diff --git a/internal/app/appfx/module.go b/internal/app/appfx/module.go index 4499037b..64ec61a4 100644 --- a/internal/app/appfx/module.go +++ b/internal/app/appfx/module.go @@ -1,7 +1,6 @@ package appfx import ( - "github.com/bitmagnet-io/bitmagnet/internal/app/cmd/searchcmd" "github.com/bitmagnet-io/bitmagnet/internal/app/cmd/torrentcmd" "github.com/bitmagnet-io/bitmagnet/internal/blocking/blockingfx" "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/app/boilerplateappfx" @@ -9,6 +8,7 @@ import ( "github.com/bitmagnet-io/bitmagnet/internal/classifier/classifierfx" "github.com/bitmagnet-io/bitmagnet/internal/database/databasefx" "github.com/bitmagnet-io/bitmagnet/internal/database/migrations" + "github.com/bitmagnet-io/bitmagnet/internal/database/search/warmer" "github.com/bitmagnet-io/bitmagnet/internal/dhtcrawler/dhtcrawlerfx" "github.com/bitmagnet-io/bitmagnet/internal/gql/gqlfx" "github.com/bitmagnet-io/bitmagnet/internal/importer/importerfx" @@ -43,10 +43,12 @@ func New() fx.Option { versionfx.New(), // cli commands: fx.Provide( - searchcmd.New, torrentcmd.New, ), fx.Provide(webui.New), fx.Decorate(migrations.NewDecorator), + fx.Decorate( + warmer.NewDecorator, + ), ) } diff --git a/internal/app/cmd/searchcmd/command.go b/internal/app/cmd/searchcmd/command.go deleted file mode 100644 index c916e503..00000000 --- a/internal/app/cmd/searchcmd/command.go +++ /dev/null @@ -1,111 +0,0 @@ -// This command isn't currently intended to be usable, it's more of a testbed for trying things out, but may become user-friendly in future. - -package searchcmd - -import ( - "encoding/json" - "github.com/bitmagnet-io/bitmagnet/internal/database/query" - "github.com/bitmagnet-io/bitmagnet/internal/database/search" - "github.com/urfave/cli/v2" - "go.uber.org/fx" - "go.uber.org/zap" -) - -type Params struct { - fx.In - Search search.Search - Logger *zap.SugaredLogger -} - -type Result struct { - fx.Out - Command *cli.Command `group:"commands"` -} - -func New(p Params) (Result, error) { - cmd := &cli.Command{ - Name: "search", - Subcommands: []*cli.Command{ - { - Name: "torrents", - Flags: []cli.Flag{ - &cli.StringFlag{ - Name: "query", - }, - &cli.UintFlag{ - Name: "limit", - Value: 10, - }, - &cli.UintFlag{ - Name: "offset", - }, - &cli.StringFlag{ - Name: "releaseDate", - }, - }, - Action: func(ctx *cli.Context) error { - result, searchErr := p.Search.TorrentContent( - ctx.Context, - search.TorrentContentDefaultOption(), - query.QueryString(ctx.String("query")), - //search.Where( - // search.ContentReleaseDateCriteriaString(ctx.String("releaseDate")), - //), - query.Limit(ctx.Uint("limit")), - query.Offset(ctx.Uint("offset")), - query.WithFacet( - //search.ReleaseYearFacet( - // query.FacetHasFilter(query.FacetFilter{ - // "2022": {}, - // //"null": {}, - // }), - // query.FacetIsAggregated(), - //), - //search.Video3dFacet( - // query.FacetIsAggregated(), - //), - //search.VideoCodecFacet( - // query.FacetIsAggregated(), - //), - //search.VideoModifierFacet( - // query.FacetIsAggregated(), - //), - //search.VideoResolutionFacet( - // query.FacetIsAggregated(), - //), - //search.VideoSourceFacet( - // query.FacetIsAggregated(), - //), - search.TorrentContentGenreFacet( - query.FacetHasFilter(query.FacetFilter{ - "tmdb:10751": {}, - "tmdb:14": {}, - }), - query.FacetIsAggregated(), - ), - ), - query.OrderByQueryStringRank(), - //query.Filter(query.FacetFilter{ - // search.MovieGenreAggregatorKey: { - // //"tmdb:10751": {}, - // //"tmdb:14": {}, - // //"tmdb:35": {}, - // }, - //}), - //search.OrderByColumn("torrents.created_at", true), - ) - if searchErr != nil { - return searchErr - } - jsonResult, jsonErr := json.Marshal(result) - if jsonErr != nil { - return jsonErr - } - p.Logger.Infof("Result: %v", string(jsonResult)) - return nil - }, - }, - }, - } - return Result{Command: cmd}, nil -} diff --git a/internal/app/cmd/torrentcmd/command.go b/internal/app/cmd/torrentcmd/command.go index f0a9bc00..beb8220a 100644 --- a/internal/app/cmd/torrentcmd/command.go +++ b/internal/app/cmd/torrentcmd/command.go @@ -1,6 +1,7 @@ package torrentcmd import ( + "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/lazy" "github.com/bitmagnet-io/bitmagnet/internal/classifier" "github.com/bitmagnet-io/bitmagnet/internal/protocol" "github.com/bitmagnet-io/bitmagnet/internal/protocol/metainfo/metainforequester" @@ -13,7 +14,7 @@ import ( type Params struct { fx.In MetaInfoRequester metainforequester.Requester - Classifier classifier.Classifier + Classifier lazy.Lazy[classifier.Classifier] Logger *zap.SugaredLogger } @@ -34,11 +35,15 @@ func New(p Params) (Result, error) { }, }, Action: func(ctx *cli.Context) error { + c, err := p.Classifier.Get() + if err != nil { + return err + } infoHash, err := protocol.ParseID(ctx.String("infoHash")) if err != nil { return err } - return p.Classifier.Classify(ctx.Context, infoHash) + return c.Classify(ctx.Context, infoHash) }, }, { diff --git a/internal/blocking/factory.go b/internal/blocking/factory.go index a7ebe54d..3866d7a8 100644 --- a/internal/blocking/factory.go +++ b/internal/blocking/factory.go @@ -2,6 +2,7 @@ package blocking import ( "context" + "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/lazy" "github.com/bitmagnet-io/bitmagnet/internal/database/dao" "github.com/bitmagnet-io/bitmagnet/internal/protocol" "go.uber.org/fx" @@ -10,27 +11,35 @@ import ( type Params struct { fx.In - Dao *dao.Query + Dao lazy.Lazy[*dao.Query] } type Result struct { fx.Out - Manager Manager + Manager lazy.Lazy[Manager] AppHook fx.Hook `group:"app_hooks"` } func New(params Params) Result { - m := &manager{ - dao: params.Dao, - buffer: make(map[protocol.ID]struct{}, 1000), - maxBufferSize: 1000, - maxFlushWait: time.Minute * 5, - } + lazyManager := lazy.New[Manager](func() (Manager, error) { + d, err := params.Dao.Get() + if err != nil { + return nil, err + } + return &manager{ + dao: d, + buffer: make(map[protocol.ID]struct{}, 1000), + maxBufferSize: 1000, + maxFlushWait: time.Minute * 5, + }, nil + }) return Result{ - Manager: m, + Manager: lazyManager, AppHook: fx.Hook{ OnStop: func(ctx context.Context) error { - return m.Flush(ctx) + return lazyManager.IfInitialized(func(m Manager) error { + return m.Flush(ctx) + }) }, }, } diff --git a/internal/boilerplate/httpserver/server.go b/internal/boilerplate/httpserver/server.go index c1f11d9f..4aae8286 100644 --- a/internal/boilerplate/httpserver/server.go +++ b/internal/boilerplate/httpserver/server.go @@ -24,53 +24,53 @@ type Params struct { type Result struct { fx.Out - Gin *gin.Engine - Server *http.Server Worker worker.Worker `group:"workers"` } -func New(p Params) (r Result, err error) { - gin.SetMode(p.Config.GinMode) - g := gin.New() - g.Use(ginzap.Ginzap(p.Logger.Named("gin"), time.RFC3339, true), gin.Recovery()) - options, optionsErr := resolveOptions(p.Config.Options, p.Options) - if optionsErr != nil { - err = optionsErr - return - } - for _, o := range options { - if buildErr := o.Apply(g); buildErr != nil { - err = buildErr - return - } - } - s := &http.Server{ - Addr: p.Config.LocalAddress, - Handler: g.Handler(), - } - r.Worker = worker.NewWorker( - "http_server", - fx.Hook{ - OnStart: func(ctx context.Context) error { - ln, listenErr := net.Listen("tcp", s.Addr) - if listenErr != nil { - return listenErr - } - go (func() { - serveErr := s.Serve(ln) - if !errors.Is(serveErr, http.ErrServerClosed) { - panic(serveErr) +func New(p Params) Result { + var s *http.Server + return Result{ + Worker: worker.NewWorker( + "http_server", + fx.Hook{ + OnStart: func(ctx context.Context) error { + gin.SetMode(p.Config.GinMode) + g := gin.New() + g.Use(ginzap.Ginzap(p.Logger.Named("gin"), time.RFC3339, true), gin.Recovery()) + options, optionsErr := resolveOptions(p.Config.Options, p.Options) + if optionsErr != nil { + return optionsErr } - })() - return nil - }, - OnStop: func(ctx context.Context) error { - return s.Shutdown(ctx) + for _, o := range options { + if buildErr := o.Apply(g); buildErr != nil { + return buildErr + } + } + s = &http.Server{ + Addr: p.Config.LocalAddress, + Handler: g.Handler(), + } + ln, listenErr := net.Listen("tcp", s.Addr) + if listenErr != nil { + return listenErr + } + go (func() { + serveErr := s.Serve(ln) + if !errors.Is(serveErr, http.ErrServerClosed) { + panic(serveErr) + } + })() + return nil + }, + OnStop: func(ctx context.Context) error { + if s == nil { + return nil + } + return s.Shutdown(ctx) + }, }, - }, - ) - r.Gin = g - return + ), + } } func resolveOptions(param []string, options []Option) ([]Option, error) { diff --git a/internal/boilerplate/lazy/lazy.go b/internal/boilerplate/lazy/lazy.go new file mode 100644 index 00000000..2bded523 --- /dev/null +++ b/internal/boilerplate/lazy/lazy.go @@ -0,0 +1,40 @@ +package lazy + +import "sync" + +type Lazy[T any] interface { + Get() (T, error) + // IfInitialized calls the given function if the value has been initialized (useful for shutdown logic) + IfInitialized(func(T) error) error +} + +func New[T any](fn func() (T, error)) Lazy[T] { + return &lazy[T]{fn: fn} +} + +type lazy[T any] struct { + fn func() (T, error) + mtx sync.Mutex + v T + err error + done bool +} + +func (l *lazy[T]) Get() (T, error) { + l.mtx.Lock() + defer l.mtx.Unlock() + if !l.done { + l.v, l.err = l.fn() + l.done = true + } + return l.v, l.err +} + +func (l *lazy[T]) IfInitialized(fn func(T) error) error { + l.mtx.Lock() + defer l.mtx.Unlock() + if l.done { + return fn(l.v) + } + return nil +} diff --git a/internal/boilerplate/worker/worker.go b/internal/boilerplate/worker/worker.go index 80452e2f..d9fc63e4 100644 --- a/internal/boilerplate/worker/worker.go +++ b/internal/boilerplate/worker/worker.go @@ -21,7 +21,7 @@ type RegistryResult struct { Registry Registry } -func NewRegistry(p RegistryParams) (RegistryResult, error) { +func NewRegistry(p RegistryParams) RegistryResult { r := ®istry{ mutex: &sync.RWMutex{}, workers: make(map[string]Worker), @@ -30,7 +30,7 @@ func NewRegistry(p RegistryParams) (RegistryResult, error) { for _, w := range p.Workers { r.workers[w.Key()] = w } - return RegistryResult{Registry: r}, nil + return RegistryResult{Registry: r} } type Registry interface { @@ -41,17 +41,21 @@ type Registry interface { DisableAll() Start(ctx context.Context) error Stop(ctx context.Context) error + Decorate(name string, fn Decorator) error } type Worker interface { Key() string Enabled() bool Started() bool + Decorate(Decorator) Worker _hook() fx.Hook setEnabled(enabled bool) setStarted(started bool) } +type Decorator func(fx.Hook) fx.Hook + type worker struct { key string hook fx.Hook @@ -78,6 +82,16 @@ func (w *worker) Started() bool { return w.started } +func (w *worker) Decorate(fn Decorator) Worker { + return &worker{ + key: w.key, + hook: fn(fx.Hook{ + OnStart: w.hook.OnStart, + OnStop: w.hook.OnStop, + }), + } +} + func (w *worker) _hook() fx.Hook { return w.hook } @@ -195,3 +209,13 @@ func (r *registry) Stop(ctx context.Context) error { } return nil } + +func (r *registry) Decorate(name string, fn Decorator) error { + r.mutex.Lock() + defer r.mutex.Unlock() + if w, ok := r.workers[name]; ok { + r.workers[name] = w.Decorate(fn) + return nil + } + return fmt.Errorf("worker %s not found", name) +} diff --git a/internal/classifier/asynq/consumer/consumer.go b/internal/classifier/asynq/consumer/consumer.go index 8f64a5a5..4a05484e 100644 --- a/internal/classifier/asynq/consumer/consumer.go +++ b/internal/classifier/asynq/consumer/consumer.go @@ -2,6 +2,7 @@ package consumer import ( "context" + "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/lazy" "github.com/bitmagnet-io/bitmagnet/internal/classifier" "github.com/bitmagnet-io/bitmagnet/internal/classifier/asynq/message" "github.com/bitmagnet-io/bitmagnet/internal/queue/consumer" @@ -10,23 +11,29 @@ import ( type Params struct { fx.In - Classifier classifier.Classifier + Classifier lazy.Lazy[classifier.Classifier] } type Result struct { fx.Out - Consumer consumer.Consumer `group:"queue_consumers"` + Consumer lazy.Lazy[consumer.Consumer] `group:"queue_consumers"` } -func New(p Params) (Result, error) { +func New(p Params) Result { return Result{ - Consumer: consumer.New[message.ClassifyTorrentPayload]( - message.ClassifyTorrentTypename, - cns{ - p.Classifier, - }, - ), - }, nil + Consumer: lazy.New(func() (consumer.Consumer, error) { + cl, err := p.Classifier.Get() + if err != nil { + return nil, err + } + return consumer.New[message.ClassifyTorrentPayload]( + message.ClassifyTorrentTypename, + cns{ + cl, + }, + ), nil + }), + } } type cns struct { diff --git a/internal/classifier/asynq/publisher/publisher.go b/internal/classifier/asynq/publisher/publisher.go index 516e2f71..70360bd9 100644 --- a/internal/classifier/asynq/publisher/publisher.go +++ b/internal/classifier/asynq/publisher/publisher.go @@ -1,6 +1,7 @@ package publisher import ( + "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/lazy" "github.com/bitmagnet-io/bitmagnet/internal/classifier/asynq/message" "github.com/bitmagnet-io/bitmagnet/internal/queue/producer" "github.com/bitmagnet-io/bitmagnet/internal/queue/publisher" @@ -10,17 +11,23 @@ import ( type Params struct { fx.In - Client *asynq.Client + Client lazy.Lazy[*asynq.Client] Producer producer.Producer[message.ClassifyTorrentPayload] } type Result struct { fx.Out - Publisher publisher.Publisher[message.ClassifyTorrentPayload] + Publisher lazy.Lazy[publisher.Publisher[message.ClassifyTorrentPayload]] } func New(p Params) Result { return Result{ - Publisher: publisher.New[message.ClassifyTorrentPayload](p.Client, p.Producer), + Publisher: lazy.New(func() (publisher.Publisher[message.ClassifyTorrentPayload], error) { + client, err := p.Client.Get() + if err != nil { + return nil, err + } + return publisher.New[message.ClassifyTorrentPayload](client, p.Producer), nil + }), } } diff --git a/internal/classifier/factory.go b/internal/classifier/factory.go index 8c74650a..4a31526e 100644 --- a/internal/classifier/factory.go +++ b/internal/classifier/factory.go @@ -1,24 +1,26 @@ package classifier import ( - "github.com/bitmagnet-io/bitmagnet/internal/database/dao" - "github.com/bitmagnet-io/bitmagnet/internal/database/search" - "github.com/prometheus/client_golang/prometheus" - "go.uber.org/fx" - "go.uber.org/zap" + "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/lazy" + "github.com/bitmagnet-io/bitmagnet/internal/database/dao" + "github.com/bitmagnet-io/bitmagnet/internal/database/search" + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/fx" + "go.uber.org/zap" + "sort" ) type Params struct { fx.In - Search search.Search - SubResolvers []SubResolver `group:"content_resolvers"` - Dao *dao.Query + Search lazy.Lazy[search.Search] + SubResolvers []lazy.Lazy[SubResolver] `group:"content_resolvers"` + Dao lazy.Lazy[*dao.Query] Logger *zap.SugaredLogger } type Result struct { fx.Out - Classifier Classifier + Classifier lazy.Lazy[Classifier] Duration prometheus.Collector `group:"prometheus_collectors"` SuccessTotal prometheus.Collector `group:"prometheus_collectors"` NoMatchTotal prometheus.Collector `group:"prometheus_collectors"` @@ -26,16 +28,37 @@ type Result struct { } func New(p Params) Result { - collector := newPrometheusCollectorResolver(resolver{ - subResolvers: p.SubResolvers, - logger: p.Logger.Named("content_classifier"), - }) + collector := newPrometheusCollector() return Result{ - Classifier: classifier{ - resolver: collector, - dao: p.Dao, - search: p.Search, - }, + Classifier: lazy.New(func() (Classifier, error) { + s, err := p.Search.Get() + if err != nil { + return classifier{}, err + } + d, err := p.Dao.Get() + if err != nil { + return classifier{}, err + } + subResolvers := make([]SubResolver, 0, len(p.SubResolvers)) + for _, subResolver := range p.SubResolvers { + r, err := subResolver.Get() + if err != nil { + return classifier{}, err + } + subResolvers = append(subResolvers, r) + } + sort.Slice(subResolvers, func(i, j int) bool { + return subResolvers[i].Config().Priority < subResolvers[j].Config().Priority + }) + return classifier{ + resolver: prometheusCollectorResolver{ + prometheusCollector: collector, + resolver: resolver{subResolvers, p.Logger}, + }, + dao: d, + search: s, + }, nil + }), Duration: collector.duration, SuccessTotal: collector.successTotal, NoMatchTotal: collector.noMatchTotal, diff --git a/internal/classifier/prometheus_collector.go b/internal/classifier/prometheus_collector.go index 49ef83dd..1e56beb5 100644 --- a/internal/classifier/prometheus_collector.go +++ b/internal/classifier/prometheus_collector.go @@ -8,8 +8,7 @@ import ( "time" ) -type prometheusCollectorResolver struct { - resolver Resolver +type prometheusCollector struct { duration *prometheus.HistogramVec successTotal *prometheus.CounterVec noMatchTotal prometheus.Counter @@ -22,9 +21,8 @@ const subsystem = "classifier" const labelContentType = "content_type" const labelContentSource = "content_source" -func newPrometheusCollectorResolver(resolver Resolver) prometheusCollectorResolver { - return prometheusCollectorResolver{ - resolver: resolver, +func newPrometheusCollector() prometheusCollector { + return prometheusCollector{ duration: prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: namespace, Subsystem: subsystem, @@ -53,6 +51,11 @@ func newPrometheusCollectorResolver(resolver Resolver) prometheusCollectorResolv } } +type prometheusCollectorResolver struct { + prometheusCollector + resolver Resolver +} + func (r prometheusCollectorResolver) Resolve(ctx context.Context, content model.TorrentContent) (model.TorrentContent, error) { start := time.Now() result, err := r.resolver.Resolve(ctx, content) diff --git a/internal/classifier/resolve.go b/internal/classifier/resolve.go index 28ee6e82..0714b75e 100644 --- a/internal/classifier/resolve.go +++ b/internal/classifier/resolve.go @@ -4,11 +4,10 @@ import ( "context" "errors" "github.com/bitmagnet-io/bitmagnet/internal/model" - "sort" ) func (r resolver) Resolve(ctx context.Context, content model.TorrentContent) (model.TorrentContent, error) { - for _, subResolver := range r.sortedSubResolvers() { + for _, subResolver := range r.subResolvers { preEnrichedContent, preEnrichedErr := subResolver.PreEnrich(content) if preEnrichedErr != nil { return model.TorrentContent{}, preEnrichedErr @@ -24,11 +23,3 @@ func (r resolver) Resolve(ctx context.Context, content model.TorrentContent) (mo } return model.TorrentContent{}, ErrNoMatch } - -func (r resolver) sortedSubResolvers() []SubResolver { - subResolvers := r.subResolvers - sort.Slice(subResolvers, func(i, j int) bool { - return subResolvers[i].Config().Priority < subResolvers[j].Config().Priority - }) - return subResolvers -} diff --git a/internal/classifier/video/factory.go b/internal/classifier/video/factory.go new file mode 100644 index 00000000..8089cf9f --- /dev/null +++ b/internal/classifier/video/factory.go @@ -0,0 +1,33 @@ +package video + +import ( + "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/lazy" + "github.com/bitmagnet-io/bitmagnet/internal/classifier" + "github.com/bitmagnet-io/bitmagnet/internal/classifier/video/tmdb" + "go.uber.org/fx" +) + +type Params struct { + fx.In + TmdbClient lazy.Lazy[tmdb.Client] +} + +type Result struct { + fx.Out + Resolver lazy.Lazy[classifier.SubResolver] `group:"content_resolvers"` +} + +func New(p Params) Result { + return Result{ + Resolver: lazy.New(func() (classifier.SubResolver, error) { + tmdbClient, err := p.TmdbClient.Get() + if err != nil { + return nil, err + } + return videoResolver{ + config: classifier.SubResolverConfig{Key: "video", Priority: 1}, + tmdbClient: tmdbClient, + }, nil + }), + } +} diff --git a/internal/classifier/video/resolver.go b/internal/classifier/video/resolver.go index b7dd840a..25399ffe 100644 --- a/internal/classifier/video/resolver.go +++ b/internal/classifier/video/resolver.go @@ -8,30 +8,10 @@ import ( "github.com/bitmagnet-io/bitmagnet/internal/classifier/video/tmdb" "github.com/bitmagnet-io/bitmagnet/internal/model" "github.com/bitmagnet-io/bitmagnet/internal/regex" - "go.uber.org/fx" "strconv" "strings" ) -type Params struct { - fx.In - TmdbClient tmdb.Client -} - -type Result struct { - fx.Out - Resolver classifier.SubResolver `group:"content_resolvers"` -} - -func New(p Params) Result { - return Result{ - Resolver: videoResolver{ - config: classifier.SubResolverConfig{Key: "video", Priority: 1}, - tmdbClient: p.TmdbClient, - }, - } -} - type videoResolver struct { config classifier.SubResolverConfig tmdbClient tmdb.Client diff --git a/internal/classifier/video/tmdb/client.go b/internal/classifier/video/tmdb/client.go index 5d768058..f8f0db35 100644 --- a/internal/classifier/video/tmdb/client.go +++ b/internal/classifier/video/tmdb/client.go @@ -2,69 +2,15 @@ package tmdb import ( "errors" - "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/httpclient/httplogger" - "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/httpclient/httpratelimiter" "github.com/bitmagnet-io/bitmagnet/internal/database/search" "github.com/cyruzin/golang-tmdb" - "go.uber.org/fx" - "go.uber.org/zap" - "net/http" - "time" ) -type Params struct { - fx.In - Config Config - Logger *zap.SugaredLogger - Search search.Search -} - -type Result struct { - fx.Out - TmdbClient *tmdb.Client - Client Client -} - type Client interface { MovieClient TvShowClient } -func New(p Params) (r Result, err error) { - logger := p.Logger.Named("tmdb_client") - rateLimit := p.Config.RateLimit - rateLimitBurst := p.Config.RateLimitBurst - if p.Config.ApiKey == defaultTmdbApiKey { - rateLimit = time.Second - rateLimitBurst = 1 - logger.Warnln("you are using the default TMDB api key; TMDB requests will be limited to 1 per second; to remove this warning please configure a personal TMDB api key") - } - httpClient := http.Client{ - // need to set a non-zero value as the underlying client unfortunately sets 10 seconds as the default if none is provided; - // this does not work well with the rate limiter; a 30 second timeout fixes this assuming a concurrency of 10 on the queue - // (and a maximum of 2 TMDB requests per classification) - Timeout: time.Second * 30, - Transport: httpratelimiter.NewDecorator( - rateLimit, - rateLimitBurst, - )(httplogger.NewDecorator( - logger, - )(http.DefaultTransport)), - } - c, initErr := tmdb.Init(p.Config.ApiKey) - c.SetClientConfig(httpClient) - if initErr != nil { - err = initErr - return - } - r.Client = &client{ - c: c, - s: p.Search, - } - r.TmdbClient = c - return -} - type client struct { c *tmdb.Client s search.Search diff --git a/internal/classifier/video/tmdb/factory.go b/internal/classifier/video/tmdb/factory.go new file mode 100644 index 00000000..b8244e81 --- /dev/null +++ b/internal/classifier/video/tmdb/factory.go @@ -0,0 +1,65 @@ +package tmdb + +import ( + "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/httpclient/httplogger" + "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/httpclient/httpratelimiter" + "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/lazy" + "github.com/bitmagnet-io/bitmagnet/internal/database/search" + tmdb "github.com/cyruzin/golang-tmdb" + "go.uber.org/fx" + "go.uber.org/zap" + "net/http" + "time" +) + +type Params struct { + fx.In + Config Config + Logger *zap.SugaredLogger + Search lazy.Lazy[search.Search] +} + +type Result struct { + fx.Out + Client lazy.Lazy[Client] +} + +func New(p Params) Result { + return Result{ + Client: lazy.New(func() (Client, error) { + s, err := p.Search.Get() + if err != nil { + return nil, err + } + logger := p.Logger.Named("tmdb_client") + rateLimit := p.Config.RateLimit + rateLimitBurst := p.Config.RateLimitBurst + if p.Config.ApiKey == defaultTmdbApiKey { + rateLimit = time.Second + rateLimitBurst = 1 + logger.Warnln("you are using the default TMDB api key; TMDB requests will be limited to 1 per second; to remove this warning please configure a personal TMDB api key") + } + httpClient := http.Client{ + // need to set a non-zero value as the underlying client unfortunately sets 10 seconds as the default if none is provided; + // this does not work well with the rate limiter; a 30 second timeout fixes this assuming a concurrency of 10 on the queue + // (and a maximum of 2 TMDB requests per classification) + Timeout: time.Second * 30, + Transport: httpratelimiter.NewDecorator( + rateLimit, + rateLimitBurst, + )(httplogger.NewDecorator( + logger, + )(http.DefaultTransport)), + } + c, initErr := tmdb.Init(p.Config.ApiKey) + c.SetClientConfig(httpClient) + if initErr != nil { + return nil, initErr + } + return &client{ + c: c, + s: s, + }, nil + }), + } +} diff --git a/internal/database/cache/decorator.go b/internal/database/cache/decorator.go index 617d2c35..eccb431e 100644 --- a/internal/database/cache/decorator.go +++ b/internal/database/cache/decorator.go @@ -1,6 +1,7 @@ package cache import ( + "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/lazy" caches "github.com/mgdigital/gorm-cache/v2" "go.uber.org/fx" "gorm.io/gorm" @@ -9,18 +10,25 @@ import ( type DecoratorParams struct { fx.In Plugin *caches.Caches - DB *gorm.DB + DB lazy.Lazy[*gorm.DB] } type DecoratorResult struct { fx.Out - DB *gorm.DB + DB lazy.Lazy[*gorm.DB] } -func NewDecorator(p DecoratorParams) (DecoratorResult, error) { - db := p.DB - if err := db.Use(p.Plugin); err != nil { - return DecoratorResult{}, err +func NewDecorator(p DecoratorParams) DecoratorResult { + return DecoratorResult{ + DB: lazy.New(func() (*gorm.DB, error) { + db, err := p.DB.Get() + if err != nil { + return nil, err + } + if err := db.Use(p.Plugin); err != nil { + return nil, err + } + return db, nil + }), } - return DecoratorResult{DB: db}, nil } diff --git a/internal/database/dao/fx.go b/internal/database/dao/fx.go index 8e312711..bd7fc903 100644 --- a/internal/database/dao/fx.go +++ b/internal/database/dao/fx.go @@ -1,21 +1,29 @@ package dao import ( + "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/lazy" "go.uber.org/fx" "gorm.io/gorm" ) type Params struct { fx.In - GormDb *gorm.DB + GormDb lazy.Lazy[*gorm.DB] } type Result struct { fx.Out - Dao *Query + Dao lazy.Lazy[*Query] } -func New(p Params) (r Result, err error) { - r.Dao = Use(p.GormDb) - return +func New(p Params) Result { + return Result{ + Dao: lazy.New(func() (*Query, error) { + db, err := p.GormDb.Get() + if err != nil { + return nil, err + } + return Use(db), nil + }), + } } diff --git a/internal/database/databasefx/module.go b/internal/database/databasefx/module.go index 4c0dd4f9..9d6b708a 100644 --- a/internal/database/databasefx/module.go +++ b/internal/database/databasefx/module.go @@ -26,7 +26,6 @@ func New() fx.Option { healthcheck.New, postgres.New, search.New, - warmer.New, ), fx.Decorate( cache.NewDecorator, diff --git a/internal/database/gorm.go b/internal/database/gorm.go index 5d121b0b..eeb3230f 100644 --- a/internal/database/gorm.go +++ b/internal/database/gorm.go @@ -2,6 +2,7 @@ package database import ( "database/sql" + "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/lazy" gorm2 "github.com/bitmagnet-io/bitmagnet/internal/database/gorm" "go.uber.org/fx" "go.uber.org/zap" @@ -13,42 +14,43 @@ import ( type Params struct { fx.In Logger *zap.SugaredLogger - Lifecycle fx.Lifecycle Dialector gorm.Dialector } type Result struct { fx.Out - SqlDb *sql.DB - GormDb *gorm.DB + GormDb lazy.Lazy[*gorm.DB] + SqlDb lazy.Lazy[*sql.DB] } -func New(p Params) (r Result, err error) { - loggerResult, loggerErr := gorm2.New(gorm2.Params{ - ZapLogger: p.Logger, - Config: gorm2.Config{ - LogLevel: gormlogger.Info, - SlowThreshold: time.Second * 3, - }, +func New(p Params) Result { + gormDb := lazy.New(func() (*gorm.DB, error) { + gDb, dbErr := gorm.Open(p.Dialector, &gorm.Config{ + Logger: gorm2.New(gorm2.Params{ + ZapLogger: p.Logger, + Config: gorm2.Config{ + LogLevel: gormlogger.Info, + SlowThreshold: time.Second * 3, + }, + }).GormLogger, + }) + if dbErr != nil { + return nil, dbErr + } + return gDb, nil }) - if loggerErr != nil { - err = loggerErr - return + return Result{ + GormDb: gormDb, + SqlDb: lazy.New(func() (*sql.DB, error) { + gDb, gDbErr := gormDb.Get() + if gDbErr != nil { + return nil, gDbErr + } + sqlDb, sqlDbErr := gDb.DB() + if sqlDbErr != nil { + return nil, sqlDbErr + } + return sqlDb, nil + }), } - gDb, dbErr := gorm.Open(p.Dialector, &gorm.Config{ - Logger: loggerResult.GormLogger, - DisableAutomaticPing: true, - }) - if dbErr != nil { - err = dbErr - return - } - sqlDb, sqlDbErr := gDb.DB() - if sqlDbErr != nil { - err = sqlDbErr - return - } - r.GormDb = gDb - r.SqlDb = sqlDb - return } diff --git a/internal/database/gorm/logger.go b/internal/database/gorm/logger.go index 96e325d9..a623c564 100644 --- a/internal/database/gorm/logger.go +++ b/internal/database/gorm/logger.go @@ -27,14 +27,14 @@ type Result struct { GormLogger gormlogger.Interface } -func New(p Params) (Result, error) { +func New(p Params) Result { return Result{ GormLogger: &customLogger{ logLevel: p.Config.LogLevel, slowThreshold: p.Config.SlowThreshold, zap: p.ZapLogger.Named("gorm"), }, - }, nil + } } type customLogger struct { diff --git a/internal/database/healthcheck/healthcheck.go b/internal/database/healthcheck/healthcheck.go index 21f010d7..c7bdd77a 100644 --- a/internal/database/healthcheck/healthcheck.go +++ b/internal/database/healthcheck/healthcheck.go @@ -2,16 +2,17 @@ package healthcheck import ( "context" + "database/sql" "fmt" + "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/lazy" "github.com/hellofresh/health-go/v5" "go.uber.org/fx" - "gorm.io/gorm" "time" ) type Params struct { fx.In - GormDb *gorm.DB + DB lazy.Lazy[*sql.DB] } type Result struct { @@ -25,7 +26,7 @@ func New(p Params) Result { Name: "postgres", Timeout: time.Second * 5, Check: func(ctx context.Context) error { - db, dbErr := p.GormDb.DB() + db, dbErr := p.DB.Get() if dbErr != nil { return fmt.Errorf("failed to get database connection: %w", dbErr) } diff --git a/internal/database/migrations/decorator.go b/internal/database/migrations/decorator.go index f7087883..0f0b3c71 100644 --- a/internal/database/migrations/decorator.go +++ b/internal/database/migrations/decorator.go @@ -2,6 +2,7 @@ package migrations import ( "context" + "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/lazy" "go.uber.org/fx" "go.uber.org/zap" "gorm.io/gorm" @@ -9,36 +10,31 @@ import ( type DecoratorParams struct { fx.In - DB *gorm.DB - Logger *zap.SugaredLogger + DB lazy.Lazy[*gorm.DB] + Migrator lazy.Lazy[Migrator] + Logger *zap.SugaredLogger } type DecoratorResult struct { fx.Out - DB *gorm.DB + DB lazy.Lazy[*gorm.DB] } -func NewDecorator(p DecoratorParams) (result DecoratorResult, err error) { - result.DB = p.DB - sqlDb, dbErr := p.DB.DB() - if dbErr != nil { - err = dbErr - return +func NewDecorator(p DecoratorParams) DecoratorResult { + return DecoratorResult{ + DB: lazy.New(func() (*gorm.DB, error) { + db, err := p.DB.Get() + if err != nil { + return nil, err + } + m, err := p.Migrator.Get() + if err != nil { + return nil, err + } + if migrateErr := m.Up(context.TODO()); migrateErr != nil { + return nil, migrateErr + } + return db, nil + }), } - // avoid failing here on a non-connectable database - pingErr := sqlDb.Ping() - if pingErr != nil { - p.Logger.Errorf("failed to ping database: %v", pingErr) - return - } - m := New(Params{ - DB: sqlDb, - Logger: p.Logger, - }) - migrateErr := m.Up(context.TODO()) - if migrateErr != nil { - err = migrateErr - return - } - return } diff --git a/internal/database/migrations/migrator.go b/internal/database/migrations/migrator.go index 7e1d7b27..03b36dc9 100644 --- a/internal/database/migrations/migrator.go +++ b/internal/database/migrations/migrator.go @@ -3,22 +3,41 @@ package migrations import ( "context" "database/sql" + "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/lazy" migrationssql "github.com/bitmagnet-io/bitmagnet/migrations" goose "github.com/pressly/goose/v3" "go.uber.org/fx" "go.uber.org/zap" + "gorm.io/gorm" ) type Params struct { fx.In - DB *sql.DB + DB lazy.Lazy[*gorm.DB] Logger *zap.SugaredLogger } -func New(p Params) Migrator { - initGoose(p.Logger) - return &migrator{ - db: p.DB, +type Result struct { + fx.Out + Migrator lazy.Lazy[Migrator] +} + +func New(p Params) Result { + return Result{ + Migrator: lazy.New(func() (Migrator, error) { + g, err := p.DB.Get() + if err != nil { + return nil, err + } + db, err := g.DB() + if err != nil { + return nil, err + } + initGoose(p.Logger) + return &migrator{ + db: db, + }, nil + }), } } diff --git a/internal/database/search/search.go b/internal/database/search/search.go index 694834ae..5cef8352 100644 --- a/internal/database/search/search.go +++ b/internal/database/search/search.go @@ -1,6 +1,7 @@ package search import ( + "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/lazy" "github.com/bitmagnet-io/bitmagnet/internal/database/dao" "go.uber.org/fx" ) @@ -17,18 +18,24 @@ type search struct { type Params struct { fx.In - Query *dao.Query + Query lazy.Lazy[*dao.Query] } type Result struct { fx.Out - Search Search + Search lazy.Lazy[Search] } func New(params Params) Result { return Result{ - Search: &search{ - q: params.Query, - }, + Search: lazy.New(func() (Search, error) { + q, err := params.Query.Get() + if err != nil { + return nil, err + } + return &search{ + q: q, + }, nil + }), } } diff --git a/internal/database/search/warmer/decorator.go b/internal/database/search/warmer/decorator.go new file mode 100644 index 00000000..eb06ff91 --- /dev/null +++ b/internal/database/search/warmer/decorator.go @@ -0,0 +1,54 @@ +package warmer + +import ( + "context" + "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/lazy" + "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/worker" + "github.com/bitmagnet-io/bitmagnet/internal/database/search" + "go.uber.org/fx" + "go.uber.org/zap" +) + +type DecoratorParams struct { + fx.In + Config Config + Search lazy.Lazy[search.Search] + Registry worker.Registry + Logger *zap.SugaredLogger +} + +type DecoratorResult struct { + fx.Out + Registry worker.Registry +} + +func NewDecorator(params DecoratorParams) (DecoratorResult, error) { + var w warmer + err := params.Registry.Decorate("http_server", func(hook fx.Hook) fx.Hook { + return fx.Hook{ + OnStart: func(ctx context.Context) error { + s, err := params.Search.Get() + if err != nil { + return err + } + w = warmer{ + stopped: make(chan struct{}), + interval: params.Config.Interval, + search: s, + logger: params.Logger.Named("search_warmer"), + } + go w.start() + return hook.OnStart(ctx) + }, + OnStop: func(ctx context.Context) error { + if w.stopped != nil { + close(w.stopped) + } + return hook.OnStop(ctx) + }, + } + }) + return DecoratorResult{ + Registry: params.Registry, + }, err +} diff --git a/internal/database/search/warmer/warmer.go b/internal/database/search/warmer/warmer.go index 90973b4f..563d59f5 100644 --- a/internal/database/search/warmer/warmer.go +++ b/internal/database/search/warmer/warmer.go @@ -6,44 +6,10 @@ import ( "github.com/bitmagnet-io/bitmagnet/internal/database/search" "github.com/bitmagnet-io/bitmagnet/internal/maps" "github.com/bitmagnet-io/bitmagnet/internal/model" - "go.uber.org/fx" "go.uber.org/zap" "time" ) -type Params struct { - fx.In - Config Config - Search search.Search - Logger *zap.SugaredLogger -} - -type Result struct { - fx.Out - AppHook fx.Hook `group:"app_hooks"` -} - -func New(params Params) Result { - w := warmer{ - stopped: make(chan struct{}), - interval: params.Config.Interval, - search: params.Search, - logger: params.Logger.Named("search_warmer"), - } - return Result{ - AppHook: fx.Hook{ - OnStart: func(context.Context) error { - go w.start() - return nil - }, - OnStop: func(ctx context.Context) error { - close(w.stopped) - return nil - }, - }, - } -} - type warmer struct { stopped chan struct{} interval time.Duration diff --git a/internal/dev/app/cmd/gormcmd/command.go b/internal/dev/app/cmd/gormcmd/command.go index 52f7547d..f2e6126c 100644 --- a/internal/dev/app/cmd/gormcmd/command.go +++ b/internal/dev/app/cmd/gormcmd/command.go @@ -1,6 +1,7 @@ package gormcmd import ( + "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/lazy" "github.com/bitmagnet-io/bitmagnet/internal/database/gen" "github.com/urfave/cli/v2" "go.uber.org/fx" @@ -9,7 +10,7 @@ import ( type Params struct { fx.In - DB *gorm.DB + DB lazy.Lazy[*gorm.DB] } type Result struct { @@ -24,7 +25,11 @@ func New(p Params) (r Result, err error) { { Name: "gen", Action: func(ctx *cli.Context) error { - g := gen.BuildGenerator(p.DB) + db, err := p.DB.Get() + if err != nil { + return err + } + g := gen.BuildGenerator(db) g.Execute() return nil }, diff --git a/internal/dev/app/cmd/migratecmd/command.go b/internal/dev/app/cmd/migratecmd/command.go index b79d96be..05042320 100644 --- a/internal/dev/app/cmd/migratecmd/command.go +++ b/internal/dev/app/cmd/migratecmd/command.go @@ -1,6 +1,7 @@ package migratecmd import ( + "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/lazy" "github.com/bitmagnet-io/bitmagnet/internal/database/migrations" "github.com/urfave/cli/v2" "go.uber.org/fx" @@ -8,7 +9,7 @@ import ( type Params struct { fx.In - Migrator migrations.Migrator + Migrator lazy.Lazy[migrations.Migrator] } type Result struct { @@ -29,11 +30,15 @@ func New(p Params) (r Result, err error) { }, }, Action: func(ctx *cli.Context) error { + m, err := p.Migrator.Get() + if err != nil { + return err + } version := ctx.Int64("version") if version == 0 { - return p.Migrator.Up(ctx.Context) + return m.Up(ctx.Context) } else { - return p.Migrator.UpTo(ctx.Context, version) + return m.UpTo(ctx.Context, version) } }, }, @@ -46,11 +51,15 @@ func New(p Params) (r Result, err error) { }, }, Action: func(ctx *cli.Context) error { + m, err := p.Migrator.Get() + if err != nil { + return err + } version := ctx.Int64("version") if version == 0 { - return p.Migrator.Down(ctx.Context) + return m.Down(ctx.Context) } else { - return p.Migrator.DownTo(ctx.Context, version) + return m.DownTo(ctx.Context, version) } }, }, diff --git a/internal/dhtcrawler/crawler.go b/internal/dhtcrawler/crawler.go index 3fde2885..e99a1107 100644 --- a/internal/dhtcrawler/crawler.go +++ b/internal/dhtcrawler/crawler.go @@ -60,13 +60,6 @@ type crawler struct { } func (c *crawler) start() { - // wait for the server to be ready - select { - case <-c.stopped: - return - case <-c.client.Ready(): - break - } ctx, cancel := context.WithCancel(context.Background()) defer cancel() // start the various pipeline workers diff --git a/internal/dhtcrawler/factory.go b/internal/dhtcrawler/factory.go index 0978458c..047b26ad 100644 --- a/internal/dhtcrawler/factory.go +++ b/internal/dhtcrawler/factory.go @@ -3,6 +3,7 @@ package dhtcrawler import ( "context" "github.com/bitmagnet-io/bitmagnet/internal/blocking" + "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/lazy" "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/worker" "github.com/bitmagnet-io/bitmagnet/internal/classifier/asynq/message" "github.com/bitmagnet-io/bitmagnet/internal/concurrency" @@ -25,13 +26,13 @@ type Params struct { fx.In Config Config KTable ktable.Table - Client client.Client + Client lazy.Lazy[client.Client] MetainfoRequester metainforequester.Requester BanningChecker banning.Checker `name:"metainfo_banning_checker"` - Search search.Search - Dao *dao.Query - BlockingManager blocking.Manager - ClassifierPublisher publisher.Publisher[message.ClassifyTorrentPayload] + Search lazy.Lazy[search.Search] + Dao lazy.Lazy[*dao.Query] + BlockingManager lazy.Lazy[blocking.Manager] + ClassifierPublisher lazy.Lazy[publisher.Publisher[message.ClassifyTorrentPayload]] DiscoveredNodes concurrency.BatchingChannel[ktable.Node] `name:"dht_discovered_nodes"` Logger *zap.SugaredLogger } @@ -43,65 +44,84 @@ type Result struct { } func New(params Params) Result { - scalingFactor := int(params.Config.ScalingFactor) + var c crawler persistedTotal := prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "bitmagnet", Subsystem: "dht_crawler", Name: "persisted_total", Help: "A counter of persisted database entities.", }, []string{"entity"}) - c := crawler{ - kTable: params.KTable, - client: params.Client, - metainfoRequester: params.MetainfoRequester, - banningChecker: params.BanningChecker, - bootstrapNodes: params.Config.BootstrapNodes, - reseedBootstrapNodesInterval: time.Minute * 10, - getOldestNodesInterval: time.Second * 10, - oldPeerThreshold: time.Minute * 15, - discoveredNodes: params.DiscoveredNodes, - nodesForPing: concurrency.NewBufferedConcurrentChannel[ktable.Node](scalingFactor, scalingFactor), - nodesForFindNode: concurrency.NewBufferedConcurrentChannel[ktable.Node](10*scalingFactor, 10*scalingFactor), - nodesForSampleInfoHashes: concurrency.NewBufferedConcurrentChannel[ktable.Node](10*scalingFactor, 10*scalingFactor), - infoHashTriage: concurrency.NewBatchingChannel[nodeHasPeersForHash](10*scalingFactor, 1000, 20*time.Second), - getPeers: concurrency.NewBufferedConcurrentChannel[nodeHasPeersForHash](10*scalingFactor, 20*scalingFactor), - scrape: concurrency.NewBufferedConcurrentChannel[nodeHasPeersForHash](10*scalingFactor, 20*scalingFactor), - requestMetaInfo: concurrency.NewBufferedConcurrentChannel[infoHashWithPeers](10*scalingFactor, 40*scalingFactor), - persistTorrents: concurrency.NewBatchingChannel[infoHashWithMetaInfo]( - 1000, - 1000, - time.Minute, - ), - persistSources: concurrency.NewBatchingChannel[infoHashWithScrape]( - 1000, - 1000, - time.Minute, - ), - saveFilesThreshold: params.Config.SaveFilesThreshold, - savePieces: params.Config.SavePieces, - rescrapeThreshold: params.Config.RescrapeThreshold, - dao: params.Dao, - classifierPublisher: params.ClassifierPublisher, - ignoreHashes: &ignoreHashes{ - bloom: boom.NewStableBloomFilter(10_000_000, 2, 0.001), - }, - blockingManager: params.BlockingManager, - soughtNodeID: &concurrency.AtomicValue[protocol.ID]{}, - stopped: make(chan struct{}), - persistedTotal: persistedTotal, - logger: params.Logger.Named("dht_crawler"), - } - c.soughtNodeID.Set(protocol.RandomNodeID()) return Result{ Worker: worker.NewWorker( "dht_crawler", fx.Hook{ OnStart: func(context.Context) error { + scalingFactor := int(params.Config.ScalingFactor) + cl, err := params.Client.Get() + if err != nil { + return err + } + query, err := params.Dao.Get() + if err != nil { + return err + } + classifierPublisher, err := params.ClassifierPublisher.Get() + if err != nil { + return err + } + blockingManager, err := params.BlockingManager.Get() + if err != nil { + return err + } + c = crawler{ + kTable: params.KTable, + client: cl, + metainfoRequester: params.MetainfoRequester, + banningChecker: params.BanningChecker, + bootstrapNodes: params.Config.BootstrapNodes, + reseedBootstrapNodesInterval: time.Minute * 10, + getOldestNodesInterval: time.Second * 10, + oldPeerThreshold: time.Minute * 15, + discoveredNodes: params.DiscoveredNodes, + nodesForPing: concurrency.NewBufferedConcurrentChannel[ktable.Node](scalingFactor, scalingFactor), + nodesForFindNode: concurrency.NewBufferedConcurrentChannel[ktable.Node](10*scalingFactor, 10*scalingFactor), + nodesForSampleInfoHashes: concurrency.NewBufferedConcurrentChannel[ktable.Node](10*scalingFactor, 10*scalingFactor), + infoHashTriage: concurrency.NewBatchingChannel[nodeHasPeersForHash](10*scalingFactor, 1000, 20*time.Second), + getPeers: concurrency.NewBufferedConcurrentChannel[nodeHasPeersForHash](10*scalingFactor, 20*scalingFactor), + scrape: concurrency.NewBufferedConcurrentChannel[nodeHasPeersForHash](10*scalingFactor, 20*scalingFactor), + requestMetaInfo: concurrency.NewBufferedConcurrentChannel[infoHashWithPeers](10*scalingFactor, 40*scalingFactor), + persistTorrents: concurrency.NewBatchingChannel[infoHashWithMetaInfo]( + 1000, + 1000, + time.Minute, + ), + persistSources: concurrency.NewBatchingChannel[infoHashWithScrape]( + 1000, + 1000, + time.Minute, + ), + saveFilesThreshold: params.Config.SaveFilesThreshold, + savePieces: params.Config.SavePieces, + rescrapeThreshold: params.Config.RescrapeThreshold, + dao: query, + classifierPublisher: classifierPublisher, + ignoreHashes: &ignoreHashes{ + bloom: boom.NewStableBloomFilter(10_000_000, 2, 0.001), + }, + blockingManager: blockingManager, + soughtNodeID: &concurrency.AtomicValue[protocol.ID]{}, + stopped: make(chan struct{}), + persistedTotal: persistedTotal, + logger: params.Logger.Named("dht_crawler"), + } + c.soughtNodeID.Set(protocol.RandomNodeID()) go c.start() return nil }, OnStop: func(context.Context) error { - close(c.stopped) + if c.stopped != nil { + close(c.stopped) + } return nil }, }, diff --git a/internal/gql/config/config.go b/internal/gql/config/config.go index 77a82088..25a9c45c 100644 --- a/internal/gql/config/config.go +++ b/internal/gql/config/config.go @@ -1,15 +1,22 @@ package config import ( + "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/lazy" "github.com/bitmagnet-io/bitmagnet/internal/gql" "go.uber.org/fx" ) type Params struct { fx.In - ResolverRoot gql.ResolverRoot + ResolverRoot lazy.Lazy[gql.ResolverRoot] } -func New(p Params) gql.Config { - return gql.Config{Resolvers: p.ResolverRoot} +func New(p Params) lazy.Lazy[gql.Config] { + return lazy.New(func() (gql.Config, error) { + root, err := p.ResolverRoot.Get() + if err != nil { + return gql.Config{}, err + } + return gql.Config{Resolvers: root}, nil + }) } diff --git a/internal/gql/gqlfx/module.go b/internal/gql/gqlfx/module.go index 7a1bf84f..7d34807b 100644 --- a/internal/gql/gqlfx/module.go +++ b/internal/gql/gqlfx/module.go @@ -1,6 +1,10 @@ package gqlfx import ( + "github.com/99designs/gqlgen/graphql" + "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/lazy" + "github.com/bitmagnet-io/bitmagnet/internal/database/dao" + "github.com/bitmagnet-io/bitmagnet/internal/database/search" "github.com/bitmagnet-io/bitmagnet/internal/gql" "github.com/bitmagnet-io/bitmagnet/internal/gql/config" "github.com/bitmagnet-io/bitmagnet/internal/gql/httpserver" @@ -13,9 +17,34 @@ func New() fx.Option { "graphql", fx.Provide( config.New, - gql.NewExecutableSchema, - resolvers.New, httpserver.New, + func( + ls lazy.Lazy[search.Search], + ld lazy.Lazy[*dao.Query], + ) lazy.Lazy[gql.ResolverRoot] { + return lazy.New(func() (gql.ResolverRoot, error) { + s, err := ls.Get() + if err != nil { + return nil, err + } + d, err := ld.Get() + if err != nil { + return nil, err + } + return resolvers.New(d, s), nil + }) + }, + func( + lcfg lazy.Lazy[gql.Config], + ) lazy.Lazy[graphql.ExecutableSchema] { + return lazy.New(func() (graphql.ExecutableSchema, error) { + cfg, err := lcfg.Get() + if err != nil { + return nil, err + } + return gql.NewExecutableSchema(cfg), nil + }) + }, ), ) } diff --git a/internal/gql/httpserver/httpserver.go b/internal/gql/httpserver/httpserver.go index 7267cbf3..ceac5582 100644 --- a/internal/gql/httpserver/httpserver.go +++ b/internal/gql/httpserver/httpserver.go @@ -5,6 +5,7 @@ import ( "github.com/99designs/gqlgen/graphql/handler" "github.com/99designs/gqlgen/graphql/playground" "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/httpserver" + "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/lazy" "github.com/gin-gonic/gin" "go.uber.org/fx" "go.uber.org/zap" @@ -12,7 +13,7 @@ import ( type Params struct { fx.In - Schema graphql.ExecutableSchema + Schema lazy.Lazy[graphql.ExecutableSchema] Logger *zap.SugaredLogger } @@ -22,23 +23,15 @@ type Result struct { } func New(p Params) Result { - gql := handler.NewDefaultServer(p.Schema) - pg := playground.Handler("GraphQL playground", "/graphql") return Result{ Option: &builder{ - gqlHandler: func(c *gin.Context) { - gql.ServeHTTP(c.Writer, c.Request) - }, - playgroundHandler: func(c *gin.Context) { - pg.ServeHTTP(c.Writer, c.Request) - }, + schema: p.Schema, }, } } type builder struct { - gqlHandler gin.HandlerFunc - playgroundHandler gin.HandlerFunc + schema lazy.Lazy[graphql.ExecutableSchema] } func (builder) Key() string { @@ -46,7 +39,17 @@ func (builder) Key() string { } func (b builder) Apply(e *gin.Engine) error { - e.POST("/graphql", b.gqlHandler) - e.GET("/graphql", b.playgroundHandler) + schema, err := b.schema.Get() + if err != nil { + return err + } + gql := handler.NewDefaultServer(schema) + e.POST("/graphql", func(c *gin.Context) { + gql.ServeHTTP(c.Writer, c.Request) + }) + pg := playground.Handler("GraphQL playground", "/graphql") + e.GET("/graphql", func(c *gin.Context) { + pg.ServeHTTP(c.Writer, c.Request) + }) return nil } diff --git a/internal/importer/factory.go b/internal/importer/factory.go new file mode 100644 index 00000000..ee1bb2b1 --- /dev/null +++ b/internal/importer/factory.go @@ -0,0 +1,42 @@ +package importer + +import ( + "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/lazy" + "github.com/bitmagnet-io/bitmagnet/internal/classifier/asynq/message" + "github.com/bitmagnet-io/bitmagnet/internal/database/dao" + "github.com/bitmagnet-io/bitmagnet/internal/queue/publisher" + "go.uber.org/fx" + "time" +) + +type Params struct { + fx.In + Dao lazy.Lazy[*dao.Query] + ClassifyPublisher lazy.Lazy[publisher.Publisher[message.ClassifyTorrentPayload]] +} + +type Result struct { + fx.Out + Importer lazy.Lazy[Importer] +} + +func New(p Params) Result { + return Result{ + Importer: lazy.New(func() (Importer, error) { + d, err := p.Dao.Get() + if err != nil { + return nil, err + } + cp, err := p.ClassifyPublisher.Get() + if err != nil { + return nil, err + } + return importer{ + dao: d, + classifyPublisher: cp, + bufferSize: 100, + maxWaitTime: 500 * time.Millisecond, + }, nil + }), + } +} diff --git a/internal/importer/httpserver/httpserver.go b/internal/importer/httpserver/httpserver.go index 66c704e7..b02d0826 100644 --- a/internal/importer/httpserver/httpserver.go +++ b/internal/importer/httpserver/httpserver.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/httpserver" + "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/lazy" "github.com/bitmagnet-io/bitmagnet/internal/importer" "github.com/gin-gonic/gin" "go.uber.org/fx" @@ -15,7 +16,7 @@ import ( type Params struct { fx.In - Importer importer.Importer + Importer lazy.Lazy[importer.Importer] Logger *zap.SugaredLogger } @@ -24,21 +25,45 @@ type Result struct { Option httpserver.Option `group:"http_server_options"` } -func New(p Params) (r Result, err error) { - r.Option = &builder{p.handler} - return +func New(p Params) Result { + return Result{ + Option: &builder{ + importer: p.Importer, + logger: p.Logger.Named("importer"), + }, + } } const ImportIdHeader = "x-import-id" -func (p Params) handler(ctx *gin.Context) { +type builder struct { + importer lazy.Lazy[importer.Importer] + logger *zap.SugaredLogger +} + +func (builder) Key() string { + return "import" +} + +func (b builder) Apply(e *gin.Engine) error { + i, err := b.importer.Get() + if err != nil { + return err + } + e.POST("/import", func(ctx *gin.Context) { + b.handle(ctx, i) + }) + return nil +} + +func (b builder) handle(ctx *gin.Context, i importer.Importer) { s := bufio.NewScanner(ctx.Request.Body) s.Split(bufio.ScanRunes) importId := ctx.Request.Header.Get(ImportIdHeader) if importId == "" { importId = strconv.FormatUint(uint64(time.Now().Unix()), 10) } - i := p.Importer.New(ctx, importer.Info{ + ai := i.New(ctx, importer.Info{ ID: importId, }) var currentLine []rune @@ -49,13 +74,13 @@ func (p Params) handler(ctx *gin.Context) { addItem := func() error { item := importer.Item{} if err := json.Unmarshal([]byte(string(currentLine)), &item); err != nil { - p.Logger.Errorw("error adding item", "error", err) + b.logger.Errorw("error adding item", "error", err) ctx.Status(400) _, _ = ctx.Writer.WriteString(err.Error()) return err } - if err := i.Import(item); err != nil { - p.Logger.Errorw("error importing item", "error", err) + if err := ai.Import(item); err != nil { + b.logger.Errorw("error importing item", "error", err) ctx.Status(400) _, _ = ctx.Writer.WriteString(err.Error()) return err @@ -86,9 +111,9 @@ func (p Params) handler(ctx *gin.Context) { return } } - i.Drain() - if err := i.Close(); err != nil { - p.Logger.Errorw("error closing import", "error", err) + ai.Drain() + if err := ai.Close(); err != nil { + b.logger.Errorw("error closing import", "error", err) ctx.Status(400) _, _ = ctx.Writer.WriteString(err.Error()) return @@ -96,16 +121,3 @@ func (p Params) handler(ctx *gin.Context) { ctx.Status(200) writeCount() } - -type builder struct { - handler gin.HandlerFunc -} - -func (builder) Key() string { - return "import" -} - -func (b builder) Apply(e *gin.Engine) error { - e.POST("/import", b.handler) - return nil -} diff --git a/internal/importer/importer.go b/internal/importer/importer.go index 0b411536..9781936a 100644 --- a/internal/importer/importer.go +++ b/internal/importer/importer.go @@ -9,38 +9,15 @@ import ( "github.com/bitmagnet-io/bitmagnet/internal/model" "github.com/bitmagnet-io/bitmagnet/internal/protocol" "github.com/bitmagnet-io/bitmagnet/internal/queue/publisher" - "go.uber.org/fx" "gorm.io/gorm/clause" "sync" "time" ) -type Params struct { - fx.In - Dao *dao.Query - ClassifyPublisher publisher.Publisher[message.ClassifyTorrentPayload] -} - -type Result struct { - fx.Out - Importer Importer -} - type Importer interface { New(ctx context.Context, info Info) ActiveImport } -func New(p Params) (Result, error) { - return Result{ - Importer: importer{ - dao: p.Dao, - classifyPublisher: p.ClassifyPublisher, - bufferSize: 100, - maxWaitTime: 500 * time.Millisecond, - }, - }, nil -} - type Item struct { Source string InfoHash protocol.ID diff --git a/internal/protocol/dht/client/factory.go b/internal/protocol/dht/client/factory.go index b5960a34..961804c4 100644 --- a/internal/protocol/dht/client/factory.go +++ b/internal/protocol/dht/client/factory.go @@ -1,6 +1,7 @@ package client import ( + "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/lazy" "github.com/bitmagnet-io/bitmagnet/internal/protocol" "github.com/bitmagnet-io/bitmagnet/internal/protocol/dht/server" "go.uber.org/fx" @@ -12,26 +13,32 @@ import ( type Params struct { fx.In NodeID protocol.ID `name:"dht_node_id"` - Server server.Server + Server lazy.Lazy[server.Server] Logger *zap.SugaredLogger } type Result struct { fx.Out - Client Client + Client lazy.Lazy[Client] } func New(p Params) Result { return Result{ - Client: clientLogger{ - client: serverAdapter{ - nodeID: p.NodeID, - server: p.Server, - }, - // we make way to many queries to usefully log everything, but having a sample is helpful: - logger: p.Logger.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core { - return zapcore.NewSamplerWithOptions(core, time.Minute, 10, 0) - })).Named("dht_client"), - }, + Client: lazy.New(func() (Client, error) { + s, err := p.Server.Get() + if err != nil { + return nil, err + } + return clientLogger{ + client: serverAdapter{ + nodeID: p.NodeID, + server: s, + }, + // we make way to many queries to usefully log everything, but having a sample is helpful: + logger: p.Logger.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core { + return zapcore.NewSamplerWithOptions(core, time.Minute, 10, 0) + })).Named("dht_client"), + }, nil + }), } } diff --git a/internal/protocol/dht/client/interface.go b/internal/protocol/dht/client/interface.go index fce4b0f5..1e437f45 100644 --- a/internal/protocol/dht/client/interface.go +++ b/internal/protocol/dht/client/interface.go @@ -8,7 +8,6 @@ import ( ) type Client interface { - Ready() <-chan struct{} Ping(ctx context.Context, addr netip.AddrPort) (PingResult, error) FindNode(ctx context.Context, addr netip.AddrPort, target protocol.ID) (FindNodeResult, error) GetPeers(ctx context.Context, addr netip.AddrPort, infoHash protocol.ID) (GetPeersResult, error) diff --git a/internal/protocol/dht/client/logger.go b/internal/protocol/dht/client/logger.go index 39ae2789..fd92887e 100644 --- a/internal/protocol/dht/client/logger.go +++ b/internal/protocol/dht/client/logger.go @@ -14,10 +14,6 @@ type clientLogger struct { logger *zap.SugaredLogger } -func (l clientLogger) Ready() <-chan struct{} { - return l.client.Ready() -} - func (l clientLogger) Ping(ctx context.Context, addr netip.AddrPort) (PingResult, error) { start := time.Now() res, err := l.client.Ping(ctx, addr) diff --git a/internal/protocol/dht/client/server_adapter.go b/internal/protocol/dht/client/server_adapter.go index cf61ca4a..4424d2c2 100644 --- a/internal/protocol/dht/client/server_adapter.go +++ b/internal/protocol/dht/client/server_adapter.go @@ -14,10 +14,6 @@ type serverAdapter struct { server server.Server } -func (a serverAdapter) Ready() <-chan struct{} { - return a.server.Ready() -} - func (a serverAdapter) Ping(ctx context.Context, addr netip.AddrPort) (PingResult, error) { res, err := a.server.Query(ctx, addr, dht.QPing, dht.MsgArgs{ID: a.nodeID}) if err != nil { diff --git a/internal/protocol/dht/server/factory.go b/internal/protocol/dht/server/factory.go index 59e94e65..f7ef2add 100644 --- a/internal/protocol/dht/server/factory.go +++ b/internal/protocol/dht/server/factory.go @@ -3,6 +3,7 @@ package server import ( "context" "fmt" + "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/lazy" "github.com/bitmagnet-io/bitmagnet/internal/concurrency" "github.com/bitmagnet-io/bitmagnet/internal/protocol/dht" "github.com/bitmagnet-io/bitmagnet/internal/protocol/dht/responder" @@ -23,7 +24,7 @@ type Params struct { type Result struct { fx.Out - Server Server + Server lazy.Lazy[Server] AppHook fx.Hook `group:"app_hooks"` QueryDuration prometheus.Collector `group:"prometheus_collectors"` QuerySuccessTotal prometheus.Collector `group:"prometheus_collectors"` @@ -35,35 +36,36 @@ const namespace = "bitmagnet" const subsystem = "dht_server" func New(p Params) Result { - s := &server{ - ready: make(chan struct{}), - stopped: make(chan struct{}), - localAddr: netip.AddrPortFrom(netip.IPv4Unspecified(), p.Config.Port), - socket: NewSocket(), - queries: make(map[string]chan dht.RecvMsg), - queryTimeout: p.Config.QueryTimeout, - responder: p.Responder, - responderTimeout: time.Second * 5, - idIssuer: &variantIdIssuer{}, - logger: p.Logger.Named(subsystem), - } - collector := newPrometheusCollector(s) - return Result{ - Server: queryLimiter{ - server: collector, + collector := newPrometheusCollector() + ls := lazy.New(func() (Server, error) { + s := queryLimiter{ + server: prometheusServerWrapper{ + prometheusCollector: collector, + server: &server{ + localAddr: netip.AddrPortFrom(netip.IPv4Unspecified(), p.Config.Port), + socket: NewSocket(), + queries: make(map[string]chan dht.RecvMsg), + queryTimeout: p.Config.QueryTimeout, + responder: p.Responder, + responderTimeout: time.Second * 5, + idIssuer: &variantIdIssuer{}, + logger: p.Logger.Named(subsystem), + }, + }, queryLimiter: concurrency.NewKeyedLimiter(rate.Every(time.Second), 4, 1000, time.Second*20), - }, + } + if err := s.start(); err != nil { + return nil, fmt.Errorf("could not start server: %w", err) + } + return s, nil + }) + return Result{ + Server: ls, AppHook: fx.Hook{ - OnStart: func(ctx context.Context) error { - if err := s.socket.Open(s.localAddr); err != nil { - return fmt.Errorf("could not open socket: %w", err) - } - go s.start() - return nil - }, - OnStop: func(ctx context.Context) error { - close(s.stopped) - return nil + OnStop: func(context.Context) error { + return ls.IfInitialized(func(s Server) error { + return s.stop() + }) }, }, QueryDuration: collector.queryDuration, diff --git a/internal/protocol/dht/server/limiter.go b/internal/protocol/dht/server/limiter.go index 87b5a2f2..a6ccc4c8 100644 --- a/internal/protocol/dht/server/limiter.go +++ b/internal/protocol/dht/server/limiter.go @@ -12,8 +12,12 @@ type queryLimiter struct { queryLimiter concurrency.KeyedLimiter } -func (s queryLimiter) Ready() <-chan struct{} { - return s.server.Ready() +func (s queryLimiter) start() error { + return s.server.start() +} + +func (s queryLimiter) stop() error { + return s.server.stop() } func (s queryLimiter) Query(ctx context.Context, addr netip.AddrPort, q string, args dht.MsgArgs) (r dht.RecvMsg, err error) { diff --git a/internal/protocol/dht/server/prometheus_collector.go b/internal/protocol/dht/server/prometheus_collector.go index d372f5dc..af02027c 100644 --- a/internal/protocol/dht/server/prometheus_collector.go +++ b/internal/protocol/dht/server/prometheus_collector.go @@ -9,7 +9,6 @@ import ( ) type prometheusCollector struct { - server Server queryDuration *prometheus.HistogramVec querySuccessTotal *prometheus.CounterVec queryErrorTotal *prometheus.CounterVec @@ -20,9 +19,8 @@ const labelQuery = "query" var labelNames = []string{labelQuery} -func newPrometheusCollector(server Server) prometheusCollector { +func newPrometheusCollector() prometheusCollector { return prometheusCollector{ - server: server, queryDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: namespace, Subsystem: subsystem, @@ -51,11 +49,20 @@ func newPrometheusCollector(server Server) prometheusCollector { } } -func (l prometheusCollector) Ready() <-chan struct{} { - return l.server.Ready() +type prometheusServerWrapper struct { + prometheusCollector + server Server } -func (l prometheusCollector) Query(ctx context.Context, addr netip.AddrPort, q string, args dht.MsgArgs) (dht.RecvMsg, error) { +func (l prometheusServerWrapper) start() error { + return l.server.start() +} + +func (l prometheusServerWrapper) stop() error { + return l.server.stop() +} + +func (l prometheusServerWrapper) Query(ctx context.Context, addr netip.AddrPort, q string, args dht.MsgArgs) (dht.RecvMsg, error) { labels := prometheus.Labels{labelQuery: q} l.queryConcurrency.With(labels).Inc() start := time.Now() diff --git a/internal/protocol/dht/server/server.go b/internal/protocol/dht/server/server.go index 3af8b767..77ed0f2f 100644 --- a/internal/protocol/dht/server/server.go +++ b/internal/protocol/dht/server/server.go @@ -14,14 +14,13 @@ import ( ) type Server interface { - Ready() <-chan struct{} + start() error + stop() error Query(ctx context.Context, addr netip.AddrPort, q string, args dht.MsgArgs) (dht.RecvMsg, error) } type server struct { mutex sync.Mutex - ready chan struct{} - stopped chan struct{} localAddr netip.AddrPort socket Socket queryTimeout time.Duration @@ -32,16 +31,18 @@ type server struct { logger *zap.SugaredLogger } -func (s *server) Ready() <-chan struct{} { - return s.ready -} - -func (s *server) start() { +func (s *server) start() error { + if err := s.socket.Open(s.localAddr); err != nil { + return fmt.Errorf("could not open socket: %w", err) + } ctx, cancel := context.WithCancel(context.Background()) defer cancel() go s.read(ctx) - close(s.ready) - <-s.stopped + return nil +} + +func (s *server) stop() error { + return s.socket.Close() } func (s *server) read(ctx context.Context) { diff --git a/internal/queue/client/client.go b/internal/queue/client/client.go index db905bd3..affa080a 100644 --- a/internal/queue/client/client.go +++ b/internal/queue/client/client.go @@ -1,6 +1,7 @@ package client import ( + "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/lazy" "github.com/bitmagnet-io/bitmagnet/internal/queue/redis" "github.com/hibiken/asynq" "go.uber.org/fx" @@ -8,15 +9,18 @@ import ( type Params struct { fx.In - Redis *redis.Client + Redis lazy.Lazy[*redis.Client] } type Result struct { fx.Out - Client *asynq.Client + Client lazy.Lazy[*asynq.Client] } -func New(p Params) (Result, error) { - client := asynq.NewClient(redis.Wrapper{Redis: p.Redis}) - return Result{Client: client}, nil +func New(p Params) Result { + return Result{ + Client: lazy.New(func() (*asynq.Client, error) { + return asynq.NewClient(redis.Wrapper{Redis: p.Redis}), nil + }), + } } diff --git a/internal/queue/prometheus/metrics.go b/internal/queue/prometheus/metrics.go index 4525ec30..7b850fe3 100644 --- a/internal/queue/prometheus/metrics.go +++ b/internal/queue/prometheus/metrics.go @@ -1,6 +1,7 @@ package prometheus import ( + "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/lazy" "github.com/bitmagnet-io/bitmagnet/internal/queue/redis" "github.com/hibiken/asynq" "github.com/hibiken/asynq/x/metrics" @@ -10,7 +11,7 @@ import ( type Params struct { fx.In - Redis *redis.Client + Redis lazy.Lazy[*redis.Client] } type Result struct { diff --git a/internal/queue/redis/redis.go b/internal/queue/redis/redis.go index 424a260a..5ced6e02 100644 --- a/internal/queue/redis/redis.go +++ b/internal/queue/redis/redis.go @@ -1,15 +1,20 @@ package redis import ( + "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/lazy" r "github.com/redis/go-redis/v9" ) type Client = r.Client type Wrapper struct { - Redis *r.Client + Redis lazy.Lazy[*r.Client] } func (w Wrapper) MakeRedisClient() interface{} { - return w.Redis + redis, err := w.Redis.Get() + if err != nil { + return err + } + return redis } diff --git a/internal/queue/server/server.go b/internal/queue/server/server.go index 8cb234d5..d602f245 100644 --- a/internal/queue/server/server.go +++ b/internal/queue/server/server.go @@ -2,6 +2,7 @@ package server import ( "context" + "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/lazy" "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/worker" "github.com/bitmagnet-io/bitmagnet/internal/queue" "github.com/bitmagnet-io/bitmagnet/internal/queue/consumer" @@ -14,45 +15,48 @@ import ( type Params struct { fx.In Config queue.Config - Redis *redis.Client - Consumers []consumer.Consumer `group:"queue_consumers"` - Options []Option `group:"queue_server_options"` + Redis lazy.Lazy[*redis.Client] + Consumers []lazy.Lazy[consumer.Consumer] `group:"queue_consumers"` + Options []Option `group:"queue_server_options"` Logger *zap.SugaredLogger } type Result struct { fx.Out - Server *asynq.Server - ServeMux *asynq.ServeMux - Worker worker.Worker `group:"workers"` + Worker worker.Worker `group:"workers"` } func New(p Params) (Result, error) { - cfg := &asynq.Config{ - Concurrency: p.Config.Concurrency, - Logger: loggerWrapper{p.Logger.Named("asynq")}, - LogLevel: asynq.DebugLevel, - Queues: p.Config.Queues, - } - for _, opt := range p.Options { - opt.apply(cfg) - } - srv := asynq.NewServer(redis.Wrapper{Redis: p.Redis}, *cfg) - mux := asynq.NewServeMux() - for _, c := range p.Consumers { - mux.Handle(c.Pattern(), c) - } + var srv *asynq.Server return Result{ - Server: srv, - ServeMux: mux, Worker: worker.NewWorker( "queue_server", fx.Hook{ OnStart: func(ctx context.Context) error { + cfg := &asynq.Config{ + Concurrency: p.Config.Concurrency, + Logger: loggerWrapper{p.Logger.Named("asynq")}, + LogLevel: asynq.DebugLevel, + Queues: p.Config.Queues, + } + for _, opt := range p.Options { + opt.apply(cfg) + } + srv = asynq.NewServer(redis.Wrapper{Redis: p.Redis}, *cfg) + mux := asynq.NewServeMux() + for _, lc := range p.Consumers { + c, err := lc.Get() + if err != nil { + return err + } + mux.Handle(c.Pattern(), c) + } return srv.Start(mux) }, OnStop: func(ctx context.Context) error { - srv.Shutdown() + if srv != nil { + srv.Shutdown() + } return nil }, }, diff --git a/internal/redis/healthcheck/healthcheck.go b/internal/redis/healthcheck/healthcheck.go index c05e4080..bd6ac8d7 100644 --- a/internal/redis/healthcheck/healthcheck.go +++ b/internal/redis/healthcheck/healthcheck.go @@ -2,6 +2,7 @@ package healthcheck import ( "context" + "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/lazy" "github.com/hellofresh/health-go/v5" "github.com/redis/go-redis/v9" "go.uber.org/fx" @@ -9,7 +10,7 @@ import ( type Params struct { fx.In - Redis *redis.Client + Redis lazy.Lazy[*redis.Client] } type Result struct { @@ -21,7 +22,11 @@ func New(p Params) (r Result, err error) { r.Option = health.WithChecks(health.Config{ Name: "redis", Check: func(ctx context.Context) error { - _, err := p.Redis.Ping(ctx).Result() + r, err := p.Redis.Get() + if err != nil { + return err + } + _, err = r.Ping(ctx).Result() return err }, }) diff --git a/internal/redis/redisfx/module.go b/internal/redis/redisfx/module.go index 249cb002..a91778bd 100644 --- a/internal/redis/redisfx/module.go +++ b/internal/redis/redisfx/module.go @@ -2,6 +2,7 @@ package redisfx import ( "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/config/configfx" + "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/lazy" "github.com/bitmagnet-io/bitmagnet/internal/redis/healthcheck" "github.com/bitmagnet-io/bitmagnet/internal/redis/redisconfig" redis "github.com/redis/go-redis/v9" @@ -13,8 +14,10 @@ func New() fx.Option { "redis", configfx.NewConfigModule[redisconfig.Config]("redis", redisconfig.NewDefaultConfig()), fx.Provide( - func(cfg redisconfig.Config) *redis.Client { - return redis.NewClient(cfg.RedisClientOptions()) + func(cfg redisconfig.Config) lazy.Lazy[*redis.Client] { + return lazy.New(func() (*redis.Client, error) { + return redis.NewClient(cfg.RedisClientOptions()), nil + }) }, healthcheck.New, ), diff --git a/internal/torznab/adapter/adapter.go b/internal/torznab/adapter/adapter.go index f73b7cf1..0666bdcd 100644 --- a/internal/torznab/adapter/adapter.go +++ b/internal/torznab/adapter/adapter.go @@ -1,6 +1,7 @@ package adapter import ( + "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/lazy" "github.com/bitmagnet-io/bitmagnet/internal/database/search" "github.com/bitmagnet-io/bitmagnet/internal/torznab" "go.uber.org/fx" @@ -8,22 +9,28 @@ import ( type Params struct { fx.In - Search search.Search + Search lazy.Lazy[search.Search] } type Result struct { fx.Out - Client torznab.Client + Client lazy.Lazy[torznab.Client] } func New(p Params) Result { return Result{ - Client: adapter{ - title: "bitmagnet", - maxLimit: 100, - defaultLimit: 100, - search: p.Search, - }, + Client: lazy.New[torznab.Client](func() (torznab.Client, error) { + s, err := p.Search.Get() + if err != nil { + return nil, err + } + return adapter{ + title: "bitmagnet", + maxLimit: 100, + defaultLimit: 100, + search: s, + }, nil + }), } } diff --git a/internal/torznab/httpserver/httpserver.go b/internal/torznab/httpserver/httpserver.go index 2b6b14a7..86cb6409 100644 --- a/internal/torznab/httpserver/httpserver.go +++ b/internal/torznab/httpserver/httpserver.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/httpserver" + "github.com/bitmagnet-io/bitmagnet/internal/boilerplate/lazy" "github.com/bitmagnet-io/bitmagnet/internal/model" "github.com/bitmagnet-io/bitmagnet/internal/torznab" "github.com/gin-gonic/gin" @@ -13,7 +14,7 @@ import ( type Params struct { fx.In - Client torznab.Client + Client lazy.Lazy[torznab.Client] } type Result struct { @@ -30,7 +31,7 @@ func New(p Params) Result { } type builder struct { - client torznab.Client + client lazy.Lazy[torznab.Client] } func (builder) Key() string { @@ -38,6 +39,10 @@ func (builder) Key() string { } func (b builder) Apply(e *gin.Engine) error { + client, err := b.client.Get() + if err != nil { + return err + } e.GET("/torznab/*any", func(c *gin.Context) { writeInternalError := func(err error) { _ = c.AbortWithError(500, err) @@ -70,7 +75,7 @@ func (b builder) Apply(e *gin.Engine) error { return } if tp == torznab.FunctionCaps { - caps, capsErr := b.client.Caps(c) + caps, capsErr := client.Caps(c) if capsErr != nil { writeErr(fmt.Errorf("failed to execute caps: %w", capsErr)) return @@ -99,7 +104,7 @@ func (b builder) Apply(e *gin.Engine) error { offset.Valid = true offset.Uint = uint(intOffset) } - result, searchErr := b.client.Search(c, torznab.SearchRequest{ + result, searchErr := client.Search(c, torznab.SearchRequest{ Query: c.Query(torznab.ParamQuery), Type: tp, Cats: cats,