-
Notifications
You must be signed in to change notification settings - Fork 18
/
alter_partition_reassignments_request.go
148 lines (121 loc) · 4.23 KB
/
alter_partition_reassignments_request.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package healer
import (
"encoding/binary"
)
// AlterPartitionReassignmentsRequest is the request of AlterPartitionReassignmentsRequest
type AlterPartitionReassignmentsRequest struct {
*RequestHeader
TimeoutMs int32 `json:"timeout_ms"`
Topics []*AlterPartitionReassignmentsTopic `json:"topics"`
}
// NewAlterPartitionReassignmentsRequest is used to create a new AlterPartitionReassignmentsRequest
func NewAlterPartitionReassignmentsRequest(timeoutMs int32) (r AlterPartitionReassignmentsRequest) {
r.RequestHeader = &RequestHeader{
APIKey: API_AlterPartitionReassignments,
APIVersion: 0,
}
r.TimeoutMs = timeoutMs
return r
}
// AddAssignment is used to add a new assignment to AlterPartitionReassignmentsRequest
// It do not verify the assignment already exists or not
func (r *AlterPartitionReassignmentsRequest) AddAssignment(topic string, partitionID int32, replicas []int32) {
for _, t := range r.Topics {
if t.TopicName == topic {
t.Partitions = append(t.Partitions, &AlterPartitionReassignmentsPartition{
PartitionID: partitionID,
Replicas: replicas,
})
return
}
}
r.Topics = append(r.Topics, &AlterPartitionReassignmentsTopic{
TopicName: topic,
Partitions: []*AlterPartitionReassignmentsPartition{
{
PartitionID: partitionID,
Replicas: replicas,
},
},
})
}
func (r *AlterPartitionReassignmentsRequest) length(version uint16) (requestLength int) {
requestLength = r.RequestHeader.length()
requestLength += 4 // TimeoutMs
requestLength += 4 // len(Topics)
for _, topic := range r.Topics {
requestLength += topic.length(version)
}
requestLength++ // TAG_BUFFER
return requestLength
}
// Encode encodes AlterPartitionReassignmentsRequest to []byte
func (r *AlterPartitionReassignmentsRequest) Encode(version uint16) []byte {
requestLength := r.length(version)
payload := make([]byte, requestLength+4)
offset := 4 // skip length field
defer func() {
binary.BigEndian.PutUint32(payload, uint32(offset-4))
}()
offset += r.RequestHeader.Encode(payload[offset:])
offset++ // TAG_BUFFER in header
binary.BigEndian.PutUint32(payload[offset:], uint32(r.TimeoutMs))
offset += 4
offset += binary.PutUvarint(payload[offset:], 1+uint64(len(r.Topics)))
for _, topic := range r.Topics {
offset += topic.encode(payload[offset:])
}
offset++ // TAG_BUFFER
binary.BigEndian.PutUint32(payload[0:], uint32(offset-4))
return payload[:offset]
}
// AlterPartitionReassignmentsTopic is the topic of AlterPartitionReassignmentsRequest
type AlterPartitionReassignmentsTopic struct {
TopicName string `json:"topic_name"`
Partitions []*AlterPartitionReassignmentsPartition `json:"partitions"`
// TAG_BUFFER
}
func (r *AlterPartitionReassignmentsTopic) length(version uint16) (length int) {
length = 2 // len(TopicName)
length += len(r.TopicName)
length += 4 // len(Partitions)
for _, partition := range r.Partitions {
length += partition.length(version)
}
length++ // TAG_BUFFER
return
}
func (r *AlterPartitionReassignmentsTopic) encode(payload []byte) (offset int) {
offset += binary.PutUvarint(payload[offset:], 1+uint64(len(r.TopicName)))
offset += copy(payload[offset:], r.TopicName)
offset += binary.PutUvarint(payload[offset:], 1+uint64(len(r.Partitions)))
for _, partition := range r.Partitions {
offset += partition.encode(payload[offset:])
}
offset++ // TAG_BUFFER
return offset
}
// AlterPartitionReassignmentsPartition is the partition of AlterPartitionReassignmentsTopic
type AlterPartitionReassignmentsPartition struct {
PartitionID int32 `json:"partition_id"`
Replicas []int32 `json:"replicas"`
// TAG_BUFFER
}
func (r *AlterPartitionReassignmentsPartition) length(version uint16) (length int) {
length = 4 // PartitionID
length += 4 // len(Replicas)
length += 4 * len(r.Replicas)
length++ // TAG_BUFFER
return
}
func (r *AlterPartitionReassignmentsPartition) encode(payload []byte) (offset int) {
binary.BigEndian.PutUint32(payload[offset:], uint32(r.PartitionID))
offset += 4
offset += binary.PutUvarint(payload[offset:], 1+uint64(len(r.Replicas)))
for _, replica := range r.Replicas {
binary.BigEndian.PutUint32(payload[offset:], uint32(replica))
offset += 4
}
offset++ // TAG_BUFFER
return offset
}