From bc69e38ecd915953c1edb0ec9c43cc8d854a23d8 Mon Sep 17 00:00:00 2001 From: Dylan <64976002+galt-tr@users.noreply.github.com> Date: Fri, 26 Jan 2024 12:44:30 -0500 Subject: [PATCH] Add alert processing retry cron (#31) * Add alert processing retry cron * Fix ReadRaw func --- app/config/config.go | 44 +++++++++++---------- app/config/envs/local.json | 2 + app/config/envs/stn.json | 1 + app/config/load.go | 10 +++++ app/config/load_test.go | 2 + app/models/alert_message.go | 79 +++++++++++++++++++++++++++++-------- app/models/genesis_alert.go | 1 + app/p2p/server.go | 76 ++++++++++++++++++++++++++++++++--- app/p2p/thread.go | 2 + 9 files changed, 175 insertions(+), 42 deletions(-) diff --git a/app/config/config.go b/app/config/config.go index 85471ce..e2f1739 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -35,13 +35,15 @@ var ( // Application configuration constants var ( - ApplicationName = "alert_system" // Application name used in places where we need an application name space - DatabasePrefix = "alert_system" // Default database prefix - DefaultAlertSystemProtocolID = "/bitcoin/alert-system/0.0.1" // Default alert system protocol for libp2p syncing - DefaultTopicName = "alert_system" // Default alert system topic name for libp2p subscription - DefaultServerShutdown = 5 * time.Second // Default server shutdown delay time (to finish any requests or internal processes) - LocalPrivateKeyDefault = "alert_system_private_key" // Default local private key - LocalPrivateKeyDirectory = ".bitcoin" // Default local private key directory + ApplicationName = "alert_system" // Application name used in places where we need an application name space + DatabasePrefix = "alert_system" // Default database prefix + DefaultAlertSystemProtocolID = "/bitcoin/alert-system/0.0.1" // Default alert system protocol for libp2p syncing + DefaultTopicName = "alert_system" // Default alert system topic name for libp2p subscription + DefaultServerShutdown = 5 * time.Second // Default server shutdown delay time (to finish any requests or internal processes) + DefaultPeerDiscoveryInterval = 10 * time.Minute // Default peer discovery refresh interval + DefaultAlertProcessingInterval = 5 * time.Minute // Default alert processing retry interval + LocalPrivateKeyDefault = "alert_system_private_key" // Default local private key + LocalPrivateKeyDirectory = ".bitcoin" // Default local private key directory ) // The global configuration settings @@ -49,13 +51,14 @@ type ( // Config is the global configuration settings Config struct { - AlertWebhookURL string `json:"alert_webhook_url" mapstructure:"alert_webhook_url"` // AlertWebhookURL is the URL for the alert webhook - Datastore DatastoreConfig `json:"datastore" mapstructure:"datastore"` // Datastore's configuration - P2P P2PConfig `json:"p2p" mapstructure:"p2p"` // P2P is the configuration for the P2P server - RPCConnections []RPCConfig `json:"rpc_connections" mapstructure:"rpc_connections"` // RPCConnections is a list of RPC connections - RequestLogging bool `json:"request_logging" mapstructure:"request_logging"` // Toggle for verbose request logging (API requests) - Services Services `json:"-" mapstructure:"services"` // Services is the global services - WebServer WebServerConfig `json:"web_server" mapstructure:"web_server"` // WebServer is the configuration for the web HTTP Server + AlertWebhookURL string `json:"alert_webhook_url" mapstructure:"alert_webhook_url"` // AlertWebhookURL is the URL for the alert webhook + Datastore DatastoreConfig `json:"datastore" mapstructure:"datastore"` // Datastore's configuration + P2P P2PConfig `json:"p2p" mapstructure:"p2p"` // P2P is the configuration for the P2P server + RPCConnections []RPCConfig `json:"rpc_connections" mapstructure:"rpc_connections"` // RPCConnections is a list of RPC connections + RequestLogging bool `json:"request_logging" mapstructure:"request_logging"` // Toggle for verbose request logging (API requests) + Services Services `json:"-" mapstructure:"services"` // Services is the global services + WebServer WebServerConfig `json:"web_server" mapstructure:"web_server"` // WebServer is the configuration for the web HTTP Server + AlertProcessingInterval time.Duration `json:"alert_processing_interval" mapstructure:"alert_processing_interval"` // AlertProcessingInterval is the interval in which the system will go through all of the saved alerts and attempt to retry any unprocessed alerts } // DatastoreConfig is the configuration for the datastore @@ -84,12 +87,13 @@ type ( // P2PConfig is the configuration for the P2P server and connection P2PConfig struct { - AlertSystemProtocolID string `json:"alert_system_protocol_id" mapstructure:"alert_system_protocol_id"` // AlertSystemProtocolID is the protocol ID to use on the libp2p network for alert system communication - BootstrapPeer string `json:"bootstrap_peer" mapstructure:"bootstrap_peer"` // BootstrapPeer is the bootstrap peer for the libp2p network - IP string `json:"ip" mapstructure:"ip"` // IP is the IP address for the P2P server - Port string `json:"port" mapstructure:"port"` // Port is the port for the P2P server - PrivateKeyPath string `json:"private_key_path" mapstructure:"private_key_path"` // PrivateKeyPath is the path to the private key - TopicName string `json:"topic_name" mapstructure:"topic_name"` // TopicName is the name of the topic to subscribe to + AlertSystemProtocolID string `json:"alert_system_protocol_id" mapstructure:"alert_system_protocol_id"` // AlertSystemProtocolID is the protocol ID to use on the libp2p network for alert system communication + BootstrapPeer string `json:"bootstrap_peer" mapstructure:"bootstrap_peer"` // BootstrapPeer is the bootstrap peer for the libp2p network + IP string `json:"ip" mapstructure:"ip"` // IP is the IP address for the P2P server + Port string `json:"port" mapstructure:"port"` // Port is the port for the P2P server + PrivateKeyPath string `json:"private_key_path" mapstructure:"private_key_path"` // PrivateKeyPath is the path to the private key + TopicName string `json:"topic_name" mapstructure:"topic_name"` // TopicName is the name of the topic to subscribe to + PeerDiscoveryInterval time.Duration `json:"peer_discovery_interval" mapstructure:"peer_discovery_interval"` // PeerDiscoveryInterval is the interval in which we will refresh the peer table and check peers for missing messages } // RPCConfig is the configuration for the RPC client diff --git a/app/config/envs/local.json b/app/config/envs/local.json index 39e86ad..c03b5ef 100644 --- a/app/config/envs/local.json +++ b/app/config/envs/local.json @@ -1,6 +1,7 @@ { "alert_webhook_url": "", "request_logging": true, + "alert_processing_interval": "5m", "web_server": { "idle_timeout": "60s", "port": "3000", @@ -57,6 +58,7 @@ "alert_system_protocol_id": "/bitcoin/alert-system/0.0.1", "bootstrap_peer": "", "private_key_path": "", + "peer_discovery_interval": "10m", "topic_name": "alert_system" }, "rpc_connections": [ diff --git a/app/config/envs/stn.json b/app/config/envs/stn.json index ef8cc15..4c7c59e 100644 --- a/app/config/envs/stn.json +++ b/app/config/envs/stn.json @@ -1,6 +1,7 @@ { "alert_webhook_url": "", "request_logging": true, + "alert_processing_interval": "5m", "web_server": { "idle_timeout": "60s", "port": "3000", diff --git a/app/config/load.go b/app/config/load.go index b032fa0..1db2354 100644 --- a/app/config/load.go +++ b/app/config/load.go @@ -103,6 +103,11 @@ func requireP2P(_appConfig *Config) error { } } + // Load the peer discovery interval + if _appConfig.P2P.PeerDiscoveryInterval <= 0 { + _appConfig.P2P.PeerDiscoveryInterval = DefaultPeerDiscoveryInterval + } + // Load the p2p ip (local, ip address or domain name) // todo better validation of what is a valid IP, domain name or local address if len(_appConfig.P2P.IP) < 5 { @@ -205,6 +210,11 @@ func LoadConfigFile() (_appConfig *Config, err error) { Logger: gocore.Log(ApplicationName), } + // Set default alert processing interval if it doesn't exist + if _appConfig.AlertProcessingInterval <= 0 { + _appConfig.AlertProcessingInterval = DefaultAlertProcessingInterval + } + // Log the configuration that was detected and where it was loaded from _appConfig.Services.Log.Debug("loaded configuration from: " + viper.ConfigFileUsed()) diff --git a/app/config/load_test.go b/app/config/load_test.go index 61f76d7..39ede04 100644 --- a/app/config/load_test.go +++ b/app/config/load_test.go @@ -29,6 +29,8 @@ func TestLoadConfig_Success(t *testing.T) { assert.Equal(t, "/path/to/private/key", c.P2P.PrivateKeyPath) assert.Equal(t, "", c.P2P.BootstrapPeer) assert.Equal(t, DefaultAlertSystemProtocolID, c.P2P.AlertSystemProtocolID) + assert.Equal(t, DefaultPeerDiscoveryInterval, c.P2P.PeerDiscoveryInterval) + assert.Equal(t, DefaultAlertProcessingInterval, c.AlertProcessingInterval) assert.Equal(t, "192.168.1.1", c.P2P.IP) assert.Equal(t, "8000", c.P2P.Port) assert.Equal(t, "https://webhook.url", c.AlertWebhookURL) diff --git a/app/models/alert_message.go b/app/models/alert_message.go index 8e94dcd..de835f4 100644 --- a/app/models/alert_message.go +++ b/app/models/alert_message.go @@ -27,7 +27,7 @@ type AlertMessage struct { Hash string `json:"hash" toml:"hash" yaml:"hash" bson:"hash" gorm:"<-;type:char(64);index;comment:This is the hash"` SequenceNumber uint32 `json:"sequence_number" toml:"sequence_number" yaml:"sequence_number" bson:"sequence_number" gorm:"<-;type:int8;index;comment:This is the alert sequence number"` Raw string `json:"raw" toml:"raw" yaml:"raw" bson:"raw" gorm:"<-;type:text;comment:This is the raw alert message"` - Processed bool `json:"processed" toml:"processed" yaml:"processed" bson:"processed"` + Processed bool `json:"processed" toml:"processed" yaml:"processed" bson:"processed" gorm:"<-;type:boolean;comment:This determine if the alert was processed"` // Private fields (never to be exported) alertType AlertType @@ -247,14 +247,21 @@ func (m *AlertMessage) Timestamp() uint64 { return m.timestamp } -// NewAlertFromBytes creates a new alert from bytes -func NewAlertFromBytes(ak []byte, opts ...model.Options) (*AlertMessage, error) { +// ReadRaw sets the model fields based on the raw message +func (m *AlertMessage) ReadRaw() error { + if len(m.GetRawMessage()) == 0 { + ak, err := hex.DecodeString(m.Raw) + if err != nil { + return err + } + m.SetRawMessage(ak) + } - // Check if the alert is valid - if len(ak) < 16 { + if len(m.GetRawMessage()) < 16 { // todo DETERMINE ACTUAL PROPER LENGTH - return nil, fmt.Errorf("alert needs to be at least 16") + return fmt.Errorf("alert needs to be at least 16 bytes") } + ak := m.GetRawMessage() version := binary.LittleEndian.Uint32(ak[:4]) sequenceNumber := binary.LittleEndian.Uint32(ak[4:8]) timestamp := binary.LittleEndian.Uint64(ak[8:16]) @@ -274,7 +281,7 @@ func NewAlertFromBytes(ak []byte, opts ...model.Options) (*AlertMessage, error) // but possible. Regardless let's just error out now if this length is lower. At least // allows us to grab the expected signature. if len(alertAndSignature) < sigLen+2 { - return nil, fmt.Errorf("alert message is invalid - too short length") + return fmt.Errorf("alert message is invalid - too short length") } // Get alert message bytes @@ -292,17 +299,26 @@ func NewAlertFromBytes(ak []byte, opts ...model.Options) (*AlertMessage, error) dataLen := 20 + len(alert) - // Create the new alert + m.SetAlertType(AlertType(alertType)) + m.message = alert + m.SequenceNumber = sequenceNumber + m.timestamp = timestamp + m.version = version + m.data = ak[:dataLen] + m.signatures = sigs + _ = m.Serialize() + return nil +} + +// NewAlertFromBytes creates a new alert from bytes +func NewAlertFromBytes(ak []byte, opts ...model.Options) (*AlertMessage, error) { opts = append(opts, model.New()) newAlert := NewAlertMessage(opts...) - newAlert.SetAlertType(AlertType(alertType)) - newAlert.message = alert - newAlert.SequenceNumber = sequenceNumber - newAlert.timestamp = timestamp - newAlert.version = version - newAlert.data = ak[:dataLen] - newAlert.signatures = sigs - _ = newAlert.Serialize() + newAlert.SetRawMessage(ak) + err := newAlert.ReadRaw() + if err != nil { + return nil, err + } // Return alert return newAlert, nil @@ -359,3 +375,34 @@ func GetLatestAlert(ctx context.Context, metadata *model.Metadata, opts ...model // Return the first item (only item) return modelItems[0], nil } + +// GetAllUnprocessedAlerts will get all alerts that weren't successfully processed +func GetAllUnprocessedAlerts(ctx context.Context, metadata *model.Metadata, opts ...model.Options) ([]*AlertMessage, error) { + + // Set the conditions + conditions := &map[string]interface{}{ + utils.FieldDeletedAt: map[string]interface{}{ // IS NULL + utils.ExistsCondition: false, + }, + "processed": false, + } + + // Set the query params + queryParams := &datastore.QueryParams{ + OrderByField: utils.FieldSequenceNumber, + SortDirection: utils.SortAscending, + } + + // Get the record + modelItems := make([]*AlertMessage, 0) + if err := model.GetModelsByConditions( + ctx, model.NameAlertMessage, &modelItems, metadata, conditions, queryParams, opts..., + ); err != nil { + return nil, err + } else if len(modelItems) == 0 { + return nil, nil + } + + // Return the first item (only item) + return modelItems, nil +} diff --git a/app/models/genesis_alert.go b/app/models/genesis_alert.go index b359c10..29ba26f 100644 --- a/app/models/genesis_alert.go +++ b/app/models/genesis_alert.go @@ -59,6 +59,7 @@ func CreateGenesisAlert(ctx context.Context, opts ...model.Options) error { newAlert.SequenceNumber = 0 newAlert.timestamp = uint64(time.Date(2923, time.November, 1, 1, 1, 1, 1, time.UTC).Unix()) newAlert.version = 1 + newAlert.Processed = true // Serialize the data newAlert.SerializeData() diff --git a/app/p2p/server.go b/app/p2p/server.go index 727c53d..f170d63 100644 --- a/app/p2p/server.go +++ b/app/p2p/server.go @@ -114,6 +114,7 @@ func (s *Server) Start(ctx context.Context) error { } _ = s.RunPeerDiscovery(ctx, routingDiscovery) + _ = s.RunAlertProcessingCron(ctx) for !s.connected { time.Sleep(5 * time.Second) } @@ -187,19 +188,82 @@ func (s *Server) Stop(_ context.Context) error { return nil } -// RunPeerDiscovery starts a 5 min cron job to resync peers and update routable peers +// RunAlertProcessingCron starts a cron job to attempt to retry unprocessed alerts +func (s *Server) RunAlertProcessingCron(ctx context.Context) chan bool { + ticker := time.NewTicker(s.config.AlertProcessingInterval) + quit := make(chan bool, 1) + go func() { + for { + select { + case <-ticker.C: + err := s.processAlerts(ctx) + if err != nil { + s.config.Services.Log.Errorf("error processing alerts: %v", err.Error()) + } + case <-quit: + ticker.Stop() + } + } + }() + return quit +} + +// processAlerts performs the alert processing +func (s *Server) processAlerts(ctx context.Context) error { + alerts, err := models.GetAllUnprocessedAlerts(ctx, nil, model.WithAllDependencies(s.config)) + if err != nil { + return err + } + s.config.Services.Log.Infof("Attempting to process %d failed alerts", len(alerts)) + success := 0 + for _, alert := range alerts { + alert.SetOptions(model.WithAllDependencies(s.config)) + // Serialize the alert data and hash + err := alert.ReadRaw() + if err != nil { + continue + } + alert.SerializeData() + // Process the alert + ak := alert.ProcessAlertMessage() + if ak == nil { + continue + } + if err = ak.Read(alert.GetRawMessage()); err != nil { + return err + } + s.config.Services.Log.Debugf("attempting to process alert %d of type %d", alert.SequenceNumber, alert.GetAlertType()) + alert.Processed = true + if err = ak.Do(ctx); err != nil { + s.config.Services.Log.Errorf("failed to process alert %d; err: %v", alert.SequenceNumber, err.Error()) + alert.Processed = false + } + + if alert.Processed { + success++ + // Save the alert + if err = alert.Save(ctx); err != nil { + return err + } + } + } + s.config.Services.Log.Infof("Processed %d failed alerts", success) + return nil +} + +// RunPeerDiscovery starts a cron job to resync peers and update routable peers func (s *Server) RunPeerDiscovery(ctx context.Context, routingDiscovery *drouting.RoutingDiscovery) chan bool { - ticker := time.NewTicker(5 * time.Minute) + ticker := time.NewTicker(s.config.P2P.PeerDiscoveryInterval) quit := make(chan bool, 1) go func() { - err := s.discoverPeers(ctx, s.topicNames, routingDiscovery) + err := s.discoverPeers(ctx, routingDiscovery) if err != nil { s.config.Services.Log.Errorf("error discovering peers: %v", err.Error()) } for { select { case <-ticker.C: - err := s.discoverPeers(ctx, s.topicNames, routingDiscovery) + err := s.discoverPeers(ctx, routingDiscovery) if err != nil { s.config.Services.Log.Errorf("error discovering peers: %v", err.Error()) } @@ -262,13 +326,13 @@ func (s *Server) Topics() map[string]*pubsub.Topic { } // discoverPeers will discover peers -func (s *Server) discoverPeers(ctx context.Context, tn []string, routingDiscovery *drouting.RoutingDiscovery) error { +func (s *Server) discoverPeers(ctx context.Context, routingDiscovery *drouting.RoutingDiscovery) error { s.config.Services.Log.Infof("Running peer discovery at %s", time.Now().String()) // Look for others who have announced and attempt to connect to them connected := 0 for connected < 2 { - for _, topicName := range tn { + for _, topicName := range s.topicNames { s.config.Services.Log.Debugf("searching for peers for topic %s..\n", topicName) var peerChan <-chan peer.AddrInfo diff --git a/app/p2p/thread.go b/app/p2p/thread.go index 7ffbda9..04e132c 100644 --- a/app/p2p/thread.go +++ b/app/p2p/thread.go @@ -212,8 +212,10 @@ func (s *StreamThread) ProcessGotSequenceNumber(msg *SyncMessage) error { if err = ak.Read(a.GetRawMessage()); err != nil { return err } + a.Processed = true if err = ak.Do(s.ctx); err != nil { s.config.Services.Log.Errorf("failed to process alert %d; err: %v", a.SequenceNumber, err.Error()) + a.Processed = false } // Save the alert