Skip to content

Commit

Permalink
feat: add batch checkpoint commit ticker (#87)
Browse files Browse the repository at this point in the history
  • Loading branch information
mhmtszr authored Feb 20, 2025
1 parent cfa869a commit 6f584d6
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 19 deletions.
37 changes: 19 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,24 +98,25 @@ Check out on [go-dcp](https://github.com/Trendyol/go-dcp#configuration)

### Elasticsearch Specific Configuration

| Variable | Type | Required | Default | Description |
|---------------------------------------------|-------------------|------------|--------------|-----------------------------------------------------------------------------------------------------|
| `elasticsearch.collectionIndexMapping` | map[string]string | yes | | Defines which Couchbase collection events will be written to which index |
| `elasticsearch.urls` | []string | yes | | Elasticsearch connection urls |
| `elasticsearch.username` | string | no | | The username of Elasticsearch |
| `elasticsearch.password` | string | no | | The password of Elasticsearch |
| `elasticsearch.typeName` | string | no | | Defines Elasticsearch index type name |
| `elasticsearch.batchSizeLimit` | int | no | 1000 | Maximum message count for batch, if exceed flush will be triggered. |
| `elasticsearch.batchTickerDuration` | time.Duration | no | 10s | Batch is being flushed automatically at specific time intervals for long waiting messages in batch. |
| `elasticsearch.batchByteSizeLimit` | int, string | no | 10mb | Maximum size(byte) for batch, if exceed flush will be triggered. `10mb` is default. |
| `elasticsearch.maxConnsPerHost` | int | no | 512 | Maximum number of connections per each host which may be established |
| `elasticsearch.maxIdleConnDuration` | time.Duration | no | 10s | Idle keep-alive connections are closed after this duration. |
| `elasticsearch.compressionEnabled` | boolean | no | false | Compression can be used if message size is large, CPU usage may be affected. |
| `elasticsearch.concurrentRequest` | int | no | 1 | Concurrent bulk request count |
| `elasticsearch.disableDiscoverNodesOnStart` | boolean | no | false | Disable discover nodes when initializing the client. |
| `elasticsearch.discoverNodesInterval` | time.Duration | no | 5m | Discover nodes periodically |
| `elasticsearch.rejectionLog.index` | string | no | cbes-rejects | Rejection log index name. `cbes-rejects` is default. |
| `elasticsearch.rejectionLog.includeSource` | boolean | no | false | Includes rejection log source info. `false` is default. |
| Variable | Type | Required | Default | Description |
|---------------------------------------------|-------------------|----------|--------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `elasticsearch.collectionIndexMapping` | map[string]string | yes | | Defines which Couchbase collection events will be written to which index |
| `elasticsearch.urls` | []string | yes | | Elasticsearch connection urls |
| `elasticsearch.username` | string | no | | The username of Elasticsearch |
| `elasticsearch.password` | string | no | | The password of Elasticsearch |
| `elasticsearch.typeName` | string | no | | Defines Elasticsearch index type name |
| `elasticsearch.batchSizeLimit` | int | no | 1000 | Maximum message count for batch, if exceed flush will be triggered. |
| `elasticsearch.batchTickerDuration` | time.Duration | no | 10s | Batch is being flushed automatically at specific time intervals for long waiting messages in batch. |
| `elasticsearch.batchCommitTickerDuration` | time.Duration | no | 0s | Configures checkpoint offset save time, By default, after batch flushing, the offsets are updated immediately, this period can be increased for performance. |
| `elasticsearch.batchByteSizeLimit` | int, string | no | 10mb | Maximum size(byte) for batch, if exceed flush will be triggered. `10mb` is default. |
| `elasticsearch.maxConnsPerHost` | int | no | 512 | Maximum number of connections per each host which may be established |
| `elasticsearch.maxIdleConnDuration` | time.Duration | no | 10s | Idle keep-alive connections are closed after this duration. |
| `elasticsearch.compressionEnabled` | boolean | no | false | Compression can be used if message size is large, CPU usage may be affected. |
| `elasticsearch.concurrentRequest` | int | no | 1 | Concurrent bulk request count |
| `elasticsearch.disableDiscoverNodesOnStart` | boolean | no | false | Disable discover nodes when initializing the client. |
| `elasticsearch.discoverNodesInterval` | time.Duration | no | 5m | Discover nodes periodically |
| `elasticsearch.rejectionLog.index` | string | no | cbes-rejects | Rejection log index name. `cbes-rejects` is default. |
| `elasticsearch.rejectionLog.includeSource` | boolean | no | false | Includes rejection log source info. `false` is default. |

## Exposed metrics

Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Elasticsearch struct {
RejectionLog RejectionLog `yaml:"rejectionLog"`
BatchSizeLimit int `yaml:"batchSizeLimit"`
BatchTickerDuration time.Duration `yaml:"batchTickerDuration"`
BatchCommitTickerDuration *time.Duration `yaml:"batchCommitTickerDuration"`
ConcurrentRequest int `yaml:"concurrentRequest"`
CompressionEnabled bool `yaml:"compressionEnabled"`
DisableDiscoverNodesOnStart bool `yaml:"disableDiscoverNodesOnStart"`
Expand Down
23 changes: 22 additions & 1 deletion elasticsearch/bulk/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Bulk struct {
batchKeys map[string]int
dcpCheckpointCommit func()
batchTicker *time.Ticker
batchCommitTicker *time.Ticker
isClosed chan bool
actionCh chan document.ESActionDocument
esClient *elasticsearch.Client
Expand Down Expand Up @@ -99,6 +100,10 @@ func NewBulk(
sinkResponseHandler: sinkResponseHandler,
}

if config.Elasticsearch.BatchCommitTickerDuration != nil {
bulk.batchCommitTicker = time.NewTicker(*config.Elasticsearch.BatchCommitTickerDuration)
}

if sinkResponseHandler != nil {
sinkResponseHandler.OnInit(&dcpElasticsearch.SinkResponseHandlerInitContext{
Config: config,
Expand Down Expand Up @@ -257,6 +262,9 @@ func getEsActionJSON(docID []byte, action document.EsAction, indexName string, r

func (b *Bulk) Close() {
b.batchTicker.Stop()
if b.batchCommitTicker != nil {
b.batchCommitTicker.Stop()
}

b.flushMessages()
}
Expand Down Expand Up @@ -284,8 +292,21 @@ func (b *Bulk) flushMessages() {
b.batchSize = 0
b.batchByteSize = 0
}
b.CheckAndCommit()
}

b.dcpCheckpointCommit()
func (b *Bulk) CheckAndCommit() {
if b.batchCommitTicker == nil {
b.dcpCheckpointCommit()
return
}

select {
case <-b.batchCommitTicker.C:
b.dcpCheckpointCommit()
default:
return
}
}

func (b *Bulk) requestFunc(concurrentRequestIndex int, batchItems []BatchItem) func() error {
Expand Down
1 change: 1 addition & 0 deletions test/integration/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ metadata:
elasticsearch:
batchSizeLimit: 1000
batchTickerDuration: 5s
batchCommitTickerDuration: 120s
batchByteSizeLimit: 90614720
concurrentRequest: 12
collectionIndexMapping:
Expand Down

0 comments on commit 6f584d6

Please sign in to comment.