Skip to content

Commit

Permalink
feat: pass config and es client to sink response handler context
Browse files Browse the repository at this point in the history
  • Loading branch information
erayarslan committed May 14, 2024
1 parent eb6147a commit 80c6fea
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 6 deletions.
12 changes: 9 additions & 3 deletions elasticsearch/bulk/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Bulk struct {
sinkResponseHandler dcpElasticsearch.SinkResponseHandler
metric *Metric
collectionIndexMapping map[string]string
config *config.Config
batchKeys map[string]int
dcpCheckpointCommit func()
batchTicker *time.Ticker
Expand Down Expand Up @@ -85,6 +86,7 @@ func NewBulk(
esClient: esClient,
metric: &Metric{},
collectionIndexMapping: config.Elasticsearch.CollectionIndexMapping,
config: config,
typeName: helper.Byte(config.Elasticsearch.TypeName),
readers: readers,
concurrentRequest: config.Elasticsearch.ConcurrentRequest,
Expand Down Expand Up @@ -374,12 +376,16 @@ func (b *Bulk) executeSinkResponseHandler(batchActions []*document.ESActionDocum
key := getActionKey(*action)
if _, ok := errorData[key]; ok {
b.sinkResponseHandler.OnError(&dcpElasticsearch.SinkResponseHandlerContext{
Action: action,
Err: fmt.Errorf(errorData[key]),
Action: action,
Err: fmt.Errorf(errorData[key]),
ElasticsearchClient: b.esClient,
Config: b.config,
})
} else {
b.sinkResponseHandler.OnSuccess(&dcpElasticsearch.SinkResponseHandlerContext{
Action: action,
Action: action,
ElasticsearchClient: b.esClient,
Config: b.config,
})
}
}
Expand Down
12 changes: 9 additions & 3 deletions elasticsearch/sink_response_handler.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
package elasticsearch

import "github.com/Trendyol/go-dcp-elasticsearch/elasticsearch/document"
import (
"github.com/Trendyol/go-dcp-elasticsearch/config"
"github.com/Trendyol/go-dcp-elasticsearch/elasticsearch/document"
"github.com/elastic/go-elasticsearch/v7"
)

type SinkResponseHandlerContext struct {
Action *document.ESActionDocument
Err error
Action *document.ESActionDocument
Err error
Config *config.Config
ElasticsearchClient *elasticsearch.Client
}

type SinkResponseHandler interface {
Expand Down

0 comments on commit 80c6fea

Please sign in to comment.