-
Notifications
You must be signed in to change notification settings - Fork 18
/
client.go
157 lines (134 loc) · 3.78 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package healer
import (
"github.com/go-logr/logr"
)
type Client struct {
clientID string
logger logr.Logger
brokers *Brokers
}
// NewClient creates a new Client
func NewClient(bs, clientID string) (*Client, error) {
var err error
client := &Client{
clientID: clientID,
logger: GetLogger().WithName(clientID),
}
client.brokers, err = NewBrokers(bs)
return client, err
}
func (client *Client) WithLogger(logger logr.Logger) *Client {
client.logger = logger
return client
}
// Close closes the connections to kafka brokers
func (c *Client) Close() {
c.brokers.Close()
}
// RefreshMetadata refreshes metadata for c.brokers
func (c *Client) RefreshMetadata() {
}
// ListGroups lists all consumer groups from all brokers
func (c *Client) ListGroups() (groups []string, err error) {
for _, brokerinfo := range c.brokers.BrokersInfo() {
broker, err := c.brokers.GetBroker(brokerinfo.NodeID)
if err != nil {
c.logger.Error(err, "get broker failed", "NodeID", brokerinfo.NodeID)
return groups, err
}
response, err := broker.RequestListGroups(c.clientID)
if err != nil {
c.logger.Error(err, "get group list failed", "broker", broker.GetAddress())
return groups, err
}
for _, g := range response.Groups {
groups = append(groups, g.GroupID)
}
}
return groups, nil
}
func (c *Client) DescribeLogDirs(topics []string) (map[int32]DescribeLogDirsResponse, error) {
c.logger.Info("describe logdirs", "topics", topics)
meta, err := c.brokers.RequestMetaData(c.clientID, topics)
if err != nil {
return nil, err
}
type tp struct {
Topic string
PartitionID int32
}
brokerPartitions := make(map[int32][]tp)
for _, topic := range meta.TopicMetadatas {
topicName := topic.TopicName
for _, partition := range topic.PartitionMetadatas {
pid := partition.PartitionID
for _, b := range partition.Replicas {
if _, ok := brokerPartitions[b]; !ok {
brokerPartitions[b] = []tp{
{
Topic: topicName,
PartitionID: pid,
},
}
} else {
brokerPartitions[b] = append(brokerPartitions[b], tp{
Topic: topicName,
PartitionID: pid,
})
}
}
}
}
c.logger.Info("broker partitions", "brokerPartitions", brokerPartitions)
rst := make(map[int32]DescribeLogDirsResponse)
for b, topicPartitions := range brokerPartitions {
req := NewDescribeLogDirsRequest(c.clientID, nil)
for _, tp := range topicPartitions {
req.AddTopicPartition(tp.Topic, tp.PartitionID)
}
broker, err := c.brokers.GetBroker(b)
if err != nil {
return nil, err
}
resp, err := broker.RequestAndGet(req)
if err != nil {
c.logger.Error(err, "describe logdirs failed", "broker", broker.String())
continue
}
topicSet := make(map[string]struct{})
for _, t := range topics {
topicSet[t] = struct{}{}
}
r := resp.(DescribeLogDirsResponse)
rs := r.Results
for i := range rs {
theTopics := rs[i].Topics
filterdTopics := make([]DescribeLogDirsResponseTopic, 0)
for i := range theTopics {
if _, ok := topicSet[theTopics[i].TopicName]; ok {
filterdTopics = append(filterdTopics, theTopics[i])
}
}
rs[i].Topics = filterdTopics
}
filteredTopicResults := make([]DescribeLogDirsResponseResult, 0)
for i := range rs {
if len(rs[i].Topics) > 0 {
filteredTopicResults = append(filteredTopicResults, rs[i])
}
}
r.Results = filteredTopicResults
rst[b] = resp.(DescribeLogDirsResponse)
}
return rst, nil
}
func (c *Client) DeleteTopics(topics []string, timeoutMs int32) (r DeleteTopicsResponse, err error) {
c.logger.Info("delete topics", "topics", topics)
req := NewDeleteTopicsRequest(c.clientID, topics, timeoutMs)
controller, err := c.brokers.GetController()
if err != nil {
return r, err
}
resp, err := controller.RequestAndGet(req)
return resp.(DeleteTopicsResponse), err
}