Skip to content

Commit

Permalink
Let ElasticsearchManager create its managed MongoDB collections dyn…
Browse files Browse the repository at this point in the history
…amically
  • Loading branch information
JamesChenX committed Jun 10, 2024
1 parent 8e75e94 commit 317f321
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -281,46 +281,51 @@ private Mono<Void> fullSyncIfEnabled(
return TurmsMongoClient.of(userUseCaseProperties.getMongo(), "elasticsearch")
.flatMap(mongoClient -> {
mongoClient.registerEntitiesByClasses(SyncLog.class);
return mongoClient.findMany(SyncLog.class,
Filter.newBuilder(1)
.inIfNotNullForEnumStrings(SyncLog.Fields.status,
List.of(SyncStatus.IN_PROGRESS,
SyncStatus.COMPLETED)))
.collect(CollectorUtil.toChunkedList())
.flatMap(syncLogs -> performFullSyncs(mongoClient,
syncLogs,
isUserDocsFullSyncEnabled,
isGroupDocsFullSyncEnabled));
return mongoClient.createCollectionIfNotExists(SyncLog.class)
.then(mongoClient.findMany(SyncLog.class,
Filter.newBuilder(1)
.inIfNotNullForEnumStrings(SyncLog.Fields.status,
List.of(SyncStatus.IN_PROGRESS,
SyncStatus.COMPLETED)))
.collect(CollectorUtil.toChunkedList())
.flatMap(syncLogs -> performFullSyncs(mongoClient,
syncLogs,
isUserDocsFullSyncEnabled,
isGroupDocsFullSyncEnabled)));
});
}

Mono<Void> performFullSyncForUserDocs = Mono.defer(() -> TurmsMongoClient
.of(userUseCaseProperties.getMongo(), "elasticsearch-for-user-docs")
.flatMap(mongoClient -> {
mongoClient.registerEntitiesByClasses(SyncLog.class);
return mongoClient.findMany(SyncLog.class,
Filter.newBuilder(1)
.inIfNotNullForEnumStrings(SyncLog.Fields.status,
List.of(SyncStatus.IN_PROGRESS, SyncStatus.COMPLETED)))
.collect(CollectorUtil.toChunkedList())
.flatMap(syncLogs -> performFullSyncs(mongoClient,
syncLogs,
true,
false));
return mongoClient.createCollectionIfNotExists(SyncLog.class)
.then(mongoClient.findMany(SyncLog.class,
Filter.newBuilder(1)
.inIfNotNullForEnumStrings(SyncLog.Fields.status,
List.of(SyncStatus.IN_PROGRESS,
SyncStatus.COMPLETED)))
.collect(CollectorUtil.toChunkedList())
.flatMap(syncLogs -> performFullSyncs(mongoClient,
syncLogs,
true,
false)));
}));
Mono<Void> performFullSyncForGroupDocs = Mono.defer(() -> TurmsMongoClient
.of(groupUseCaseProperties.getMongo(), "elasticsearch-for-group-docs")
.flatMap(mongoClient -> {
mongoClient.registerEntitiesByClasses(SyncLog.class);
return mongoClient.findMany(SyncLog.class,
Filter.newBuilder(1)
.inIfNotNullForEnumStrings(SyncLog.Fields.status,
List.of(SyncStatus.IN_PROGRESS, SyncStatus.COMPLETED)))
.collect(CollectorUtil.toChunkedList())
.flatMap(syncLogs -> performFullSyncs(mongoClient,
syncLogs,
false,
true));
return mongoClient.createCollectionIfNotExists(SyncLog.class)
.then(mongoClient.findMany(SyncLog.class,
Filter.newBuilder(1)
.inIfNotNullForEnumStrings(SyncLog.Fields.status,
List.of(SyncStatus.IN_PROGRESS,
SyncStatus.COMPLETED)))
.collect(CollectorUtil.toChunkedList())
.flatMap(syncLogs -> performFullSyncs(mongoClient,
syncLogs,
false,
true)));
}));
if (isUserDocsFullSyncEnabled && isGroupDocsFullSyncEnabled) {
return performFullSyncForUserDocs.then(performFullSyncForGroupDocs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@
import im.turms.service.domain.user.po.UserRelationshipGroupMember;
import im.turms.service.domain.user.po.UserSettings;
import im.turms.service.domain.user.po.UserVersion;
import im.turms.service.storage.elasticsearch.mongo.SyncLog;

import static im.turms.server.common.infra.property.env.service.env.database.TieredStorageProperties.StorageTierProperties;

Expand All @@ -99,10 +98,11 @@ public class MongoCollectionInitializer implements IMongoCollectionInitializer {
private static final Logger LOGGER = LoggerFactory.getLogger(MongoCollectionInitializer.class);

private final TurmsMongoClient adminMongoClient;
private final TurmsMongoClient userMongoClient;
private final TurmsMongoClient groupMongoClient;
private final TurmsMongoClient conferenceMongoClient;
private final TurmsMongoClient conversationMongoClient;
private final TurmsMongoClient groupMongoClient;
private final TurmsMongoClient messageMongoClient;
private final TurmsMongoClient userMongoClient;
private final List<TurmsMongoClient> clients;

private final TurmsApplicationContext context;
Expand All @@ -118,20 +118,22 @@ public class MongoCollectionInitializer implements IMongoCollectionInitializer {
public MongoCollectionInitializer(
@Lazy Node node,
TurmsMongoClient adminMongoClient,
TurmsMongoClient userMongoClient,
TurmsMongoClient groupMongoClient,
TurmsMongoClient conferenceMongoClient,
TurmsMongoClient conversationMongoClient,
TurmsMongoClient groupMongoClient,
TurmsMongoClient messageMongoClient,
TurmsMongoClient userMongoClient,
PasswordManager passwordManager,
TaskManager taskManager,
TurmsApplicationContext context,
TurmsPropertiesManager propertiesManager) {
this.node = node;
this.adminMongoClient = adminMongoClient;
this.userMongoClient = userMongoClient;
this.groupMongoClient = groupMongoClient;
this.conferenceMongoClient = conferenceMongoClient;
this.conversationMongoClient = conversationMongoClient;
this.groupMongoClient = groupMongoClient;
this.messageMongoClient = messageMongoClient;
this.userMongoClient = userMongoClient;
clients = List.of(adminMongoClient,
userMongoClient,
groupMongoClient,
Expand Down Expand Up @@ -238,15 +240,17 @@ private Mono<Boolean> createCollectionsIfNotExist() {
groupMongoClient.createCollectionIfNotExists(GroupType.class),
groupMongoClient.createCollectionIfNotExists(GroupVersion.class),

messageMongoClient.createCollectionIfNotExists(Meeting.class),
conferenceMongoClient.createCollectionIfNotExists(Meeting.class),

messageMongoClient.createCollectionIfNotExists(Message.class),

conversationMongoClient.createCollectionIfNotExists(ConversationSettings.class),
conversationMongoClient.createCollectionIfNotExists(GroupConversation.class),
conversationMongoClient.createCollectionIfNotExists(PrivateConversation.class),

conversationMongoClient.createCollectionIfNotExists(SyncLog.class),
// ElasticsearchManager has its own logic to create collections dynamically,
// so we don't need to create collections for it.
// mongoClient.createCollectionIfNotExists(SyncLog.class),

userMongoClient.createCollectionIfNotExists(User.class),
userMongoClient.createCollectionIfNotExists(UserFriendRequest.class),
Expand Down

0 comments on commit 317f321

Please sign in to comment.