Skip to content

Commit

Permalink
Feature/fix routing add discover node (#40)
Browse files Browse the repository at this point in the history
* Fix routing field and add discover node implementation
  • Loading branch information
mhmtszr authored Oct 23, 2023
1 parent eec93ea commit 5fb7c57
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 27 deletions.
26 changes: 14 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,18 +96,20 @@ Check out on [go-dcp](https://github.com/Trendyol/go-dcp#configuration)

### Elasticsearch Specific Configuration

| Variable | Type | Required | Default | Description |
|----------------------------------------|-------------------|----------|----------|-----------------------------------------------------------------------------------------------------|
| `elasticsearch.collectionIndexMapping` | map[string]string | yes | | Defines which Couchbase collection events will be written to which index |
| `elasticsearch.urls` | []string | yes | | Elasticsearch connection urls |
| `elasticsearch.typeName` | string | no | _doc | Defines Elasticsearch index type name |
| `elasticsearch.batchSizeLimit` | int | no | 1000 | Maximum message count for batch, if exceed flush will be triggered. |
| `elasticsearch.batchTickerDuration` | time.Duration | no | 10s | Batch is being flushed automatically at specific time intervals for long waiting messages in batch. |
| `elasticsearch.batchByteSizeLimit` | int | no | 10485760 | Maximum size(byte) for batch, if exceed flush will be triggered. |
| `elasticsearch.maxConnsPerHost` | int | no | 512 | Maximum number of connections per each host which may be established |
| `elasticsearch.maxIdleConnDuration` | time.Duration | no | 10s | Idle keep-alive connections are closed after this duration. |
| `elasticsearch.compressionEnabled` | boolean | no | false | Compression can be used if message size is large, CPU usage may be affected. |
| `elasticsearch.concurrentRequest` | int | no | 1 | Concurrent bulk request count |
| Variable | Type | Required | Default | Description |
|---------------------------------------------|-------------------|----------|----------|-----------------------------------------------------------------------------------------------------|
| `elasticsearch.collectionIndexMapping` | map[string]string | yes | | Defines which Couchbase collection events will be written to which index |
| `elasticsearch.urls` | []string | yes | | Elasticsearch connection urls |
| `elasticsearch.typeName` | string | no | _doc | Defines Elasticsearch index type name |
| `elasticsearch.batchSizeLimit` | int | no | 1000 | Maximum message count for batch, if exceed flush will be triggered. |
| `elasticsearch.batchTickerDuration` | time.Duration | no | 10s | Batch is being flushed automatically at specific time intervals for long waiting messages in batch. |
| `elasticsearch.batchByteSizeLimit` | int | no | 10485760 | Maximum size(byte) for batch, if exceed flush will be triggered. |
| `elasticsearch.maxConnsPerHost` | int | no | 512 | Maximum number of connections per each host which may be established |
| `elasticsearch.maxIdleConnDuration` | time.Duration | no | 10s | Idle keep-alive connections are closed after this duration. |
| `elasticsearch.compressionEnabled` | boolean | no | false | Compression can be used if message size is large, CPU usage may be affected. |
| `elasticsearch.concurrentRequest` | int | no | 1 | Concurrent bulk request count |
| `elasticsearch.disableDiscoverNodesOnStart` | boolean | no | false | Disable discover nodes when initializing the client. |
| `elasticsearch.discoverNodesInterval` | time.Duration | no | 5m | Discover nodes periodically |

## Exposed metrics

Expand Down
27 changes: 17 additions & 10 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@ import (
)

type Elasticsearch struct {
CollectionIndexMapping map[string]string `yaml:"collectionIndexMapping"`
MaxConnsPerHost *int `yaml:"maxConnsPerHost"`
MaxIdleConnDuration *time.Duration `yaml:"maxIdleConnDuration"`
TypeName string `yaml:"typeName"`
Urls []string `yaml:"urls"`
BatchSizeLimit int `yaml:"batchSizeLimit"`
BatchByteSizeLimit int `yaml:"batchByteSizeLimit"`
BatchTickerDuration time.Duration `yaml:"batchTickerDuration"`
ConcurrentRequest int `yaml:"concurrentRequest"`
CompressionEnabled bool `yaml:"compressionEnabled"`
CollectionIndexMapping map[string]string `yaml:"collectionIndexMapping"`
MaxConnsPerHost *int `yaml:"maxConnsPerHost"`
MaxIdleConnDuration *time.Duration `yaml:"maxIdleConnDuration"`
DiscoverNodesInterval *time.Duration `yaml:"discoverNodesInterval"`
TypeName string `yaml:"typeName"`
Urls []string `yaml:"urls"`
BatchSizeLimit int `yaml:"batchSizeLimit"`
BatchByteSizeLimit int `yaml:"batchByteSizeLimit"`
BatchTickerDuration time.Duration `yaml:"batchTickerDuration"`
ConcurrentRequest int `yaml:"concurrentRequest"`
CompressionEnabled bool `yaml:"compressionEnabled"`
DisableDiscoverNodesOnStart bool `yaml:"disableDiscoverNodesOnStart"`
}

type Config struct {
Expand All @@ -40,4 +42,9 @@ func (c *Config) ApplyDefaults() {
if c.Elasticsearch.ConcurrentRequest == 0 {
c.Elasticsearch.ConcurrentRequest = 1
}

if c.Elasticsearch.DiscoverNodesInterval == nil {
duration := 5 * time.Minute
c.Elasticsearch.DiscoverNodesInterval = &duration
}
}
2 changes: 1 addition & 1 deletion elasticsearch/bulk/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ var (
deletePrefix = helper.Byte(`{"delete":{"_index":"`)
idPrefix = helper.Byte(`","_id":"`)
typePrefix = helper.Byte(`","_type":"`)
routingPrefix = helper.Byte(`","_routing":"`)
routingPrefix = helper.Byte(`","routing":"`)
postFix = helper.Byte(`"}}`)
)

Expand Down
10 changes: 6 additions & 4 deletions elasticsearch/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import (

func NewElasticClient(config *config.Config) (*elasticsearch.Client, error) {
es, err := elasticsearch.NewClient(elasticsearch.Config{
MaxRetries: math.MaxInt,
Addresses: config.Elasticsearch.Urls,
Transport: newTransport(config.Elasticsearch),
CompressRequestBody: config.Elasticsearch.CompressionEnabled,
MaxRetries: math.MaxInt,
Addresses: config.Elasticsearch.Urls,
Transport: newTransport(config.Elasticsearch),
CompressRequestBody: config.Elasticsearch.CompressionEnabled,
DiscoverNodesOnStart: !config.Elasticsearch.DisableDiscoverNodesOnStart,
DiscoverNodesInterval: *config.Elasticsearch.DiscoverNodesInterval,
})
if err != nil {
return nil, err
Expand Down

0 comments on commit 5fb7c57

Please sign in to comment.