Skip to content

Commit

Permalink
refactor: workers
Browse files Browse the repository at this point in the history
  • Loading branch information
TheDevMinerTV committed Sep 2, 2024
1 parent e119901 commit ab30919
Show file tree
Hide file tree
Showing 20 changed files with 743 additions and 461 deletions.
66 changes: 60 additions & 6 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,24 @@
package config

import (
"git.devminer.xyz/devminer/unitel"
"github.com/joho/godotenv"
"github.com/rs/zerolog/log"
"net/url"
"os"
"regexp"
"slices"
"strconv"
"strings"

"git.devminer.xyz/devminer/unitel"
"github.com/joho/godotenv"
"github.com/rs/zerolog/log"
)

type Mode string

const (
ModeCombined Mode = "combined"
ModeWeb Mode = "web"
ModeConsumer Mode = "consumer"
)

type Config struct {
Expand All @@ -25,6 +36,9 @@ type Config struct {
NATSURI string
NATSStreamName string

Mode Mode
Consumers []string

DatabaseURI string

Telemetry unitel.Opts
Expand Down Expand Up @@ -62,6 +76,13 @@ func Load() {
Msg("Both VERSIA_TLS_KEY and VERSIA_TLS_CERT have to be set if you want to use in-process TLS termination.")
}

mode := getEnvStrOneOf("VERSIA_MODE", ModeCombined, ModeCombined, ModeWeb, ModeConsumer)

var consumers []string
if raw := optionalEnvStr("VERSIA_TQ_CUSTOMERS"); raw != nil {
consumers = strings.Split(*raw, ",")
}

C = Config{
Port: getEnvInt("VERSIA_PORT", 80),
TLSCert: tlsCert,
Expand All @@ -76,13 +97,15 @@ func Load() {

NATSURI: os.Getenv("NATS_URI"),
NATSStreamName: getEnvStr("NATS_STREAM_NAME", "versia-go"),
DatabaseURI: os.Getenv("DATABASE_URI"),

Mode: mode,
Consumers: consumers,

DatabaseURI: os.Getenv("DATABASE_URI"),

ForwardTracesTo: forwardTracesTo,
Telemetry: unitel.ParseOpts("versia-go"),
}

return
}

func optionalEnvStr(key string) *string {
Expand All @@ -93,6 +116,18 @@ func optionalEnvStr(key string) *string {
return &value
}

func getEnvBool(key string, default_ bool) bool {
if value, ok := os.LookupEnv(key); ok {
b, err := strconv.ParseBool(value)
if err != nil {
panic(err)
}
return b
}

return default_
}

func getEnvStr(key, default_ string) string {
if value, ok := os.LookupEnv(key); ok {
return value
Expand All @@ -113,3 +148,22 @@ func getEnvInt(key string, default_ int) int {

return default_
}

func getEnvStrOneOf[T ~string](key string, default_ T, enum ...T) T {
if value, ok := os.LookupEnv(key); ok {
if !slices.Contains(enum, T(value)) {
sb := strings.Builder{}
sb.WriteString(key)
sb.WriteString(" can only be one of ")
for _, v := range enum {
sb.WriteString(string(v))
}

panic(sb.String())
}

return T(value)
}

return default_
}
4 changes: 4 additions & 0 deletions internal/repository/repo_impls/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ func (i *ManagerImpl) Atomic(ctx context.Context, fn func(ctx context.Context, t
return tx.Finish()
}

func (i *ManagerImpl) Ping() error {
return i.db.Ping()
}

func (i *ManagerImpl) Users() repository.UserRepository {
return i.users
}
Expand Down
1 change: 1 addition & 0 deletions internal/repository/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type InstanceMetadataRepository interface {

type Manager interface {
Atomic(ctx context.Context, fn func(ctx context.Context, tx Manager) error) error
Ping() error

Users() UserRepository
Notes() NoteRepository
Expand Down
3 changes: 2 additions & 1 deletion internal/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package service

import (
"context"

"github.com/gofiber/fiber/v2"
"github.com/versia-pub/versia-go/internal/repository"
"github.com/versia-pub/versia-go/pkg/versia"
Expand Down Expand Up @@ -57,7 +58,7 @@ type InstanceMetadataService interface {
}

type TaskService interface {
ScheduleTask(ctx context.Context, type_ string, data any) error
ScheduleNoteTask(ctx context.Context, type_ string, data any) error
}

type RequestSigner interface {
Expand Down
7 changes: 4 additions & 3 deletions internal/service/svc_impls/note_service_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,18 @@ package svc_impls

import (
"context"
"slices"

"github.com/versia-pub/versia-go/internal/repository"
"github.com/versia-pub/versia-go/internal/service"
task_dtos "github.com/versia-pub/versia-go/internal/task/dtos"
"github.com/versia-pub/versia-go/pkg/versia"
"slices"

"git.devminer.xyz/devminer/unitel"
"github.com/go-logr/logr"
"github.com/google/uuid"
"github.com/versia-pub/versia-go/internal/api_schema"
"github.com/versia-pub/versia-go/internal/entity"
"github.com/versia-pub/versia-go/internal/tasks"
)

var _ service.NoteService = (*NoteServiceImpl)(nil)
Expand Down Expand Up @@ -69,7 +70,7 @@ func (i NoteServiceImpl) CreateNote(ctx context.Context, req api_schema.CreateNo
return err
}

if err := i.taskService.ScheduleTask(ctx, tasks.FederateNote, tasks.FederateNoteData{NoteID: n.ID}); err != nil {
if err := i.taskService.ScheduleNoteTask(ctx, task_dtos.FederateNote, task_dtos.FederateNoteData{NoteID: n.ID}); err != nil {
return err
}

Expand Down
12 changes: 7 additions & 5 deletions internal/service/svc_impls/task_service_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package svc_impls

import (
"context"

"github.com/versia-pub/versia-go/internal/service"
"github.com/versia-pub/versia-go/internal/task"

"git.devminer.xyz/devminer/unitel"
"github.com/go-logr/logr"
Expand All @@ -12,22 +14,22 @@ import (
var _ service.TaskService = (*TaskServiceImpl)(nil)

type TaskServiceImpl struct {
client *taskqueue.Client
manager task.Manager

telemetry *unitel.Telemetry
log logr.Logger
}

func NewTaskServiceImpl(client *taskqueue.Client, telemetry *unitel.Telemetry, log logr.Logger) *TaskServiceImpl {
func NewTaskServiceImpl(manager task.Manager, telemetry *unitel.Telemetry, log logr.Logger) *TaskServiceImpl {
return &TaskServiceImpl{
client: client,
manager: manager,

telemetry: telemetry,
log: log,
}
}

func (i TaskServiceImpl) ScheduleTask(ctx context.Context, type_ string, data any) error {
func (i TaskServiceImpl) ScheduleNoteTask(ctx context.Context, type_ string, data any) error {
s := i.telemetry.StartSpan(ctx, "function", "svc_impls/TaskServiceImpl.ScheduleTask")
defer s.End()
ctx = s.Context()
Expand All @@ -38,7 +40,7 @@ func (i TaskServiceImpl) ScheduleTask(ctx context.Context, type_ string, data an
return err
}

if err := i.client.Submit(ctx, t); err != nil {
if err := i.manager.Notes().Submit(ctx, t); err != nil {
i.log.Error(err, "Failed to schedule task", "type", type_, "taskID", t.ID)
return err
}
Expand Down
11 changes: 11 additions & 0 deletions internal/task/dtos/note_dtos.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package task_dtos

import "github.com/google/uuid"

const (
FederateNote = "federate_note"
)

type FederateNoteData struct {
NoteID uuid.UUID `json:"noteID"`
}
20 changes: 20 additions & 0 deletions internal/task/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package task

import (
"context"

"github.com/versia-pub/versia-go/pkg/taskqueue"
)

type Manager interface {
Notes() NoteHandler
}

type Handler interface {
Register(*taskqueue.Set)
Submit(context.Context, taskqueue.Task) error
}

type NoteHandler interface {
Submit(context.Context, taskqueue.Task) error
}
11 changes: 11 additions & 0 deletions internal/task/task_impls/base.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package task_impls

import "git.devminer.xyz/devminer/unitel"

type baseHandler struct {
telemetry *unitel.Telemetry
}

func newBaseHandler() *baseHandler {
return &baseHandler{}
}
29 changes: 29 additions & 0 deletions internal/task/task_impls/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package task_impls

import (
"git.devminer.xyz/devminer/unitel"
"github.com/go-logr/logr"
"github.com/versia-pub/versia-go/internal/task"
)

var _ task.Manager = (*Manager)(nil)

type Manager struct {
notes *NoteHandler

telemetry *unitel.Telemetry
log logr.Logger
}

func NewManager(notes *NoteHandler, telemetry *unitel.Telemetry, log logr.Logger) *Manager {
return &Manager{
notes: notes,

telemetry: telemetry,
log: log,
}
}

func (m *Manager) Notes() task.NoteHandler {
return m.notes
}
97 changes: 97 additions & 0 deletions internal/task/task_impls/note_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package task_impls

import (
"context"

"github.com/versia-pub/versia-go/internal/entity"
"github.com/versia-pub/versia-go/internal/repository"
"github.com/versia-pub/versia-go/internal/service"
"github.com/versia-pub/versia-go/internal/task"
task_dtos "github.com/versia-pub/versia-go/internal/task/dtos"
"github.com/versia-pub/versia-go/internal/utils"

"git.devminer.xyz/devminer/unitel"
"github.com/go-logr/logr"
"github.com/versia-pub/versia-go/pkg/taskqueue"
)

var _ task.Handler = (*NoteHandler)(nil)

type NoteHandler struct {
federationService service.FederationService

repositories repository.Manager

telemetry *unitel.Telemetry
log logr.Logger
set *taskqueue.Set
}

func NewNoteHandler(federationService service.FederationService, repositories repository.Manager, telemetry *unitel.Telemetry, log logr.Logger) *NoteHandler {
return &NoteHandler{
federationService: federationService,

repositories: repositories,

telemetry: telemetry,
log: log,
}
}

func (t *NoteHandler) Start(ctx context.Context) error {
consumer := t.set.Consumer("note-handler")

return consumer.Start(ctx)
}

func (t *NoteHandler) Register(s *taskqueue.Set) {
t.set = s
s.RegisterHandler(task_dtos.FederateNote, utils.ParseTask(t.FederateNote))
}

func (t *NoteHandler) Submit(ctx context.Context, task taskqueue.Task) error {
s := t.telemetry.StartSpan(ctx, "function", "task_impls/NoteHandler.Submit")
defer s.End()
ctx = s.Context()

return t.set.Submit(ctx, task)
}

func (t *NoteHandler) FederateNote(ctx context.Context, data task_dtos.FederateNoteData) error {
s := t.telemetry.StartSpan(ctx, "function", "task_impls/NoteHandler.FederateNote")
defer s.End()
ctx = s.Context()

var n *entity.Note
if err := t.repositories.Atomic(ctx, func(ctx context.Context, tx repository.Manager) error {
var err error
n, err = tx.Notes().GetByID(ctx, data.NoteID)
if err != nil {
return err
}
if n == nil {
t.log.V(-1).Info("Could not find note", "id", data.NoteID)
return nil
}

for _, uu := range n.Mentions {
if !uu.IsRemote {
t.log.V(2).Info("User is not remote", "user", uu.ID)
continue
}

res, err := t.federationService.SendToInbox(ctx, n.Author, &uu, n.ToVersia())
if err != nil {
t.log.Error(err, "Failed to send note to remote user", "res", res, "user", uu.ID)
} else {
t.log.V(2).Info("Sent note to remote user", "res", res, "user", uu.ID)
}
}

return nil
}); err != nil {
return err
}

return nil
}
Loading

0 comments on commit ab30919

Please sign in to comment.