Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reset offset to latest for zero throughout topics #221

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion redshiftsink/cmd/redshiftsink/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func main() {
Log: ctrl.Log.WithName("controllers").WithName("RedshiftSink"),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("redshiftsink-reconciler"),
KafkaWatchers: new(sync.Map),
KafkaClients: new(sync.Map),
KafkaTopicRegexes: new(sync.Map),
KafkaTopicsCache: new(sync.Map),
KafkaRealtimeCache: new(sync.Map),
Expand Down
20 changes: 10 additions & 10 deletions redshiftsink/controllers/realtime_calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ type topicLast struct {
}

type realtimeCalculator struct {
rsk *tipocav1.RedshiftSink
watcher kafka.Watcher
cache *sync.Map
rsk *tipocav1.RedshiftSink
kafkaClient kafka.Client
cache *sync.Map

batchersRealtime []string
loadersRealtime []string
Expand All @@ -31,14 +31,14 @@ type realtimeCalculator struct {

func newRealtimeCalculator(
rsk *tipocav1.RedshiftSink,
watcher kafka.Watcher,
kafkaClient kafka.Client,
cache *sync.Map,
desiredVersion string,
) *realtimeCalculator {

return &realtimeCalculator{
rsk: rsk,
watcher: watcher,
kafkaClient: kafkaClient,
cache: cache,
batchersLast: []topicLast{},
loadersLast: []topicLast{},
Expand Down Expand Up @@ -143,15 +143,15 @@ func (r *realtimeCalculator) fetchRealtimeInfo(
}

// batcher's lag analysis: a) get last
batcherLast, err := r.watcher.LastOffset(topic, 0)
batcherLast, err := r.kafkaClient.LastOffset(topic, 0)
if err != nil {
return info, fmt.Errorf("Error getting last offset for %s", topic)
}
info.batcher.last = &batcherLast
klog.V(4).Infof("rsk/%s %s, lastOffset=%v", r.rsk.Name, topic, batcherLast)

// batcher's lag analysis: b) get current
batcherCurrent, err := r.watcher.CurrentOffset(
batcherCurrent, err := r.kafkaClient.CurrentOffset(
consumerGroupID(r.rsk.Name, r.rsk.Namespace, desiredGroupID, "-batcher"),
topic,
0,
Expand All @@ -173,15 +173,15 @@ func (r *realtimeCalculator) fetchRealtimeInfo(
}

// loader's lag analysis: a) get last
loaderLast, err := r.watcher.LastOffset(*loaderTopic, 0)
loaderLast, err := r.kafkaClient.LastOffset(*loaderTopic, 0)
if err != nil {
return info, fmt.Errorf("Error getting last offset for %s", *loaderTopic)
}
info.loader.last = &loaderLast
klog.V(4).Infof("rsk/%s %s, lastOffset=%v", r.rsk.Name, *loaderTopic, loaderLast)

// loader's lag analysis: b) get current
loaderCurrent, err := r.watcher.CurrentOffset(
loaderCurrent, err := r.kafkaClient.CurrentOffset(
consumerGroupID(r.rsk.Name, r.rsk.Namespace, desiredGroupID, "-loader"),
*loaderTopic,
0,
Expand Down Expand Up @@ -220,7 +220,7 @@ func (r *realtimeCalculator) calculate(reloading []string, currentRealtime []str
realtimeTopics := []string{}
current := toMap(currentRealtime)

allTopics, err := r.watcher.Topics()
allTopics, err := r.kafkaClient.Topics()
if err != nil {
klog.Errorf(
"Ignoring realtime update. Error fetching all topics, err:%v",
Expand Down
26 changes: 13 additions & 13 deletions redshiftsink/controllers/redshiftsink_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type RedshiftSinkReconciler struct {
Recorder record.EventRecorder

KafkaTopicRegexes *sync.Map
KafkaWatchers *sync.Map
KafkaClients *sync.Map
KafkaTopicsCache *sync.Map
KafkaRealtimeCache *sync.Map
ReleaseCache *sync.Map
Expand Down Expand Up @@ -182,7 +182,7 @@ func resultRequeueMilliSeconds(ms int) ctrl.Result {
}

func (r *RedshiftSinkReconciler) fetchLatestTopics(
kafkaWatcher kafka.Watcher,
kafkaClient kafka.Client,
regexes string,
) (
[]string,
Expand All @@ -193,7 +193,7 @@ func (r *RedshiftSinkReconciler) fetchLatestTopics(
topicsAppended := make(map[string]bool)
expressions := strings.Split(regexes, ",")

allTopics, err := kafkaWatcher.Topics()
allTopics, err := kafkaClient.Topics()
if err != nil {
return []string{}, err
}
Expand Down Expand Up @@ -262,11 +262,11 @@ func (r *RedshiftSinkReconciler) makeTLSConfig(secret map[string]string) (*kafka
return &configTLS, nil
}

func (r *RedshiftSinkReconciler) loadKafkaWatcher(
func (r *RedshiftSinkReconciler) loadKafkaClient(
rsk *tipocav1.RedshiftSink,
tlsConfig *kafka.TLSConfig,
) (
kafka.Watcher,
kafka.Client,
error,
) {
values := ""
Expand All @@ -281,19 +281,19 @@ func (r *RedshiftSinkReconciler) loadKafkaWatcher(
values += kafkaVersion
hash := fmt.Sprintf("%x", sha1.Sum([]byte(values)))

watcher, ok := r.KafkaWatchers.Load(hash)
kc, ok := r.KafkaClients.Load(hash)
if ok {
return watcher.(kafka.Watcher), nil
return kc.(kafka.Client), nil
} else {
watcher, err := kafka.NewWatcher(
kc, err := kafka.NewClient(
brokers, kafkaVersion, *tlsConfig,
)
if err != nil {
return nil, err
}
r.KafkaWatchers.Store(hash, watcher)
r.KafkaClients.Store(hash, kc)

return watcher, nil
return kc, nil
}
}

Expand Down Expand Up @@ -323,13 +323,13 @@ func (r *RedshiftSinkReconciler) reconcile(
return result, events, err
}

kafkaWatcher, err := r.loadKafkaWatcher(rsk, tlsConfig)
kafkaClient, err := r.loadKafkaClient(rsk, tlsConfig)
if err != nil {
return result, events, fmt.Errorf("Error fetching kafka watcher, %v", err)
}

kafkaTopics, err := r.fetchLatestTopics(
kafkaWatcher, rsk.Spec.KafkaTopicRegexes,
kafkaClient, rsk.Spec.KafkaTopicRegexes,
)
if err != nil {
return result, events, fmt.Errorf("Error fetching topics, err: %v", err)
Expand Down Expand Up @@ -440,7 +440,7 @@ func (r *RedshiftSinkReconciler) reconcile(
// Realtime status is always calculated to keep the CurrentOffset
// info updated in the rsk status. This is required so that low throughput
// release do not get blocked due to missing consumer group currentOffset.
calc := newRealtimeCalculator(rsk, kafkaWatcher, r.KafkaRealtimeCache, desiredMaskVersion)
calc := newRealtimeCalculator(rsk, kafkaClient, r.KafkaRealtimeCache, desiredMaskVersion)
currentRealtime := calc.calculate(status.reloading, status.realtime)
if len(status.reloading) > 0 {
klog.V(2).Infof("rsk/%v batchersRealtime: %d / %d (current=%d)", rsk.Name, len(calc.batchersRealtime), len(status.reloading), len(rsk.Status.BatcherReloadingTopics))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/practo/klog/v2"
)

type Watcher interface {
type Client interface {
// Topics return all the topics present in kafka, it keeps a cache
// which is refreshed every cacheValidity seconds
Topics() ([]string, error)
Expand All @@ -23,7 +23,7 @@ type Watcher interface {
CurrentOffset(id string, topic string, partition int32) (int64, error)
}

type kafkaWatch struct {
type kafkaClient struct {
client sarama.Client
cacheValidity time.Duration
lastTopicRefreshTime *int64
Expand All @@ -35,12 +35,12 @@ type kafkaWatch struct {
topics []string
}

func NewWatcher(
func NewClient(
brokers []string,
version string,
configTLS TLSConfig,
) (
Watcher, error,
Client, error,
) {
v, err := sarama.ParseKafkaVersion(version)
if err != nil {
Expand All @@ -63,7 +63,7 @@ func NewWatcher(
return nil, fmt.Errorf("Error creating client: %v\n", err)
}

return &kafkaWatch{
return &kafkaClient{
client: client,
cacheValidity: time.Second * time.Duration(30),
lastTopicRefreshTime: nil,
Expand All @@ -85,7 +85,7 @@ func cacheValid(validity time.Duration, lastCachedTime *int64) bool {

// Topics get the latest topics after refreshing the client with the latest
// it caches it for t.cacheValidity
func (t *kafkaWatch) Topics() ([]string, error) {
func (t *kafkaClient) Topics() ([]string, error) {
if cacheValid(t.cacheValidity, t.lastTopicRefreshTime) {
return t.topics, nil
}
Expand Down Expand Up @@ -113,11 +113,11 @@ func (t *kafkaWatch) Topics() ([]string, error) {
return t.topics, nil
}

func (t *kafkaWatch) LastOffset(topic string, partition int32) (int64, error) {
func (t *kafkaClient) LastOffset(topic string, partition int32) (int64, error) {
return t.client.GetOffset(topic, partition, sarama.OffsetNewest)
}

func (t *kafkaWatch) fetchCurrentOffset(
func (t *kafkaClient) fetchCurrentOffset(
id string,
topic string,
partition int32,
Expand Down Expand Up @@ -179,7 +179,7 @@ func (t *kafkaWatch) fetchCurrentOffset(
return defaultCurrentOffset, nil
}

func (t *kafkaWatch) CurrentOffset(
func (t *kafkaClient) CurrentOffset(
id string,
topic string,
partition int32,
Expand Down