Skip to content

Commit f0bd655

Browse files
authored
feat: implemented an in-memory store for data plane (frain-dev#1932)
* feat: completed memorystore refactor * fix: removed debug lines * fix: ingest is working * fix: use context to synchronise and cancel all running consumers * fix: fixed loader bug * fix: fixed linter error: check for err * fix: use rlock for reading table * fix: fixed sqs to use ctx * fix: removed unused variable * add table tests
1 parent 3ac6302 commit f0bd655

21 files changed

+821
-1118
lines changed

cmd/ingest/ingest.go

+17-7
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
package ingest
22

33
import (
4-
"context"
5-
64
"github.com/frain-dev/convoy/config"
75
"github.com/frain-dev/convoy/database/postgres"
86
"github.com/frain-dev/convoy/internal/pkg/cli"
7+
"github.com/frain-dev/convoy/internal/pkg/memorystore"
98
"github.com/frain-dev/convoy/internal/pkg/pubsub"
109
"github.com/frain-dev/convoy/internal/pkg/server"
1110
"github.com/frain-dev/convoy/pkg/log"
@@ -56,13 +55,24 @@ func AddIngestCommand(a *cli.App) *cobra.Command {
5655

5756
lo.SetLevel(lvl)
5857

59-
sourcePool := pubsub.NewSourcePool(lo)
60-
sourceLoader := pubsub.NewSourceLoader(endpointRepo, sourceRepo, projectRepo, a.Queue, sourcePool, lo)
58+
sourceLoader := pubsub.NewSourceLoader(endpointRepo, sourceRepo, projectRepo, lo)
59+
sourceTable := memorystore.NewTable(memorystore.OptionSyncer(sourceLoader))
60+
61+
err = memorystore.DefaultStore.Register("sources", sourceTable)
62+
if err != nil {
63+
return err
64+
}
65+
66+
go memorystore.DefaultStore.Sync(cmd.Context(), interval)
67+
68+
ingest, err := pubsub.NewIngest(cmd.Context(), sourceTable, a.Queue, lo)
69+
if err != nil {
70+
return err
71+
}
6172

62-
stop := make(chan struct{})
63-
go sourceLoader.Run(context.Background(), interval, stop)
73+
go ingest.Run()
6474

65-
srv := server.NewServer(cfg.Server.HTTP.IngestPort, func() { stop <- struct{}{} })
75+
srv := server.NewServer(cfg.Server.HTTP.IngestPort, func() {})
6676
srv.SetHandler(chi.NewMux())
6777

6878
srv.Listen()

docs/docs.go

+1-130
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)