diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java index 3526c075a..ca034ac34 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java @@ -68,6 +68,18 @@ public class MongoDBDatabaseSync extends DatabaseSync { .defaultValue(0.2) .withDescription("mongo cdc sample percent"); + public static final ConfigOption MONGO_CDC_MIN_SAMPLE_SIZE = + ConfigOptions.key("schema.min-sample-size") + .longType() + .defaultValue(1000L) + .withDescription("mongo cdc min sample size"); + + public static final ConfigOption MONGO_CDC_MAX_SAMPLE_SIZE = + ConfigOptions.key("schema.max-sample-size") + .longType() + .defaultValue(100000L) + .withDescription("mongo cdc max sample size"); + public static final ConfigOption TABLE_NAME = ConfigOptions.key("table-name") .stringType() @@ -101,6 +113,8 @@ public List getSchemaList() throws Exception { MongoClientSettings settings = settingsBuilder.build(); Double samplePercent = config.get(MONGO_CDC_CREATE_SAMPLE_PERCENT); + Long minSampleSize = config.get(MONGO_CDC_MIN_SAMPLE_SIZE); + Long maxSampleSize = config.get(MONGO_CDC_MAX_SAMPLE_SIZE); try (MongoClient mongoClient = MongoClients.create(settings)) { MongoDatabase mongoDatabase = mongoClient.getDatabase(databaseName); MongoIterable collectionNames = mongoDatabase.listCollectionNames(); @@ -115,7 +129,10 @@ public List getSchemaList() throws Exception { } long totalDocuments = collection.estimatedDocumentCount(); - long sampleSize = (long) Math.ceil(totalDocuments * samplePercent); + long sampleSize = + calculateSampleSize( + totalDocuments, samplePercent, minSampleSize, maxSampleSize); + ArrayList documents = sampleData(collection, sampleSize); MongoDBSchema mongoDBSchema = new MongoDBSchema(documents, databaseName, collectionName, null); @@ -127,6 +144,19 @@ public List getSchemaList() throws Exception { return schemaList; } + public long calculateSampleSize( + long totalDocuments, Double samplePercent, Long minSampleSize, Long maxSampleSize) { + if (totalDocuments < minSampleSize) { + return totalDocuments; + } + long sampleSize = (long) Math.ceil(totalDocuments * samplePercent); + // If the number of samples is less than the minimum threshold, the minimum threshold is + // used, while ensuring that the number of samples does not exceed the maximum threshold + sampleSize = Math.max(sampleSize, minSampleSize); + sampleSize = Math.min(sampleSize, maxSampleSize); + return sampleSize; + } + private ArrayList sampleData(MongoCollection collection, Long sampleNum) { ArrayList query = new ArrayList<>(); query.add(new Document("$sample", new Document("size", sampleNum))); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSyncTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSyncTest.java new file mode 100644 index 000000000..94c6c51c9 --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSyncTest.java @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.tools.cdc.mongodb; + +import org.junit.Before; +import org.junit.Test; + +import java.sql.SQLException; + +import static org.junit.Assert.assertEquals; + +public class MongoDBDatabaseSyncTest { + private MongoDBDatabaseSync mongoDBDatabaseSync; + + @Before + public void init() throws SQLException { + mongoDBDatabaseSync = new MongoDBDatabaseSync(); + } + + @Test + public void testCalculateSampleSize() { + long sampleSize1 = mongoDBDatabaseSync.calculateSampleSize(100L, 0.2, 1000L, 100000L); + long sampleSize2 = mongoDBDatabaseSync.calculateSampleSize(1000L, 0.2, 1000L, 100000L); + long sampleSize3 = mongoDBDatabaseSync.calculateSampleSize(2000L, 0.2, 1000L, 100000L); + long sampleSize4 = mongoDBDatabaseSync.calculateSampleSize(10000L, 0.2, 1000L, 100000L); + long sampleSize5 = mongoDBDatabaseSync.calculateSampleSize(1000000L, 0.2, 1000L, 100000L); + assertEquals(100, sampleSize1); + assertEquals(1000, sampleSize2); + assertEquals(1000, sampleSize3); + assertEquals(2000, sampleSize4); + assertEquals(100000, sampleSize5); + } +}