Skip to content

Commit

Permalink
feat: qol improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
erayarslan committed Nov 6, 2024
1 parent 76a2289 commit 65845f3
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 43 deletions.
11 changes: 9 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package config

import (
"errors"
"math"
"strconv"
"time"

"github.com/Trendyol/go-dcp/logger"

"github.com/Trendyol/go-dcp/config"
"github.com/Trendyol/go-dcp/helpers"
"github.com/segmentio/kafka-go"
Expand Down Expand Up @@ -50,13 +53,17 @@ func (k *Kafka) GetBalancer() kafka.Balancer {
case "Murmur2Balancer":
return kafka.Murmur2Balancer{}
default:
panic("invalid kafka balancer method, given: " + k.Balancer)
err := errors.New("invalid kafka balancer method, given: " + k.Balancer)
logger.Log.Error("error while get kafka balancer, err: %v", err)
panic(err)
}
}

func (k *Kafka) GetCompression() int8 {
if k.Compression < 0 || k.Compression > 4 {
panic("invalid kafka compression method, given: " + strconv.Itoa(int(k.Compression)))
err := errors.New("invalid kafka compression method, given: " + strconv.Itoa(int(k.Compression)))
logger.Log.Error("error while get kafka compression, err: %v", err)
panic(err)
}
return k.Compression
}
Expand Down
7 changes: 6 additions & 1 deletion connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,12 @@ func (c *connector) getTopicName(collectionName string, messageTopic string) str

topic := c.config.Kafka.CollectionTopicMapping[collectionName]
if topic == "" {
panic(fmt.Sprintf("there is no topic mapping for collection: %s on your configuration", collectionName))
err := fmt.Errorf(
"there is no topic mapping for collection: %s on your configuration",
collectionName,
)
logger.Log.Error("error while get topic name, err: %v", err)
panic(err)
}
return topic
}
Expand Down
73 changes: 42 additions & 31 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,41 +7,52 @@ services:
- "8091:8091"
- "11210:11210"
healthcheck:
test: [ "CMD", "curl", "-f", "http://user:123456@localhost:8091/pools/default/buckets/dcp-test" ]
test: [ "CMD", "curl", "-f", "http://user:password@localhost:8091/pools/default/buckets/dcp-test" ]
interval: 2s
timeout: 3s
retries: 60
redpanda:
image: docker.redpanda.com/redpandadata/redpanda
container_name: redpanda-1
command:
- redpanda
- start
- --smp
- '1'
- --reserve-memory
- 0M
- --overprovisioned
- --node-id
- '0'
- --kafka-addr
- PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
- --advertise-kafka-addr
- PLAINTEXT://redpanda:29092,OUTSIDE://localhost:9092
- --pandaproxy-addr
- PLAINTEXT://0.0.0.0:28082,OUTSIDE://0.0.0.0:8082
- --advertise-pandaproxy-addr
- PLAINTEXT://redpanda:28082,OUTSIDE://localhost:8082
zookeeper:
restart: always
image: confluentinc/cp-zookeeper:7.6.3
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
restart: always
image: confluentinc/cp-kafka:7.6.1
container_name: broker
ports:
- 8081:8081
- 8082:8082
- 9092:9092
- 28082:28082
- 29092:29092
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: true
healthcheck:
test: [ "CMD", "curl", "-f", "http://localhost:8080" ]
interval: 2s
timeout: 3s
retries: 60
kowl:
restart: always
image: quay.io/cloudhut/kowl:master-59f68da
container_name: kowl
depends_on:
- broker
ports:
- "8081:8080"
environment:
KAFKA_BROKERS: 'broker:29092'
redpanda-topic-create:
image: docker.redpanda.com/redpandadata/redpanda
image: docker.redpanda.com/redpandadata/redpanda:v24.2.8
container_name: redpanda-topic-create
entrypoint: [ "bash", "-c", "sleep 5 && rpk topic create test --brokers redpanda:29092" ]
depends_on:
- redpanda
restart: "no"
- broker
entrypoint: [ "bash", "-c", "sleep 10 && rpk topic create topicname -c max.message.bytes=10485760 --brokers broker:29092" ]
4 changes: 2 additions & 2 deletions example/simple/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ type sinkResponseHandler struct {
}

func (s *sinkResponseHandler) OnSuccess(ctx *kafka.SinkResponseHandlerContext) {
fmt.Printf("OnSuccess %v\n", string(ctx.Message.Value))
fmt.Printf("OnSuccess Key: %v, Len: %v\n", string(ctx.Message.Key), len(ctx.Message.Value))
}

func (s *sinkResponseHandler) OnError(ctx *kafka.SinkResponseHandlerContext) {
fmt.Printf("OnError %v\n", string(ctx.Message.Value))
fmt.Printf("OnError Key: %v, Len: %v, Err: %v\n", string(ctx.Message.Key), len(ctx.Message.Value), ctx.Err)
}

func main() {
Expand Down
1 change: 1 addition & 0 deletions kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ func NewClient(config *config.Connector) Client {
config.Kafka.InterCA,
)
if err != nil {
logger.Log.Error("error while creating new tls content, err: %v", err)
panic(err)
}

Expand Down
2 changes: 1 addition & 1 deletion test/couchbase/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM couchbase:community-7.6.1
FROM couchbase:7.6.3

ADD configure.sh /configure.sh
RUN chmod +x /configure.sh
Expand Down
7 changes: 4 additions & 3 deletions test/couchbase/configure.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ until [[ $(check_db) = 0 ]]; do
done

couchbase-cli cluster-init -c localhost --cluster-name Cluster --cluster-username user \
--cluster-password 123456 --services data --cluster-ramsize 1024
--cluster-password password --services data --cluster-ramsize 1024

couchbase-cli bucket-create -c couchbase --username user --password 123456 --bucket dcp-test --bucket-type couchbase --bucket-ramsize 1024
couchbase-cli bucket-create -c couchbase --username user --password password --bucket dcp-test --bucket-type couchbase --bucket-ramsize 768
couchbase-cli bucket-create -c couchbase --username user --password password --bucket checkpoint-bucket-name --bucket-type couchbase --bucket-ramsize 256

cbimport json -c couchbase://127.0.0.1 -u user -p 123456 --bucket-quota 1024 -b dcp-test -d file://opt/couchbase/samples/travel-sample.zip -f sample
cbimport json -c couchbase://127.0.0.1 -u user -p password --bucket-quota 768 -b dcp-test -d file://opt/couchbase/samples/travel-sample.zip -f sample

echo "couchbase-dev started"

Expand Down
6 changes: 3 additions & 3 deletions test/integration/config.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
hosts:
- localhost:8091
username: user
password: 123456
password: password
bucketName: dcp-test
rollbackMitigation:
disabled: true
Expand All @@ -21,7 +21,7 @@ metadata:
collection: _default
kafka:
collectionTopicMapping:
_default: test
_default: topicname
brokers:
- "localhost:9092"
readTimeout: 30s
Expand All @@ -31,4 +31,4 @@ kafka:
producerBatchTickerDuration: 5s
metadataTTL: 2400s
metadataTopics:
- "test"
- "topicname"

0 comments on commit 65845f3

Please sign in to comment.