Skip to content

Commit 61becf6

Browse files
Support newer api versions API
1 parent 97b4cb2 commit 61becf6

File tree

1 file changed

+18
-13
lines changed

1 file changed

+18
-13
lines changed

proxy/protocol/responses.go

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ const (
1111
apiKeyMetadata = 3
1212
apiKeyFindCoordinator = 10
1313
apiKeySaslHandshake = 17
14-
apiKeyApiApiVersions = 18
14+
apiKeyApiVersions = 18
1515

1616
brokersKeyName = "brokers"
1717
hostKeyName = "host"
@@ -26,28 +26,25 @@ var (
2626
metadataResponseSchemaVersions = createMetadataResponseSchemaVersions()
2727
findCoordinatorResponseSchemaVersions = createFindCoordinatorResponseSchemaVersions()
2828
apiVersionsResponseSchemaVersions = createApiVersionsResponseSchemaVersions()
29-
apiVersionSchema = createApiVersionSchema()
3029
)
3130

32-
func createApiVersionSchema() Schema {
33-
return NewSchema("api_version",
31+
func createApiVersionsResponseSchemaVersions() []Schema {
32+
apiVersionV0 := NewSchema("api_version",
3433
&Mfield{Name: "api_key", Ty: TypeInt16},
3534
&Mfield{Name: "min_version", Ty: TypeInt16},
3635
&Mfield{Name: "max_version", Ty: TypeInt16},
3736
)
38-
}
3937

40-
func createApiVersionsResponseSchemaVersions() []Schema {
4138
// Version 0: error_code + api_keys
4239
apiVersionsResponseV0 := NewSchema("api_versions_response_v0",
4340
&Mfield{Name: "error_code", Ty: TypeInt16},
44-
&Array{Name: "api_keys", Ty: apiVersionSchema},
41+
&Array{Name: "api_keys", Ty: apiVersionV0},
4542
)
4643

4744
// Version 1: error_code + api_keys + throttle_time_ms
4845
apiVersionsResponseV1 := NewSchema("api_versions_response_v1",
4946
&Mfield{Name: "error_code", Ty: TypeInt16},
50-
&Array{Name: "api_keys", Ty: apiVersionSchema},
47+
&Array{Name: "api_keys", Ty: apiVersionV0},
5148
&Mfield{Name: "throttle_time_ms", Ty: TypeInt32},
5249
)
5350

@@ -389,8 +386,8 @@ func modifyApiVersionsResponse(decodedStruct *Struct, fn config.NetAddressMappin
389386
return errors.New("decoded struct must not be nil")
390387
}
391388

392-
versions, ok := decodedStruct.Get("api_keys").([]interface{})
393-
if !ok {
389+
versions, ok := decodedStruct.Get("api_keys").([]any)
390+
if !ok || len(versions) == 0 {
394391
return errors.New("versions not found")
395392
}
396393
for _, versionElement := range versions {
@@ -400,9 +397,17 @@ func modifyApiVersionsResponse(decodedStruct *Struct, fn config.NetAddressMappin
400397
}
401398
}
402399

400+
schema := versions[0].(*Struct).GetSchema()
401+
402+
values := []any{int16(17), int16(0), int16(1)}
403+
404+
if len(schema.GetFields()) > 3 {
405+
values = append(values, []rawTaggedField{})
406+
}
407+
403408
versions = append(versions, &Struct{
404-
Schema: apiVersionSchema,
405-
Values: []any{int16(17), int16(0), int16(0)},
409+
Schema: schema,
410+
Values: values,
406411
})
407412

408413
return decodedStruct.Replace("api_keys", versions)
@@ -550,7 +555,7 @@ func (f *responseModifier) Apply(resp []byte) ([]byte, error) {
550555

551556
func GetResponseModifier(apiKey int16, apiVersion int16, addressMappingFunc config.NetAddressMappingFunc) (ResponseModifier, error) {
552557
switch apiKey {
553-
case apiKeyApiApiVersions:
558+
case apiKeyApiVersions:
554559
return newResponseModifier(apiKey, apiVersion, addressMappingFunc, apiVersionsResponseSchemaVersions, modifyApiVersionsResponse)
555560
case apiKeyMetadata:
556561
return newResponseModifier(apiKey, apiVersion, addressMappingFunc, metadataResponseSchemaVersions, modifyMetadataResponse)

0 commit comments

Comments
 (0)