Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SinkGroupSpec, DeploymentUnit, MaxBytesPerBatch #172

Merged
merged 49 commits into from
Apr 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
f2a7e3a
MaxBytesPerBatch is better than maxSize
alok87 Mar 21, 2021
458476a
Deployment Unit and SinkGroup spec
alok87 Mar 22, 2021
d8a6dac
Operator changes for SinkGroupSpec, DeploymentUnit, MaxBytesPerBatch
alok87 Mar 22, 2021
6b61180
Documentation improvements
alok87 Mar 23, 2021
ded4847
Loader changes for the new spec, resource Quanitity
alok87 Mar 23, 2021
0042a4e
Fix the spec
alok87 Mar 23, 2021
80a4b7b
Fix test
alok87 Mar 24, 2021
9101b8d
Fixes after self review
alok87 Mar 24, 2021
b1868be
Make deprecated fields optional
alok87 Mar 24, 2021
0db2ec1
Cleanup bug fix: name should be object name
alok87 Mar 24, 2021
e728eb6
Add omitempty; needed for deprecated particularly
alok87 Mar 24, 2021
14d89e2
Bug fix for image backward compatibility
alok87 Mar 24, 2021
42c004a
Bug fix for resource and tolerations backwardness
alok87 Mar 24, 2021
e7b1c6e
Use counter and not gauge
alok87 Mar 24, 2021
eff35eb
Unit configuration is parallel now and part of rsk spec
alok87 Mar 25, 2021
bca4380
Remove status info
alok87 Mar 25, 2021
8d4a484
Fix maxTopics not being set bug
alok87 Mar 25, 2021
774455f
Log improvements
alok87 Mar 25, 2021
509801a
Sort all states
alok87 Mar 26, 2021
7bacc5d
Realtime calculator refactored out.
alok87 Mar 26, 2021
3e66455
S3 path bug; make unique using consumerGroupID
alok87 Mar 27, 2021
9bcedd7
Keep consumerGroupID the first dir
alok87 Mar 27, 2021
02e1859
Unit allocation
alok87 Mar 28, 2021
8e4d327
Defaults by sinkgroup
alok87 Mar 29, 2021
cc33a7a
Realtime topics should run as main sink group spec
alok87 Mar 29, 2021
3effcad
SinkGroup name before tags
alok87 Mar 29, 2021
6118345
rm dead conf
alok87 Mar 29, 2021
8fc79d0
rm allowshuffle
alok87 Mar 30, 2021
cff86dd
Fix nil pointer bug
alok87 Mar 30, 2021
0492aa7
Fix nil pointer bug; change default
alok87 Mar 30, 2021
f778b87
When topic lag is empty take any combination
alok87 Mar 30, 2021
644e1ed
UnitID from table name
alok87 Mar 30, 2021
226ef26
Update lastCacheUpdate time only if cache miss
alok87 Mar 30, 2021
1b9f70c
Syntax fix
alok87 Mar 30, 2021
0b6fcfb
Use lastOffset and not lag for SJF
alok87 Mar 30, 2021
41994b9
K8s compatible name and fix test
alok87 Mar 31, 2021
c5f935d
Unit allocation debuggers
alok87 Mar 31, 2021
a7eb39c
Fix the deadlock
alok87 Mar 31, 2021
9f5ec93
Update batcherReloadingTopics whenever realtime status change
alok87 Mar 31, 2021
f12c010
Fix duplicate bug for allocator
alok87 Mar 31, 2021
c0f58e0
Fix bug: Batcher realtime should be removed
alok87 Mar 31, 2021
a1af1cd
Increase the name default
alok87 Mar 31, 2021
3760af2
Fix maxReloading unit decrement bug
alok87 Mar 31, 2021
1b503f3
Update logs info for delete
alok87 Mar 31, 2021
d2a0534
Lag info
alok87 Mar 31, 2021
2d85daf
More log info
alok87 Mar 31, 2021
78283ec
Debug flags for realtime calc
alok87 Apr 1, 2021
073d3c5
Fix batcher and loader realtime calc bug
alok87 Apr 1, 2021
febba1c
Stop and reset ticker after processing
alok87 Apr 1, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 27 additions & 23 deletions redshiftsink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,28 +50,35 @@ spec:
maxLoaderLag: 10
batcher:
suspend: false
maxSize: 10
maxWaitSeconds: 30
maxConcurrency: 10
mask: true
maskFile: "github.com/practo/tipoca-stream/redshiftsink/pkg/transformer/masker/database.yaml"
podTemplate:
resources:
requests:
cpu: 100m
memory: 200Mi
sinkGroup:
all:
maxSizePerBatch: 10Mi
maxWaitSeconds: 30
maxConcurrency: 10
deploymentUnit:
podTemplate:
resources:
requests:
cpu: 100m
memory: 200Mi
loader:
suspend: false
maxSize: 10
maxWaitSeconds: 30
maxProcessingTime: 60000
redshiftSchema: "inventory"
redshiftGroup: "sales"
podTemplate:
resources:
requests:
cpu: 100m
memory: 200Mi
sinkGroup:
all:
maxSizePerBatch: 1Gi
maxWaitSeconds: 30
maxProcessingTime: 60000
deploymentUnit:
podTemplate:
resources:
requests:
cpu: 100m
memory: 200Mi

