|
4 | 4 | "context" |
5 | 5 | "encoding/json" |
6 | 6 | "fmt" |
| 7 | + "io" |
7 | 8 | "net/http" |
8 | 9 | "net/url" |
9 | 10 | "strings" |
@@ -574,6 +575,131 @@ func TestSendJoinPartialStateResponse(t *testing.T) { |
574 | 575 | must.HaveInOrder(t, sendJoinResp.ServersInRoom, []string{"hs1"}) |
575 | 576 | } |
576 | 577 |
|
| 578 | +// This test verifies that events sent into a room between a /make_join and |
| 579 | +// /send_join are not lost to the joining server. When an event is created |
| 580 | +// during the join handshake, the join event's prev_events (set at make_join |
| 581 | +// time) won't reference it, creating two forward extremities. The server |
| 582 | +// handling the join should ensure the joining server can discover the missed |
| 583 | +// event, for example by sending a follow-up event that references both |
| 584 | +// extremities, prompting the joining server to backfill. |
| 585 | +// |
| 586 | +// See https://github.com/element-hq/synapse/pull/19390 |
| 587 | +func TestEventBetweenMakeJoinAndSendJoinIsNotLost(t *testing.T) { |
| 588 | + deployment := complement.Deploy(t, 1) |
| 589 | + defer deployment.Destroy(t) |
| 590 | + |
| 591 | + alice := deployment.Register(t, "hs1", helpers.RegistrationOpts{}) |
| 592 | + |
| 593 | + // We track the message event ID sent between make_join and send_join. |
| 594 | + // After send_join, we wait for hs1 to send us either: |
| 595 | + // - the message event itself, or |
| 596 | + // - any event whose prev_events reference the message (e.g. a dummy event) |
| 597 | + var messageEventID string |
| 598 | + messageDiscoverableWaiter := helpers.NewWaiter() |
| 599 | + |
| 600 | + srv := federation.NewServer(t, deployment, |
| 601 | + federation.HandleKeyRequests(), |
| 602 | + ) |
| 603 | + srv.UnexpectedRequestsAreErrors = false |
| 604 | + |
| 605 | + // Custom /send handler: the Complement server won't be in the room until |
| 606 | + // send_join completes, so we can't use HandleTransactionRequests (which |
| 607 | + // requires the room in srv.rooms). Instead we parse the raw transaction. |
| 608 | + srv.Mux().Handle("/_matrix/federation/v1/send/{transactionID}", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { |
| 609 | + body, _ := io.ReadAll(req.Body) |
| 610 | + txn := gjson.ParseBytes(body) |
| 611 | + txn.Get("pdus").ForEach(func(_, pdu gjson.Result) bool { |
| 612 | + eventID := pdu.Get("event_id").String() |
| 613 | + eventType := pdu.Get("type").String() |
| 614 | + t.Logf("Received PDU via /send: type=%s id=%s", eventType, eventID) |
| 615 | + |
| 616 | + if messageEventID == "" { |
| 617 | + return true |
| 618 | + } |
| 619 | + |
| 620 | + // Check if this IS the message event (server pushed it directly). |
| 621 | + if eventID == messageEventID { |
| 622 | + messageDiscoverableWaiter.Finish() |
| 623 | + return true |
| 624 | + } |
| 625 | + |
| 626 | + // Check if this event's prev_events reference the message |
| 627 | + // (e.g. a dummy event tying the forward extremities together). |
| 628 | + pdu.Get("prev_events").ForEach(func(_, prevEvent gjson.Result) bool { |
| 629 | + if prevEvent.String() == messageEventID { |
| 630 | + messageDiscoverableWaiter.Finish() |
| 631 | + return false |
| 632 | + } |
| 633 | + return true |
| 634 | + }) |
| 635 | + |
| 636 | + return true |
| 637 | + }) |
| 638 | + w.WriteHeader(200) |
| 639 | + w.Write([]byte(`{"pdus":{}}`)) |
| 640 | + })).Methods("PUT") |
| 641 | + |
| 642 | + cancel := srv.Listen() |
| 643 | + defer cancel() |
| 644 | + |
| 645 | + // Alice creates a room on hs1. |
| 646 | + roomID := alice.MustCreateRoom(t, map[string]interface{}{ |
| 647 | + "preset": "public_chat", |
| 648 | + }) |
| 649 | + |
| 650 | + charlie := srv.UserID("charlie") |
| 651 | + origin := srv.ServerName() |
| 652 | + fedClient := srv.FederationClient(deployment) |
| 653 | + |
| 654 | + // Step 1: make_join, hs1 returns a join event template whose prev_events |
| 655 | + // reflect the current room DAG tips. |
| 656 | + makeJoinResp, err := fedClient.MakeJoin( |
| 657 | + context.Background(), origin, |
| 658 | + deployment.GetFullyQualifiedHomeserverName(t, "hs1"), |
| 659 | + roomID, charlie, |
| 660 | + ) |
| 661 | + must.NotError(t, "MakeJoin", err) |
| 662 | + |
| 663 | + // Step 2: Alice sends a message on hs1. This advances the DAG past the |
| 664 | + // point captured by make_join's prev_events. The Complement server is not |
| 665 | + // yet in the room, so it won't receive this event via normal federation. |
| 666 | + messageEventID = alice.SendEventSynced(t, roomID, b.Event{ |
| 667 | + Type: "m.room.message", |
| 668 | + Content: map[string]interface{}{ |
| 669 | + "msgtype": "m.text", |
| 670 | + "body": "Message sent between make_join and send_join", |
| 671 | + }, |
| 672 | + }) |
| 673 | + t.Logf("Alice sent message %s between make_join and send_join", messageEventID) |
| 674 | + |
| 675 | + // Step 3: Build and sign the join event, then send_join. |
| 676 | + // The join event's prev_events are from step 1 (before the message), |
| 677 | + // so persisting it on hs1 creates two forward extremities: the message |
| 678 | + // and the join. |
| 679 | + verImpl, err := gomatrixserverlib.GetRoomVersion(makeJoinResp.RoomVersion) |
| 680 | + must.NotError(t, "GetRoomVersion", err) |
| 681 | + eb := verImpl.NewEventBuilderFromProtoEvent(&makeJoinResp.JoinEvent) |
| 682 | + joinEvent, err := eb.Build(time.Now(), srv.ServerName(), srv.KeyID, srv.Priv) |
| 683 | + must.NotError(t, "Build join event", err) |
| 684 | + |
| 685 | + _, err = fedClient.SendJoin( |
| 686 | + context.Background(), origin, |
| 687 | + deployment.GetFullyQualifiedHomeserverName(t, "hs1"), |
| 688 | + joinEvent, |
| 689 | + ) |
| 690 | + must.NotError(t, "SendJoin", err) |
| 691 | + |
| 692 | + // Step 4: hs1 should make the missed message discoverable to the joining |
| 693 | + // server. We accept either receiving the message event directly, or |
| 694 | + // receiving any event whose prev_events reference it (allowing the |
| 695 | + // joining server to backfill). |
| 696 | + messageDiscoverableWaiter.Waitf(t, 5*time.Second, |
| 697 | + "Timed out waiting for message event %s to become discoverable — "+ |
| 698 | + "the event sent between make_join and send_join was lost to the "+ |
| 699 | + "joining server", messageEventID, |
| 700 | + ) |
| 701 | +} |
| 702 | + |
577 | 703 | // given an event JSON, return the type and state_key, joined with a "|" |
578 | 704 | func typeAndStateKeyForEvent(result gjson.Result) string { |
579 | 705 | return strings.Join([]string{result.Map()["type"].Str, result.Map()["state_key"].Str}, "|") |
|
0 commit comments