From 2cca1006f8f390a5c0e1f4f7bd39f02c0e421628 Mon Sep 17 00:00:00 2001 From: miguelreiswildlife <108534663+miguelreiswildlife@users.noreply.github.com> Date: Thu, 25 Apr 2024 17:11:15 -0300 Subject: [PATCH] Use MQTT client and configure timeout (#20) * Use Mqtt client instead of http * Do no check for http error * Better logging + get timeout from config * Use mqtt client and configure timeout --- api/sendmqtt.go | 25 ++++++++----------------- mqttclient/mqttclient.go | 25 ++++++++++++++++++------- 2 files changed, 26 insertions(+), 24 deletions(-) diff --git a/api/sendmqtt.go b/api/sendmqtt.go index 393e2b2..858b845 100644 --- a/api/sendmqtt.go +++ b/api/sendmqtt.go @@ -16,7 +16,6 @@ import ( "github.com/labstack/echo" log "github.com/sirupsen/logrus" - "github.com/topfreegames/arkadiko/httpclient" ) // SendMqttHandler is the handler responsible for sending messages to mqtt @@ -52,6 +51,7 @@ func SendMqttHandler(app *App) func(c echo.Context) error { return FailWith(400, err.Error(), c) } + // Default should_moderate to false so messages sent from the server side are not moderated if _, exists := msgPayload["should_moderate"]; !exists { msgPayload["should_moderate"] = false } @@ -75,34 +75,24 @@ func SendMqttHandler(app *App) func(c echo.Context) error { }) var mqttLatency time.Duration - var beforeMqttTime, afterMqttTime time.Time + var beforeMqttTime time.Time err = WithSegment("mqtt", c, func() error { beforeMqttTime = time.Now() - httpError := app.HttpClient.SendMessage( - c.Request().Context(), topic, string(b), retained, - ) - afterMqttTime = time.Now() - return httpError + sendMqttErr := app.MqttClient.PublishMessage(c.Request().Context(), topic, string(b), retained) + mqttLatency = time.Now().Sub(beforeMqttTime) + + return sendMqttErr }) - status := 200 - if err != nil { - lg.WithError(err).Error("failed to send mqtt message") - status = 500 - if e, ok := err.(*httpclient.HTTPError); ok { - status = e.StatusCode - } - } tags := []string{ fmt.Sprintf("error:%t", err != nil), - fmt.Sprintf("status:%d", status), fmt.Sprintf("retained:%t", retained), } if source != "" { tags = append(tags, fmt.Sprintf("requestor:%s", source)) } - mqttLatency = afterMqttTime.Sub(beforeMqttTime) + app.DDStatsD.Timing("mqtt_latency", mqttLatency, tags...) lg = lg.WithField("mqttLatency", mqttLatency.Nanoseconds()) lg.Debug("sent mqtt message") @@ -111,6 +101,7 @@ func SendMqttHandler(app *App) func(c echo.Context) error { c.Set("retained", retained) if err != nil { + lg.WithError(err).Error("failed to send mqtt message") return FailWith(500, err.Error(), c) } return c.String(http.StatusOK, workingString) diff --git a/mqttclient/mqttclient.go b/mqttclient/mqttclient.go index f48b8e5..bf9c9a0 100644 --- a/mqttclient/mqttclient.go +++ b/mqttclient/mqttclient.go @@ -30,6 +30,7 @@ import ( type MqttClient struct { MqttServerHost string MqttServerPort int + Timeout time.Duration ConfigPath string Config *viper.Viper Logger log.FieldLogger @@ -62,30 +63,38 @@ func GetMqttClient(configPath string, onConnectHandler mqtt.OnConnectHandler, l // SendMessage sends the message with the given payload to topic func (mc *MqttClient) SendMessage(ctx context.Context, topic string, message string) error { - return mc.publishMessage(ctx, topic, message, false) + return mc.PublishMessage(ctx, topic, message, false) } // SendRetainedMessage sends the message with the given payload to topic func (mc *MqttClient) SendRetainedMessage(ctx context.Context, topic string, message string) error { - return mc.publishMessage(ctx, topic, message, true) + return mc.PublishMessage(ctx, topic, message, true) } -func (mc *MqttClient) publishMessage(ctx context.Context, topic string, message string, retained bool) error { +func (mc *MqttClient) PublishMessage(ctx context.Context, topic string, message string, retained bool) error { + l := mc.Logger.WithFields( + log.Fields{ + "method": "PublishMessage", + "topic": topic, + "message": message, + "retained": retained, + }, + ) token := mc.MqttClient.WithContext(ctx).Publish(topic, 2, retained, message) - result := token.WaitTimeout(time.Second * 5) + result := token.WaitTimeout(mc.Timeout) if result == false || token.Error() != nil { err := token.Error() if err == nil { err = errors.New("timeoutError") } - mc.Logger.WithError(err).Error() + l.WithError(err).Error("Error publishing message to mqtt") return err } return nil } -//WaitForConnection to mqtt server +// WaitForConnection to mqtt server func (mc *MqttClient) WaitForConnection(timeout int) error { start := time.Now() timedOut := func() bool { @@ -102,7 +111,7 @@ func (mc *MqttClient) WaitForConnection(timeout int) error { } func (mc *MqttClient) configure(l log.FieldLogger) { - mc.Logger = l + mc.Logger = l.WithField("source", "MqttClient") mc.setConfigurationDefaults() mc.loadConfiguration() @@ -115,6 +124,7 @@ func (mc *MqttClient) setConfigurationDefaults() { mc.Config.SetDefault("mqttserver.user", "admin") mc.Config.SetDefault("mqttserver.pass", "admin") mc.Config.SetDefault("mqttserver.ca_cert_file", "") + mc.Config.SetDefault("mqttserver.timeout", 5*time.Second) } func (mc *MqttClient) loadConfiguration() { @@ -135,6 +145,7 @@ func (mc *MqttClient) loadConfiguration() { func (mc *MqttClient) configureClient() { mc.MqttServerHost = mc.Config.GetString("mqttserver.host") mc.MqttServerPort = mc.Config.GetInt("mqttserver.port") + mc.Timeout = mc.Config.GetDuration("mqttserver.timeout") } func (mc *MqttClient) start(onConnectHandler mqtt.OnConnectHandler) {