Skip to content

Commit

Permalink
update cdc config (zilliztech#118)
Browse files Browse the repository at this point in the history
* update cdc config

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>

* update cdc config

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>

* update cdc config

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>

* update cdc config

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>

* update cdc config

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>

* update cdc config

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>

* update cdc config

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>

---------

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
  • Loading branch information
zhuwenxing authored Aug 22, 2024
1 parent 2217d68 commit 82f9dd0
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 24 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
timeout-minutes: 15
shell: bash
run: |
docker build --build-arg GIT_COMMIT_ARG=$(git rev-parse --short HEA) -t milvus-cdc:latest .
docker build --build-arg GIT_COMMIT_ARG=$(git rev-parse --short HEAD) -t milvus-cdc:latest .
docker tag milvus-cdc:latest milvusdb/milvus-cdc:latest
- name: Creating kind cluster
Expand Down Expand Up @@ -72,9 +72,10 @@ jobs:
working-directory: deployment/docker
shell: bash
run: |
docker compose up -d
docker compose --verbose up -d
sleep 20s
docker compose ps
docker compose logs || true
- name: Create CDC task
timeout-minutes: 15
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,6 @@ jobs:
- name: Build and push docker image
run: |
tag=$(git tag --sort=-version:refname | head -1)
docker build -t milvusdb/milvus-cdc:$tag -t milvusdb/milvus-cdc:latest .
docker build --build-arg GIT_COMMIT_ARG=$(git rev-parse --short HEAD) -t milvusdb/milvus-cdc:$tag -t milvusdb/milvus-cdc:latest .
docker push milvusdb/milvus-cdc:$tag
docker push milvusdb/milvus-cdc:latest
18 changes: 15 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,19 +1,31 @@
FROM golang:1.21 AS builder
FROM golang:1.21 AS builder

ENV CGO_ENABLED=1

# Add git to the builder stage
RUN apt-get update && apt-get install -y git && rm -rf /var/lib/apt/lists/*

ARG GIT_COMMIT_ARG
ENV CDC_GIT_COMMIT=${GIT_COMMIT_ARG}

WORKDIR /app
COPY . .
RUN cd server && make build && mv ../bin/cdc /app/milvus-cdc

# Modify this line to pass CDC_GIT_COMMIT to make
RUN cd server && make build GIT_COMMIT=${CDC_GIT_COMMIT} && mv ../bin/cdc /app/milvus-cdc

FROM debian:bookworm

WORKDIR /app

RUN apt-get update && apt-get install -y \
ca-certificates \
curl \
&& rm -rf /var/lib/apt/lists/*

COPY --from=builder /app/milvus-cdc ./
COPY --from=builder /app/server/configs ./configs

EXPOSE 8444

CMD ["/bin/bash", "-c", "cat /app/configs/cdc.yaml;/app/milvus-cdc"]
CMD ["/bin/bash", "-c", "cat /app/configs/cdc.yaml;/app/milvus-cdc"]
2 changes: 1 addition & 1 deletion server/Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
OBJPREFIX := "github.com/zilliztech/milvus-cdc/server/tag"
BUILD_TIME = $(shell date -u)
GIT_COMMIT = $(shell git rev-parse --short HEAD 2>/dev/null || echo $(CDC_GIT_COMMIT))
GIT_COMMIT ?= $(if $(CDC_GIT_COMMIT),$(CDC_GIT_COMMIT),$(shell git rev-parse --short HEAD))
GO_VERSION = $(shell go version)
BUILD_FLAGS = "-X '$(OBJPREFIX).BuildTime=$(BUILD_TIME)' -X '$(OBJPREFIX).GitCommit=$(GIT_COMMIT)' -X '$(OBJPREFIX).GoVersion=$(GO_VERSION)'"

Expand Down
2 changes: 1 addition & 1 deletion server/configs/cdc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,4 @@ sourceConfig:
# address: 127.0.0.1:9092
maxNameLength: 256
logLevel: info
detectDeadLock: false
detectDeadLock: false
32 changes: 16 additions & 16 deletions tests/testcases/test_cdc_sync_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def test_cdc_sync_create_collection_request(self, upstream_host, upstream_port,
# check collections in downstream
connections.disconnect("default")
connections.connect(host=downstream_host, port=downstream_port)
timeout = 30
timeout = 60
t0 = time.time()
log.info(f"all collections in downstream {list_collections()}")
while True and time.time() - t0 < timeout:
Expand Down Expand Up @@ -84,7 +84,7 @@ def test_cdc_sync_drop_collection_request(self, upstream_host, upstream_port, do
# check collections in downstream
connections.disconnect("default")
connections.connect(host=downstream_host, port=downstream_port)
timeout = 30
timeout = 60
t0 = time.time()
log.info(f"all collections in downstream {list_collections()}")
while True and time.time() - t0 < timeout:
Expand All @@ -109,7 +109,7 @@ def test_cdc_sync_drop_collection_request(self, upstream_host, upstream_port, do
# check collections in downstream
connections.disconnect("default")
connections.connect(host=downstream_host, port=downstream_port)
timeout = 30
timeout = 60
t0 = time.time()
log.info(f"all collections in downstream {list_collections()}")
while True and time.time() - t0 < timeout:
Expand Down Expand Up @@ -289,7 +289,6 @@ def test_cdc_sync_delete_entities_request(self, upstream_host, upstream_port, do
c_state = utility.load_state(collection_name)
if c_state != LoadState.Loaded:
log.info(f"collection state in downstream: {str(c_state)}")
continue
# get the number of entities in downstream
res = c_downstream.query(f"int64 in {[i for i in range(100)]}", timeout=10)
if len(res) != 100:
Expand All @@ -298,6 +297,7 @@ def test_cdc_sync_delete_entities_request(self, upstream_host, upstream_port, do
if len(res) == 0:
log.info(f"collection synced in downstream successfully cost time: {time.time() - t0:.2f}s")
break
c_state = utility.load_state(collection_name)
assert c_state == LoadState.Loaded
assert len(res) == 0

Expand All @@ -320,7 +320,7 @@ def test_cdc_sync_create_partition_request(self, upstream_host, upstream_port, d
connections.disconnect("default")
connections.connect(host=downstream_host, port=downstream_port)
c_downstream = Collection(name=collection_name)
timeout = 30
timeout = 60
t0 = time.time()
log.info(f"all collections in downstream {list_collections()}")
while True and time.time() - t0 < timeout:
Expand Down Expand Up @@ -352,7 +352,7 @@ def test_cdc_sync_drop_partition_request(self, upstream_host, upstream_port, dow
connections.disconnect("default")
connections.connect(host=downstream_host, port=downstream_port)
c_downstream = Collection(name=collection_name)
timeout = 30
timeout = 60
t0 = time.time()
log.info(f"all collections in downstream {list_collections()}")
while True and time.time() - t0 < timeout:
Expand All @@ -375,7 +375,7 @@ def test_cdc_sync_drop_partition_request(self, upstream_host, upstream_port, dow
connections.disconnect("default")
connections.connect(host=downstream_host, port=downstream_port)
c_downstream = Collection(name=collection_name)
timeout = 30
timeout = 60
t0 = time.time()
while True and time.time() - t0 < timeout:
# collections in subset of downstream
Expand Down Expand Up @@ -418,7 +418,7 @@ def test_cdc_sync_create_index_request(self, upstream_host, upstream_port, downs
connections.disconnect("default")
connections.connect(host=downstream_host, port=downstream_port)
c_downstream = Collection(name=collection_name)
timeout = 30
timeout = 60
t0 = time.time()
while True and time.time() - t0 < timeout:
# collections in subset of downstream
Expand Down Expand Up @@ -459,7 +459,7 @@ def test_cdc_sync_drop_index_request(self, upstream_host, upstream_port, downstr
connections.disconnect("default")
connections.connect(host=downstream_host, port=downstream_port)
c_downstream = Collection(name=collection_name)
timeout = 30
timeout = 60
t0 = time.time()
while True and time.time() - t0 < timeout:
# collections in subset of downstream
Expand All @@ -479,7 +479,7 @@ def test_cdc_sync_drop_index_request(self, upstream_host, upstream_port, downstr
connections.disconnect("default")
connections.connect(host=downstream_host, port=downstream_port)
c_downstream = Collection(name=collection_name)
timeout = 30
timeout = 60
t0 = time.time()
while True and time.time() - t0 < timeout:
# collections in subset of downstream
Expand Down Expand Up @@ -523,7 +523,7 @@ def test_cdc_sync_load_and_release_collection_request(self, upstream_host, upstr
connections.disconnect("default")
connections.connect(host=downstream_host, port=downstream_port)
c_downstream = Collection(name=collection_name)
timeout = 30
timeout = 60
replicas = None
t0 = time.time()
while True and time.time() - t0 < timeout:
Expand Down Expand Up @@ -552,7 +552,7 @@ def test_cdc_sync_load_and_release_collection_request(self, upstream_host, upstr
connections.disconnect("default")
connections.connect(host=downstream_host, port=downstream_port)
c_downstream = Collection(name=collection_name)
timeout = 30
timeout = 60
replicas = None
t0 = time.time()
while True and time.time() - t0 < timeout:
Expand Down Expand Up @@ -610,7 +610,7 @@ def test_cdc_sync_flush_collection_request(self, upstream_host, upstream_port, d
connections.connect(host=downstream_host, port=downstream_port)
c_downstream = Collection(name=collection_name)
log.info(f"number of entities in downstream: {c_downstream.num_entities}")
timeout = 30
timeout = 60
t0 = time.time()
while True and time.time() - t0 < timeout:
# get the number of entities in downstream
Expand Down Expand Up @@ -641,7 +641,7 @@ def test_cdc_sync_create_database_request(self, upstream_host, upstream_port, do
# check database in downstream
connections.disconnect("default")
connections.connect(host=downstream_host, port=downstream_port)
timeout = 30
timeout = 60
t0 = time.time()
while True and time.time() - t0 < timeout:
if db_name in db.list_database():
Expand Down Expand Up @@ -669,7 +669,7 @@ def test_cdc_sync_drop_database_request(self, upstream_host, upstream_port, down
# check database in downstream
connections.disconnect("default")
connections.connect(host=downstream_host, port=downstream_port)
timeout = 30
timeout = 60
t0 = time.time()
while True and time.time() - t0 < timeout:
if db_name in db.list_database():
Expand All @@ -689,7 +689,7 @@ def test_cdc_sync_drop_database_request(self, upstream_host, upstream_port, down
# check database in downstream
connections.disconnect("default")
connections.connect(host=downstream_host, port=downstream_port)
timeout = 30
timeout = 60
t0 = time.time()
while True and time.time() - t0 < timeout:
if db_name not in db.list_database():
Expand Down

0 comments on commit 82f9dd0

Please sign in to comment.