@@ -1656,6 +1656,8 @@ static bool rx_fragment_tree_finalize(udpard_tree_t* const root, const uint32_t
16561656/// The redundant interfaces may use distinct MTU, which requires special fragment tree handling.
16571657typedef struct
16581658{
1659+ bool busy ;
1660+
16591661 uint64_t transfer_id ; ///< Which transfer we're reassembling here.
16601662
16611663 udpard_us_t ts_min ; ///< Earliest frame timestamp, aka transfer reception timestamp.
@@ -1672,46 +1674,30 @@ typedef struct
16721674 udpard_tree_t * fragments ;
16731675} rx_slot_t ;
16741676
1675- static rx_slot_t * rx_slot_new (const udpard_mem_t slot_memory )
1676- {
1677- rx_slot_t * const slot = mem_alloc (slot_memory , sizeof (rx_slot_t ));
1678- if (slot != NULL ) {
1679- mem_zero (sizeof (* slot ), slot );
1680- slot -> ts_min = HEAT_DEATH ;
1681- slot -> ts_max = BIG_BANG ;
1682- slot -> covered_prefix = 0 ;
1683- slot -> crc_end = 0 ;
1684- slot -> crc = CRC_INITIAL ;
1685- slot -> fragments = NULL ;
1686- }
1687- return slot ;
1688- }
1689-
1690- static void rx_slot_destroy (rx_slot_t * const slot , const udpard_mem_t fragment_memory , const udpard_mem_t slot_memory )
1677+ static void rx_slot_reset (rx_slot_t * const slot , const udpard_mem_t fragment_memory )
16911678{
1692- UDPARD_ASSERT (slot != NULL );
16931679 udpard_fragment_free_all ((udpard_fragment_t * )slot -> fragments , udpard_make_deleter (fragment_memory ));
1694- mem_free (slot_memory , sizeof (rx_slot_t ), slot );
1680+ slot -> fragments = NULL ;
1681+ slot -> busy = false;
1682+ slot -> covered_prefix = 0U ;
1683+ slot -> crc_end = 0U ;
1684+ slot -> crc = CRC_INITIAL ;
16951685}
16961686
1697- typedef enum
1698- {
1699- rx_slot_not_done ,
1700- rx_slot_done ,
1701- rx_slot_reset ,
1702- } rx_slot_update_result_t ;
1703-
17041687/// The caller will accept the ownership of the fragments iff the result is true.
1705- static rx_slot_update_result_t rx_slot_update (rx_slot_t * const slot ,
1706- const udpard_us_t ts ,
1707- const udpard_mem_t fragment_memory ,
1708- const udpard_deleter_t payload_deleter ,
1709- rx_frame_t * const frame ,
1710- const size_t extent ,
1711- uint64_t * const errors_oom ,
1712- uint64_t * const errors_transfer_malformed )
1713- {
1714- if ((slot -> ts_min == HEAT_DEATH ) && (slot -> ts_max == BIG_BANG )) {
1688+ static bool rx_slot_update (rx_slot_t * const slot ,
1689+ const udpard_us_t ts ,
1690+ const udpard_mem_t fragment_memory ,
1691+ const udpard_deleter_t payload_deleter ,
1692+ rx_frame_t * const frame ,
1693+ const size_t extent ,
1694+ uint64_t * const errors_oom ,
1695+ uint64_t * const errors_transfer_malformed )
1696+ {
1697+ bool done = false;
1698+ if (!slot -> busy ) {
1699+ rx_slot_reset (slot , fragment_memory );
1700+ slot -> busy = true;
17151701 slot -> transfer_id = frame -> meta .transfer_id ;
17161702 slot -> ts_min = ts ;
17171703 slot -> ts_max = ts ;
@@ -1723,7 +1709,8 @@ static rx_slot_update_result_t rx_slot_update(rx_slot_t* const slot,
17231709 if ((slot -> total_size != frame -> meta .transfer_payload_size ) || (slot -> priority != frame -> meta .priority )) {
17241710 ++ * errors_transfer_malformed ;
17251711 mem_free_payload (payload_deleter , frame -> base .origin );
1726- return rx_slot_reset ;
1712+ rx_slot_reset (slot , fragment_memory );
1713+ return false;
17271714 }
17281715 const rx_fragment_tree_update_result_t tree_res = rx_fragment_tree_update (& slot -> fragments ,
17291716 fragment_memory ,
@@ -1746,12 +1733,14 @@ static rx_slot_update_result_t rx_slot_update(rx_slot_t* const slot,
17461733 }
17471734 if (tree_res == rx_fragment_tree_done ) {
17481735 if (rx_fragment_tree_finalize (slot -> fragments , slot -> crc )) {
1749- return rx_slot_done ;
1736+ slot -> busy = false;
1737+ done = true;
1738+ } else {
1739+ ++ * errors_transfer_malformed ;
1740+ rx_slot_reset (slot , fragment_memory );
17501741 }
1751- ++ * errors_transfer_malformed ;
1752- return rx_slot_reset ;
17531742 }
1754- return rx_slot_not_done ;
1743+ return done ;
17551744}
17561745
17571746// --------------------------------------------- SESSION & PORT ---------------------------------------------
@@ -1763,6 +1752,8 @@ typedef struct rx_session_t
17631752 udpard_tree_t index_remote_uid ; ///< Must be the first member.
17641753 udpard_remote_t remote ; ///< Most recent discovered reverse path for P2P to the sender.
17651754
1755+ udpard_rx_port_t * port ;
1756+
17661757 /// LRU last animated list for automatic retirement of stale sessions.
17671758 udpard_listed_t list_by_animation ;
17681759 udpard_us_t last_animated_ts ;
@@ -1774,9 +1765,10 @@ typedef struct rx_session_t
17741765
17751766 bool initialized ; ///< Set after the first frame is seen.
17761767
1777- udpard_rx_port_t * port ;
1778-
1779- rx_slot_t * slots [RX_SLOT_COUNT ];
1768+ // TODO: Static slots are taking too much space; allocate them dynamically instead.
1769+ // Each is <=56 bytes so it fits nicely into a 64-byte o1heap block.
1770+ // The slot state enum can be replaced with a simple "done" flag.
1771+ rx_slot_t slots [RX_SLOT_COUNT ];
17801772} rx_session_t ;
17811773
17821774/// The reassembly strategy is composed once at initialization time by choosing a vtable with the desired behavior.
@@ -1829,7 +1821,8 @@ static udpard_tree_t* cavl_factory_rx_session_by_remote_uid(void* const user)
18291821 out -> index_remote_uid = (udpard_tree_t ){ NULL , { NULL , NULL }, 0 };
18301822 out -> list_by_animation = (udpard_listed_t ){ NULL , NULL };
18311823 for (size_t i = 0 ; i < RX_SLOT_COUNT ; i ++ ) {
1832- out -> slots [i ] = NULL ;
1824+ out -> slots [i ].fragments = NULL ;
1825+ rx_slot_reset (& out -> slots [i ], args -> owner -> memory .fragment );
18331826 }
18341827 out -> remote .uid = args -> remote_uid ;
18351828 out -> port = args -> owner ;
@@ -1845,9 +1838,7 @@ static udpard_tree_t* cavl_factory_rx_session_by_remote_uid(void* const user)
18451838static void rx_session_free (rx_session_t * const self , udpard_list_t * const sessions_by_animation )
18461839{
18471840 for (size_t i = 0 ; i < RX_SLOT_COUNT ; i ++ ) {
1848- if (self -> slots [i ] != NULL ) {
1849- rx_slot_destroy (self -> slots [i ], self -> port -> memory .fragment , self -> port -> memory .session );
1850- }
1841+ rx_slot_reset (& self -> slots [i ], self -> port -> memory .fragment );
18511842 }
18521843 cavl2_remove (& self -> port -> index_session_by_remote_uid , & self -> index_remote_uid );
18531844 delist (sessions_by_animation , & self -> list_by_animation );
@@ -1875,44 +1866,46 @@ static void rx_session_eject(rx_session_t* const self, udpard_rx_t* const rx, rx
18751866
18761867 // Finally, reset the slot.
18771868 slot -> fragments = NULL ; // Transfer ownership to the application.
1878- rx_slot_destroy (slot , self -> port -> memory .fragment , self -> port -> memory . session );
1869+ rx_slot_reset (slot , self -> port -> memory .fragment );
18791870}
18801871
1881- /// Finds an existing in-progress slot with the specified transfer-ID, or allocates a new one. Returns NULL of OOM.
1872+ /// Finds an existing in-progress slot with the specified transfer-ID, or allocates a new one.
1873+ /// Allocation always succeeds so the result is never NULL, but it may cancel a stale slot with incomplete transfer.
18821874static rx_slot_t * rx_session_get_slot (rx_session_t * const self , const udpard_us_t ts , const uint64_t transfer_id )
18831875{
18841876 // First, check if one is in progress already; resume it if so.
18851877 for (size_t i = 0 ; i < RX_SLOT_COUNT ; i ++ ) {
1886- if (( self -> slots [i ] != NULL ) && (self -> slots [i ]-> transfer_id == transfer_id )) {
1887- return self -> slots [i ];
1878+ if (self -> slots [i ]. busy && (self -> slots [i ]. transfer_id == transfer_id )) {
1879+ return & self -> slots [i ];
18881880 }
18891881 }
18901882 // Use this opportunity to check for timed-out in-progress slots. This may free up a slot for the search below.
18911883 for (size_t i = 0 ; i < RX_SLOT_COUNT ; i ++ ) {
1892- if (( self -> slots [i ] != NULL ) && (ts >= (self -> slots [i ]-> ts_max + SESSION_LIFETIME ))) {
1893- rx_slot_destroy ( self -> slots [i ], self -> port -> memory .fragment , self -> port -> memory . session );
1884+ if (self -> slots [i ]. busy && (ts >= (self -> slots [i ]. ts_max + SESSION_LIFETIME ))) {
1885+ rx_slot_reset ( & self -> slots [i ], self -> port -> memory .fragment );
18941886 }
18951887 }
18961888 // This appears to be a new transfer, so we will need to allocate a new slot for it.
18971889 for (size_t i = 0 ; i < RX_SLOT_COUNT ; i ++ ) {
1898- if (self -> slots [i ] == NULL ) {
1899- self -> slots [i ] = rx_slot_new (self -> port -> memory .session ); // may fail
1900- return self -> slots [i ];
1890+ if (!self -> slots [i ].busy ) {
1891+ return & self -> slots [i ];
19011892 }
19021893 }
19031894 // All slots are currently occupied; find the oldest slot to sacrifice.
1904- size_t oldest_index = 0 ;
1895+ rx_slot_t * slot = NULL ;
1896+ udpard_us_t oldest_ts = HEAT_DEATH ;
19051897 for (size_t i = 0 ; i < RX_SLOT_COUNT ; i ++ ) {
1906- UDPARD_ASSERT (self -> slots [i ] != NULL ); // Checked this already.
1907- UDPARD_ASSERT (self -> slots [oldest_index ] != NULL );
1908- if ( self -> slots [ i ] -> ts_max < self -> slots [oldest_index ] -> ts_max ) {
1909- oldest_index = i ;
1898+ UDPARD_ASSERT (self -> slots [i ]. busy ); // Checked this already.
1899+ if (self -> slots [i ]. ts_max < oldest_ts ) {
1900+ oldest_ts = self -> slots [i ]. ts_max ;
1901+ slot = & self -> slots [ i ] ;
19101902 }
19111903 }
1904+ UDPARD_ASSERT ((slot != NULL ) && slot -> busy );
19121905 // It is probably just a stale transfer, so it's a no-brainer to evict it, it's probably dead anyway.
1913- rx_slot_destroy ( self -> slots [ oldest_index ] , self -> port -> memory .fragment , self -> port -> memory . session );
1914- self -> slots [ oldest_index ] = rx_slot_new ( self -> port -> memory . session ); // may fail
1915- return self -> slots [ oldest_index ] ;
1906+ rx_slot_reset ( slot , self -> port -> memory .fragment );
1907+ UDPARD_ASSERT (( slot != NULL ) && ! slot -> busy );
1908+ return slot ;
19161909}
19171910
19181911static void rx_session_update (rx_session_t * const self ,
@@ -1947,24 +1940,21 @@ static void rx_session_update(rx_session_t* const self,
19471940 // UNORDERED mode update. There are no other modes now -- there used to be ORDERED in an experimental revision once.
19481941 if (!rx_session_is_transfer_ejected (self , frame -> meta .transfer_id )) {
19491942 rx_slot_t * const slot = rx_session_get_slot (self , ts , frame -> meta .transfer_id ); // new or continuation
1950- if (slot == NULL ) {
1951- mem_free_payload (payload_deleter , frame -> base .origin );
1952- rx -> errors_oom ++ ;
1953- } else {
1954- const bool done = rx_slot_update (slot ,
1955- ts ,
1956- self -> port -> memory .fragment ,
1957- payload_deleter ,
1958- frame ,
1959- self -> port -> extent ,
1960- & rx -> errors_oom ,
1961- & rx -> errors_transfer_malformed );
1962- if (done ) {
1963- if (frame -> meta .kind == frame_msg_reliable ) {
1964- tx_send_ack (rx , ts , slot -> priority , slot -> transfer_id , self -> remote );
1965- }
1966- rx_session_eject (self , rx , slot );
1943+ UDPARD_ASSERT (slot != NULL );
1944+ UDPARD_ASSERT ((!slot -> busy ) || (slot -> transfer_id == frame -> meta .transfer_id ));
1945+ const bool done = rx_slot_update (slot ,
1946+ ts ,
1947+ self -> port -> memory .fragment ,
1948+ payload_deleter ,
1949+ frame ,
1950+ self -> port -> extent ,
1951+ & rx -> errors_oom ,
1952+ & rx -> errors_transfer_malformed );
1953+ if (done ) {
1954+ if (frame -> meta .kind == frame_msg_reliable ) {
1955+ tx_send_ack (rx , ts , slot -> priority , slot -> transfer_id , self -> remote );
19671956 }
1957+ rx_session_eject (self , rx , slot );
19681958 }
19691959 } else { // retransmit ACK if needed
19701960 if ((frame -> meta .kind == frame_msg_reliable ) && (frame -> base .offset == 0U )) {
0 commit comments