@@ -1656,8 +1656,6 @@ static bool rx_fragment_tree_finalize(udpard_tree_t* const root, const uint32_t
16561656/// The redundant interfaces may use distinct MTU, which requires special fragment tree handling.
16571657typedef struct
16581658{
1659- bool busy ;
1660-
16611659 uint64_t transfer_id ; ///< Which transfer we're reassembling here.
16621660
16631661 udpard_us_t ts_min ; ///< Earliest frame timestamp, aka transfer reception timestamp.
@@ -1674,30 +1672,46 @@ typedef struct
16741672 udpard_tree_t * fragments ;
16751673} rx_slot_t ;
16761674
1677- static void rx_slot_reset (rx_slot_t * const slot , const udpard_mem_t fragment_memory )
1675+ static rx_slot_t * rx_slot_new (const udpard_mem_t slot_memory )
1676+ {
1677+ rx_slot_t * const slot = mem_alloc (slot_memory , sizeof (rx_slot_t ));
1678+ if (slot != NULL ) {
1679+ mem_zero (sizeof (* slot ), slot );
1680+ slot -> ts_min = HEAT_DEATH ;
1681+ slot -> ts_max = BIG_BANG ;
1682+ slot -> covered_prefix = 0 ;
1683+ slot -> crc_end = 0 ;
1684+ slot -> crc = CRC_INITIAL ;
1685+ slot -> fragments = NULL ;
1686+ }
1687+ return slot ;
1688+ }
1689+
1690+ static void rx_slot_destroy (rx_slot_t * const slot , const udpard_mem_t fragment_memory , const udpard_mem_t slot_memory )
16781691{
1692+ UDPARD_ASSERT (slot != NULL );
16791693 udpard_fragment_free_all ((udpard_fragment_t * )slot -> fragments , udpard_make_deleter (fragment_memory ));
1680- slot -> fragments = NULL ;
1681- slot -> busy = false;
1682- slot -> covered_prefix = 0U ;
1683- slot -> crc_end = 0U ;
1684- slot -> crc = CRC_INITIAL ;
1694+ mem_free (slot_memory , sizeof (rx_slot_t ), slot );
16851695}
16861696
1697+ typedef enum
1698+ {
1699+ rx_slot_not_done ,
1700+ rx_slot_done ,
1701+ rx_slot_reset ,
1702+ } rx_slot_update_result_t ;
1703+
16871704/// The caller will accept the ownership of the fragments iff the result is true.
1688- static bool rx_slot_update (rx_slot_t * const slot ,
1689- const udpard_us_t ts ,
1690- const udpard_mem_t fragment_memory ,
1691- const udpard_deleter_t payload_deleter ,
1692- rx_frame_t * const frame ,
1693- const size_t extent ,
1694- uint64_t * const errors_oom ,
1695- uint64_t * const errors_transfer_malformed )
1696- {
1697- bool done = false;
1698- if (!slot -> busy ) {
1699- rx_slot_reset (slot , fragment_memory );
1700- slot -> busy = true;
1705+ static rx_slot_update_result_t rx_slot_update (rx_slot_t * const slot ,
1706+ const udpard_us_t ts ,
1707+ const udpard_mem_t fragment_memory ,
1708+ const udpard_deleter_t payload_deleter ,
1709+ rx_frame_t * const frame ,
1710+ const size_t extent ,
1711+ uint64_t * const errors_oom ,
1712+ uint64_t * const errors_transfer_malformed )
1713+ {
1714+ if ((slot -> ts_min == HEAT_DEATH ) && (slot -> ts_max == BIG_BANG )) {
17011715 slot -> transfer_id = frame -> meta .transfer_id ;
17021716 slot -> ts_min = ts ;
17031717 slot -> ts_max = ts ;
@@ -1709,8 +1723,7 @@ static bool rx_slot_update(rx_slot_t* const slot,
17091723 if ((slot -> total_size != frame -> meta .transfer_payload_size ) || (slot -> priority != frame -> meta .priority )) {
17101724 ++ * errors_transfer_malformed ;
17111725 mem_free_payload (payload_deleter , frame -> base .origin );
1712- rx_slot_reset (slot , fragment_memory );
1713- return false;
1726+ return rx_slot_reset ;
17141727 }
17151728 const rx_fragment_tree_update_result_t tree_res = rx_fragment_tree_update (& slot -> fragments ,
17161729 fragment_memory ,
@@ -1733,14 +1746,12 @@ static bool rx_slot_update(rx_slot_t* const slot,
17331746 }
17341747 if (tree_res == rx_fragment_tree_done ) {
17351748 if (rx_fragment_tree_finalize (slot -> fragments , slot -> crc )) {
1736- slot -> busy = false;
1737- done = true;
1738- } else {
1739- ++ * errors_transfer_malformed ;
1740- rx_slot_reset (slot , fragment_memory );
1749+ return rx_slot_done ;
17411750 }
1751+ ++ * errors_transfer_malformed ;
1752+ return rx_slot_reset ;
17421753 }
1743- return done ;
1754+ return rx_slot_not_done ;
17441755}
17451756
17461757// --------------------------------------------- SESSION & PORT ---------------------------------------------
@@ -1752,8 +1763,6 @@ typedef struct rx_session_t
17521763 udpard_tree_t index_remote_uid ; ///< Must be the first member.
17531764 udpard_remote_t remote ; ///< Most recent discovered reverse path for P2P to the sender.
17541765
1755- udpard_rx_port_t * port ;
1756-
17571766 /// LRU last animated list for automatic retirement of stale sessions.
17581767 udpard_listed_t list_by_animation ;
17591768 udpard_us_t last_animated_ts ;
@@ -1765,10 +1774,9 @@ typedef struct rx_session_t
17651774
17661775 bool initialized ; ///< Set after the first frame is seen.
17671776
1768- // TODO: Static slots are taking too much space; allocate them dynamically instead.
1769- // Each is <=56 bytes so it fits nicely into a 64-byte o1heap block.
1770- // The slot state enum can be replaced with a simple "done" flag.
1771- rx_slot_t slots [RX_SLOT_COUNT ];
1777+ udpard_rx_port_t * port ;
1778+
1779+ rx_slot_t * slots [RX_SLOT_COUNT ];
17721780} rx_session_t ;
17731781
17741782/// The reassembly strategy is composed once at initialization time by choosing a vtable with the desired behavior.
@@ -1821,8 +1829,7 @@ static udpard_tree_t* cavl_factory_rx_session_by_remote_uid(void* const user)
18211829 out -> index_remote_uid = (udpard_tree_t ){ NULL , { NULL , NULL }, 0 };
18221830 out -> list_by_animation = (udpard_listed_t ){ NULL , NULL };
18231831 for (size_t i = 0 ; i < RX_SLOT_COUNT ; i ++ ) {
1824- out -> slots [i ].fragments = NULL ;
1825- rx_slot_reset (& out -> slots [i ], args -> owner -> memory .fragment );
1832+ out -> slots [i ] = NULL ;
18261833 }
18271834 out -> remote .uid = args -> remote_uid ;
18281835 out -> port = args -> owner ;
@@ -1838,7 +1845,9 @@ static udpard_tree_t* cavl_factory_rx_session_by_remote_uid(void* const user)
18381845static void rx_session_free (rx_session_t * const self , udpard_list_t * const sessions_by_animation )
18391846{
18401847 for (size_t i = 0 ; i < RX_SLOT_COUNT ; i ++ ) {
1841- rx_slot_reset (& self -> slots [i ], self -> port -> memory .fragment );
1848+ if (self -> slots [i ] != NULL ) {
1849+ rx_slot_destroy (self -> slots [i ], self -> port -> memory .fragment , self -> port -> memory .session );
1850+ }
18421851 }
18431852 cavl2_remove (& self -> port -> index_session_by_remote_uid , & self -> index_remote_uid );
18441853 delist (sessions_by_animation , & self -> list_by_animation );
@@ -1866,46 +1875,44 @@ static void rx_session_eject(rx_session_t* const self, udpard_rx_t* const rx, rx
18661875
18671876 // Finally, reset the slot.
18681877 slot -> fragments = NULL ; // Transfer ownership to the application.
1869- rx_slot_reset (slot , self -> port -> memory .fragment );
1878+ rx_slot_destroy (slot , self -> port -> memory .fragment , self -> port -> memory . session );
18701879}
18711880
1872- /// Finds an existing in-progress slot with the specified transfer-ID, or allocates a new one.
1873- /// Allocation always succeeds so the result is never NULL, but it may cancel a stale slot with incomplete transfer.
1881+ /// Finds an existing in-progress slot with the specified transfer-ID, or allocates a new one. Returns NULL of OOM.
18741882static rx_slot_t * rx_session_get_slot (rx_session_t * const self , const udpard_us_t ts , const uint64_t transfer_id )
18751883{
18761884 // First, check if one is in progress already; resume it if so.
18771885 for (size_t i = 0 ; i < RX_SLOT_COUNT ; i ++ ) {
1878- if (self -> slots [i ]. busy && (self -> slots [i ]. transfer_id == transfer_id )) {
1879- return & self -> slots [i ];
1886+ if (( self -> slots [i ] != NULL ) && (self -> slots [i ]-> transfer_id == transfer_id )) {
1887+ return self -> slots [i ];
18801888 }
18811889 }
18821890 // Use this opportunity to check for timed-out in-progress slots. This may free up a slot for the search below.
18831891 for (size_t i = 0 ; i < RX_SLOT_COUNT ; i ++ ) {
1884- if (self -> slots [i ]. busy && (ts >= (self -> slots [i ]. ts_max + SESSION_LIFETIME ))) {
1885- rx_slot_reset ( & self -> slots [i ], self -> port -> memory .fragment );
1892+ if (( self -> slots [i ] != NULL ) && (ts >= (self -> slots [i ]-> ts_max + SESSION_LIFETIME ))) {
1893+ rx_slot_destroy ( self -> slots [i ], self -> port -> memory .fragment , self -> port -> memory . session );
18861894 }
18871895 }
18881896 // This appears to be a new transfer, so we will need to allocate a new slot for it.
18891897 for (size_t i = 0 ; i < RX_SLOT_COUNT ; i ++ ) {
1890- if (!self -> slots [i ].busy ) {
1891- return & self -> slots [i ];
1898+ if (self -> slots [i ] == NULL ) {
1899+ self -> slots [i ] = rx_slot_new (self -> port -> memory .session ); // may fail
1900+ return self -> slots [i ];
18921901 }
18931902 }
18941903 // All slots are currently occupied; find the oldest slot to sacrifice.
1895- rx_slot_t * slot = NULL ;
1896- udpard_us_t oldest_ts = HEAT_DEATH ;
1904+ size_t oldest_index = 0 ;
18971905 for (size_t i = 0 ; i < RX_SLOT_COUNT ; i ++ ) {
1898- UDPARD_ASSERT (self -> slots [i ]. busy ); // Checked this already.
1899- if (self -> slots [i ]. ts_max < oldest_ts ) {
1900- oldest_ts = self -> slots [i ]. ts_max ;
1901- slot = & self -> slots [ i ] ;
1906+ UDPARD_ASSERT (self -> slots [i ] != NULL ); // Checked this already.
1907+ UDPARD_ASSERT (self -> slots [oldest_index ] != NULL );
1908+ if ( self -> slots [ i ] -> ts_max < self -> slots [oldest_index ] -> ts_max ) {
1909+ oldest_index = i ;
19021910 }
19031911 }
1904- UDPARD_ASSERT ((slot != NULL ) && slot -> busy );
19051912 // It is probably just a stale transfer, so it's a no-brainer to evict it, it's probably dead anyway.
1906- rx_slot_reset ( slot , self -> port -> memory .fragment );
1907- UDPARD_ASSERT (( slot != NULL ) && ! slot -> busy );
1908- return slot ;
1913+ rx_slot_destroy ( self -> slots [ oldest_index ] , self -> port -> memory .fragment , self -> port -> memory . session );
1914+ self -> slots [ oldest_index ] = rx_slot_new ( self -> port -> memory . session ); // may fail
1915+ return self -> slots [ oldest_index ] ;
19091916}
19101917
19111918static void rx_session_update (rx_session_t * const self ,
@@ -1940,21 +1947,24 @@ static void rx_session_update(rx_session_t* const self,
19401947 // UNORDERED mode update. There are no other modes now -- there used to be ORDERED in an experimental revision once.
19411948 if (!rx_session_is_transfer_ejected (self , frame -> meta .transfer_id )) {
19421949 rx_slot_t * const slot = rx_session_get_slot (self , ts , frame -> meta .transfer_id ); // new or continuation
1943- UDPARD_ASSERT (slot != NULL );
1944- UDPARD_ASSERT ((!slot -> busy ) || (slot -> transfer_id == frame -> meta .transfer_id ));
1945- const bool done = rx_slot_update (slot ,
1946- ts ,
1947- self -> port -> memory .fragment ,
1948- payload_deleter ,
1949- frame ,
1950- self -> port -> extent ,
1951- & rx -> errors_oom ,
1952- & rx -> errors_transfer_malformed );
1953- if (done ) {
1954- if (frame -> meta .kind == frame_msg_reliable ) {
1955- tx_send_ack (rx , ts , slot -> priority , slot -> transfer_id , self -> remote );
1950+ if (slot == NULL ) {
1951+ mem_free_payload (payload_deleter , frame -> base .origin );
1952+ rx -> errors_oom ++ ;
1953+ } else {
1954+ const bool done = rx_slot_update (slot ,
1955+ ts ,
1956+ self -> port -> memory .fragment ,
1957+ payload_deleter ,
1958+ frame ,
1959+ self -> port -> extent ,
1960+ & rx -> errors_oom ,
1961+ & rx -> errors_transfer_malformed );
1962+ if (done ) {
1963+ if (frame -> meta .kind == frame_msg_reliable ) {
1964+ tx_send_ack (rx , ts , slot -> priority , slot -> transfer_id , self -> remote );
1965+ }
1966+ rx_session_eject (self , rx , slot );
19561967 }
1957- rx_session_eject (self , rx , slot );
19581968 }
19591969 } else { // retransmit ACK if needed
19601970 if ((frame -> meta .kind == frame_msg_reliable ) && (frame -> base .offset == 0U )) {
0 commit comments