diff --git a/datastore/.gitignore b/datastore/.gitignore index 8860380..5c20689 100644 --- a/datastore/.gitignore +++ b/datastore/.gitignore @@ -6,3 +6,5 @@ domotik/sensors.db domotik/sensors.db-shm domotik/sensors.db-wal domotik/state.json + +datastore \ No newline at end of file diff --git a/datastore/Dockerfile b/datastore/Dockerfile index 63894ac..5353189 100644 --- a/datastore/Dockerfile +++ b/datastore/Dockerfile @@ -6,7 +6,7 @@ RUN CGO_ENABLED=0 go build FROM gcr.io/distroless/static COPY --from=builder /domotik/datastore /datastore -ENV MQTT_HOST=mosquitto +ENV MQTT_HOST=tcp://mosquitto:1883 ENV DB_PATH=/database/ CMD ["/datastore"] diff --git a/datastore/domotik/application.go b/datastore/domotik/application.go deleted file mode 100644 index 26c730b..0000000 --- a/datastore/domotik/application.go +++ /dev/null @@ -1,38 +0,0 @@ -package main - -import ( - "log" - "time" - - "github.com/sylvek/domotik/datastore/broker" - "github.com/sylvek/domotik/datastore/compute" - "github.com/sylvek/domotik/datastore/database" -) - -func handleSensorLogs(rule compute.Rule, client broker.BrokerClient, database database.Database) { - for { - select { - case output := <-rule.Tick(): - client.Publish(broker.Log{Topic: "sensors", Name: "sumPerDay", Unit: "euro", Value: output.EuroSpentToday}) - client.Publish(broker.Log{Topic: "sensors", Name: "sumPerDay", Unit: "rate", Value: output.RatioLowTariffToday}) - client.Publish(broker.Log{Topic: "sensors", Name: "sumPerDay", Unit: "watt", Value: float64(output.WattConsumedToday)}) - client.Publish(broker.Log{Topic: "sensors", Name: "meanPerHour", Unit: "watt", Value: float64(output.WattPerHourForThisHour)}) - client.Publish(broker.Log{Topic: "sensors", Name: "meanPerMinute", Unit: "watt", Value: float64(output.WattPerHourForLastMinute)}) - case l := <-client.Logs(): - err := database.AddSeries(l.Topic, l.Name, l.Unit, l.Value) - if err != nil { - log.Printf(" - error - %s", err) - time.Sleep(time.Second) - client.Logs() <- l - } else { - if l.Name == "linky" && l.Unit == "indice" { - rule.SetIndice(int64(l.Value)) - } - - if l.Name == "linky" && l.Unit == "state" { - rule.SetLowTariffState(l.Value == 0.0) - } - } - } - } -} diff --git a/datastore/domotik/broker/client.go b/datastore/domotik/broker/client.go deleted file mode 100644 index 0469b95..0000000 --- a/datastore/domotik/broker/client.go +++ /dev/null @@ -1,15 +0,0 @@ -package broker - -type Log struct { - Topic string - Name string - Unit string - Value float64 -} - -type BrokerClient interface { - ConnectAndListen() - Logs() chan Log - Disconnect() - Publish(log Log) -} diff --git a/datastore/domotik/broker/mqtt.go b/datastore/domotik/broker/mqtt.go deleted file mode 100644 index 7e7d9d1..0000000 --- a/datastore/domotik/broker/mqtt.go +++ /dev/null @@ -1,84 +0,0 @@ -package broker - -import ( - "fmt" - "log" - "strconv" - "strings" - "time" - - mqtt "github.com/eclipse/paho.mqtt.golang" -) - -type MqttClient struct { - client mqtt.Client - mqttBroker string - topics []string - logs chan Log -} - -func (c *MqttClient) Logs() chan Log { - return c.logs -} - -func (c *MqttClient) Publish(l Log) { - token := c.client.Publish( - fmt.Sprintf("%s/%s/%s", l.Topic, l.Name, l.Unit), - 0, - true, - fmt.Sprintf("%.3f", l.Value)) - _ = token.Wait() - - if token.Error() != nil { - log.Println("unable to publish", l, token.Error()) - } -} - -// ConnectAndListen implements BrokerClient -func (c *MqttClient) ConnectAndListen() { - opts := mqtt.NewClientOptions() - opts.AddBroker(c.mqttBroker) - opts.SetOrderMatters(false) - opts.ConnectTimeout = time.Second // Minimal delays on connect - opts.WriteTimeout = time.Second // Minimal delays on writes - opts.KeepAlive = 10 // Keepalive every 10 seconds so we quickly detect network outages - opts.PingTimeout = time.Second // local broker so response should be quick - opts.ConnectRetry = true - opts.AutoReconnect = true - opts.OnConnect = func(client mqtt.Client) { - log.Println("status: connected") - for _, topic := range c.topics { - if token := client.Subscribe(topic, 1, nil); token.Wait() && token.Error() != nil { - panic(token.Error()) - } - log.Printf("topic [%s] subscribed", topic) - } - } - opts.OnConnectionLost = func(c mqtt.Client, err error) { log.Println("status: connection lost") } - opts.OnReconnecting = func(c mqtt.Client, co *mqtt.ClientOptions) { log.Println("status: reconnecting") } - opts.DefaultPublishHandler = func(client mqtt.Client, msg mqtt.Message) { - payload := string(msg.Payload()[:]) - elements := strings.Split(msg.Topic(), "/") - value, _ := strconv.ParseFloat(payload, 64) - c.logs <- Log{Topic: elements[0], Name: elements[1], Unit: elements[2], Value: value} - } - - c.client = mqtt.NewClient(opts) - if token := c.client.Connect(); token.Wait() && token.Error() != nil { - panic(token.Error()) - } - -} - -// Disconnect implements BrokerClient -func (c *MqttClient) Disconnect() { - c.client.Disconnect(250) -} - -func NewMQTTBrokerClient(mqttBroker string) BrokerClient { - return &MqttClient{ - mqttBroker: mqttBroker, - topics: []string{"sensors/+/+"}, - logs: make(chan Log, 10), - } -} diff --git a/datastore/domotik/compute/engine.go b/datastore/domotik/compute/engine.go deleted file mode 100644 index e8a0767..0000000 --- a/datastore/domotik/compute/engine.go +++ /dev/null @@ -1,158 +0,0 @@ -package compute - -import ( - "encoding/json" - "os" - "time" -) - -type input struct { - lowTariff bool - indice int64 -} - -type state struct { - LastIndice int64 - LastIndiceTS int64 - CurrentDay int - CurrentHour int - DailySumHigh int64 - DailySumLow int64 - HourlySum int64 - HourlyNbIndices int64 -} - -type RuleEngineClient struct { - path string - ticker *time.Ticker - input *input - state *state - output chan Output - done chan bool -} - -func NewRuleEngineClient(path string) Rule { - state := &state{ - LastIndice: 0, - CurrentDay: 0, - CurrentHour: 0, - DailySumHigh: 0, - DailySumLow: 0, - HourlySum: 0, - HourlyNbIndices: 0, - } - data, err := os.ReadFile(path + "/state.json") - if err == nil { - json.Unmarshal(data, &state) - } - - instance := &RuleEngineClient{ - path: path, - input: &input{lowTariff: false, indice: 0}, - ticker: time.NewTicker(time.Minute), - state: state, - output: make(chan Output), - done: make(chan bool)} - go instance.start() - return instance -} - -func (g *RuleEngineClient) start() { - for { - select { - case <-g.ticker.C: - if g.input.indice > 0 { - g.output <- g.updateStateAndGenerateOutput() - g.input.indice = 0 - g.synchronization() - } - case <-g.done: - return - } - } -} - -func (g *RuleEngineClient) synchronization() { - s, _ := json.Marshal(g.state) - os.WriteFile(g.path+"/state.json", s, 0644) -} - -func (g *RuleEngineClient) updateStateAndGenerateOutput() Output { - t := time.Now() - now := t.Unix() - - lastIndice := g.state.LastIndice - lastEpoch := g.state.LastIndiceTS - newIndice := g.input.indice - - // calculate watt consumed between 2 Ticks - wattConsumedDuringBuffering := int64(0) - if lastIndice > 0 { - wattConsumedDuringBuffering = newIndice - lastIndice - } - g.state.LastIndice = newIndice - - minutesSinceTheLastIndice := float64(1) - if wattConsumedDuringBuffering > 0 { - if lastEpoch > 0 { - minutesSinceTheLastIndice = float64(now-lastEpoch) / 60 - } - g.state.LastIndiceTS = now - } - - // if new day -> clear daily state - if t.Day() != g.state.CurrentDay { - g.state.CurrentDay = t.Day() - g.state.DailySumHigh = 0 - g.state.DailySumLow = 0 - } - - // if new hour -> clear hourly state - if t.Hour() != g.state.CurrentHour { - g.state.CurrentHour = t.Hour() - g.state.HourlyNbIndices = 0 - g.state.HourlySum = 0 - } - - // calculate mean per hour - g.state.HourlySum += wattConsumedDuringBuffering - g.state.HourlyNbIndices += 1 - - // calculate ratio low/high and € - if g.input.lowTariff { - g.state.DailySumLow += wattConsumedDuringBuffering - } else { - g.state.DailySumHigh += wattConsumedDuringBuffering - } - - ratioLowTariffToday := 1.0 - if g.state.DailySumHigh > 0 { - ratioLowTariffToday = float64(g.state.DailySumLow) / float64(g.state.DailySumHigh+g.state.DailySumLow) - } - - return Output{ - WattPerHourForLastMinute: float64(wattConsumedDuringBuffering) * 60 / minutesSinceTheLastIndice, - WattPerHourForThisHour: g.state.HourlySum * 60 / g.state.HourlyNbIndices, - WattConsumedToday: g.state.DailySumHigh + g.state.DailySumLow, - EuroSpentToday: float64(g.state.DailySumHigh)*0.0001963 + float64(g.state.DailySumLow)*0.0001457, - RatioLowTariffToday: ratioLowTariffToday, - } -} - -func (g *RuleEngineClient) Tick() chan Output { - return g.output -} - -func (g *RuleEngineClient) Stop() { - g.ticker.Stop() - g.synchronization() - g.done <- true -} - -func (g *RuleEngineClient) SetLowTariffState(state bool) { - g.input.lowTariff = state -} - -func (g *RuleEngineClient) SetIndice(indice int64) { - g.input.indice = indice -} diff --git a/datastore/domotik/database/client.go b/datastore/domotik/database/client.go deleted file mode 100644 index 1915dd6..0000000 --- a/datastore/domotik/database/client.go +++ /dev/null @@ -1,6 +0,0 @@ -package database - -type Database interface { - Close() - AddSeries(topic string, name string, unit string, value float64) error -} diff --git a/datastore/domotik/domain/application.go b/datastore/domotik/domain/application.go new file mode 100644 index 0000000..7c9bdb1 --- /dev/null +++ b/datastore/domotik/domain/application.go @@ -0,0 +1,129 @@ +package domain + +import ( + "time" + + "github.com/sylvek/domotik/datastore/domain/model" + "github.com/sylvek/domotik/datastore/port" +) + +type Application struct { + input model.Input + state model.State + stateRepository port.StateRepository + logRepository port.LogRepository + notificationRepository port.NotificationRepository +} + +func (a *Application) AddLog(l model.Log) error { + if err := a.logRepository.Store(l); err != nil { + return err + } + + if l.Name == "linky" && l.Unit == "indice" { + a.input.Indice = int64(l.Value) + } + + if l.Name == "linky" && l.Unit == "state" { + a.input.LowTariff = l.Value == 0.0 + } + + return nil +} + +func (a *Application) Process() error { + if a.input.Indice > 0 { + newState, output := execute(time.Now(), a.state, a.input) + a.input.Indice = 0 + a.state = newState + + if err := a.stateRepository.Store(newState); err != nil { + return err + } + if err := a.notificationRepository.Notify(output); err != nil { + return err + } + } + + return nil +} + +func NewApplication( + stateRepository port.StateRepository, + logRepository port.LogRepository, + notificationRepository port.NotificationRepository) (*Application, error) { + + state, err := stateRepository.Retrieve() + if err != nil { + return nil, err + } + + return &Application{ + input: model.Input{LowTariff: false, Indice: 0}, + state: state, + stateRepository: stateRepository, + logRepository: logRepository, + notificationRepository: notificationRepository, + }, nil +} + +func execute(t time.Time, state model.State, input model.Input) (model.State, model.Output) { + now := t.Unix() + + lastIndice := state.LastIndice + lastEpoch := state.LastIndiceTS + newIndice := input.Indice + + // calculate watt consumed between 2 Ticks + wattConsumedDuringBuffering := int64(0) + if lastIndice > 0 { + wattConsumedDuringBuffering = newIndice - lastIndice + } + state.LastIndice = newIndice + + minutesSinceTheLastIndice := float64(1) + if wattConsumedDuringBuffering > 0 { + if lastEpoch > 0 { + minutesSinceTheLastIndice = float64(now-lastEpoch) / 60 + } + state.LastIndiceTS = now + } + + // if new day -> clear daily state + if t.Day() != state.CurrentDay { + state.CurrentDay = t.Day() + state.DailySumHigh = 0 + state.DailySumLow = 0 + } + + // if new hour -> clear hourly state + if t.Hour() != state.CurrentHour { + state.CurrentHour = t.Hour() + state.HourlyNbIndices = 0 + state.HourlySum = 0 + } + + // calculate mean per hour + state.HourlySum += wattConsumedDuringBuffering + state.HourlyNbIndices += 1 + + // calculate ratio low/high and € + if input.LowTariff { + state.DailySumLow += wattConsumedDuringBuffering + } else { + state.DailySumHigh += wattConsumedDuringBuffering + } + + ratioLowTariffToday := 1.0 + if state.DailySumHigh > 0 { + ratioLowTariffToday = float64(state.DailySumLow) / float64(state.DailySumHigh+state.DailySumLow) + } + + return state, model.Output{ + WattPerHourForLastMinute: float64(wattConsumedDuringBuffering) * 60 / minutesSinceTheLastIndice, + WattPerHourForThisHour: state.HourlySum * 60 / state.HourlyNbIndices, + WattConsumedToday: state.DailySumHigh + state.DailySumLow, + EuroSpentToday: float64(state.DailySumHigh)*0.0001963 + float64(state.DailySumLow)*0.0001457, + RatioLowTariffToday: ratioLowTariffToday, + } +} diff --git a/datastore/domotik/domain/model/input.go b/datastore/domotik/domain/model/input.go new file mode 100644 index 0000000..c17f89a --- /dev/null +++ b/datastore/domotik/domain/model/input.go @@ -0,0 +1,6 @@ +package model + +type Input struct { + LowTariff bool + Indice int64 +} diff --git a/datastore/domotik/domain/model/log.go b/datastore/domotik/domain/model/log.go new file mode 100644 index 0000000..bf4a435 --- /dev/null +++ b/datastore/domotik/domain/model/log.go @@ -0,0 +1,8 @@ +package model + +type Log struct { + Topic string + Name string + Unit string + Value float64 +} diff --git a/datastore/domotik/compute/client.go b/datastore/domotik/domain/model/output.go similarity index 60% rename from datastore/domotik/compute/client.go rename to datastore/domotik/domain/model/output.go index 049a28c..81194a7 100644 --- a/datastore/domotik/compute/client.go +++ b/datastore/domotik/domain/model/output.go @@ -1,4 +1,4 @@ -package compute +package model type Output struct { WattPerHourForLastMinute float64 @@ -7,10 +7,3 @@ type Output struct { EuroSpentToday float64 RatioLowTariffToday float64 } - -type Rule interface { - Tick() chan Output - Stop() - SetLowTariffState(state bool) - SetIndice(indice int64) -} diff --git a/datastore/domotik/domain/model/state.go b/datastore/domotik/domain/model/state.go new file mode 100644 index 0000000..ba73783 --- /dev/null +++ b/datastore/domotik/domain/model/state.go @@ -0,0 +1,12 @@ +package model + +type State struct { + LastIndice int64 + LastIndiceTS int64 + CurrentDay int + CurrentHour int + DailySumHigh int64 + DailySumLow int64 + HourlySum int64 + HourlyNbIndices int64 +} diff --git a/datastore/domotik/infrastructure/local.repository.go b/datastore/domotik/infrastructure/local.repository.go new file mode 100644 index 0000000..2710c2a --- /dev/null +++ b/datastore/domotik/infrastructure/local.repository.go @@ -0,0 +1,49 @@ +package infrastructure + +import ( + "encoding/json" + "os" + + "github.com/sylvek/domotik/datastore/domain/model" + "github.com/sylvek/domotik/datastore/port" +) + +type LocalClient struct { + path string +} + +const FILE = "/state.json" + +// Retrieve implements port.StateRepository. +func (l *LocalClient) Retrieve() (model.State, error) { + state := model.State{ + LastIndice: 0, + CurrentDay: 0, + CurrentHour: 0, + DailySumHigh: 0, + DailySumLow: 0, + HourlySum: 0, + HourlyNbIndices: 0, + } + data, err := os.ReadFile(l.path + FILE) + if err == nil { + json.Unmarshal(data, &state) + } + + // if the file does not exist we return the empty state + return state, nil +} + +// Close implements port.StateRepository. +func (l *LocalClient) Close() { +} + +// Store implements port.StateRepository. +func (l *LocalClient) Store(state model.State) error { + s, _ := json.Marshal(state) + return os.WriteFile(l.path+FILE, s, 0644) +} + +func NewLocalClient(path string) port.StateRepository { + return &LocalClient{path: path} +} diff --git a/datastore/domotik/infrastructure/mqtt.repository.go b/datastore/domotik/infrastructure/mqtt.repository.go new file mode 100644 index 0000000..e864561 --- /dev/null +++ b/datastore/domotik/infrastructure/mqtt.repository.go @@ -0,0 +1,50 @@ +package infrastructure + +import ( + "fmt" + "log" + + mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/sylvek/domotik/datastore/domain/model" + "github.com/sylvek/domotik/datastore/port" +) + +type MqttClient struct { + client mqtt.Client + topics []string +} + +// Close implements port.NotificationRepository. +func (m *MqttClient) Close() { + m.client.Disconnect(250) +} + +// Notify implements port.NotificationRepository. +func (m *MqttClient) Notify(output model.Output) error { + m.publish("sensors", "sumPerDay", output.EuroSpentToday, "euro") + m.publish("sensors", "sumPerDay", output.RatioLowTariffToday, "rate") + m.publish("sensors", "sumPerDay", float64(output.WattConsumedToday), "watt") + m.publish("sensors", "meanPerHour", float64(output.WattPerHourForThisHour), "watt") + m.publish("sensors", "meanPerMinute", float64(output.WattPerHourForLastMinute), "watt") + + return nil +} + +func (m *MqttClient) publish(topic string, name string, value float64, unit string) { + token := m.client.Publish( + fmt.Sprintf("%s/%s/%s", topic, name, unit), + 0, + true, + fmt.Sprintf("%.3f", value)) + _ = token.Wait() + + if token.Error() != nil { + log.Println("unable to publish", topic, token.Error()) + } +} + +func NewMQTTClient(client mqtt.Client) port.NotificationRepository { + return &MqttClient{ + client: client, + topics: []string{"sensors/+/+"}} +} diff --git a/datastore/domotik/database/sqlite.go b/datastore/domotik/infrastructure/sqlite.repository.go similarity index 84% rename from datastore/domotik/database/sqlite.go rename to datastore/domotik/infrastructure/sqlite.repository.go index 3119ab7..ed73516 100644 --- a/datastore/domotik/database/sqlite.go +++ b/datastore/domotik/infrastructure/sqlite.repository.go @@ -1,4 +1,4 @@ -package database +package infrastructure import ( "database/sql" @@ -9,6 +9,9 @@ import ( "strconv" "time" + "github.com/sylvek/domotik/datastore/domain/model" + "github.com/sylvek/domotik/datastore/port" + _ "github.com/glebarez/go-sqlite" ) @@ -43,49 +46,54 @@ type Instance struct { volatile bool } -type SqliteClient struct { - databasePath string - parameters Parameters - instances map[string]*Instance -} - type Parameters struct { CurrentIndex int } -func NewSqliteClient(databasePath string) Database { - - parameters := Parameters{} - data, err := os.ReadFile(databasePath + "/database.json") - if err == nil { - json.Unmarshal(data, ¶meters) - } - - instances := make(map[string]*Instance) - instances["sensors"] = &Instance{ - db: prepareDatabase(databasePath + "/sensors.db"), - volatile: true, - dailyOperations: []Operation{ - {aggregate: AVG, from: "outside", to: "daily_temp_outside", unit: TEMP}, - {aggregate: AVG, from: "living", to: "daily_temp_inside", unit: TEMP}, - {aggregate: DELTA, from: "linky", to: "daily_power_consumption", unit: INDICE}, - {aggregate: LAST, from: "sumPerDay", to: "daily_rate_consumption", unit: RATE}}} - instances["history"] = &Instance{ - db: prepareDatabase(databasePath + "/history.db"), - volatile: false} - - return &SqliteClient{ - databasePath: databasePath, - parameters: parameters, - instances: instances} +type SqliteClient struct { + path string + parameters Parameters + instances map[string]*Instance } +// Close implements port.LogRepository. func (d *SqliteClient) Close() { for _, v := range d.instances { v.db.Close() } } +// Store implements port.LogRepository. +func (d *SqliteClient) Store(l model.Log) error { + t := time.Now() + + currentDayOfYear := t.YearDay() + if currentDayOfYear != d.parameters.CurrentIndex { + d.parameters.CurrentIndex = currentDayOfYear + + log.Printf("daily work is starting (%d)\n", currentDayOfYear) + + d.synchronization() + d.aggregation() + d.cleaning() + + log.Println("daily work is finished") + } + + db := d.instances[l.Topic].db + _, err := db.Exec("INSERT INTO data (ts, name, unit, value) VALUES (?, ?, ?, ?)", + // Grafana requires that data is stored in UTC + t.Unix(), + l.Name, + l.Unit, + l.Value) + if err != nil { + return err + } + + return nil +} + func (d *SqliteClient) cleaning() { for _, v := range d.instances { if v.volatile { @@ -132,42 +140,7 @@ func (d *SqliteClient) aggregation() { func (d *SqliteClient) synchronization() { p, _ := json.Marshal(d.parameters) - os.WriteFile(d.databasePath+"/database.json", p, 0644) -} - -func (d *SqliteClient) AddSeries( - topic string, - name string, - unit string, - value float64) error { - - t := time.Now() - - currentDayOfYear := t.YearDay() - if currentDayOfYear != d.parameters.CurrentIndex { - d.parameters.CurrentIndex = currentDayOfYear - - log.Printf("daily work is starting (%d)\n", currentDayOfYear) - - d.synchronization() - d.aggregation() - d.cleaning() - - log.Println("daily work is finished") - } - - db := d.instances[topic].db - _, err := db.Exec("INSERT INTO data (ts, name, unit, value) VALUES (?, ?, ?, ?)", - // Grafana requires that data is stored in UTC - t.Unix(), - name, - unit, - value) - if err != nil { - return err - } - - return nil + os.WriteFile(d.path+"/database.json", p, 0644) } func prepareDatabase(entity string) (db *sql.DB) { @@ -183,3 +156,29 @@ func prepareDatabase(entity string) (db *sql.DB) { return db } + +func NewSqliteDatabase(path string) port.LogRepository { + parameters := Parameters{} + data, err := os.ReadFile(path + "/database.json") + if err == nil { + json.Unmarshal(data, ¶meters) + } + + instances := make(map[string]*Instance) + instances["sensors"] = &Instance{ + db: prepareDatabase(path + "/sensors.db"), + volatile: true, + dailyOperations: []Operation{ + {aggregate: AVG, from: "outside", to: "daily_temp_outside", unit: TEMP}, + {aggregate: AVG, from: "living", to: "daily_temp_inside", unit: TEMP}, + {aggregate: DELTA, from: "linky", to: "daily_power_consumption", unit: INDICE}, + {aggregate: LAST, from: "sumPerDay", to: "daily_rate_consumption", unit: RATE}}} + instances["history"] = &Instance{ + db: prepareDatabase(path + "/history.db"), + volatile: false} + + return &SqliteClient{ + path: path, + parameters: parameters, + instances: instances} +} diff --git a/datastore/domotik/main.go b/datastore/domotik/main.go index 904a5e3..d3b29d0 100644 --- a/datastore/domotik/main.go +++ b/datastore/domotik/main.go @@ -1,38 +1,72 @@ package main import ( - "fmt" "log" "os" "os/signal" + "strconv" + "strings" "syscall" + "time" - "github.com/sylvek/domotik/datastore/broker" - "github.com/sylvek/domotik/datastore/compute" - "github.com/sylvek/domotik/datastore/database" + "github.com/sylvek/domotik/datastore/domain" + "github.com/sylvek/domotik/datastore/domain/model" + "github.com/sylvek/domotik/datastore/infrastructure" ) func main() { - mqttHost := os.Getenv("MQTT_HOST") - databasePath := os.Getenv("DB_PATH") - log.Printf("starting - MQTT_HOST:%s - DB_PATH:%s", mqttHost, databasePath) + host := os.Getenv("MQTT_HOST") + path := os.Getenv("DB_PATH") + log.Printf("starting - MQTT_HOST:%s - DB_PATH:%s", host, path) - sqlite := database.NewSqliteClient(databasePath) - broker := broker.NewMQTTBrokerClient(fmt.Sprintf("tcp://%s:1883", mqttHost)) - engine := compute.NewRuleEngineClient(databasePath) + ticker := time.NewTicker(time.Minute) + client := NewMQTTClient(host, []string{"sensors/+/+"}) - go broker.ConnectAndListen() - - go handleSensorLogs(engine, broker, sqlite) + localRepository := infrastructure.NewLocalClient(path) + mqttRepository := infrastructure.NewMQTTClient(client.GetClient()) + sqliteRepository := infrastructure.NewSqliteDatabase(path) defer func() { - engine.Stop() - sqlite.Close() - broker.Disconnect() + ticker.Stop() + mqttRepository.Close() + sqliteRepository.Close() + localRepository.Close() log.Println("ciao.") }() + application, err := domain.NewApplication(localRepository, sqliteRepository, mqttRepository) + if err != nil { + log.Fatal(err) + } + + stop := make(chan os.Signal, 1) + go func() { + client.Start() + for { + select { + case <-ticker.C: + if err := application.Process(); err != nil { + log.Fatal(err) + } + case msg := <-client.Message(): + payload := string(msg.Payload()[:]) + elements := strings.Split(msg.Topic(), "/") + value, _ := strconv.ParseFloat(payload, 64) + + err := application.AddLog(model.Log{Topic: elements[0], Name: elements[1], Unit: elements[2], Value: value}) + if err != nil { + log.Printf(" - error - %s", err) + time.Sleep(time.Second) + client.Message() <- msg + } + case <-stop: + log.Println("stopping") + return + } + } + }() + done := make(chan os.Signal, 1) signal.Notify(done, syscall.SIGINT, syscall.SIGTERM) - <-done + stop <- <-done } diff --git a/datastore/domotik/mqttclient.go b/datastore/domotik/mqttclient.go new file mode 100644 index 0000000..59287c3 --- /dev/null +++ b/datastore/domotik/mqttclient.go @@ -0,0 +1,59 @@ +package main + +import ( + "log" + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" +) + +type MqttClient struct { + client mqtt.Client + messages chan mqtt.Message +} + +func NewMQTTClient(broker string, topics []string) *MqttClient { + + messages := make(chan mqtt.Message, 10) + + opts := mqtt.NewClientOptions().AddBroker(broker).SetOrderMatters(false) + opts.ConnectTimeout = time.Second // Minimal delays on connect + opts.WriteTimeout = time.Second // Minimal delays on writes + opts.KeepAlive = 10 // Keepalive every 10 seconds so we quickly detect network outages + opts.PingTimeout = time.Second // local broker so response should be quick + opts.ConnectRetry = true + opts.AutoReconnect = true + opts.OnConnect = func(client mqtt.Client) { + log.Println("status: connected") + for _, topic := range topics { + if token := client.Subscribe(topic, 1, nil); token.Wait() && token.Error() != nil { + panic(token.Error()) + } + log.Printf("topic [%s] subscribed", topic) + } + } + opts.OnConnectionLost = func(c mqtt.Client, err error) { log.Println("status: connection lost") } + opts.OnReconnecting = func(c mqtt.Client, co *mqtt.ClientOptions) { log.Println("status: reconnecting") } + opts.DefaultPublishHandler = func(client mqtt.Client, msg mqtt.Message) { + messages <- msg + } + + return &MqttClient{client: mqtt.NewClient(opts), messages: messages} +} + +func (mqtt *MqttClient) GetClient() mqtt.Client { + return mqtt.client +} + +func (mqtt *MqttClient) Message() chan mqtt.Message { + return mqtt.messages +} + +func (mqtt *MqttClient) Start() error { + token := mqtt.client.Connect() + if token.Error() != nil { + return token.Error() + } + _ = token.Wait() + return nil +} diff --git a/datastore/domotik/port/log.repository.go b/datastore/domotik/port/log.repository.go new file mode 100644 index 0000000..0ddb35c --- /dev/null +++ b/datastore/domotik/port/log.repository.go @@ -0,0 +1,8 @@ +package port + +import "github.com/sylvek/domotik/datastore/domain/model" + +type LogRepository interface { + Repository + Store(model.Log) error +} diff --git a/datastore/domotik/port/notification.repository.go b/datastore/domotik/port/notification.repository.go new file mode 100644 index 0000000..6e6bfe2 --- /dev/null +++ b/datastore/domotik/port/notification.repository.go @@ -0,0 +1,8 @@ +package port + +import "github.com/sylvek/domotik/datastore/domain/model" + +type NotificationRepository interface { + Repository + Notify(model.Output) error +} diff --git a/datastore/domotik/port/repository.go b/datastore/domotik/port/repository.go new file mode 100644 index 0000000..46ba5ac --- /dev/null +++ b/datastore/domotik/port/repository.go @@ -0,0 +1,5 @@ +package port + +type Repository interface { + Close() +} diff --git a/datastore/domotik/port/state.repository.go b/datastore/domotik/port/state.repository.go new file mode 100644 index 0000000..812bcfe --- /dev/null +++ b/datastore/domotik/port/state.repository.go @@ -0,0 +1,9 @@ +package port + +import "github.com/sylvek/domotik/datastore/domain/model" + +type StateRepository interface { + Repository + Store(model.State) error + Retrieve() (model.State, error) +}