-
Notifications
You must be signed in to change notification settings - Fork 18
/
request.go
130 lines (111 loc) · 4.11 KB
/
request.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package healer
import (
"encoding/binary"
)
// TODO type define ApiKey and change api_XXX to ApiKey type
const (
API_ProduceRequest uint16 = 0
API_FetchRequest uint16 = 1
API_OffsetRequest uint16 = 2
API_MetadataRequest uint16 = 3
API_OffsetCommitRequest uint16 = 8
API_OffsetFetchRequest uint16 = 9
API_FindCoordinator uint16 = 10
API_JoinGroup uint16 = 11
API_Heartbeat uint16 = 12
API_LeaveGroup uint16 = 13
API_SyncGroup uint16 = 14
API_DescribeGroups uint16 = 15
API_ListGroups uint16 = 16
API_SaslHandshake uint16 = 17
API_ApiVersions uint16 = 18
API_CreateTopics uint16 = 19
API_DeleteTopics uint16 = 20
API_DescribeConfigs uint16 = 32
API_AlterConfigs uint16 = 33
API_DescribeLogDirs uint16 = 35
API_SaslAuthenticate uint16 = 36
API_CreatePartitions uint16 = 37
API_Delete_Groups uint16 = 42
API_ElectLeaders uint16 = 43
API_IncrementalAlterConfigs uint16 = 44
API_AlterPartitionReassignments uint16 = 45
API_ListPartitionReassignments uint16 = 46
)
// healer only implements these versions of the protocol, only version 0 is supported if not defined here
// It must be sorted from high to low
var availableVersions map[uint16][]uint16 = map[uint16][]uint16{
API_MetadataRequest: {7, 1},
API_FetchRequest: {10, 7, 0},
API_OffsetRequest: {1, 0},
API_CreatePartitions: {2, 0},
API_SaslHandshake: {1, 0},
API_OffsetCommitRequest: {2, 0},
API_OffsetFetchRequest: {1, 0},
}
// RequestHeader is the request header, which is used in all requests. It contains apiKey, apiVersion, correlationID, clientID
type RequestHeader struct {
APIKey uint16
APIVersion uint16
CorrelationID uint32
ClientID string
}
func (requestHeader *RequestHeader) length() int {
return 10 + len(requestHeader.ClientID)
}
// Encode encodes request header to []byte. this is used the all request
// If the playload is too small, Encode will panic.
func (requestHeader *RequestHeader) Encode(payload []byte) int {
offset := 0
binary.BigEndian.PutUint16(payload[offset:], requestHeader.APIKey)
offset += 2
binary.BigEndian.PutUint16(payload[offset:], requestHeader.APIVersion)
offset += 2
binary.BigEndian.PutUint32(payload[offset:], uint32(requestHeader.CorrelationID))
offset += 4
binary.BigEndian.PutUint16(payload[offset:], uint16(len(requestHeader.ClientID)))
offset += 2
copy(payload[offset:], requestHeader.ClientID)
offset += len(requestHeader.ClientID)
return offset
}
// DecodeRequestHeader decodes request header from []byte, just used in test cases
func DecodeRequestHeader(payload []byte) (requestHeader RequestHeader, offset int) {
requestHeader.APIKey = binary.BigEndian.Uint16(payload)
offset += 2
requestHeader.APIVersion = binary.BigEndian.Uint16(payload[offset:])
offset += 2
requestHeader.CorrelationID = binary.BigEndian.Uint32(payload[offset:])
offset += 4
clientIDLength := binary.BigEndian.Uint16(payload[offset:])
offset += 2
requestHeader.ClientID = string(payload[offset : offset+int(clientIDLength)])
offset += int(clientIDLength)
return
}
// API returns APiKey of the request(which hold the request header)
func (requestHeader *RequestHeader) API() uint16 {
return requestHeader.APIKey
}
// Version returns API version of the request
func (requestHeader *RequestHeader) Version() uint16 {
return requestHeader.APIVersion
}
// SetCorrelationID set request's correlationID
func (requestHeader *RequestHeader) SetCorrelationID(c uint32) {
requestHeader.CorrelationID = c
}
// SetVersion set request's apiversion
func (requestHeader *RequestHeader) SetVersion(version uint16) {
// if requestHeader.APIVersion == -1 {
// return
// }
requestHeader.APIVersion = version
}
// Request is implemented by all detailed request
type Request interface {
Encode(version uint16) []byte
API() uint16
SetCorrelationID(uint32)
SetVersion(uint16)
}