From 8ff0ae91ca77406b2c50e5b37f1af98d8c128c35 Mon Sep 17 00:00:00 2001 From: Kristaps Fabians Geikins Date: Thu, 23 Jan 2025 12:06:54 +0200 Subject: [PATCH 1/4] chore(server): refactor activityStream invocations - batch #3 - branches --- .../activitystream/events/branchListeners.ts | 97 +++++++++++++ .../server/modules/activitystream/index.ts | 6 + .../activitystream/services/branchActivity.ts | 128 ------------------ .../modules/cli/commands/download/project.ts | 7 +- .../modules/core/domain/branches/events.ts | 25 +++- .../modules/core/graph/resolvers/branches.ts | 24 +--- .../modules/core/graph/resolvers/models.ts | 23 +--- .../core/services/branch/management.ts | 93 ++++++++++--- .../modules/core/tests/branches.spec.ts | 15 +- .../server/modules/core/tests/commits.spec.ts | 7 +- .../core/tests/integration/subs.graph.spec.ts | 15 +- .../server/modules/core/tests/streams.spec.ts | 6 +- .../server/modules/cross-server-sync/index.ts | 7 +- packages/server/modules/fileuploads/index.ts | 11 +- .../fileuploads/services/resultListener.ts | 30 +++- .../test/speckle-helpers/branchHelper.ts | 10 +- 16 files changed, 256 insertions(+), 248 deletions(-) create mode 100644 packages/server/modules/activitystream/events/branchListeners.ts delete mode 100644 packages/server/modules/activitystream/services/branchActivity.ts diff --git a/packages/server/modules/activitystream/events/branchListeners.ts b/packages/server/modules/activitystream/events/branchListeners.ts new file mode 100644 index 0000000000..193e9d736f --- /dev/null +++ b/packages/server/modules/activitystream/events/branchListeners.ts @@ -0,0 +1,97 @@ +import { + AddBranchCreatedActivity, + AddBranchDeletedActivity, + AddBranchUpdatedActivity, + SaveActivity +} from '@/modules/activitystream/domain/operations' +import { ActionTypes, ResourceTypes } from '@/modules/activitystream/helpers/types' +import { ModelEvents } from '@/modules/core/domain/branches/events' +import { isBranchDeleteInput, isBranchUpdateInput } from '@/modules/core/helpers/branch' +import { EventBusListen } from '@/modules/shared/services/eventBus' + +/** + * Save "branch created" activity + */ +const addBranchCreatedActivityFactory = + ({ saveActivity }: { saveActivity: SaveActivity }): AddBranchCreatedActivity => + async (params) => { + const { branch } = params + + await saveActivity({ + streamId: branch.streamId, + resourceType: ResourceTypes.Branch, + resourceId: branch.id, + actionType: ActionTypes.Branch.Create, + userId: branch.authorId, + info: { branch }, + message: `Branch created: ${branch.name} (${branch.id})` + }) + } + +const addBranchUpdatedActivityFactory = + ({ saveActivity }: { saveActivity: SaveActivity }): AddBranchUpdatedActivity => + async (params) => { + const { update, userId, oldBranch } = params + + const streamId = isBranchUpdateInput(update) ? update.streamId : update.projectId + await saveActivity({ + streamId, + resourceType: ResourceTypes.Branch, + resourceId: update.id, + actionType: ActionTypes.Branch.Update, + userId, + info: { old: oldBranch, new: update }, + message: `Branch metadata changed for branch ${update.id}` + }) + } + +const addBranchDeletedActivityFactory = + ({ saveActivity }: { saveActivity: SaveActivity }): AddBranchDeletedActivity => + async (params) => { + const { input, userId, branchName } = params + + const streamId = isBranchDeleteInput(input) ? input.streamId : input.projectId + await Promise.all([ + saveActivity({ + streamId, + resourceType: ResourceTypes.Branch, + resourceId: input.id, + actionType: ActionTypes.Branch.Delete, + userId, + info: { branch: { ...input, name: branchName } }, + message: `Branch deleted: '${branchName}' (${input.id})` + }) + ]) + } + +export const reportBranchActivityFactory = + (deps: { eventListen: EventBusListen; saveActivity: SaveActivity }) => () => { + const addBranchCreatedActivity = addBranchCreatedActivityFactory(deps) + const addBranchUpdatedActivity = addBranchUpdatedActivityFactory(deps) + const addBranchDeletedActivity = addBranchDeletedActivityFactory(deps) + + const quitters = [ + deps.eventListen(ModelEvents.Created, async (payload) => { + await addBranchCreatedActivity({ branch: payload.payload.model }) + }), + deps.eventListen(ModelEvents.Updated, async ({ payload }) => { + await addBranchUpdatedActivity({ + update: payload.update, + userId: payload.userId, + oldBranch: payload.oldModel, + newBranch: payload.newModel + }) + }), + deps.eventListen(ModelEvents.Deleted, async ({ payload }) => { + await addBranchDeletedActivity({ + userId: payload.userId, + input: payload.input, + branchName: payload.model.name + }) + }) + ] + + return () => { + quitters.forEach((quit) => quit()) + } + } diff --git a/packages/server/modules/activitystream/index.ts b/packages/server/modules/activitystream/index.ts index 820d99d42a..7d8cc049cd 100644 --- a/packages/server/modules/activitystream/index.ts +++ b/packages/server/modules/activitystream/index.ts @@ -28,6 +28,7 @@ import { ServerInvitesEvents } from '@/modules/serverinvites/domain/events' import { ProjectEvents } from '@/modules/core/domain/projects/events' import { reportUserActivityFactory } from '@/modules/activitystream/events/userListeners' import { reportAccessRequestActivityFactory } from '@/modules/activitystream/events/accessRequestListeners' +import { reportBranchActivityFactory } from '@/modules/activitystream/events/branchListeners' let scheduledTask: ReturnType | null = null let quitEventListeners: Optional<() => void> = undefined @@ -52,10 +53,15 @@ const initializeEventListeners = ({ eventListen: eventBus.listen, saveActivity }) + const reportBranchActivity = reportBranchActivityFactory({ + eventListen: eventBus.listen, + saveActivity + }) const quitCbs = [ reportUserActivity(), reportAccessRequestActivity(), + reportBranchActivity(), eventBus.listen(ServerInvitesEvents.Created, async ({ payload }) => { if (!isProjectResourceTarget(payload.invite.resource)) return await onServerInviteCreatedFactory({ diff --git a/packages/server/modules/activitystream/services/branchActivity.ts b/packages/server/modules/activitystream/services/branchActivity.ts deleted file mode 100644 index 22ef8c0a23..0000000000 --- a/packages/server/modules/activitystream/services/branchActivity.ts +++ /dev/null @@ -1,128 +0,0 @@ -import { ActionTypes, ResourceTypes } from '@/modules/activitystream/helpers/types' -import { - BranchSubscriptions as BranchPubsubEvents, - PublishSubscription -} from '@/modules/shared/utils/subscriptions' -import { ProjectModelsUpdatedMessageType } from '@/modules/core/graph/generated/graphql' -import { ProjectSubscriptions } from '@/modules/shared/utils/subscriptions' -import { isBranchDeleteInput, isBranchUpdateInput } from '@/modules/core/helpers/branch' -import { - AddBranchCreatedActivity, - AddBranchDeletedActivity, - AddBranchUpdatedActivity, - SaveActivity -} from '@/modules/activitystream/domain/operations' - -/** - * Save "branch created" activity - */ -export const addBranchCreatedActivityFactory = - ({ - saveActivity, - publish - }: { - saveActivity: SaveActivity - publish: PublishSubscription - }): AddBranchCreatedActivity => - async (params) => { - const { branch } = params - - await Promise.all([ - saveActivity({ - streamId: branch.streamId, - resourceType: ResourceTypes.Branch, - resourceId: branch.id, - actionType: ActionTypes.Branch.Create, - userId: branch.authorId, - info: { branch }, - message: `Branch created: ${branch.name} (${branch.id})` - }), - publish(BranchPubsubEvents.BranchCreated, { - branchCreated: { ...branch }, - streamId: branch.streamId - }), - publish(ProjectSubscriptions.ProjectModelsUpdated, { - projectId: branch.streamId, - projectModelsUpdated: { - id: branch.id, - type: ProjectModelsUpdatedMessageType.Created, - model: branch - } - }) - ]) - } - -export const addBranchUpdatedActivityFactory = - ({ - saveActivity, - publish - }: { - saveActivity: SaveActivity - publish: PublishSubscription - }): AddBranchUpdatedActivity => - async (params) => { - const { update, userId, oldBranch, newBranch } = params - - const streamId = isBranchUpdateInput(update) ? update.streamId : update.projectId - await Promise.all([ - saveActivity({ - streamId, - resourceType: ResourceTypes.Branch, - resourceId: update.id, - actionType: ActionTypes.Branch.Update, - userId, - info: { old: oldBranch, new: update }, - message: `Branch metadata changed for branch ${update.id}` - }), - publish(BranchPubsubEvents.BranchUpdated, { - branchUpdated: { ...update }, - streamId, - branchId: update.id - }), - publish(ProjectSubscriptions.ProjectModelsUpdated, { - projectId: streamId, - projectModelsUpdated: { - model: newBranch, - id: newBranch.id, - type: ProjectModelsUpdatedMessageType.Updated - } - }) - ]) - } - -export const addBranchDeletedActivityFactory = - ({ - saveActivity, - publish - }: { - saveActivity: SaveActivity - publish: PublishSubscription - }): AddBranchDeletedActivity => - async (params) => { - const { input, userId, branchName } = params - - const streamId = isBranchDeleteInput(input) ? input.streamId : input.projectId - await Promise.all([ - saveActivity({ - streamId, - resourceType: ResourceTypes.Branch, - resourceId: input.id, - actionType: ActionTypes.Branch.Delete, - userId, - info: { branch: { ...input, name: branchName } }, - message: `Branch deleted: '${branchName}' (${input.id})` - }), - publish(BranchPubsubEvents.BranchDeleted, { - branchDeleted: input, - streamId - }), - publish(ProjectSubscriptions.ProjectModelsUpdated, { - projectId: streamId, - projectModelsUpdated: { - id: input.id, - type: ProjectModelsUpdatedMessageType.Deleted, - model: null - } - }) - ]) - } diff --git a/packages/server/modules/cli/commands/download/project.ts b/packages/server/modules/cli/commands/download/project.ts index 8789100a28..ae6fd5d74a 100644 --- a/packages/server/modules/cli/commands/download/project.ts +++ b/packages/server/modules/cli/commands/download/project.ts @@ -62,7 +62,6 @@ import { publish } from '@/modules/shared/utils/subscriptions' import { addCommitCreatedActivityFactory } from '@/modules/activitystream/services/commitActivity' import { getUserFactory } from '@/modules/core/repositories/users' import { createObjectFactory } from '@/modules/core/services/objects/management' -import { addBranchCreatedActivityFactory } from '@/modules/activitystream/services/branchActivity' import { authorizeResolver } from '@/modules/shared' import { Roles } from '@speckle/shared' import { getDefaultRegionFactory } from '@/modules/workspaces/repositories/regions' @@ -247,10 +246,8 @@ const command: CommandModule< createBranchAndNotify: createBranchAndNotifyFactory({ getStreamBranchByName, createBranch: createBranchFactory({ db: projectDb }), - addBranchCreatedActivity: addBranchCreatedActivityFactory({ - saveActivity: saveActivityFactory({ db: mainDb }), - publish - }) + publishSub: publish, + eventEmit: getEventBus().emit }) }) await downloadProject({ ...argv, regionKey }, { logger: cliLogger }) diff --git a/packages/server/modules/core/domain/branches/events.ts b/packages/server/modules/core/domain/branches/events.ts index a76bc7cda2..a1880d32eb 100644 --- a/packages/server/modules/core/domain/branches/events.ts +++ b/packages/server/modules/core/domain/branches/events.ts @@ -1,11 +1,32 @@ import { Model } from '@/modules/core/domain/branches/types' +import { + BranchDeleteInput, + BranchUpdateInput, + DeleteModelInput, + UpdateModelInput +} from '@/modules/core/graph/generated/graphql' export const modelEventsNamespace = 'models' as const export const ModelEvents = { - Deleted: `${modelEventsNamespace}.deleted` + Deleted: `${modelEventsNamespace}.deleted`, + Created: `${modelEventsNamespace}.created`, + Updated: `${modelEventsNamespace}.updated` } as const export type ModelEventsPayloads = { - [ModelEvents.Deleted]: { projectId: string; modelId: string; model: Model } + [ModelEvents.Deleted]: { + projectId: string + modelId: string + model: Model + userId: string + input: BranchDeleteInput | DeleteModelInput + } + [ModelEvents.Created]: { projectId: string; model: Model } + [ModelEvents.Updated]: { + update: BranchUpdateInput | UpdateModelInput + userId: string + oldModel: Model + newModel: Model + } } diff --git a/packages/server/modules/core/graph/resolvers/branches.ts b/packages/server/modules/core/graph/resolvers/branches.ts index 1da138c841..7708307266 100644 --- a/packages/server/modules/core/graph/resolvers/branches.ts +++ b/packages/server/modules/core/graph/resolvers/branches.ts @@ -4,7 +4,6 @@ import { updateBranchAndNotifyFactory, deleteBranchAndNotifyFactory } from '@/modules/core/services/branch/management' - import { Roles } from '@speckle/shared' import { getBranchByIdFactory, @@ -16,11 +15,6 @@ import { getStreamBranchCountFactory } from '@/modules/core/repositories/branches' import { db } from '@/db/knex' -import { - addBranchCreatedActivityFactory, - addBranchDeletedActivityFactory, - addBranchUpdatedActivityFactory -} from '@/modules/activitystream/services/branchActivity' import { getStreamFactory, markBranchStreamUpdatedFactory @@ -28,7 +22,6 @@ import { import { legacyGetUserFactory } from '@/modules/core/repositories/users' import { Resolvers } from '@/modules/core/graph/generated/graphql' import { getPaginatedStreamBranchesFactory } from '@/modules/core/services/branch/retrieval' -import { saveActivityFactory } from '@/modules/activitystream/repositories' import { filteredSubscribe, publish } from '@/modules/shared/utils/subscriptions' import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector' import { getEventBus } from '@/modules/shared/services/eventBus' @@ -89,10 +82,8 @@ export = { const createBranchAndNotify = createBranchAndNotifyFactory({ getStreamBranchByName, createBranch: createBranchFactory({ db: projectDB }), - addBranchCreatedActivity: addBranchCreatedActivityFactory({ - saveActivity: saveActivityFactory({ db }), - publish - }) + publishSub: publish, + eventEmit: getEventBus().emit }) const { id } = await createBranchAndNotify(args.branch, context.userId!) @@ -112,10 +103,8 @@ export = { const updateBranchAndNotify = updateBranchAndNotifyFactory({ getBranchById, updateBranch: updateBranchFactory({ db: projectDB }), - addBranchUpdatedActivity: addBranchUpdatedActivityFactory({ - saveActivity: saveActivityFactory({ db }), - publish - }) + publishSub: publish, + eventEmit: getEventBus().emit }) const newBranch = await updateBranchAndNotify(args.branch, context.userId!) return !!newBranch @@ -137,10 +126,7 @@ export = { getBranchById: getBranchByIdFactory({ db: projectDB }), emitEvent: getEventBus().emit, markBranchStreamUpdated, - addBranchDeletedActivity: addBranchDeletedActivityFactory({ - saveActivity: saveActivityFactory({ db }), - publish - }), + publishSub: publish, deleteBranchById: deleteBranchByIdFactory({ db: projectDB }) }) const deleted = await deleteBranchAndNotify(args.branch, context.userId!) diff --git a/packages/server/modules/core/graph/resolvers/models.ts b/packages/server/modules/core/graph/resolvers/models.ts index 0b50815533..84bb4edcab 100644 --- a/packages/server/modules/core/graph/resolvers/models.ts +++ b/packages/server/modules/core/graph/resolvers/models.ts @@ -50,16 +50,10 @@ import { legacyGetPaginatedStreamCommitsPageFactory } from '@/modules/core/repositories/commits' import { db } from '@/db/knex' -import { - addBranchCreatedActivityFactory, - addBranchDeletedActivityFactory, - addBranchUpdatedActivityFactory -} from '@/modules/activitystream/services/branchActivity' import { getStreamFactory, markBranchStreamUpdatedFactory } from '@/modules/core/repositories/streams' -import { saveActivityFactory } from '@/modules/activitystream/repositories' import { getProjectDbClient, getRegisteredRegionClients @@ -312,10 +306,8 @@ export = { const createBranchAndNotify = createBranchAndNotifyFactory({ getStreamBranchByName: getStreamBranchByNameFactory({ db: projectDB }), createBranch: createBranchFactory({ db: projectDB }), - addBranchCreatedActivity: addBranchCreatedActivityFactory({ - saveActivity: saveActivityFactory({ db }), - publish - }) + publishSub: publish, + eventEmit: getEventBus().emit }) return await createBranchAndNotify(args.input, ctx.userId!) }, @@ -330,10 +322,8 @@ export = { const updateBranchAndNotify = updateBranchAndNotifyFactory({ getBranchById: getBranchByIdFactory({ db: projectDB }), updateBranch: updateBranchFactory({ db: projectDB }), - addBranchUpdatedActivity: addBranchUpdatedActivityFactory({ - saveActivity: saveActivityFactory({ db }), - publish - }) + publishSub: publish, + eventEmit: getEventBus().emit }) return await updateBranchAndNotify(args.input, ctx.userId!) }, @@ -352,10 +342,7 @@ export = { getBranchById: getBranchByIdFactory({ db: projectDB }), emitEvent: getEventBus().emit, markBranchStreamUpdated, - addBranchDeletedActivity: addBranchDeletedActivityFactory({ - saveActivity: saveActivityFactory({ db }), - publish - }), + publishSub: publish, deleteBranchById: deleteBranchByIdFactory({ db: projectDB }) }) return await deleteBranchAndNotify(args.input, ctx.userId!) diff --git a/packages/server/modules/core/services/branch/management.ts b/packages/server/modules/core/services/branch/management.ts index 3b7aa76415..1e8c4a7c82 100644 --- a/packages/server/modules/core/services/branch/management.ts +++ b/packages/server/modules/core/services/branch/management.ts @@ -10,6 +10,7 @@ import { BranchUpdateInput, CreateModelInput, DeleteModelInput, + ProjectModelsUpdatedMessageType, UpdateModelInput } from '@/modules/core/graph/generated/graphql' import { BranchRecord } from '@/modules/core/helpers/types' @@ -29,13 +30,13 @@ import { GetStream, MarkBranchStreamUpdated } from '@/modules/core/domain/streams/operations' -import { - AddBranchCreatedActivity, - AddBranchDeletedActivity, - AddBranchUpdatedActivity -} from '@/modules/activitystream/domain/operations' import { EventBusEmit } from '@/modules/shared/services/eventBus' import { ModelEvents } from '@/modules/core/domain/branches/events' +import { + ProjectSubscriptions, + PublishSubscription +} from '@/modules/shared/utils/subscriptions' +import { BranchPubsubEvents } from '@/modules/shared' const isBranchCreateInput = ( i: BranchCreateInput | CreateModelInput @@ -45,7 +46,8 @@ export const createBranchAndNotifyFactory = (deps: { getStreamBranchByName: GetStreamBranchByName createBranch: StoreBranch - addBranchCreatedActivity: AddBranchCreatedActivity + eventEmit: EventBusEmit + publishSub: PublishSubscription }): CreateBranchAndNotify => async (input: BranchCreateInput | CreateModelInput, creatorId: string) => { const streamId = isBranchCreateInput(input) ? input.streamId : input.projectId @@ -60,7 +62,26 @@ export const createBranchAndNotifyFactory = streamId: isBranchCreateInput(input) ? input.streamId : input.projectId, authorId: creatorId }) - await deps.addBranchCreatedActivity({ branch }) + + await Promise.all([ + deps.eventEmit({ + eventName: ModelEvents.Created, + payload: { model: branch, projectId: branch.streamId } + }), + // TODO: Move to event bus listeners + deps.publishSub(BranchPubsubEvents.BranchCreated, { + branchCreated: { ...branch }, + streamId: branch.streamId + }), + deps.publishSub(ProjectSubscriptions.ProjectModelsUpdated, { + projectId: branch.streamId, + projectModelsUpdated: { + id: branch.id, + type: ProjectModelsUpdatedMessageType.Created, + model: branch + } + }) + ]) return branch } @@ -69,7 +90,8 @@ export const updateBranchAndNotifyFactory = (deps: { getBranchById: GetBranchById updateBranch: UpdateBranch - addBranchUpdatedActivity: AddBranchUpdatedActivity + publishSub: PublishSubscription + eventEmit: EventBusEmit }): UpdateBranchAndNotify => async (input: BranchUpdateInput | UpdateModelInput, userId: string) => { const streamId = isBranchUpdateInput(input) ? input.streamId : input.projectId @@ -113,12 +135,31 @@ export const updateBranchAndNotifyFactory = } if (newBranch) { - await deps.addBranchUpdatedActivity({ - update: input, - userId, - oldBranch: existingBranch, - newBranch - }) + await Promise.all([ + deps.eventEmit({ + eventName: ModelEvents.Updated, + payload: { + update: input, + userId, + oldModel: existingBranch, + newModel: newBranch + } + }), + // TODO: Move to event bus listeners + deps.publishSub(BranchPubsubEvents.BranchUpdated, { + branchUpdated: { ...input }, + streamId, + branchId: input.id + }), + deps.publishSub(ProjectSubscriptions.ProjectModelsUpdated, { + projectId: streamId, + projectModelsUpdated: { + model: newBranch, + id: newBranch.id, + type: ProjectModelsUpdatedMessageType.Updated + } + }) + ]) } return newBranch @@ -130,8 +171,8 @@ export const deleteBranchAndNotifyFactory = getBranchById: GetBranchById emitEvent: EventBusEmit markBranchStreamUpdated: MarkBranchStreamUpdated - addBranchDeletedActivity: AddBranchDeletedActivity deleteBranchById: DeleteBranchById + publishSub: PublishSubscription }): DeleteBranchAndNotify => async (input: BranchDeleteInput | DeleteModelInput, userId: string) => { const streamId = isBranchDeleteInput(input) ? input.streamId : input.projectId @@ -167,18 +208,28 @@ export const deleteBranchAndNotifyFactory = const isDeleted = !!(await deps.deleteBranchById(existingBranch.id)) if (isDeleted) { await Promise.all([ - deps.addBranchDeletedActivity({ - input, - userId, - branchName: existingBranch.name - }), deps.markBranchStreamUpdated(input.id), deps.emitEvent({ eventName: ModelEvents.Deleted, payload: { modelId: existingBranch.id, model: existingBranch, - projectId: streamId + projectId: streamId, + input, + userId + } + }), + // TODO: Move to event bus listeners + deps.publishSub(BranchPubsubEvents.BranchDeleted, { + branchDeleted: input, + streamId + }), + deps.publishSub(ProjectSubscriptions.ProjectModelsUpdated, { + projectId: streamId, + projectModelsUpdated: { + id: input.id, + type: ProjectModelsUpdatedMessageType.Deleted, + model: null } }) ]) diff --git a/packages/server/modules/core/tests/branches.spec.ts b/packages/server/modules/core/tests/branches.spec.ts index a0fc7e2235..f5717d71a1 100644 --- a/packages/server/modules/core/tests/branches.spec.ts +++ b/packages/server/modules/core/tests/branches.spec.ts @@ -23,10 +23,6 @@ import { getPaginatedStreamBranchesPageFactory, getStreamBranchCountFactory } from '@/modules/core/repositories/branches' -import { - addBranchUpdatedActivityFactory, - addBranchDeletedActivityFactory -} from '@/modules/activitystream/services/branchActivity' import { getStreamFactory, createStreamFactory, @@ -104,20 +100,15 @@ const createBranch = createBranchFactory({ db: knex }) const updateBranchAndNotify = updateBranchAndNotifyFactory({ getBranchById: getBranchByIdFactory({ db: knex }), updateBranch: updateBranchFactory({ db: knex }), - addBranchUpdatedActivity: addBranchUpdatedActivityFactory({ - saveActivity: saveActivityFactory({ db }), - publish - }) + publishSub: publish, + eventEmit: getEventBus().emit }) const deleteBranchAndNotify = deleteBranchAndNotifyFactory({ getStream, getBranchById: getBranchByIdFactory({ db: knex }), emitEvent: getEventBus().emit, markBranchStreamUpdated, - addBranchDeletedActivity: addBranchDeletedActivityFactory({ - saveActivity: saveActivityFactory({ db }), - publish - }), + publishSub: publish, deleteBranchById: deleteBranchByIdFactory({ db: knex }) }) diff --git a/packages/server/modules/core/tests/commits.spec.ts b/packages/server/modules/core/tests/commits.spec.ts index 5d22b2605c..0622be85aa 100644 --- a/packages/server/modules/core/tests/commits.spec.ts +++ b/packages/server/modules/core/tests/commits.spec.ts @@ -91,7 +91,6 @@ import { getPaginatedBranchCommitsItemsByNameFactory } from '@/modules/core/services/commit/retrieval' import { createObjectFactory } from '@/modules/core/services/objects/management' -import { addBranchCreatedActivityFactory } from '@/modules/activitystream/services/branchActivity' import { ensureError } from '@speckle/shared' import { VersionEvents } from '@/modules/core/domain/commits/events' @@ -105,10 +104,8 @@ const createBranch = createBranchFactory({ db }) const createBranchAndNotify = createBranchAndNotifyFactory({ createBranch, getStreamBranchByName: getStreamBranchByNameFactory({ db }), - addBranchCreatedActivity: addBranchCreatedActivityFactory({ - saveActivity: saveActivityFactory({ db }), - publish - }) + publishSub: publish, + eventEmit: getEventBus().emit }) const getCommit = getCommitFactory({ db }) const deleteCommitAndNotify = deleteCommitAndNotifyFactory({ diff --git a/packages/server/modules/core/tests/integration/subs.graph.spec.ts b/packages/server/modules/core/tests/integration/subs.graph.spec.ts index 2ea9839401..83bb1dd0b3 100644 --- a/packages/server/modules/core/tests/integration/subs.graph.spec.ts +++ b/packages/server/modules/core/tests/integration/subs.graph.spec.ts @@ -1,10 +1,6 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ import { db } from '@/db/knex' import { saveActivityFactory } from '@/modules/activitystream/repositories' -import { - addBranchDeletedActivityFactory, - addBranchUpdatedActivityFactory -} from '@/modules/activitystream/services/branchActivity' import { addCommitDeletedActivityFactory, addCommitUpdatedActivityFactory @@ -159,10 +155,8 @@ const buildUpdateModel = async (params: { projectId: string }) => { const updateBranchAndNotify = updateBranchAndNotifyFactory({ getBranchById: getBranchByIdFactory({ db: projectDB }), updateBranch: updateBranchFactory({ db: projectDB }), - addBranchUpdatedActivity: addBranchUpdatedActivityFactory({ - saveActivity: saveActivityFactory({ db }), - publish - }) + publishSub: publish, + eventEmit: getEventBus().emit }) return updateBranchAndNotify } @@ -179,10 +173,7 @@ const buildDeleteModel = async (params: { projectId: string }) => { getBranchById: getBranchByIdFactory({ db: projectDB }), emitEvent: getEventBus().emit, markBranchStreamUpdated, - addBranchDeletedActivity: addBranchDeletedActivityFactory({ - saveActivity: saveActivityFactory({ db }), - publish - }), + publishSub: publish, deleteBranchById: deleteBranchByIdFactory({ db: projectDB }) }) return deleteBranchAndNotify diff --git a/packages/server/modules/core/tests/streams.spec.ts b/packages/server/modules/core/tests/streams.spec.ts index 69f3588f4d..d864cdd4e8 100644 --- a/packages/server/modules/core/tests/streams.spec.ts +++ b/packages/server/modules/core/tests/streams.spec.ts @@ -100,7 +100,6 @@ import { import { changeUserRoleFactory } from '@/modules/core/services/users/management' import { getServerInfoFactory } from '@/modules/core/repositories/server' import { createObjectFactory } from '@/modules/core/services/objects/management' -import { addBranchDeletedActivityFactory } from '@/modules/activitystream/services/branchActivity' const getServerInfo = getServerInfoFactory({ db }) const getUser = getUserFactory({ db }) @@ -115,10 +114,7 @@ const deleteBranchAndNotify = deleteBranchAndNotifyFactory({ getBranchById: getBranchByIdFactory({ db }), emitEvent: getEventBus().emit, markBranchStreamUpdated, - addBranchDeletedActivity: addBranchDeletedActivityFactory({ - saveActivity: saveActivityFactory({ db }), - publish - }), + publishSub: publish, deleteBranchById: deleteBranchByIdFactory({ db }) }) diff --git a/packages/server/modules/cross-server-sync/index.ts b/packages/server/modules/cross-server-sync/index.ts index 99c428765c..9c33a7a514 100644 --- a/packages/server/modules/cross-server-sync/index.ts +++ b/packages/server/modules/cross-server-sync/index.ts @@ -1,7 +1,6 @@ import { db } from '@/db/knex' import { moduleLogger, crossServerSyncLogger } from '@/logging/logging' import { saveActivityFactory } from '@/modules/activitystream/repositories' -import { addBranchCreatedActivityFactory } from '@/modules/activitystream/services/branchActivity' import { addCommentCreatedActivityFactory, addReplyAddedActivityFactory @@ -196,10 +195,8 @@ const crossServerSyncModule: SpeckleModule = { createBranchAndNotify: createBranchAndNotifyFactory({ createBranch: createBranchFactory({ db }), getStreamBranchByName, - addBranchCreatedActivity: addBranchCreatedActivityFactory({ - saveActivity: saveActivityFactory({ db }), - publish - }) + publishSub: publish, + eventEmit: getEventBus().emit }) }), markOnboardingBaseStream diff --git a/packages/server/modules/fileuploads/index.ts b/packages/server/modules/fileuploads/index.ts index 619f2cc2f4..0966ec919b 100644 --- a/packages/server/modules/fileuploads/index.ts +++ b/packages/server/modules/fileuploads/index.ts @@ -19,11 +19,10 @@ import { streamWritePermissionsPipelineFactory } from '@/modules/shared/authz' import { getRolesFactory } from '@/modules/shared/repositories/roles' import { getStreamBranchByNameFactory } from '@/modules/core/repositories/branches' import { getStreamFactory } from '@/modules/core/repositories/streams' -import { addBranchCreatedActivityFactory } from '@/modules/activitystream/services/branchActivity' -import { saveActivityFactory } from '@/modules/activitystream/repositories' import { getPort } from '@/modules/shared/helpers/envHelper' import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector' import { listenFor } from '@/modules/core/utils/dbNotificationListener' +import { getEventBus } from '@/modules/shared/services/eventBus' export const init: SpeckleModule['init'] = async (app, isInitial) => { if (process.env.DISABLE_FILE_UPLOADS) { @@ -120,6 +119,9 @@ export const init: SpeckleModule['init'] = async (app, isInitial) => { } ) + // FIXME: Fix the type issue + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-expect-error req.pipe(pipedReq) } ) @@ -133,10 +135,7 @@ export const init: SpeckleModule['init'] = async (app, isInitial) => { getFileInfo: getFileInfoFactory({ db: projectDb }), publish, getStreamBranchByName: getStreamBranchByNameFactory({ db: projectDb }), - addBranchCreatedActivity: addBranchCreatedActivityFactory({ - publish, - saveActivity: saveActivityFactory({ db }) - }) + eventEmit: getEventBus().emit })(parsedMessage) }) listenFor('file_import_started', async (msg) => { diff --git a/packages/server/modules/fileuploads/services/resultListener.ts b/packages/server/modules/fileuploads/services/resultListener.ts index 8dfbad4a8a..5e859efde1 100644 --- a/packages/server/modules/fileuploads/services/resultListener.ts +++ b/packages/server/modules/fileuploads/services/resultListener.ts @@ -1,22 +1,26 @@ import { FileImportSubscriptions, + ProjectSubscriptions, publish, type PublishSubscription } from '@/modules/shared/utils/subscriptions' import { ProjectFileImportUpdatedMessageType, + ProjectModelsUpdatedMessageType, ProjectPendingModelsUpdatedMessageType, ProjectPendingVersionsUpdatedMessageType } from '@/modules/core/graph/generated/graphql' import { GetFileInfo } from '@/modules/fileuploads/domain/operations' import { GetStreamBranchByName } from '@/modules/core/domain/branches/operations' -import { AddBranchCreatedActivity } from '@/modules/activitystream/domain/operations' +import { EventBusEmit } from '@/modules/shared/services/eventBus' +import { ModelEvents } from '@/modules/core/domain/branches/events' +import { BranchPubsubEvents } from '@/modules/shared' type OnFileImportProcessedDeps = { getFileInfo: GetFileInfo getStreamBranchByName: GetStreamBranchByName publish: PublishSubscription - addBranchCreatedActivity: AddBranchCreatedActivity + eventEmit: EventBusEmit } type ParsedMessage = { @@ -55,7 +59,27 @@ export const onFileImportProcessedFactory = projectId: upload.streamId }) - if (branch) await deps.addBranchCreatedActivity({ branch }) + if (branch) { + await Promise.all([ + deps.eventEmit({ + eventName: ModelEvents.Created, + payload: { model: branch, projectId: branch.streamId } + }), + // TODO: Move to event bus listeners + deps.publish(BranchPubsubEvents.BranchCreated, { + branchCreated: { ...branch }, + streamId: branch.streamId + }), + deps.publish(ProjectSubscriptions.ProjectModelsUpdated, { + projectId: branch.streamId, + projectModelsUpdated: { + id: branch.id, + type: ProjectModelsUpdatedMessageType.Created, + model: branch + } + }) + ]) + } } else { await deps.publish(FileImportSubscriptions.ProjectPendingVersionsUpdated, { projectPendingVersionsUpdated: { diff --git a/packages/server/test/speckle-helpers/branchHelper.ts b/packages/server/test/speckle-helpers/branchHelper.ts index f21f7b9be1..5377f19013 100644 --- a/packages/server/test/speckle-helpers/branchHelper.ts +++ b/packages/server/test/speckle-helpers/branchHelper.ts @@ -1,12 +1,10 @@ -import { db } from '@/db/knex' -import { saveActivityFactory } from '@/modules/activitystream/repositories' -import { addBranchCreatedActivityFactory } from '@/modules/activitystream/services/branchActivity' import { createBranchFactory, getStreamBranchByNameFactory } from '@/modules/core/repositories/branches' import { createBranchAndNotifyFactory } from '@/modules/core/services/branch/management' import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector' +import { getEventBus } from '@/modules/shared/services/eventBus' import { publish } from '@/modules/shared/utils/subscriptions' import { BasicTestUser } from '@/test/authHelper' import { BasicTestStream } from '@/test/speckle-helpers/streamHelper' @@ -44,10 +42,8 @@ export async function createTestBranch(params: { const createBranchAndNotify = createBranchAndNotifyFactory({ getStreamBranchByName: getStreamBranchByNameFactory({ db: projectDb }), createBranch: createBranchFactory({ db: projectDb }), - addBranchCreatedActivity: addBranchCreatedActivityFactory({ - saveActivity: saveActivityFactory({ db }), - publish - }) + publishSub: publish, + eventEmit: getEventBus().emit }) const id = ( From a8c9f1edf1401e90e9bce50c5ac1c33d87d5541e Mon Sep 17 00:00:00 2001 From: Kristaps Fabians Geikins Date: Thu, 23 Jan 2025 14:13:58 +0200 Subject: [PATCH 2/4] undo ts check --- packages/server/modules/fileuploads/index.ts | 3 --- 1 file changed, 3 deletions(-) diff --git a/packages/server/modules/fileuploads/index.ts b/packages/server/modules/fileuploads/index.ts index 0966ec919b..fed58356df 100644 --- a/packages/server/modules/fileuploads/index.ts +++ b/packages/server/modules/fileuploads/index.ts @@ -119,9 +119,6 @@ export const init: SpeckleModule['init'] = async (app, isInitial) => { } ) - // FIXME: Fix the type issue - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - // @ts-expect-error req.pipe(pipedReq) } ) From d9bdc545992bdf1fbda668ab83776a6b55e2148b Mon Sep 17 00:00:00 2001 From: Kristaps Fabians Geikins Date: Fri, 24 Jan 2025 16:49:55 +0200 Subject: [PATCH 3/4] chore(server): moving out branch sub reporting to separate listeners --- .../modules/cli/commands/download/project.ts | 1 - .../core/events/subscriptionListeners.ts | 87 +++++++++++++++++++ .../modules/core/graph/resolvers/branches.ts | 5 +- .../modules/core/graph/resolvers/models.ts | 6 +- packages/server/modules/core/index.ts | 9 ++ .../core/services/branch/management.ts | 79 +++-------------- .../modules/core/tests/branches.spec.ts | 2 - .../server/modules/core/tests/commits.spec.ts | 1 - .../core/tests/integration/subs.graph.spec.ts | 2 - .../server/modules/core/tests/streams.spec.ts | 1 - .../server/modules/cross-server-sync/index.ts | 1 - .../fileuploads/services/resultListener.ts | 26 +----- .../server/modules/shared/helpers/factory.ts | 8 ++ .../test/speckle-helpers/branchHelper.ts | 2 - 14 files changed, 123 insertions(+), 107 deletions(-) create mode 100644 packages/server/modules/core/events/subscriptionListeners.ts create mode 100644 packages/server/modules/shared/helpers/factory.ts diff --git a/packages/server/modules/cli/commands/download/project.ts b/packages/server/modules/cli/commands/download/project.ts index ae6fd5d74a..b614137efb 100644 --- a/packages/server/modules/cli/commands/download/project.ts +++ b/packages/server/modules/cli/commands/download/project.ts @@ -246,7 +246,6 @@ const command: CommandModule< createBranchAndNotify: createBranchAndNotifyFactory({ getStreamBranchByName, createBranch: createBranchFactory({ db: projectDb }), - publishSub: publish, eventEmit: getEventBus().emit }) }) diff --git a/packages/server/modules/core/events/subscriptionListeners.ts b/packages/server/modules/core/events/subscriptionListeners.ts new file mode 100644 index 0000000000..6c693b11e4 --- /dev/null +++ b/packages/server/modules/core/events/subscriptionListeners.ts @@ -0,0 +1,87 @@ +import { ModelEvents } from '@/modules/core/domain/branches/events' +import { ProjectModelsUpdatedMessageType } from '@/modules/core/graph/generated/graphql' +import { BranchPubsubEvents } from '@/modules/shared' +import { EventBusListen, EventPayload } from '@/modules/shared/services/eventBus' +import { + ProjectSubscriptions, + PublishSubscription +} from '@/modules/shared/utils/subscriptions' + +const reportModelCreatedFactory = + (deps: { publish: PublishSubscription }) => + async (payload: EventPayload) => { + const { model } = payload.payload + + await Promise.all([ + deps.publish(BranchPubsubEvents.BranchCreated, { + branchCreated: { ...model }, + streamId: model.streamId + }), + deps.publish(ProjectSubscriptions.ProjectModelsUpdated, { + projectId: model.streamId, + projectModelsUpdated: { + id: model.id, + type: ProjectModelsUpdatedMessageType.Created, + model + } + }) + ]) + } + +const reportModelUpdatedFactory = + (deps: { publish: PublishSubscription }) => + async (payload: EventPayload) => { + const { newModel, update } = payload.payload + + await Promise.all([ + deps.publish(BranchPubsubEvents.BranchUpdated, { + branchUpdated: { ...update }, + streamId: newModel.streamId, + branchId: newModel.id + }), + deps.publish(ProjectSubscriptions.ProjectModelsUpdated, { + projectId: newModel.streamId, + projectModelsUpdated: { + model: newModel, + id: newModel.id, + type: ProjectModelsUpdatedMessageType.Updated + } + }) + ]) + } + +const reportModelDeletedFactory = + (deps: { publish: PublishSubscription }) => + async (payload: EventPayload) => { + const { input, projectId } = payload.payload + + await Promise.all([ + deps.publish(BranchPubsubEvents.BranchDeleted, { + branchDeleted: input, + streamId: projectId + }), + deps.publish(ProjectSubscriptions.ProjectModelsUpdated, { + projectId, + projectModelsUpdated: { + id: input.id, + type: ProjectModelsUpdatedMessageType.Deleted, + model: null + } + }) + ]) + } + +export const reportSubscriptionEventsFactory = + (deps: { eventListen: EventBusListen; publish: PublishSubscription }) => () => { + const reportModelCreated = reportModelCreatedFactory(deps) + const reportModelUpdated = reportModelUpdatedFactory(deps) + const reportModelDeleted = reportModelDeletedFactory(deps) + + const quitCbs = [ + deps.eventListen(ModelEvents.Created, reportModelCreated), + deps.eventListen(ModelEvents.Updated, reportModelUpdated), + deps.eventListen(ModelEvents.Deleted, reportModelDeleted) + ] + + return () => quitCbs.forEach((quit) => quit()) + } diff --git a/packages/server/modules/core/graph/resolvers/branches.ts b/packages/server/modules/core/graph/resolvers/branches.ts index 7708307266..509c39566d 100644 --- a/packages/server/modules/core/graph/resolvers/branches.ts +++ b/packages/server/modules/core/graph/resolvers/branches.ts @@ -22,7 +22,7 @@ import { import { legacyGetUserFactory } from '@/modules/core/repositories/users' import { Resolvers } from '@/modules/core/graph/generated/graphql' import { getPaginatedStreamBranchesFactory } from '@/modules/core/services/branch/retrieval' -import { filteredSubscribe, publish } from '@/modules/shared/utils/subscriptions' +import { filteredSubscribe } from '@/modules/shared/utils/subscriptions' import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector' import { getEventBus } from '@/modules/shared/services/eventBus' @@ -82,7 +82,6 @@ export = { const createBranchAndNotify = createBranchAndNotifyFactory({ getStreamBranchByName, createBranch: createBranchFactory({ db: projectDB }), - publishSub: publish, eventEmit: getEventBus().emit }) const { id } = await createBranchAndNotify(args.branch, context.userId!) @@ -103,7 +102,6 @@ export = { const updateBranchAndNotify = updateBranchAndNotifyFactory({ getBranchById, updateBranch: updateBranchFactory({ db: projectDB }), - publishSub: publish, eventEmit: getEventBus().emit }) const newBranch = await updateBranchAndNotify(args.branch, context.userId!) @@ -126,7 +124,6 @@ export = { getBranchById: getBranchByIdFactory({ db: projectDB }), emitEvent: getEventBus().emit, markBranchStreamUpdated, - publishSub: publish, deleteBranchById: deleteBranchByIdFactory({ db: projectDB }) }) const deleted = await deleteBranchAndNotify(args.branch, context.userId!) diff --git a/packages/server/modules/core/graph/resolvers/models.ts b/packages/server/modules/core/graph/resolvers/models.ts index 84bb4edcab..20e829db5f 100644 --- a/packages/server/modules/core/graph/resolvers/models.ts +++ b/packages/server/modules/core/graph/resolvers/models.ts @@ -20,8 +20,7 @@ import { } from '@/modules/core/services/commit/retrieval' import { filteredSubscribe, - ProjectSubscriptions, - publish + ProjectSubscriptions } from '@/modules/shared/utils/subscriptions' import { createBranchFactory, @@ -306,7 +305,6 @@ export = { const createBranchAndNotify = createBranchAndNotifyFactory({ getStreamBranchByName: getStreamBranchByNameFactory({ db: projectDB }), createBranch: createBranchFactory({ db: projectDB }), - publishSub: publish, eventEmit: getEventBus().emit }) return await createBranchAndNotify(args.input, ctx.userId!) @@ -322,7 +320,6 @@ export = { const updateBranchAndNotify = updateBranchAndNotifyFactory({ getBranchById: getBranchByIdFactory({ db: projectDB }), updateBranch: updateBranchFactory({ db: projectDB }), - publishSub: publish, eventEmit: getEventBus().emit }) return await updateBranchAndNotify(args.input, ctx.userId!) @@ -342,7 +339,6 @@ export = { getBranchById: getBranchByIdFactory({ db: projectDB }), emitEvent: getEventBus().emit, markBranchStreamUpdated, - publishSub: publish, deleteBranchById: deleteBranchByIdFactory({ db: projectDB }) }) return await deleteBranchAndNotify(args.input, ctx.userId!) diff --git a/packages/server/modules/core/index.ts b/packages/server/modules/core/index.ts index e45cc5569b..700b250097 100644 --- a/packages/server/modules/core/index.ts +++ b/packages/server/modules/core/index.ts @@ -19,6 +19,9 @@ import db from '@/db/knex' import { registerOrUpdateRole } from '@/modules/shared/repositories/roles' import { isTestEnv } from '@/modules/shared/helpers/envHelper' import { HooksConfig, Hook, ExecuteHooks } from '@/modules/core/hooks' +import { reportSubscriptionEventsFactory } from '@/modules/core/events/subscriptionListeners' +import { getEventBus } from '@/modules/shared/services/eventBus' +import { publish } from '@/modules/shared/utils/subscriptions' let stopTestSubs: (() => void) | undefined = undefined @@ -75,6 +78,12 @@ const coreModule: SpeckleModule<{ const { startEmittingTestSubs } = await import('@/test/graphqlHelper') stopTestSubs = await startEmittingTestSubs() } + + // Setup GQL sub emits + reportSubscriptionEventsFactory({ + eventListen: getEventBus().listen, + publish + })() } }, async shutdown() { diff --git a/packages/server/modules/core/services/branch/management.ts b/packages/server/modules/core/services/branch/management.ts index 1e8c4a7c82..144313d1cc 100644 --- a/packages/server/modules/core/services/branch/management.ts +++ b/packages/server/modules/core/services/branch/management.ts @@ -10,7 +10,6 @@ import { BranchUpdateInput, CreateModelInput, DeleteModelInput, - ProjectModelsUpdatedMessageType, UpdateModelInput } from '@/modules/core/graph/generated/graphql' import { BranchRecord } from '@/modules/core/helpers/types' @@ -32,11 +31,6 @@ import { } from '@/modules/core/domain/streams/operations' import { EventBusEmit } from '@/modules/shared/services/eventBus' import { ModelEvents } from '@/modules/core/domain/branches/events' -import { - ProjectSubscriptions, - PublishSubscription -} from '@/modules/shared/utils/subscriptions' -import { BranchPubsubEvents } from '@/modules/shared' const isBranchCreateInput = ( i: BranchCreateInput | CreateModelInput @@ -47,7 +41,6 @@ export const createBranchAndNotifyFactory = getStreamBranchByName: GetStreamBranchByName createBranch: StoreBranch eventEmit: EventBusEmit - publishSub: PublishSubscription }): CreateBranchAndNotify => async (input: BranchCreateInput | CreateModelInput, creatorId: string) => { const streamId = isBranchCreateInput(input) ? input.streamId : input.projectId @@ -63,25 +56,10 @@ export const createBranchAndNotifyFactory = authorId: creatorId }) - await Promise.all([ - deps.eventEmit({ - eventName: ModelEvents.Created, - payload: { model: branch, projectId: branch.streamId } - }), - // TODO: Move to event bus listeners - deps.publishSub(BranchPubsubEvents.BranchCreated, { - branchCreated: { ...branch }, - streamId: branch.streamId - }), - deps.publishSub(ProjectSubscriptions.ProjectModelsUpdated, { - projectId: branch.streamId, - projectModelsUpdated: { - id: branch.id, - type: ProjectModelsUpdatedMessageType.Created, - model: branch - } - }) - ]) + await deps.eventEmit({ + eventName: ModelEvents.Created, + payload: { model: branch, projectId: branch.streamId } + }) return branch } @@ -90,7 +68,6 @@ export const updateBranchAndNotifyFactory = (deps: { getBranchById: GetBranchById updateBranch: UpdateBranch - publishSub: PublishSubscription eventEmit: EventBusEmit }): UpdateBranchAndNotify => async (input: BranchUpdateInput | UpdateModelInput, userId: string) => { @@ -135,31 +112,15 @@ export const updateBranchAndNotifyFactory = } if (newBranch) { - await Promise.all([ - deps.eventEmit({ - eventName: ModelEvents.Updated, - payload: { - update: input, - userId, - oldModel: existingBranch, - newModel: newBranch - } - }), - // TODO: Move to event bus listeners - deps.publishSub(BranchPubsubEvents.BranchUpdated, { - branchUpdated: { ...input }, - streamId, - branchId: input.id - }), - deps.publishSub(ProjectSubscriptions.ProjectModelsUpdated, { - projectId: streamId, - projectModelsUpdated: { - model: newBranch, - id: newBranch.id, - type: ProjectModelsUpdatedMessageType.Updated - } - }) - ]) + await deps.eventEmit({ + eventName: ModelEvents.Updated, + payload: { + update: input, + userId, + oldModel: existingBranch, + newModel: newBranch + } + }) } return newBranch @@ -172,7 +133,6 @@ export const deleteBranchAndNotifyFactory = emitEvent: EventBusEmit markBranchStreamUpdated: MarkBranchStreamUpdated deleteBranchById: DeleteBranchById - publishSub: PublishSubscription }): DeleteBranchAndNotify => async (input: BranchDeleteInput | DeleteModelInput, userId: string) => { const streamId = isBranchDeleteInput(input) ? input.streamId : input.projectId @@ -218,19 +178,6 @@ export const deleteBranchAndNotifyFactory = input, userId } - }), - // TODO: Move to event bus listeners - deps.publishSub(BranchPubsubEvents.BranchDeleted, { - branchDeleted: input, - streamId - }), - deps.publishSub(ProjectSubscriptions.ProjectModelsUpdated, { - projectId: streamId, - projectModelsUpdated: { - id: input.id, - type: ProjectModelsUpdatedMessageType.Deleted, - model: null - } }) ]) } diff --git a/packages/server/modules/core/tests/branches.spec.ts b/packages/server/modules/core/tests/branches.spec.ts index f5717d71a1..a791822b74 100644 --- a/packages/server/modules/core/tests/branches.spec.ts +++ b/packages/server/modules/core/tests/branches.spec.ts @@ -100,7 +100,6 @@ const createBranch = createBranchFactory({ db: knex }) const updateBranchAndNotify = updateBranchAndNotifyFactory({ getBranchById: getBranchByIdFactory({ db: knex }), updateBranch: updateBranchFactory({ db: knex }), - publishSub: publish, eventEmit: getEventBus().emit }) const deleteBranchAndNotify = deleteBranchAndNotifyFactory({ @@ -108,7 +107,6 @@ const deleteBranchAndNotify = deleteBranchAndNotifyFactory({ getBranchById: getBranchByIdFactory({ db: knex }), emitEvent: getEventBus().emit, markBranchStreamUpdated, - publishSub: publish, deleteBranchById: deleteBranchByIdFactory({ db: knex }) }) diff --git a/packages/server/modules/core/tests/commits.spec.ts b/packages/server/modules/core/tests/commits.spec.ts index 0622be85aa..d56ad37615 100644 --- a/packages/server/modules/core/tests/commits.spec.ts +++ b/packages/server/modules/core/tests/commits.spec.ts @@ -104,7 +104,6 @@ const createBranch = createBranchFactory({ db }) const createBranchAndNotify = createBranchAndNotifyFactory({ createBranch, getStreamBranchByName: getStreamBranchByNameFactory({ db }), - publishSub: publish, eventEmit: getEventBus().emit }) const getCommit = getCommitFactory({ db }) diff --git a/packages/server/modules/core/tests/integration/subs.graph.spec.ts b/packages/server/modules/core/tests/integration/subs.graph.spec.ts index 83bb1dd0b3..29dd504fbf 100644 --- a/packages/server/modules/core/tests/integration/subs.graph.spec.ts +++ b/packages/server/modules/core/tests/integration/subs.graph.spec.ts @@ -155,7 +155,6 @@ const buildUpdateModel = async (params: { projectId: string }) => { const updateBranchAndNotify = updateBranchAndNotifyFactory({ getBranchById: getBranchByIdFactory({ db: projectDB }), updateBranch: updateBranchFactory({ db: projectDB }), - publishSub: publish, eventEmit: getEventBus().emit }) return updateBranchAndNotify @@ -173,7 +172,6 @@ const buildDeleteModel = async (params: { projectId: string }) => { getBranchById: getBranchByIdFactory({ db: projectDB }), emitEvent: getEventBus().emit, markBranchStreamUpdated, - publishSub: publish, deleteBranchById: deleteBranchByIdFactory({ db: projectDB }) }) return deleteBranchAndNotify diff --git a/packages/server/modules/core/tests/streams.spec.ts b/packages/server/modules/core/tests/streams.spec.ts index d864cdd4e8..0b560e925d 100644 --- a/packages/server/modules/core/tests/streams.spec.ts +++ b/packages/server/modules/core/tests/streams.spec.ts @@ -114,7 +114,6 @@ const deleteBranchAndNotify = deleteBranchAndNotifyFactory({ getBranchById: getBranchByIdFactory({ db }), emitEvent: getEventBus().emit, markBranchStreamUpdated, - publishSub: publish, deleteBranchById: deleteBranchByIdFactory({ db }) }) diff --git a/packages/server/modules/cross-server-sync/index.ts b/packages/server/modules/cross-server-sync/index.ts index 9c33a7a514..40b990252a 100644 --- a/packages/server/modules/cross-server-sync/index.ts +++ b/packages/server/modules/cross-server-sync/index.ts @@ -195,7 +195,6 @@ const crossServerSyncModule: SpeckleModule = { createBranchAndNotify: createBranchAndNotifyFactory({ createBranch: createBranchFactory({ db }), getStreamBranchByName, - publishSub: publish, eventEmit: getEventBus().emit }) }), diff --git a/packages/server/modules/fileuploads/services/resultListener.ts b/packages/server/modules/fileuploads/services/resultListener.ts index 5e859efde1..7f3b6aa57b 100644 --- a/packages/server/modules/fileuploads/services/resultListener.ts +++ b/packages/server/modules/fileuploads/services/resultListener.ts @@ -1,12 +1,10 @@ import { FileImportSubscriptions, - ProjectSubscriptions, publish, type PublishSubscription } from '@/modules/shared/utils/subscriptions' import { ProjectFileImportUpdatedMessageType, - ProjectModelsUpdatedMessageType, ProjectPendingModelsUpdatedMessageType, ProjectPendingVersionsUpdatedMessageType } from '@/modules/core/graph/generated/graphql' @@ -14,7 +12,6 @@ import { GetFileInfo } from '@/modules/fileuploads/domain/operations' import { GetStreamBranchByName } from '@/modules/core/domain/branches/operations' import { EventBusEmit } from '@/modules/shared/services/eventBus' import { ModelEvents } from '@/modules/core/domain/branches/events' -import { BranchPubsubEvents } from '@/modules/shared' type OnFileImportProcessedDeps = { getFileInfo: GetFileInfo @@ -60,25 +57,10 @@ export const onFileImportProcessedFactory = }) if (branch) { - await Promise.all([ - deps.eventEmit({ - eventName: ModelEvents.Created, - payload: { model: branch, projectId: branch.streamId } - }), - // TODO: Move to event bus listeners - deps.publish(BranchPubsubEvents.BranchCreated, { - branchCreated: { ...branch }, - streamId: branch.streamId - }), - deps.publish(ProjectSubscriptions.ProjectModelsUpdated, { - projectId: branch.streamId, - projectModelsUpdated: { - id: branch.id, - type: ProjectModelsUpdatedMessageType.Created, - model: branch - } - }) - ]) + await deps.eventEmit({ + eventName: ModelEvents.Created, + payload: { model: branch, projectId: branch.streamId } + }) } } else { await deps.publish(FileImportSubscriptions.ProjectPendingVersionsUpdated, { diff --git a/packages/server/modules/shared/helpers/factory.ts b/packages/server/modules/shared/helpers/factory.ts new file mode 100644 index 0000000000..fe7dead6dc --- /dev/null +++ b/packages/server/modules/shared/helpers/factory.ts @@ -0,0 +1,8 @@ +/* eslint-disable @typescript-eslint/no-explicit-any */ +type Factory = ( + deps: Deps +) => (...args: Args[]) => ReturnType + +export type DependenciesOf = F extends Factory + ? Deps + : never diff --git a/packages/server/test/speckle-helpers/branchHelper.ts b/packages/server/test/speckle-helpers/branchHelper.ts index 5377f19013..35a673d781 100644 --- a/packages/server/test/speckle-helpers/branchHelper.ts +++ b/packages/server/test/speckle-helpers/branchHelper.ts @@ -5,7 +5,6 @@ import { import { createBranchAndNotifyFactory } from '@/modules/core/services/branch/management' import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector' import { getEventBus } from '@/modules/shared/services/eventBus' -import { publish } from '@/modules/shared/utils/subscriptions' import { BasicTestUser } from '@/test/authHelper' import { BasicTestStream } from '@/test/speckle-helpers/streamHelper' import { omit } from 'lodash' @@ -42,7 +41,6 @@ export async function createTestBranch(params: { const createBranchAndNotify = createBranchAndNotifyFactory({ getStreamBranchByName: getStreamBranchByNameFactory({ db: projectDb }), createBranch: createBranchFactory({ db: projectDb }), - publishSub: publish, eventEmit: getEventBus().emit }) From 5d1a46d541078f452b953d123be20b33cfa2deb2 Mon Sep 17 00:00:00 2001 From: Kristaps Fabians Geikins Date: Fri, 24 Jan 2025 16:55:11 +0200 Subject: [PATCH 4/4] minor adjustment --- .../modules/core/events/subscriptionListeners.ts | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/packages/server/modules/core/events/subscriptionListeners.ts b/packages/server/modules/core/events/subscriptionListeners.ts index 6c693b11e4..68c76c9390 100644 --- a/packages/server/modules/core/events/subscriptionListeners.ts +++ b/packages/server/modules/core/events/subscriptionListeners.ts @@ -1,6 +1,7 @@ import { ModelEvents } from '@/modules/core/domain/branches/events' import { ProjectModelsUpdatedMessageType } from '@/modules/core/graph/generated/graphql' import { BranchPubsubEvents } from '@/modules/shared' +import { DependenciesOf } from '@/modules/shared/helpers/factory' import { EventBusListen, EventPayload } from '@/modules/shared/services/eventBus' import { ProjectSubscriptions, @@ -72,7 +73,15 @@ const reportModelDeletedFactory = } export const reportSubscriptionEventsFactory = - (deps: { eventListen: EventBusListen; publish: PublishSubscription }) => () => { + ( + deps: { + eventListen: EventBusListen + publish: PublishSubscription + } & DependenciesOf & + DependenciesOf & + DependenciesOf + ) => + () => { const reportModelCreated = reportModelCreatedFactory(deps) const reportModelUpdated = reportModelUpdatedFactory(deps) const reportModelDeleted = reportModelDeletedFactory(deps)