From 8a21fe84160ac1c7f7b7a077e77e92d7e633ed03 Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Wed, 12 May 2021 14:17:43 +0530 Subject: [PATCH] Renamed watcher to client as its scope widens to more work --- redshiftsink/cmd/redshiftsink/main.go | 2 +- .../controllers/realtime_calculator.go | 20 +++++++------- .../controllers/redshiftsink_controller.go | 26 +++++++++---------- .../pkg/kafka/{watcher.go => client.go} | 18 ++++++------- 4 files changed, 33 insertions(+), 33 deletions(-) rename redshiftsink/pkg/kafka/{watcher.go => client.go} (94%) diff --git a/redshiftsink/cmd/redshiftsink/main.go b/redshiftsink/cmd/redshiftsink/main.go index 5b67cc1f8..9489c8282 100644 --- a/redshiftsink/cmd/redshiftsink/main.go +++ b/redshiftsink/cmd/redshiftsink/main.go @@ -106,7 +106,7 @@ func main() { Log: ctrl.Log.WithName("controllers").WithName("RedshiftSink"), Scheme: mgr.GetScheme(), Recorder: mgr.GetEventRecorderFor("redshiftsink-reconciler"), - KafkaWatchers: new(sync.Map), + KafkaClients: new(sync.Map), KafkaTopicRegexes: new(sync.Map), KafkaTopicsCache: new(sync.Map), KafkaRealtimeCache: new(sync.Map), diff --git a/redshiftsink/controllers/realtime_calculator.go b/redshiftsink/controllers/realtime_calculator.go index 359fb8631..9c24ac186 100644 --- a/redshiftsink/controllers/realtime_calculator.go +++ b/redshiftsink/controllers/realtime_calculator.go @@ -16,9 +16,9 @@ type topicLast struct { } type realtimeCalculator struct { - rsk *tipocav1.RedshiftSink - watcher kafka.Watcher - cache *sync.Map + rsk *tipocav1.RedshiftSink + kafkaClient kafka.Client + cache *sync.Map batchersRealtime []string loadersRealtime []string @@ -31,14 +31,14 @@ type realtimeCalculator struct { func newRealtimeCalculator( rsk *tipocav1.RedshiftSink, - watcher kafka.Watcher, + kafkaClient kafka.Client, cache *sync.Map, desiredVersion string, ) *realtimeCalculator { return &realtimeCalculator{ rsk: rsk, - watcher: watcher, + kafkaClient: kafkaClient, cache: cache, batchersLast: []topicLast{}, loadersLast: []topicLast{}, @@ -143,7 +143,7 @@ func (r *realtimeCalculator) fetchRealtimeInfo( } // batcher's lag analysis: a) get last - batcherLast, err := r.watcher.LastOffset(topic, 0) + batcherLast, err := r.kafkaClient.LastOffset(topic, 0) if err != nil { return info, fmt.Errorf("Error getting last offset for %s", topic) } @@ -151,7 +151,7 @@ func (r *realtimeCalculator) fetchRealtimeInfo( klog.V(4).Infof("rsk/%s %s, lastOffset=%v", r.rsk.Name, topic, batcherLast) // batcher's lag analysis: b) get current - batcherCurrent, err := r.watcher.CurrentOffset( + batcherCurrent, err := r.kafkaClient.CurrentOffset( consumerGroupID(r.rsk.Name, r.rsk.Namespace, desiredGroupID, "-batcher"), topic, 0, @@ -173,7 +173,7 @@ func (r *realtimeCalculator) fetchRealtimeInfo( } // loader's lag analysis: a) get last - loaderLast, err := r.watcher.LastOffset(*loaderTopic, 0) + loaderLast, err := r.kafkaClient.LastOffset(*loaderTopic, 0) if err != nil { return info, fmt.Errorf("Error getting last offset for %s", *loaderTopic) } @@ -181,7 +181,7 @@ func (r *realtimeCalculator) fetchRealtimeInfo( klog.V(4).Infof("rsk/%s %s, lastOffset=%v", r.rsk.Name, *loaderTopic, loaderLast) // loader's lag analysis: b) get current - loaderCurrent, err := r.watcher.CurrentOffset( + loaderCurrent, err := r.kafkaClient.CurrentOffset( consumerGroupID(r.rsk.Name, r.rsk.Namespace, desiredGroupID, "-loader"), *loaderTopic, 0, @@ -220,7 +220,7 @@ func (r *realtimeCalculator) calculate(reloading []string, currentRealtime []str realtimeTopics := []string{} current := toMap(currentRealtime) - allTopics, err := r.watcher.Topics() + allTopics, err := r.kafkaClient.Topics() if err != nil { klog.Errorf( "Ignoring realtime update. Error fetching all topics, err:%v", diff --git a/redshiftsink/controllers/redshiftsink_controller.go b/redshiftsink/controllers/redshiftsink_controller.go index e371b05f1..a0bde3d38 100644 --- a/redshiftsink/controllers/redshiftsink_controller.go +++ b/redshiftsink/controllers/redshiftsink_controller.go @@ -51,7 +51,7 @@ type RedshiftSinkReconciler struct { Recorder record.EventRecorder KafkaTopicRegexes *sync.Map - KafkaWatchers *sync.Map + KafkaClients *sync.Map KafkaTopicsCache *sync.Map KafkaRealtimeCache *sync.Map ReleaseCache *sync.Map @@ -182,7 +182,7 @@ func resultRequeueMilliSeconds(ms int) ctrl.Result { } func (r *RedshiftSinkReconciler) fetchLatestTopics( - kafkaWatcher kafka.Watcher, + kafkaClient kafka.Client, regexes string, ) ( []string, @@ -193,7 +193,7 @@ func (r *RedshiftSinkReconciler) fetchLatestTopics( topicsAppended := make(map[string]bool) expressions := strings.Split(regexes, ",") - allTopics, err := kafkaWatcher.Topics() + allTopics, err := kafkaClient.Topics() if err != nil { return []string{}, err } @@ -262,11 +262,11 @@ func (r *RedshiftSinkReconciler) makeTLSConfig(secret map[string]string) (*kafka return &configTLS, nil } -func (r *RedshiftSinkReconciler) loadKafkaWatcher( +func (r *RedshiftSinkReconciler) loadKafkaClient( rsk *tipocav1.RedshiftSink, tlsConfig *kafka.TLSConfig, ) ( - kafka.Watcher, + kafka.Client, error, ) { values := "" @@ -281,19 +281,19 @@ func (r *RedshiftSinkReconciler) loadKafkaWatcher( values += kafkaVersion hash := fmt.Sprintf("%x", sha1.Sum([]byte(values))) - watcher, ok := r.KafkaWatchers.Load(hash) + kc, ok := r.KafkaClients.Load(hash) if ok { - return watcher.(kafka.Watcher), nil + return kc.(kafka.Client), nil } else { - watcher, err := kafka.NewWatcher( + kc, err := kafka.NewClient( brokers, kafkaVersion, *tlsConfig, ) if err != nil { return nil, err } - r.KafkaWatchers.Store(hash, watcher) + r.KafkaClients.Store(hash, kc) - return watcher, nil + return kc, nil } } @@ -323,13 +323,13 @@ func (r *RedshiftSinkReconciler) reconcile( return result, events, err } - kafkaWatcher, err := r.loadKafkaWatcher(rsk, tlsConfig) + kafkaClient, err := r.loadKafkaClient(rsk, tlsConfig) if err != nil { return result, events, fmt.Errorf("Error fetching kafka watcher, %v", err) } kafkaTopics, err := r.fetchLatestTopics( - kafkaWatcher, rsk.Spec.KafkaTopicRegexes, + kafkaClient, rsk.Spec.KafkaTopicRegexes, ) if err != nil { return result, events, fmt.Errorf("Error fetching topics, err: %v", err) @@ -440,7 +440,7 @@ func (r *RedshiftSinkReconciler) reconcile( // Realtime status is always calculated to keep the CurrentOffset // info updated in the rsk status. This is required so that low throughput // release do not get blocked due to missing consumer group currentOffset. - calc := newRealtimeCalculator(rsk, kafkaWatcher, r.KafkaRealtimeCache, desiredMaskVersion) + calc := newRealtimeCalculator(rsk, kafkaClient, r.KafkaRealtimeCache, desiredMaskVersion) currentRealtime := calc.calculate(status.reloading, status.realtime) if len(status.reloading) > 0 { klog.V(2).Infof("rsk/%v batchersRealtime: %d / %d (current=%d)", rsk.Name, len(calc.batchersRealtime), len(status.reloading), len(rsk.Status.BatcherReloadingTopics)) diff --git a/redshiftsink/pkg/kafka/watcher.go b/redshiftsink/pkg/kafka/client.go similarity index 94% rename from redshiftsink/pkg/kafka/watcher.go rename to redshiftsink/pkg/kafka/client.go index 915cd4c68..1b85d2a29 100644 --- a/redshiftsink/pkg/kafka/watcher.go +++ b/redshiftsink/pkg/kafka/client.go @@ -9,7 +9,7 @@ import ( "github.com/practo/klog/v2" ) -type Watcher interface { +type Client interface { // Topics return all the topics present in kafka, it keeps a cache // which is refreshed every cacheValidity seconds Topics() ([]string, error) @@ -23,7 +23,7 @@ type Watcher interface { CurrentOffset(id string, topic string, partition int32) (int64, error) } -type kafkaWatch struct { +type kafkaClient struct { client sarama.Client cacheValidity time.Duration lastTopicRefreshTime *int64 @@ -35,12 +35,12 @@ type kafkaWatch struct { topics []string } -func NewWatcher( +func NewClient( brokers []string, version string, configTLS TLSConfig, ) ( - Watcher, error, + Client, error, ) { v, err := sarama.ParseKafkaVersion(version) if err != nil { @@ -63,7 +63,7 @@ func NewWatcher( return nil, fmt.Errorf("Error creating client: %v\n", err) } - return &kafkaWatch{ + return &kafkaClient{ client: client, cacheValidity: time.Second * time.Duration(30), lastTopicRefreshTime: nil, @@ -85,7 +85,7 @@ func cacheValid(validity time.Duration, lastCachedTime *int64) bool { // Topics get the latest topics after refreshing the client with the latest // it caches it for t.cacheValidity -func (t *kafkaWatch) Topics() ([]string, error) { +func (t *kafkaClient) Topics() ([]string, error) { if cacheValid(t.cacheValidity, t.lastTopicRefreshTime) { return t.topics, nil } @@ -113,11 +113,11 @@ func (t *kafkaWatch) Topics() ([]string, error) { return t.topics, nil } -func (t *kafkaWatch) LastOffset(topic string, partition int32) (int64, error) { +func (t *kafkaClient) LastOffset(topic string, partition int32) (int64, error) { return t.client.GetOffset(topic, partition, sarama.OffsetNewest) } -func (t *kafkaWatch) fetchCurrentOffset( +func (t *kafkaClient) fetchCurrentOffset( id string, topic string, partition int32, @@ -179,7 +179,7 @@ func (t *kafkaWatch) fetchCurrentOffset( return defaultCurrentOffset, nil } -func (t *kafkaWatch) CurrentOffset( +func (t *kafkaClient) CurrentOffset( id string, topic string, partition int32,