From 82f9dd0c483d906999712dadb48a5f70f4a17e8d Mon Sep 17 00:00:00 2001 From: zhuwenxing Date: Thu, 22 Aug 2024 11:11:04 +0800 Subject: [PATCH] update cdc config (#118) * update cdc config Signed-off-by: zhuwenxing * update cdc config Signed-off-by: zhuwenxing * update cdc config Signed-off-by: zhuwenxing * update cdc config Signed-off-by: zhuwenxing * update cdc config Signed-off-by: zhuwenxing * update cdc config Signed-off-by: zhuwenxing * update cdc config Signed-off-by: zhuwenxing --------- Signed-off-by: zhuwenxing --- .github/workflows/ci.yaml | 5 ++-- .github/workflows/release.yaml | 2 +- Dockerfile | 18 ++++++++++--- server/Makefile | 2 +- server/configs/cdc.yaml | 2 +- tests/testcases/test_cdc_sync_requests.py | 32 +++++++++++------------ 6 files changed, 37 insertions(+), 24 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index aa8e85e5..4f58b982 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -30,7 +30,7 @@ jobs: timeout-minutes: 15 shell: bash run: | - docker build --build-arg GIT_COMMIT_ARG=$(git rev-parse --short HEA) -t milvus-cdc:latest . + docker build --build-arg GIT_COMMIT_ARG=$(git rev-parse --short HEAD) -t milvus-cdc:latest . docker tag milvus-cdc:latest milvusdb/milvus-cdc:latest - name: Creating kind cluster @@ -72,9 +72,10 @@ jobs: working-directory: deployment/docker shell: bash run: | - docker compose up -d + docker compose --verbose up -d sleep 20s docker compose ps + docker compose logs || true - name: Create CDC task timeout-minutes: 15 diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 4f8f632a..0f7c1733 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -122,6 +122,6 @@ jobs: - name: Build and push docker image run: | tag=$(git tag --sort=-version:refname | head -1) - docker build -t milvusdb/milvus-cdc:$tag -t milvusdb/milvus-cdc:latest . + docker build --build-arg GIT_COMMIT_ARG=$(git rev-parse --short HEAD) -t milvusdb/milvus-cdc:$tag -t milvusdb/milvus-cdc:latest . docker push milvusdb/milvus-cdc:$tag docker push milvusdb/milvus-cdc:latest \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 4cf5a9fb..08be6513 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,19 +1,31 @@ -FROM golang:1.21 AS builder +FROM golang:1.21 AS builder + ENV CGO_ENABLED=1 + +# Add git to the builder stage +RUN apt-get update && apt-get install -y git && rm -rf /var/lib/apt/lists/* + ARG GIT_COMMIT_ARG ENV CDC_GIT_COMMIT=${GIT_COMMIT_ARG} + WORKDIR /app COPY . . -RUN cd server && make build && mv ../bin/cdc /app/milvus-cdc + +# Modify this line to pass CDC_GIT_COMMIT to make +RUN cd server && make build GIT_COMMIT=${CDC_GIT_COMMIT} && mv ../bin/cdc /app/milvus-cdc FROM debian:bookworm + WORKDIR /app + RUN apt-get update && apt-get install -y \ ca-certificates \ curl \ && rm -rf /var/lib/apt/lists/* + COPY --from=builder /app/milvus-cdc ./ COPY --from=builder /app/server/configs ./configs + EXPOSE 8444 -CMD ["/bin/bash", "-c", "cat /app/configs/cdc.yaml;/app/milvus-cdc"] +CMD ["/bin/bash", "-c", "cat /app/configs/cdc.yaml;/app/milvus-cdc"] \ No newline at end of file diff --git a/server/Makefile b/server/Makefile index 96a0b284..42cd827f 100644 --- a/server/Makefile +++ b/server/Makefile @@ -1,6 +1,6 @@ OBJPREFIX := "github.com/zilliztech/milvus-cdc/server/tag" BUILD_TIME = $(shell date -u) -GIT_COMMIT = $(shell git rev-parse --short HEAD 2>/dev/null || echo $(CDC_GIT_COMMIT)) +GIT_COMMIT ?= $(if $(CDC_GIT_COMMIT),$(CDC_GIT_COMMIT),$(shell git rev-parse --short HEAD)) GO_VERSION = $(shell go version) BUILD_FLAGS = "-X '$(OBJPREFIX).BuildTime=$(BUILD_TIME)' -X '$(OBJPREFIX).GitCommit=$(GIT_COMMIT)' -X '$(OBJPREFIX).GoVersion=$(GO_VERSION)'" diff --git a/server/configs/cdc.yaml b/server/configs/cdc.yaml index 8e94ed7b..a74230d1 100644 --- a/server/configs/cdc.yaml +++ b/server/configs/cdc.yaml @@ -44,4 +44,4 @@ sourceConfig: # address: 127.0.0.1:9092 maxNameLength: 256 logLevel: info -detectDeadLock: false +detectDeadLock: false \ No newline at end of file diff --git a/tests/testcases/test_cdc_sync_requests.py b/tests/testcases/test_cdc_sync_requests.py index 173794ec..3b32c0de 100644 --- a/tests/testcases/test_cdc_sync_requests.py +++ b/tests/testcases/test_cdc_sync_requests.py @@ -47,7 +47,7 @@ def test_cdc_sync_create_collection_request(self, upstream_host, upstream_port, # check collections in downstream connections.disconnect("default") connections.connect(host=downstream_host, port=downstream_port) - timeout = 30 + timeout = 60 t0 = time.time() log.info(f"all collections in downstream {list_collections()}") while True and time.time() - t0 < timeout: @@ -84,7 +84,7 @@ def test_cdc_sync_drop_collection_request(self, upstream_host, upstream_port, do # check collections in downstream connections.disconnect("default") connections.connect(host=downstream_host, port=downstream_port) - timeout = 30 + timeout = 60 t0 = time.time() log.info(f"all collections in downstream {list_collections()}") while True and time.time() - t0 < timeout: @@ -109,7 +109,7 @@ def test_cdc_sync_drop_collection_request(self, upstream_host, upstream_port, do # check collections in downstream connections.disconnect("default") connections.connect(host=downstream_host, port=downstream_port) - timeout = 30 + timeout = 60 t0 = time.time() log.info(f"all collections in downstream {list_collections()}") while True and time.time() - t0 < timeout: @@ -289,7 +289,6 @@ def test_cdc_sync_delete_entities_request(self, upstream_host, upstream_port, do c_state = utility.load_state(collection_name) if c_state != LoadState.Loaded: log.info(f"collection state in downstream: {str(c_state)}") - continue # get the number of entities in downstream res = c_downstream.query(f"int64 in {[i for i in range(100)]}", timeout=10) if len(res) != 100: @@ -298,6 +297,7 @@ def test_cdc_sync_delete_entities_request(self, upstream_host, upstream_port, do if len(res) == 0: log.info(f"collection synced in downstream successfully cost time: {time.time() - t0:.2f}s") break + c_state = utility.load_state(collection_name) assert c_state == LoadState.Loaded assert len(res) == 0 @@ -320,7 +320,7 @@ def test_cdc_sync_create_partition_request(self, upstream_host, upstream_port, d connections.disconnect("default") connections.connect(host=downstream_host, port=downstream_port) c_downstream = Collection(name=collection_name) - timeout = 30 + timeout = 60 t0 = time.time() log.info(f"all collections in downstream {list_collections()}") while True and time.time() - t0 < timeout: @@ -352,7 +352,7 @@ def test_cdc_sync_drop_partition_request(self, upstream_host, upstream_port, dow connections.disconnect("default") connections.connect(host=downstream_host, port=downstream_port) c_downstream = Collection(name=collection_name) - timeout = 30 + timeout = 60 t0 = time.time() log.info(f"all collections in downstream {list_collections()}") while True and time.time() - t0 < timeout: @@ -375,7 +375,7 @@ def test_cdc_sync_drop_partition_request(self, upstream_host, upstream_port, dow connections.disconnect("default") connections.connect(host=downstream_host, port=downstream_port) c_downstream = Collection(name=collection_name) - timeout = 30 + timeout = 60 t0 = time.time() while True and time.time() - t0 < timeout: # collections in subset of downstream @@ -418,7 +418,7 @@ def test_cdc_sync_create_index_request(self, upstream_host, upstream_port, downs connections.disconnect("default") connections.connect(host=downstream_host, port=downstream_port) c_downstream = Collection(name=collection_name) - timeout = 30 + timeout = 60 t0 = time.time() while True and time.time() - t0 < timeout: # collections in subset of downstream @@ -459,7 +459,7 @@ def test_cdc_sync_drop_index_request(self, upstream_host, upstream_port, downstr connections.disconnect("default") connections.connect(host=downstream_host, port=downstream_port) c_downstream = Collection(name=collection_name) - timeout = 30 + timeout = 60 t0 = time.time() while True and time.time() - t0 < timeout: # collections in subset of downstream @@ -479,7 +479,7 @@ def test_cdc_sync_drop_index_request(self, upstream_host, upstream_port, downstr connections.disconnect("default") connections.connect(host=downstream_host, port=downstream_port) c_downstream = Collection(name=collection_name) - timeout = 30 + timeout = 60 t0 = time.time() while True and time.time() - t0 < timeout: # collections in subset of downstream @@ -523,7 +523,7 @@ def test_cdc_sync_load_and_release_collection_request(self, upstream_host, upstr connections.disconnect("default") connections.connect(host=downstream_host, port=downstream_port) c_downstream = Collection(name=collection_name) - timeout = 30 + timeout = 60 replicas = None t0 = time.time() while True and time.time() - t0 < timeout: @@ -552,7 +552,7 @@ def test_cdc_sync_load_and_release_collection_request(self, upstream_host, upstr connections.disconnect("default") connections.connect(host=downstream_host, port=downstream_port) c_downstream = Collection(name=collection_name) - timeout = 30 + timeout = 60 replicas = None t0 = time.time() while True and time.time() - t0 < timeout: @@ -610,7 +610,7 @@ def test_cdc_sync_flush_collection_request(self, upstream_host, upstream_port, d connections.connect(host=downstream_host, port=downstream_port) c_downstream = Collection(name=collection_name) log.info(f"number of entities in downstream: {c_downstream.num_entities}") - timeout = 30 + timeout = 60 t0 = time.time() while True and time.time() - t0 < timeout: # get the number of entities in downstream @@ -641,7 +641,7 @@ def test_cdc_sync_create_database_request(self, upstream_host, upstream_port, do # check database in downstream connections.disconnect("default") connections.connect(host=downstream_host, port=downstream_port) - timeout = 30 + timeout = 60 t0 = time.time() while True and time.time() - t0 < timeout: if db_name in db.list_database(): @@ -669,7 +669,7 @@ def test_cdc_sync_drop_database_request(self, upstream_host, upstream_port, down # check database in downstream connections.disconnect("default") connections.connect(host=downstream_host, port=downstream_port) - timeout = 30 + timeout = 60 t0 = time.time() while True and time.time() - t0 < timeout: if db_name in db.list_database(): @@ -689,7 +689,7 @@ def test_cdc_sync_drop_database_request(self, upstream_host, upstream_port, down # check database in downstream connections.disconnect("default") connections.connect(host=downstream_host, port=downstream_port) - timeout = 30 + timeout = 60 t0 = time.time() while True and time.time() - t0 < timeout: if db_name not in db.list_database():