```

```bash
Expand All @@ -82,11 +89,6 @@ This will start syncing all the Kakfa topics matching regex `"^db.inventory*"` f

### Configuration

### Redshiftsink Spec Documentation (TODO):
| Spec | Description | Mandatory |
| :------------ | :----------- |:------------|


## RedshiftSink Managed Pods
Redshiftsink performs the sink by creating two pods. Creating a RedshiftSink CRD installs the batcher and loader pods. Batcher and loader pods details are below:

Expand All @@ -113,7 +115,8 @@ Flags:

#### Metrics
```
rsk_batcher_messages_processed_per_second
rsk_batcher_bytes_processed
rsk_batcher_messages_processed
```

### Configuration
Expand Down Expand Up @@ -144,7 +147,8 @@ Flags:

#### Metrics
```
rsk_loader_messages_processed_per_second
rsk_loader_bytes_loaded
rsk_loader_messages_loaded
```

### Configuration
Expand Down
166 changes: 132 additions & 34 deletions redshiftsink/api/v1/redshiftsink_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package v1

import (
corev1 "k8s.io/api/core/v1"
resource "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand All @@ -42,31 +43,109 @@ type RedshiftPodTemplateSpec struct {
Tolerations *[]corev1.Toleration `json:"tolerations,omitempty"`
}

