Skip to content

Commit

Permalink
patch: Add Segment Purge Sub-function To Utilities
Browse files Browse the repository at this point in the history
Add sub-function to the utilities application to purge segments in the object store which do not match any recording segment entries in the database.
  • Loading branch information
alwitt committed May 8, 2024
1 parent 93fb92f commit 7960a7f
Showing 1 changed file with 204 additions and 0 deletions.
204 changes: 204 additions & 0 deletions bin/util/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"encoding/json"
"fmt"
"net/url"
Expand All @@ -9,12 +10,16 @@ import (
"github.com/alwitt/goutils"
"github.com/alwitt/livemix/api"
"github.com/alwitt/livemix/common"
"github.com/alwitt/livemix/db"
"github.com/alwitt/livemix/utils"
"github.com/apex/log"
apexJSON "github.com/apex/log/handlers/json"
"github.com/go-playground/validator/v10"
"github.com/go-resty/resty/v2"
"github.com/oklog/ulid/v2"
"github.com/spf13/viper"
"github.com/urfave/cli/v2"
"gorm.io/gorm/logger"
)

type newVideoSourceList struct {
Expand All @@ -25,13 +30,22 @@ type provisionVideoSourcArgs struct {
DefinitionFile string `validate:"required,file"`
}

type ctrlNodeCliArgs struct {
ConfigFile string `validate:"required,file"`
DBPassword string
}

type cliArgs struct {
JSONLog bool
LogLevel string `validate:"required,oneof=debug info warn error"`
APIBaseURL string `validate:"required,url"`
RequestIDHeader string `validate:"required"`
}

var s3CredsArgs common.S3Credentials

var ctrlNodeArgs ctrlNodeCliArgs

var cmdArgs cliArgs

var logTags log.Fields
Expand Down Expand Up @@ -116,6 +130,50 @@ func main() {
},
Action: provisionVideoSources,
},
{
Name: "purge-unknown-segments-from-obj-store",
Aliases: []string{"purge-unknown"},
Usage: "Purge unknown segments in object store",
Description: "Purge segments in the object store which do not match any recording segment entries.",
Flags: []cli.Flag{
// Config file
&cli.StringFlag{
Name: "config-file",
Usage: "Application config file",
Aliases: []string{"c"},
EnvVars: []string{"CONFIG_FILE"},
Destination: &ctrlNodeArgs.ConfigFile,
Required: true,
},
// DB password
&cli.StringFlag{
Name: "db-password",
Usage: "Database user password",
Aliases: []string{"p"},
EnvVars: []string{"DB_USER_PASSWORD"},
Value: "",
DefaultText: "",
Destination: &ctrlNodeArgs.DBPassword,
Required: false,
},
// S3 Creds
&cli.StringFlag{
Name: "s3-access-key",
Usage: "S3 user access key",
EnvVars: []string{"AWS_ACCESS_KEY_ID"},
Destination: &s3CredsArgs.AccessKey,
Required: true,
},
&cli.StringFlag{
Name: "s3-secret-access-key",
Usage: "S3 user secret access key",
EnvVars: []string{"AWS_SECRET_ACCESS_KEY"},
Destination: &s3CredsArgs.SecretAccessKey,
Required: true,
},
},
Action: purgeUnknownSegmentsFromObjStore,
},
},
}

Expand Down Expand Up @@ -289,3 +347,149 @@ func provisionVideoSources(c *cli.Context) error {

return nil
}

func purgeUnknownSegmentsFromObjStore(c *cli.Context) error {
validate := validator.New()

// Validate general config
if err := validate.Struct(&cmdArgs); err != nil {
return err
}

setupLogging()

runtimeCtxt, cancel := context.WithCancel(context.Background())
defer cancel()

// ================================================================================
// Process system control node config
if err := validate.Struct(&ctrlNodeArgs); err != nil {
log.
WithError(err).
WithFields(logTags).
Error("Invalid parameters provided to start system control node")
return err
}

// Process the config file
common.InstallDefaultControlNodeConfigValues()
var configs common.ControlNodeConfig
viper.SetConfigFile(ctrlNodeArgs.ConfigFile)
if err := viper.ReadInConfig(); err != nil {
log.WithError(err).WithFields(logTags).Error("Failed to load system control node config")
return err
}
if err := viper.Unmarshal(&configs); err != nil {
log.WithError(err).WithFields(logTags).Error("Failed to parse system control node config")
return err
}

// Validate system control node config
if err := validate.Struct(&configs); err != nil {
log.WithError(err).WithFields(logTags).Error("System control node config file is not valid")
return err
}

storageConfig := configs.VODConfig.RecordingStorage
storageConfig.S3.Creds = &s3CredsArgs

// ================================================================================
// Prepare DB connectors

sqlDSN, err := db.GetPostgresDialector(configs.Postgres, ctrlNodeArgs.DBPassword)
if err != nil {
log.WithError(err).WithFields(logTags).Error("Failed to define Postgres connection DSN")
return err
}

dbConns, err := db.NewSQLConnection(sqlDSN, logger.Error)
if err != nil {
log.WithError(err).WithFields(logTags).Error("Failed to define SQL connection manager")
return err
}

// ================================================================================
// Prepare S3 client

s3Client, err := utils.NewS3Client(storageConfig.S3)
if err != nil {
log.
WithError(err).
WithFields(logTags).
Error("Failed to create S3 client")
return err
}

// ================================================================================
// Get the current set of all known segments

knownRecordingSegments := map[string]common.VideoSegment{}
{
db := dbConns.NewPersistanceManager()

entries, err := db.ListAllRecordingSegments(runtimeCtxt)
if err != nil {
log.WithError(err).WithFields(logTags).Error("Failed to list all recording segments")
return err
}

db.Close()

for _, entry := range entries {
knownRecordingSegments[entry.URI] = entry
}
}

// ================================================================================
// Get the current set of all segments in the object store

objectsInStorage := map[string]string{}
{
objects, err := s3Client.ListObjects(
runtimeCtxt, storageConfig.StorageBucket, &storageConfig.StorageObjectPrefix,
)
if err != nil {
log.WithError(err).WithFields(logTags).Error("Failed to list all objects in storage")
return err
}

for _, objectKey := range objects {
fullURI := fmt.Sprintf("s3://%s/%s", storageConfig.StorageBucket, objectKey)
objectsInStorage[fullURI] = objectKey
}
}

// ================================================================================
// Get list of all objects in storage that doesn't match an recording segment entry

toDeleteObjKeys := []string{}
for objFullURI, objKey := range objectsInStorage {
if _, ok := knownRecordingSegments[objFullURI]; !ok {
// Object is unknown, must remove
toDeleteObjKeys = append(toDeleteObjKeys, objKey)
}
}
log.
WithFields(logTags).
WithField("purge-objects", toDeleteObjKeys).
Debug("Purging unknown segments")

// ================================================================================
// Remove the unknown objects from storage

if err := s3Client.DeleteObjects(
runtimeCtxt, storageConfig.StorageBucket, toDeleteObjKeys,
); err != nil {
errByObject := map[string]string{}
for _, oneErr := range err {
errByObject[oneErr.Object] = oneErr.Error()
}
log.
WithFields(logTags).
WithField("errors", errByObject).
Error("Failed to delete unknown objects")
return fmt.Errorf("failed to delete unknown objects")
}

return nil
}

0 comments on commit 7960a7f

Please sign in to comment.