@@ -544,8 +544,8 @@ static bool header_deserialize(const udpard_bytes_mut_t dgram_payload,
544544 const byte_t head = * ptr ++ ;
545545 const byte_t version = head & 0x1FU ;
546546 if (version == HEADER_VERSION ) {
547- out_meta -> priority = (udpard_prio_t )((byte_t )(head >> 5U ) & 0x07U );
548- const frame_kind_t kind = (frame_kind_t )* ptr ++ ;
547+ out_meta -> priority = (udpard_prio_t )((byte_t )(head >> 5U ) & 0x07U );
548+ out_meta -> kind = (frame_kind_t )* ptr ++ ;
549549 ptr += 2U ;
550550 ptr = deserialize_u32 (ptr , frame_index );
551551 ptr = deserialize_u32 (ptr , frame_payload_offset );
@@ -560,14 +560,15 @@ static bool header_deserialize(const udpard_bytes_mut_t dgram_payload,
560560 // Finalize the fields.
561561 * frame_index = HEADER_FRAME_INDEX_MAX & * frame_index ;
562562 // Validate the fields.
563- ok = ok && ((kind == frame_msg_best ) || (kind == frame_msg_reliable ) || (kind == frame_ack ));
563+ ok = ok && ((out_meta -> kind == frame_msg_best ) || (out_meta -> kind == frame_msg_reliable ) ||
564+ (out_meta -> kind == frame_ack ));
564565 ok = ok && (((uint64_t )* frame_payload_offset + (uint64_t )out_payload -> size ) <=
565566 (uint64_t )out_meta -> transfer_payload_size );
566567 ok = ok && ((0 == * frame_index ) == (0 == * frame_payload_offset ));
567568 // The prefix-CRC of the first frame of a transfer equals the CRC of its payload.
568569 ok = ok && ((0 < * frame_payload_offset ) || (crc_full (out_payload -> size , out_payload -> data ) == * prefix_crc ));
569570 // ACK frame requires zero offset, single-frame transfer.
570- ok = ok && ((kind != frame_ack ) || (* frame_payload_offset == 0U ));
571+ ok = ok && ((out_meta -> kind != frame_ack ) || (* frame_payload_offset == 0U ));
571572 } else {
572573 ok = false;
573574 }
@@ -1092,22 +1093,23 @@ static bool tx_push(udpard_tx_t* const tx,
10921093}
10931094
10941095/// Handle an ACK received from a remote node.
1095- static void tx_receive_ack (udpard_rx_t * const rx , const uint64_t transfer_id )
1096+ static void tx_receive_ack (udpard_rx_t * const rx , const uint64_t sender_uid , const uint64_t transfer_id )
10961097{
10971098 if (rx -> tx != NULL ) {
10981099 // A transfer-ID collision is astronomically unlikely: given 10k simultaneously pending reliable transfers,
10991100 // which is outside typical usage, the probability of a collision is about 1 in 500 billion. However, we
11001101 // take into account that the PRNG used to seed the transfer-ID may be imperfect, so we add explicit collision
1101- // handling.
1102- // The disambiguation policy is very simple: given an ambiguous match, acknowledge the oldest reliable topic.
1103- // In all practical scenarios at most a single iteration will be needed.
1102+ // handling. In all practical scenarios at most a single iteration will be needed.
11041103 const tx_key_transfer_id_t key = { .transfer_id = transfer_id , .seq_no = 0 };
11051104 tx_transfer_t * tr =
11061105 CAVL2_TO_OWNER (cavl2_lower_bound (rx -> tx -> index_transfer_id , & key , & tx_cavl_compare_transfer_id ),
11071106 tx_transfer_t ,
11081107 index_transfer_id );
11091108 while ((tr != NULL ) && (tr -> transfer_id == transfer_id )) {
1110- if (tr -> kind == frame_msg_reliable ) { // pick the first reliable and acknowledge it
1109+ // Outgoing reliable P2P transfers only accept acks from the intended recipient.
1110+ // Non-P2P accept acks from any sender.
1111+ const bool destination_match = !tr -> is_p2p || (tr -> p2p_remote .uid == sender_uid );
1112+ if ((tr -> kind == frame_msg_reliable ) && destination_match ) {
11111113 tx_transfer_retire (rx -> tx , tr , true);
11121114 break ;
11131115 }
@@ -1205,9 +1207,10 @@ bool udpard_tx_new(udpard_tx_t* const self,
12051207 self -> enqueued_frames_count = 0 ;
12061208 self -> next_seq_no = 0 ;
12071209 self -> memory = memory ;
1208- self -> index_staged = NULL ;
1209- self -> index_deadline = NULL ;
12101210 self -> index_transfer_id = NULL ;
1211+ self -> index_deadline = NULL ;
1212+ self -> index_staged = NULL ;
1213+ self -> agewise = (udpard_list_t ){ NULL , NULL };
12111214 self -> user = NULL ;
12121215 for (size_t i = 0 ; i < UDPARD_IFACE_COUNT_MAX ; i ++ ) {
12131216 self -> mtu [i ] = UDPARD_MTU_DEFAULT ;
@@ -2425,7 +2428,7 @@ bool udpard_rx_port_push(udpard_rx_t* const rx,
24252428 if (frame .meta .kind != frame_ack ) {
24262429 port -> vtable_private -> accept (rx , port , timestamp , source_ep , & frame , payload_deleter , iface_index );
24272430 } else {
2428- tx_receive_ack (rx , frame .meta .transfer_id );
2431+ tx_receive_ack (rx , frame .meta .sender_uid , frame . meta . transfer_id );
24292432 mem_free_payload (payload_deleter , frame .base .origin );
24302433 }
24312434 } else {
0 commit comments