Skip to content
12 changes: 9 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ update-ca-certificates

## Sytest parity

As of 10 February 2023:
As of 29 October 2025:
```
$ go build ./cmd/sytest-coverage
$ ./sytest-coverage -v
Expand Down Expand Up @@ -507,7 +507,13 @@ $ ./sytest-coverage -v
✓ Can get rooms/{roomId}/members

30rooms/60version_upgrade 0/19 tests
30rooms/70publicroomslist 0/5 tests
30rooms/70publicroomslist 2/5 tests
× Asking for a remote rooms list, but supplying the local server's name, returns the local rooms list
× Can get remote public room list
× Can paginate public room list
✓ Can search public room list
✓ Name/topic keys are correct

31sync/01filter 2/2 tests
✓ Can create filter
✓ Can download filter
Expand Down Expand Up @@ -707,5 +713,5 @@ $ ./sytest-coverage -v
90jira/SYN-516 0/1 tests
90jira/SYN-627 0/1 tests

TOTAL: 220/610 tests converted
TOTAL: 222/610 tests converted
```
39 changes: 30 additions & 9 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/matrix-org/complement/b"
"github.com/matrix-org/complement/ct"
"github.com/matrix-org/complement/internal"
)

type ctxKey string
Expand Down Expand Up @@ -767,6 +768,9 @@ func (c *CSAPI) MustDo(t ct.TestLike, method string, paths []string, opts ...Req
// match.JSONKeyEqual("errcode", "M_INVALID_USERNAME"),
// },
// })
//
// The caller does not need to worry about closing the returned `http.Response.Body` as
// this is handled automatically.
func (c *CSAPI) Do(t ct.TestLike, method string, paths []string, opts ...RequestOpt) *http.Response {
t.Helper()
escapedPaths := make([]string, len(paths))
Expand Down Expand Up @@ -815,6 +819,30 @@ func (c *CSAPI) Do(t ct.TestLike, method string, paths []string, opts ...Request
if err != nil {
ct.Fatalf(t, "CSAPI.Do response returned error: %s", err)
}
// `defer` is function scoped but it's okay that we only clean up all requests at
// the end. To also be clear, `defer` arguments are evaluated at the time of the
// `defer` statement so we are only closing the original response body here. Our new
// response body will be untouched.
defer internal.CloseIO(
res.Body,
fmt.Sprintf(
"CSAPI.Do: response body from %s %s",
res.Request.Method,
res.Request.URL.String(),
),
)

// Make a copy of the response body so that downstream callers can read it multiple
// times if needed and don't need to worry about closing it.
var resBody []byte
if res.Body != nil {
resBody, err = io.ReadAll(res.Body)
if err != nil {
ct.Fatalf(t, "CSAPI.Do failed to read response body for RetryUntil check: %s", err)
}
res.Body = io.NopCloser(bytes.NewBuffer(resBody))
}

// debug log the response
if c.Debug && res != nil {
var dump []byte
Expand All @@ -824,19 +852,12 @@ func (c *CSAPI) Do(t ct.TestLike, method string, paths []string, opts ...Request
}
t.Logf("%s", string(dump))
}

if retryUntil == nil || retryUntil.timeout == 0 {
return res // don't retry
}

// check the condition, make a copy of the response body first in case the check consumes it
var resBody []byte
if res.Body != nil {
resBody, err = io.ReadAll(res.Body)
if err != nil {
ct.Fatalf(t, "CSAPI.Do failed to read response body for RetryUntil check: %s", err)
}
res.Body = io.NopCloser(bytes.NewBuffer(resBody))
}
// check the condition
if retryUntil.untilFn(res) {
// remake the response and return
res.Body = io.NopCloser(bytes.NewBuffer(resBody))
Expand Down
174 changes: 109 additions & 65 deletions client/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net/http"
"net/url"
"reflect"
"slices"
"sort"
"strings"
"time"
Expand Down Expand Up @@ -269,95 +270,138 @@ func SyncPresenceHas(fromUser string, expectedPresence *string, checks ...func(g
}
}

// Checks that `userID` gets invited to `roomID`.
// syncMembershipIn checks that `userID` has `membership` in `roomID`, with optional
// extra checks on the found membership event.
//
// This checks different parts of the /sync response depending on the client making the request.
// If the client is also the person being invited to the room then the 'invite' block will be inspected.
// If the client is different to the person being invited then the 'join' block will be inspected.
func SyncInvitedTo(userID, roomID string) SyncCheckOpt {
return func(clientUserID string, topLevelSyncJSON gjson.Result) error {
// two forms which depend on what the client user is:
// - passively viewing an invite for a room you're joined to (timeline events)
// - actively being invited to a room.
if clientUserID == userID {
// active
err := checkArrayElements(
topLevelSyncJSON, "rooms.invite."+GjsonEscape(roomID)+".invite_state.events",
func(ev gjson.Result) bool {
return ev.Get("type").Str == "m.room.member" && ev.Get("state_key").Str == userID && ev.Get("content.membership").Str == "invite"
},
)
if err != nil {
return fmt.Errorf("SyncInvitedTo(%s): %s", roomID, err)
}
return nil
}
// passive
return SyncTimelineHas(roomID, func(ev gjson.Result) bool {
return ev.Get("type").Str == "m.room.member" && ev.Get("state_key").Str == userID && ev.Get("content.membership").Str == "invite"
})(clientUserID, topLevelSyncJSON)
}
}

// Check that `userID` gets joined to `roomID` by inspecting the join timeline for a membership event.
// This can be also used to passively observe another user's membership changes in a
// room although we assume that the observing client is joined to the room.
//
// Additional checks can be passed to narrow down the check, all must pass.
func SyncJoinedTo(userID, roomID string, checks ...func(gjson.Result) bool) SyncCheckOpt {
checkJoined := func(ev gjson.Result) bool {
if ev.Get("type").Str == "m.room.member" && ev.Get("state_key").Str == userID && ev.Get("content.membership").Str == "join" {
// Note: This will not work properly with leave/ban membership for initial syncs, see
// https://github.com/matrix-org/matrix-doc/issues/3537
func syncMembershipIn(userID, roomID, membership string, checks ...func(gjson.Result) bool) SyncCheckOpt {
checkMembership := func(ev gjson.Result) bool {
if ev.Get("type").Str == "m.room.member" && ev.Get("state_key").Str == userID && ev.Get("content.membership").Str == membership {
for _, check := range checks {
if !check(ev) {
// short-circuit, bail early
return false
}
}
// passed both basic join check and all other checks
// passed both basic membership check and all other checks
return true
}
return false
}
return func(clientUserID string, topLevelSyncJSON gjson.Result) error {
// Check both the timeline and the state events for the join event
// since on initial sync, the state events may only be in
// <room>.state.events.
// Check both the timeline and the state events for the membership event since on
// initial sync, the state events may only be in state. Additionally, state only
// covers the "updates for the room up to the start of the timeline."

// We assume the passively observing client user is joined to the room
roomTypeKey := "join"
// Otherwise, if the client is the user whose membership we are checking, we need to
// pick the correct room type JSON key based on the membership being checked.
if clientUserID == userID {
if membership == "join" {
roomTypeKey = "join"
} else if membership == "leave" || membership == "ban" {
roomTypeKey = "leave"
} else if membership == "invite" {
roomTypeKey = "invite"
} else if membership == "knock" {
roomTypeKey = "knock"
} else {
return fmt.Errorf("syncMembershipIn(%s, %s): unknown membership: %s", roomID, membership, membership)
}
}

// We assume the passively observing client user is joined to the room (`rooms.join.<roomID>.state`)
stateKey := "state"
// Otherwise, if the client is the user whose membership we are checking,
// we need to pick the correct JSON key based on the membership being checked.
if clientUserID == userID {
if membership == "join" || membership == "leave" || membership == "ban" {
stateKey = "state"
} else if membership == "invite" {
stateKey = "invite_state"
} else if membership == "knock" {
stateKey = "knock_state"
} else {
return fmt.Errorf("syncMembershipIn(%s, %s): unknown membership: %s", roomID, membership, membership)
}
}

// Check the state first as it's a better source of truth than the `timeline`.
//
// FIXME: Ideally, we'd use something like `state_after` to get the actual current
// state in the room instead of us assuming that no state resets/conflicts happen
// when we apply state from the `timeline` on top of the `state`. But `state_after`
// is gated behind a sync request parameter which we can't control here.
firstErr := checkArrayElements(
topLevelSyncJSON, "rooms.join."+GjsonEscape(roomID)+".timeline.events", checkJoined,
topLevelSyncJSON, "rooms."+roomTypeKey+"."+GjsonEscape(roomID)+"."+stateKey+".events", checkMembership,
)
if firstErr == nil {
return nil
}

secondErr := checkArrayElements(
topLevelSyncJSON, "rooms.join."+GjsonEscape(roomID)+".state.events", checkJoined,
)
if secondErr == nil {
return nil
// Check the timeline
//
// This is also important to differentiate between leave/ban because those both
// appear in the `leave` `roomTypeKey` and we need to specifically check the
// timeline for the membership event to differentiate them.
var secondErr error
// The `timeline` is only available for join/leave/ban memberships.
if slices.Contains([]string{"join", "leave", "ban"}, membership) ||
// We assume the passively observing client user is joined to the room (therefore
// has `timeline`).
clientUserID != userID {
secondErr = checkArrayElements(
topLevelSyncJSON, "rooms."+roomTypeKey+"."+GjsonEscape(roomID)+".timeline.events", checkMembership,
)
if secondErr == nil {
return nil
}
}
return fmt.Errorf("SyncJoinedTo(%s): %s & %s", roomID, firstErr, secondErr)

return fmt.Errorf("syncMembershipIn(%s, %s): %s & %s - %s", roomID, membership, firstErr, secondErr, topLevelSyncJSON)
}
}

// Check that `userID` is leaving `roomID` by inspecting the timeline for a membership event, or witnessing `roomID` in `rooms.leave`
// Checks that `userID` gets invited to `roomID`
//
// Additional checks can be passed to narrow down the check, all must pass.
func SyncInvitedTo(userID, roomID string, checks ...func(gjson.Result) bool) SyncCheckOpt {
return syncMembershipIn(userID, roomID, "invite", checks...)
}

// Checks that `userID` has knocked on `roomID`
//
// Additional checks can be passed to narrow down the check, all must pass.
func SyncKnockedOn(userID, roomID string, checks ...func(gjson.Result) bool) SyncCheckOpt {
return syncMembershipIn(userID, roomID, "knock", checks...)
}

// Check that `userID` gets joined to `roomID`
//
// Additional checks can be passed to narrow down the check, all must pass.
func SyncJoinedTo(userID, roomID string, checks ...func(gjson.Result) bool) SyncCheckOpt {
return syncMembershipIn(userID, roomID, "join", checks...)
}

// Check that `userID` has left the `roomID`
// Note: This will not work properly with initial syncs, see https://github.com/matrix-org/matrix-doc/issues/3537
func SyncLeftFrom(userID, roomID string) SyncCheckOpt {
return func(clientUserID string, topLevelSyncJSON gjson.Result) error {
// two forms which depend on what the client user is:
// - passively viewing a membership for a room you're joined in
// - actively leaving the room
if clientUserID == userID {
// active
events := topLevelSyncJSON.Get("rooms.leave." + GjsonEscape(roomID))
if !events.Exists() {
return fmt.Errorf("no leave section for room %s", roomID)
} else {
return nil
}
}
// passive
return SyncTimelineHas(roomID, func(ev gjson.Result) bool {
return ev.Get("type").Str == "m.room.member" && ev.Get("state_key").Str == userID && ev.Get("content.membership").Str == "leave"
})(clientUserID, topLevelSyncJSON)
}
//
// Additional checks can be passed to narrow down the check, all must pass.
func SyncLeftFrom(userID, roomID string, checks ...func(gjson.Result) bool) SyncCheckOpt {
return syncMembershipIn(userID, roomID, "leave", checks...)
}

// Check that `userID` is banned from the `roomID`
// Note: This will not work properly with initial syncs, see https://github.com/matrix-org/matrix-doc/issues/3537
//
// Additional checks can be passed to narrow down the check, all must pass.
func SyncBannedFrom(userID, roomID string, checks ...func(gjson.Result) bool) SyncCheckOpt {
return syncMembershipIn(userID, roomID, "ban", checks...)
}

// Calls the `check` function for each global account data event, and returns with success if the
Expand Down
11 changes: 10 additions & 1 deletion cmd/account-snapshot/internal/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"os"
"strconv"
"strings"

"github.com/matrix-org/complement/internal"
)

// LoadSyncData loads sync data from disk or by doing a /sync request
Expand Down Expand Up @@ -75,7 +77,14 @@ func doRequest(httpCli *http.Client, req *http.Request, token string) ([]byte, e
if err != nil {
return nil, fmt.Errorf("failed to perform request: %w", err)
}
defer res.Body.Close()
defer internal.CloseIO(
res.Body,
fmt.Sprintf(
"doRequest: response body from %s %s",
res.Request.Method,
res.Request.URL.String(),
),
)
if res.StatusCode != 200 {
return nil, fmt.Errorf("response returned %s", res.Status)
}
Expand Down
19 changes: 19 additions & 0 deletions federation/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package federation

import (
"bytes"
"context"
"crypto/ed25519"
"crypto/rand"
Expand All @@ -12,6 +13,7 @@ import (
"encoding/json"
"encoding/pem"
"fmt"
"io"
"io/ioutil"
"math/big"
"net"
Expand All @@ -32,6 +34,7 @@ import (

"github.com/matrix-org/complement/config"
"github.com/matrix-org/complement/ct"
"github.com/matrix-org/complement/internal"
)

// Subset of Deployment used in federation
Expand Down Expand Up @@ -278,6 +281,9 @@ func (s *Server) SendFederationRequest(
// DoFederationRequest signs and sends an arbitrary federation request from this server, and returns the response.
//
// The requests will be routed according to the deployment map in `deployment`.
//
// The caller does not need to worry about closing the returned `http.Response.Body` as
// this is handled automatically.
func (s *Server) DoFederationRequest(
ctx context.Context,
t ct.TestLike,
Expand All @@ -297,12 +303,25 @@ func (s *Server) DoFederationRequest(

var resp *http.Response
resp, err = httpClient.DoHTTPRequest(ctx, httpReq)
defer internal.CloseIO(resp.Body, "DoFederationRequest: federation response body")

if httpError, ok := err.(gomatrix.HTTPError); ok {
t.Logf("[SSAPI] %s %s%s => error(%d): %s (%s)", req.Method(), req.Destination(), req.RequestURI(), httpError.Code, err, time.Since(start))
} else if err == nil {
t.Logf("[SSAPI] %s %s%s => %d (%s)", req.Method(), req.Destination(), req.RequestURI(), resp.StatusCode, time.Since(start))
}

// Make a copy of the response body so that downstream callers can read it multiple
// times if needed and don't need to worry about closing it.
var respBody []byte
if resp.Body != nil {
respBody, err = io.ReadAll(resp.Body)
if err != nil {
ct.Fatalf(t, "CSAPI.Do failed to read response body for RetryUntil check: %s", err)
}
resp.Body = io.NopCloser(bytes.NewBuffer(respBody))
}

return resp, err
}

Expand Down
Loading
Loading