// RedshiftBatcherSpec defines the desired state of RedshiftBatcher
type RedshiftBatcherSpec struct {
// Supsend when turned on makes sure no batcher pods
// are running for this CRD object. Default: false
Suspend bool `json:"suspend,omitempty"`
// DeploymentUnit is used to specify how many topics will run together in a unit
// and how much resources it needs.
type DeploymentUnit struct {
// PodTemplate describes the pod specification for the unit.
// +optional
PodTemplate *RedshiftPodTemplateSpec `json:"podTemplate,omitempty"`
}

// Max configurations for the batcher to batch
MaxSize int `json:"maxSize"`
MaxWaitSeconds int `json:"maxWaitSeconds"`
// SinkGroupSpec defines the specification for one of the three sinkgroups:
// 1. MainSinkGroup 2. ReloadSinkGroup 3. ReloadDupeSinkGroup
type SinkGroupSpec struct {
// MaxSizePerBatch is the maximum size of the batch in bytes, Ki, Mi, Gi
// Example values: 1000, 1Ki, 100Mi, 1Gi
// 1000 is 1000 bytes, 1Ki is 1 Killo byte,
// 100Mi is 100 mega bytes, 1Gi is 1 Giga bytes
// +optional
MaxSizePerBatch *resource.Quantity `json:"maxSizePerBatch,omitempty"`
// MaxWaitSeconds is the maximum time to wait before making a batch,
// make a batch if MaxSizePerBatch is not hit during MaxWaitSeconds.
// +optional
MaxWaitSeconds *int `json:"maxWaitSeconds,omitempty"`
// MaxConcurrency is the maximum no, of batch processors to run concurrently.
// This spec is useful when the sink group pod operates in asynchronous mode.
// Loader pods does not needed this as they are synchronous.
// +optional
MaxConcurrency *int `json:"maxConcurrency,omitempty"`

// MaxProcessingTime is the sarama configuration MaxProcessingTime
// It is the max time in milliseconds required to consume one message.
// Defaults to 1000ms
// MaxProcessingTime is the max time in ms required to consume one message.
// Defaults for the batcher is 180000ms and loader is 600000ms.
// +optional
MaxProcessingTime *int32 `json:"maxProcessingTime,omitempty"`

// Mask when turned on enables masking of the data
// Default: false
// MaxReloadingUnits is the maximum number of units(pods) that can be launched
// based on the DeploymentUnit specification. Only valid for Reloading SinkGroup.
// This value is at present supported to be configurable only for batcher
// +optional
MaxReloadingUnits *int32 `json:"maxReloadingUnits,omitempty"`
// DeploymentUnit(pod) is the unit of deployment for the batcher or the loader.
// Using this user can specify the amount of resources
// needed to run them as one unit. Operator calculates the total units
// based on the total number of topics and this unit spec. This majorly
// solves the scaling issues described in #167.
// +optional
DeploymentUnit *DeploymentUnit `json:"deploymentUnit,omitempty"`
}

// SinkGroup is the group of batcher and loader pods based on the
// mask version, target table and the topic release status. This is the specification
// to allow to have different set of SinkGroupSpec for each type of SinkGroups.
// Explaining the precedence:
// The configuration required for full sink and the realtime sink can be different.
// SinkGroupSpec for each of the type of sink groups helps us provide different
// configurations for each of them. Following are the precedence:
// a) If All is specified and none of the others are specified, All is used for all SinkGroups.
// b) If All and Main both are specified then Main gets used for MainSinkGroup
// c) If All and Reload are specified then Reload gets used for ReloadSinkGroup
// d) If All and ReloadDupe are specified then ReloadDupe gets used for ReloadDupeSinkGroup
// d) If None gets specified then Defaults are used for all of them..
type SinkGroup struct {
// All specifies a common specification for all SinkGroups
// +optional
All *SinkGroupSpec `json:"all,omitempty"`
// Main specifies the MainSinkGroup specification, overwrites All
// +optional
Main *SinkGroupSpec `json:"main,omitempty"`
// Reload specifies the ReloadSinkGroup specification, overwrites All
// +optional
Reload *SinkGroupSpec `json:"reload,omitempty"`
// ReloadDupe specifies the ReloadDupeSinkGroup specification, overwrites All
// +optional
ReloadDupe *SinkGroupSpec `json:"reloadDupe,omitempty"`
}

// RedshiftBatcherSpec defines the desired state of RedshiftBatcher
type RedshiftBatcherSpec struct {
// Supsend is used to suspend batcher pods. Defaults to false.
Suspend bool `json:"suspend,omitempty"`

// Mask when turned on enables masking of the data. Defaults to false
// +optional
Mask bool `json:"mask"`
// MaskFile to use to apply mask configurations
// +optional
MaskFile string `json:"maskFile"`
MaskFile string `json:"maskFile,omitempty"`
// +optional

// SinkGroup contains the specification for main, reload and reloadDupe
// sinkgroups. Operator uses 3 groups to perform Redshiftsink. The topics
// which have never been released is part of Reload SinkGroup, the topics
// which gets released moves to the Main SinkGroup. ReloadDupe SinkGroup
// is used to give realtime upaates to the topics which are reloading.
// Defaults are there for all sinkGroups if none is specifed.
// +optional
SinkGroup *SinkGroup `json:"sinkGroup,omitempty"`

// Template describes the pods that will be created.
// Deprecated all of the below spec in favour of SinkGroup #167
MaxSize int `json:"maxSize,omitempty"`
MaxWaitSeconds int `json:"maxWaitSeconds,omitempty"`
MaxConcurrency *int `json:"maxConcurrency,omitempty"`
// MaxProcessingTime is the sarama configuration MaxProcessingTime
// It is the max time in milliseconds required to consume one message.
// Defaults to 1000ms
MaxProcessingTime *int32 `json:"maxProcessingTime,omitempty"`
// PodTemplate describes the pods that will be created.
// if this is not specifed, a default pod template is created
// +optional
PodTemplate *RedshiftPodTemplateSpec `json:"podTemplate,omitempty"`
Expand All @@ -78,25 +157,38 @@ type RedshiftLoaderSpec struct {
// are running for this CRD object. Default: false
Suspend bool `json:"suspend,omitempty"`

// Max configurations for the loader to batch the load
MaxSize int `json:"maxSize"`
MaxWaitSeconds int `json:"maxWaitSeconds"`

// MaxProcessingTime is the sarama configuration MaxProcessingTime
// It is the max time in milliseconds required to consume one message.
// Defaults to 600000ms (10mins)
MaxProcessingTime *int32 `json:"maxProcessingTime,omitempty"`
// SinkGroup contains the specification for main, reload and reloadDupe
// sinkgroups. Operator uses 3 groups to perform Redshiftsink. The topics
// which have never been released is part of Reload SinkGroup, the topics
// which gets released moves to the Main SinkGroup. ReloadDupe SinkGroup
// is used to give realtime upaates to the topics which are reloading.
// Defaults are there for all sinkGroups if none is specifed.
// +optional
SinkGroup *SinkGroup `json:"sinkGroup,omitempty"`

// RedshiftSchema to sink the data in
RedshiftSchema string `json:"redshiftSchema"`
// RedshiftMaxOpenConns is the maximum open connections allowed
// +optional
RedshiftMaxOpenConns *int `json:"redshiftMaxOpenConns,omitempty"`
// RedshiftMaxIdleConns is the maximum idle connections allowed
// +optional
RedshiftMaxIdleConns *int `json:"redshiftMaxIdleConns,omitempty"`
// RedshiftGroup to give the access to when new topics gets released
RedshiftGroup *string `json:"redshiftGroup"`

// Template describes the pods that will be created.
// Deprecated all of the below spec in favour of SinkGroup #167
// Max configurations for the loader to batch the load
// +optional
MaxSize int `json:"maxSize,omitempty"`
// +optional
MaxWaitSeconds int `json:"maxWaitSeconds,omitempty"`
// MaxProcessingTime is the sarama configuration MaxProcessingTime
// It is the max time in milliseconds required to consume one message.
// Defaults to 600000ms (10mins)
// +optional
MaxProcessingTime *int32 `json:"maxProcessingTime,omitempty"`
// PodTemplate describes the pods that will be created.
// if this is not specifed, a default pod template is created
// +optional
PodTemplate *RedshiftPodTemplateSpec `json:"podTemplate,omitempty"`
Expand All @@ -122,7 +214,7 @@ type RedshiftSinkSpec struct {
KafkaVersion string `json:"kafkaVersion"`
KafkaTopicRegexes string `json:"kafkaTopicRegexes"`
// +optional
KafkaLoaderTopicPrefix string `json:"kafkaLoaderTopicPrefix"`
KafkaLoaderTopicPrefix string `json:"kafkaLoaderTopicPrefix,omitempty"`

Batcher RedshiftBatcherSpec `json:"batcher"`
Loader RedshiftLoaderSpec `json:"loader"`
Expand All @@ -132,25 +224,25 @@ type RedshiftSinkSpec struct {
// This is relevant only if masking is turned on in mask configuration.
// It is used for live mask reloading.
// +optional
ReleaseCondition *ReleaseCondition `json:"releaseCondition"`
ReleaseCondition *ReleaseCondition `json:"releaseCondition,omitempty"`

// TopicReleaseCondition is considered instead of ReleaseCondition
// if it is defined for a topic. This is used for topics which
// does not work well with central ReleaseCondition for all topics
// +optional
TopicReleaseCondition map[string]ReleaseCondition `json:"topicReleaseCondition"`
TopicReleaseCondition map[string]ReleaseCondition `json:"topicReleaseCondition,omitempty"`
}

type ReleaseCondition struct {
// MaxBatcherLag is the maximum lag the batcher consumer group
// shoud have to be be considered to be operating in realtime and
// to be considered for release.
MaxBatcherLag *int64 `json:"maxBatcherLag"`
MaxBatcherLag *int64 `json:"maxBatcherLag,omitempty"`

// MaxLoaderLag is the maximum lag the loader consumer group
// shoud have to be be considered to be operating in realtime and
// to be considered for release.
MaxLoaderLag *int64 `json:"maxLoaderLag"`
MaxLoaderLag *int64 `json:"maxLoaderLag,omitempty"`
}

// MaskPhase is a label for the condition of a masking at the current time.
Expand Down Expand Up @@ -208,7 +300,7 @@ type MaskStatus struct {

type Group struct {
// LoaderTopicPrefix stores the name of the loader topic prefix
LoaderTopicPrefix string `json:"loaderTopicPrefix"`
LoaderTopicPrefix string `json:"loaderTopicPrefix,omitempty"`

// LoaderCurrentOffset stores the last read current offset of the consumer group
// This is required to determine if the consumer group has performed any
Expand All @@ -218,7 +310,7 @@ type Group struct {
// throughput consumer groups not getting moved to realtime from reloading.
// TODO: This is not dead field once a group moves to released and
// should be cleaned after that(status needs to be updated)
LoaderCurrentOffset *int64 `json:"currentOffset"`
LoaderCurrentOffset *int64 `json:"currentOffset,omitempty"`

// ID stores the name of the consumer group for the topic
// based on this batcher and loader consumer groups are made
Expand All @@ -232,11 +324,17 @@ type RedshiftSinkStatus struct {

// MaskStatus stores the status of masking for topics if masking is enabled
// +optional
MaskStatus *MaskStatus `json:"maskStatus"`
MaskStatus *MaskStatus `json:"maskStatus,omitempty"`

// TopicGroup stores the group info for the topic
// +optional
TopicGroup map[string]Group `json:"topicGroups"`
TopicGroup map[string]Group `json:"topicGroups,omitempty"`

// BatcherReloadingTopics stores the list of topics which are currently reloading
// for the batcher deployments in the reload sink group.
// There is a limit to maximum topics that can be reloaded. (MaxReloadingUnits)
// +optional
BatcherReloadingTopics []string `json:"batcherReloadingTopics,omitempty"`
}

// +kubebuilder:resource:path=redshiftsinks,shortName=rsk;rsks
Expand Down
Loading