Skip to content

Commit

Permalink
[improve](cdc) Optimize MongoCDC sampleSize calculation logic (#542)
Browse files Browse the repository at this point in the history
  • Loading branch information
xuqinghuang authored Jan 13, 2025
1 parent 1a92f2a commit 067eaf1
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@ public class MongoDBDatabaseSync extends DatabaseSync {
.defaultValue(0.2)
.withDescription("mongo cdc sample percent");

public static final ConfigOption<Long> MONGO_CDC_MIN_SAMPLE_SIZE =
ConfigOptions.key("schema.min-sample-size")
.longType()
.defaultValue(1000L)
.withDescription("mongo cdc min sample size");

public static final ConfigOption<Long> MONGO_CDC_MAX_SAMPLE_SIZE =
ConfigOptions.key("schema.max-sample-size")
.longType()
.defaultValue(100000L)
.withDescription("mongo cdc max sample size");

public static final ConfigOption<String> TABLE_NAME =
ConfigOptions.key("table-name")
.stringType()
Expand Down Expand Up @@ -101,6 +113,8 @@ public List<SourceSchema> getSchemaList() throws Exception {

MongoClientSettings settings = settingsBuilder.build();
Double samplePercent = config.get(MONGO_CDC_CREATE_SAMPLE_PERCENT);
Long minSampleSize = config.get(MONGO_CDC_MIN_SAMPLE_SIZE);
Long maxSampleSize = config.get(MONGO_CDC_MAX_SAMPLE_SIZE);
try (MongoClient mongoClient = MongoClients.create(settings)) {
MongoDatabase mongoDatabase = mongoClient.getDatabase(databaseName);
MongoIterable<String> collectionNames = mongoDatabase.listCollectionNames();
Expand All @@ -115,7 +129,10 @@ public List<SourceSchema> getSchemaList() throws Exception {
}

long totalDocuments = collection.estimatedDocumentCount();
long sampleSize = (long) Math.ceil(totalDocuments * samplePercent);
long sampleSize =
calculateSampleSize(
totalDocuments, samplePercent, minSampleSize, maxSampleSize);

ArrayList<Document> documents = sampleData(collection, sampleSize);
MongoDBSchema mongoDBSchema =
new MongoDBSchema(documents, databaseName, collectionName, null);
Expand All @@ -127,6 +144,19 @@ public List<SourceSchema> getSchemaList() throws Exception {
return schemaList;
}

public long calculateSampleSize(
long totalDocuments, Double samplePercent, Long minSampleSize, Long maxSampleSize) {
if (totalDocuments < minSampleSize) {
return totalDocuments;
}
long sampleSize = (long) Math.ceil(totalDocuments * samplePercent);
// If the number of samples is less than the minimum threshold, the minimum threshold is
// used, while ensuring that the number of samples does not exceed the maximum threshold
sampleSize = Math.max(sampleSize, minSampleSize);
sampleSize = Math.min(sampleSize, maxSampleSize);
return sampleSize;
}

private ArrayList<Document> sampleData(MongoCollection<Document> collection, Long sampleNum) {
ArrayList<Document> query = new ArrayList<>();
query.add(new Document("$sample", new Document("size", sampleNum)));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink.tools.cdc.mongodb;

import org.junit.Before;
import org.junit.Test;

import java.sql.SQLException;

import static org.junit.Assert.assertEquals;

public class MongoDBDatabaseSyncTest {
private MongoDBDatabaseSync mongoDBDatabaseSync;

@Before
public void init() throws SQLException {
mongoDBDatabaseSync = new MongoDBDatabaseSync();
}

@Test
public void testCalculateSampleSize() {
long sampleSize1 = mongoDBDatabaseSync.calculateSampleSize(100L, 0.2, 1000L, 100000L);
long sampleSize2 = mongoDBDatabaseSync.calculateSampleSize(1000L, 0.2, 1000L, 100000L);
long sampleSize3 = mongoDBDatabaseSync.calculateSampleSize(2000L, 0.2, 1000L, 100000L);
long sampleSize4 = mongoDBDatabaseSync.calculateSampleSize(10000L, 0.2, 1000L, 100000L);
long sampleSize5 = mongoDBDatabaseSync.calculateSampleSize(1000000L, 0.2, 1000L, 100000L);
assertEquals(100, sampleSize1);
assertEquals(1000, sampleSize2);
assertEquals(1000, sampleSize3);
assertEquals(2000, sampleSize4);
assertEquals(100000, sampleSize5);
}
}

0 comments on commit 067eaf1

Please sign in to comment.