Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mqtt support #950

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -99,6 +99,7 @@ Have any feedback or questions? [Create a discussion](https://github.com/TwiN/ga
- [Monitoring a WebSocket endpoint](#monitoring-a-websocket-endpoint)
- [Monitoring an endpoint using ICMP](#monitoring-an-endpoint-using-icmp)
- [Monitoring an endpoint using DNS queries](#monitoring-an-endpoint-using-dns-queries)
- [Monitoring an endpoint using MQTT](#monitoring-an-endpoint-using-mqtt)
- [Monitoring an endpoint using SSH](#monitoring-an-endpoint-using-ssh)
- [Monitoring an endpoint using STARTTLS](#monitoring-an-endpoint-using-starttls)
- [Monitoring an endpoint using TLS](#monitoring-an-endpoint-using-tls)
@@ -2037,6 +2038,28 @@ There are two placeholders that can be used in the conditions for endpoints of t
`NOERROR`, `FORMERR`, `SERVFAIL`, `NXDOMAIN`, etc.


### Monitoring an endpoint using MQTT
Defining an `mqtt` configuration in an endpoint will automatically mark said endpoint as an endpoint of type MQTT:
```yaml
endpoints:
- name: example-mqtt-query
url: "wss://example.com/mqtt"
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are you using a websocket endpoint for monitoring mqtt? 🤔
shouldn't we be using mqtt?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't this cause confusion, given that a websocket endpoint may be defined as a websocket endpoint internally? See the TypeMQTT Type = "MQTT" change you made

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mqtt:
topic: "my_topic"
username: "username" # Optional
password: "password" # Optional
body: "gatus check - {{ uuidv4 }}"
conditions:
- "[CONNECTED] == true"
```

The body can be plain text or a text/template. If it is a text/template, the following functions are available:
- `uuidv4` returns a UUID v4 universally unique ID

The following placeholders are supported for endpoints of type MQTT:
- `[CONNECTED]` resolves to `true` if the MQTT message was published and the same message was consumed, `false` otherwise


### Monitoring an endpoint using SSH
You can monitor endpoints using SSH by prefixing `endpoints[].url` with `ssh://`:
```yaml
62 changes: 62 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package client

import (
"bytes"
"crypto/tls"
"crypto/x509"
"encoding/json"
@@ -11,10 +12,13 @@ import (
"net/smtp"
"runtime"
"strings"
"text/template"
"time"

"github.com/TwiN/gocache/v2"
"github.com/TwiN/whois"
"github.com/eclipse/paho.mqtt.golang"
"github.com/google/uuid"
"github.com/ishidawataru/sctp"
"github.com/miekg/dns"
ping "github.com/prometheus-community/pro-bing"
@@ -32,6 +36,12 @@ var (

whoisClient = whois.NewClient().WithReferralCache(true)
whoisExpirationDateCache = gocache.NewCache().WithMaxSize(10000).WithDefaultTTL(24 * time.Hour)

mqttTemplateEngine = template.New("base").Funcs(template.FuncMap{
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only think I'm unsure about is the fact that this uses a templating engine, while everything else leverages placeholder and functions. Maybe this should be exposed as a placeholder instead? I feel supporting [UUID4] globally (and not just for MQTT) would be more beneficial, albeit more complicated.

The UUID4 placeholder should also be unique per endpoint, meaning that if it's in the url and the body, it should be the same. Let me know if that doesn't make sense, I can give you some examples

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I will look around and see if I can just figure it out.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven’t had time to look into this yet

"uuidv4": func() string {
return uuid.New().String()
},
})
)

// GetHTTPClient returns the shared HTTP client, or the client from the configuration passed
@@ -332,6 +342,58 @@ func QueryDNS(queryType, queryName, url string) (connected bool, dnsRcode string
return connected, dnsRcode, body, nil
}

func QueryMQTT(address, topic, username, password, body string, config *Config) (bool, error) {
bodyTemplate, err := mqttTemplateEngine.Parse(body)
if err != nil {
return false, fmt.Errorf("error parsing mqtt request body: %w", err)
}

var renderedBodyBuffer bytes.Buffer
err = bodyTemplate.Execute(&renderedBodyBuffer, nil)
if err != nil {
return false, fmt.Errorf("error rendering mqtt request body: %w", err)
}
renderedBody := renderedBodyBuffer.String()

opts := mqtt.NewClientOptions()
opts.AddBroker(address)
opts.SetClientID(fmt.Sprintf("gatus-client-%d", time.Now().UnixMilli()))
if len(username) > 0 {
opts.SetUsername(username)
}
if len(password) > 0 {
opts.SetPassword(password)
}
client := mqtt.NewClient(opts)
if token := client.Connect(); token.WaitTimeout(config.Timeout) && token.Error() != nil {
return false, fmt.Errorf("error connecting to mqtt: %w", token.Error())
}
defer client.Disconnect(1)

done := make(chan struct{})
defer close(done)

if token := client.Subscribe(topic, 0, func(client mqtt.Client, message mqtt.Message) {
message.Ack()
if string(message.Payload()) == renderedBody {
done <- struct{}{}
}
}); token.WaitTimeout(config.Timeout) && token.Error() != nil {
return false, fmt.Errorf("error subscribing to mqtt topic: %w", token.Error())
}

if token := client.Publish(topic, 0, false, renderedBody); token.WaitTimeout(config.Timeout) && token.Error() != nil {
return false, fmt.Errorf("error publishing to mqtt topic: %w", token.Error())
}

select {
case <-done:
return true, nil
case <-time.After(config.Timeout):
return false, fmt.Errorf("timout while waiting for mqtt message: %w")
}
}

// InjectHTTPClient is used to inject a custom HTTP client for testing purposes
func InjectHTTPClient(httpClient *http.Client) {
injectedHTTPClient = httpClient
17 changes: 17 additions & 0 deletions config/endpoint/endpoint.go
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@ import (
"github.com/TwiN/gatus/v5/alerting/alert"
"github.com/TwiN/gatus/v5/client"
"github.com/TwiN/gatus/v5/config/endpoint/dns"
"github.com/TwiN/gatus/v5/config/endpoint/mqtt"
sshconfig "github.com/TwiN/gatus/v5/config/endpoint/ssh"
"github.com/TwiN/gatus/v5/config/endpoint/ui"
"golang.org/x/crypto/ssh"
@@ -46,6 +47,7 @@ const (
TypeHTTP Type = "HTTP"
TypeWS Type = "WEBSOCKET"
TypeSSH Type = "SSH"
TypeMQTT Type = "MQTT"
TypeUNKNOWN Type = "UNKNOWN"
)

@@ -110,6 +112,9 @@ type Endpoint struct {
// SSH is the configuration for SSH monitoring
SSHConfig *sshconfig.Config `yaml:"ssh,omitempty"`

// MQTT is the configuration for MQTT monitoring
MQTTConfig *mqtt.Config `yaml:"mqtt,omitempty"`

// ClientConfig is the configuration of the client used to communicate with the endpoint's target
ClientConfig *client.Config `yaml:"client,omitempty"`

@@ -136,6 +141,8 @@ func (e *Endpoint) Type() Type {
switch {
case e.DNSConfig != nil:
return TypeDNS
case e.MQTTConfig != nil:
return TypeMQTT
case strings.HasPrefix(e.URL, "tcp://"):
return TypeTCP
case strings.HasPrefix(e.URL, "sctp://"):
@@ -216,6 +223,9 @@ func (e *Endpoint) ValidateAndSetDefaults() error {
if e.SSHConfig != nil {
return e.SSHConfig.Validate()
}
if e.MQTTConfig != nil {
return e.MQTTConfig.Validate()
}
if e.Type() == TypeUNKNOWN {
return ErrUnknownEndpointType
}
@@ -375,6 +385,13 @@ func (e *Endpoint) call(result *Result) {
return
}
result.Duration = time.Since(startTime)
} else if endpointType == TypeMQTT {
result.Connected, err = client.QueryMQTT(e.URL, e.MQTTConfig.Topic, e.MQTTConfig.Username, e.MQTTConfig.Password, e.Body, e.ClientConfig)
if err != nil {
result.AddError(err.Error())
return
}
result.Duration = time.Since(startTime)
} else {
response, err = client.GetHTTPClient(e.ClientConfig).Do(request)
result.Duration = time.Since(startTime)
104 changes: 101 additions & 3 deletions config/endpoint/endpoint_test.go
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ import (
"crypto/tls"
"crypto/x509"
"errors"
"github.com/TwiN/gatus/v5/config/endpoint/mqtt"
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be separate from the standard library dependencies, namely, at the bottom. See other go files for example.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know/ping me when you resolved it 👀

"io"
"net/http"
"strings"
@@ -280,9 +281,10 @@ func TestEndpoint_IsEnabled(t *testing.T) {

func TestEndpoint_Type(t *testing.T) {
type args struct {
URL string
DNS *dns.Config
SSH *ssh.Config
URL string
DNS *dns.Config
SSH *ssh.Config
MQTT *mqtt.Config
}
tests := []struct {
args args
@@ -298,6 +300,15 @@ func TestEndpoint_Type(t *testing.T) {
},
want: TypeDNS,
},
{
args: args{
URL: "wss://example.com/mqtt",
MQTT: &mqtt.Config{
Topic: "my_topic",
},
},
want: TypeMQTT,
},
{
args: args{
URL: "tcp://127.0.0.1:6379",
@@ -540,6 +551,57 @@ func TestEndpoint_ValidateAndSetDefaultsWithSSH(t *testing.T) {
}
}

func TestEndpoint_ValidateAndSetDefaultsWithMQTT(t *testing.T) {
scenarios := []struct {
name string
topic string
username string
password string
expectedErr error
}{
{
name: "fail when has no topic",
topic: "",
username: "",
password: "",
expectedErr: mqtt.ErrEndpointWithoutMQTTTopic,
},
{
name: "success when only topic is set",
topic: "my_topic",
username: "",
password: "",
expectedErr: nil,
},
{
name: "success when all fields are set",
topic: "my_topic",
username: "username",
password: "password",
expectedErr: nil,
},
}

for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {
endpoint := &Endpoint{
Name: "mqtt-test",
URL: "https://example.com",
MQTTConfig: &mqtt.Config{
Topic: scenario.topic,
Username: scenario.username,
Password: scenario.password,
},
Conditions: []Condition{Condition("[STATUS] == 0")},
}
err := endpoint.ValidateAndSetDefaults()
if !errors.Is(err, scenario.expectedErr) {
t.Errorf("expected error %v, got %v", scenario.expectedErr, err)
}
})
}
}

func TestEndpoint_ValidateAndSetDefaultsWithSimpleErrors(t *testing.T) {
scenarios := []struct {
endpoint *Endpoint
@@ -787,6 +849,42 @@ func TestIntegrationEvaluateHealthForDNS(t *testing.T) {
}
}

func TestIntegrationEvaluateHealthForMQTT(t *testing.T) {
scenarios := []struct {
name string
endpoint Endpoint
conditions []Condition
success bool
}{
{
name: "mqtt-failure",
endpoint: Endpoint{
Name: "mqtt-failure",
URL: "wss://example.com/mqtt",
MQTTConfig: &mqtt.Config{
Topic: "my_topic",
Username: "gatus",
Password: "",
},
Body: "This is a test: {{ uuidv4 }}",
},
conditions: []Condition{Condition("[CONNECTED] == true")},
success: false,
},
}

for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {
scenario.endpoint.ValidateAndSetDefaults()
scenario.endpoint.Conditions = scenario.conditions
result := scenario.endpoint.EvaluateHealth()
if result.Success != scenario.success {
t.Errorf("Expected success to be %v, but was %v", scenario.success, result.Success)
}
})
}
}

func TestIntegrationEvaluateHealthForSSH(t *testing.T) {
scenarios := []struct {
name string
24 changes: 24 additions & 0 deletions config/endpoint/mqtt/mqtt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package mqtt

import (
"errors"
)

var (
// ErrEndpointWithoutMQTTTopic is the error with which Gatus will panic if an endpoint with MQTT monitoring is configured without a topic.
ErrEndpointWithoutMQTTTopic = errors.New("you must specify a topic for each MQTT endpoint")
)

type Config struct {
Topic string `yaml:"topic,omitempty"`
Username string `yaml:"username,omitempty"`
Password string `yaml:"password,omitempty"`
}

// Validate the SSH configuration
func (cfg *Config) Validate() error {
if len(cfg.Topic) == 0 {
return ErrEndpointWithoutMQTTTopic
}
return nil
}
23 changes: 23 additions & 0 deletions config/endpoint/mqtt/mqtt_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package mqtt

import (
"errors"
"testing"
)

func TestMQTT_validate(t *testing.T) {
cfg := &Config{}
if err := cfg.Validate(); err == nil {
t.Error("expected an error")
} else if !errors.Is(err, ErrEndpointWithoutMQTTTopic) {
t.Errorf("expected error to be '%v', got '%v'", ErrEndpointWithoutMQTTTopic, err)
}
cfg.Username = "username"
if err := cfg.Validate(); err != nil {
t.Errorf("expected no error, got '%v'", err)
}
cfg.Password = "password"
if err := cfg.Validate(); err != nil {
t.Errorf("expected no error, got '%v'", err)
}
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@ require (
github.com/TwiN/whois v1.1.9
github.com/aws/aws-sdk-go v1.55.5
github.com/coreos/go-oidc/v3 v3.11.0
github.com/eclipse/paho.mqtt.golang v1.5.0
github.com/gofiber/fiber/v2 v2.52.5
github.com/google/go-github/v48 v48.2.0
github.com/google/uuid v1.6.0
@@ -50,6 +51,7 @@ require (
github.com/google/s2a-go v0.1.8 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect
github.com/googleapis/gax-go/v2 v2.14.0 // indirect
github.com/gorilla/websocket v1.5.3 // indirect
github.com/hashicorp/go-version v1.6.0 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
Loading