Skip to content

Commit

Permalink
Merge pull request #6 from GT3CH1/mqtt
Browse files Browse the repository at this point in the history
Add MQTT/Home Assistant functionality
  • Loading branch information
GT3CH1 authored Apr 30, 2022
2 parents 92b936a + df58efd commit 0859328
Show file tree
Hide file tree
Showing 6 changed files with 340 additions and 31 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ rppal = "0.12.0"
chrono = "0.3.0"
confy = "0.4.0"
lazy_static = "1.4.0"
paho-mqtt = { version = "0.11", features = ["bundled"] }
35 changes: 27 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ sqlsprinkler-cli allows control over a SQLSprinkler endpoint via a unified progr
- Turn the given zone on or off
* `sqlsprinkler-cli sys <on,off,winterize,run,status>`
- Operate on the system.
* `sqlsprinkler-cli -ha`
- Starts the SQLSprinkler MQTT listener for home assistant integration.
* You can set the database username, password, and host in the `/etc/sqlsprinkler/sqlsprinkler.conf` configuration file.

## TODO
* [ ] Create tables and databases if they do not exist.
* [ ] Implement `sqlsprinkler-cli sys winterize (test?)`
* Run each system for 10 seconds, and then sleep for 3 minutess
* [ ] A and B days
* [ ] Make `sqlsprinkler zone ...` call the Web API to control turning zones on and off.
* [ ] Better error messages
Expand Down Expand Up @@ -67,10 +67,25 @@ sqlsprinkler-cli allows control over a SQLSprinkler endpoint via a unified progr
* [x] Delete zone → ` DELETE /zone` → `{
"id": 1 }`
* [x] Change zone ordering → `PUT /zone/order``{"order":[0,0,0]}`


* [x] Add support for MQTT and home assistant.
* Topics subscribed to:
* `sqlsprinkler_zone_<id><_enabled_state,_time,_auto_off_state>/command`
* Messages are just basic ON/OFF or numbers
* Topics published to:
* `homeassistant/switch/sqlsprinkler_zone_<id>/config`
* Basic zone on/off functionality.
* `homeassistant/number/sqlsprinkler_zone_<id>_time/config`
* Time to run zone (in minutes).
* `homeassistant/switch/sqlsprinkler_zone_<id>_auto_off/config`
* Auto off functionality.
* `homeassistant/switch/sqlsprinkler_system_<id>_enabled/config`
* Zone enabled/disabled functionality.
* `homeassistant/switch/sqlsprinkler_system/config`
* The master toggle switch for nightly runs.
* `sqlsprinkler_zone_<id><_enabled_state,_time,_auto_off_state>/status`
* The status of the functionality of the zone.
*
## Used libraries

* rust >= 1.53.0
* structopt 0.3.13
* mysql 16.1.0
Expand All @@ -81,13 +96,17 @@ sqlsprinkler-cli allows control over a SQLSprinkler endpoint via a unified progr
* chrono 0.3.0
* confy 0.4.0
* lazy_static 1.4.0

* paho-mqtt 0.11
* serde 1.0
## How-to-use

* Run the program once, as `sudo`, you will get a connection error.
* Set your username, password, host, and database in `/etc/sqlsprinkler/sqlsprinkler.conf`
* if you are using mqtt, then please set the `mqtt_host`, `mqtt_pass`, and `mqtt_user` in the configuration.
* run your wanted sqlsprinkler command, and enjoy!

## About the config
- The settings prefixed with `sqlsprinkler_` should be pretty self explanitory.
- `verbose` Possible values: true/false → enables verbose logging.
- `verbose` Possible values: true/false → enables verbose logging.
- `mqtt_host` The hostname of the mqtt broker.
- `mqtt_user` The username of the mqtt broker.
- `mqtt_pass` The password of the mqtt broker.
233 changes: 224 additions & 9 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
// Copyright 2021 Gavin Pease

use std::env;
mod sqlsprinkler;

use std::{env, thread};
use std::collections::HashMap;
use std::fmt::Debug;
use std::process::exit;
use std::str::FromStr;
use std::sync::RwLock;
use std::time::Duration;
use mysql::serde_json;

use serde::{Deserialize, Serialize};
use structopt::StructOpt;
use paho_mqtt as mqtt;

use sqlsprinkler::daemon;
use crate::sqlsprinkler::zone::Zone;
use crate::sqlsprinkler::system::{get_system_status, set_system_status, turn_off_all_zones, winterize};

mod sqlsprinkler;
use crate::sqlsprinkler::system::{get_system_status, get_zones, set_system_status, turn_off_all_zones, winterize};
use crate::sqlsprinkler::mqttsprinkler;

#[macro_use]
extern crate lazy_static;
Expand All @@ -26,6 +32,9 @@ pub struct Opts {
#[structopt(short = "w", long = "daemon", about = "Launches the SQLSprinkler API web daemon.")]
daemon_mode: bool,

#[structopt(short = "ha", long = "home-assistant", about = "Broadcasts the current system to home assistant.")]
home_assistant: bool,

#[structopt(subcommand)]
commands: Option<Cli>,
}
Expand Down Expand Up @@ -87,22 +96,32 @@ struct MyConfig {
sqlsprinkler_pass: String,
sqlsprinkler_host: String,
sqlsprinkler_db: String,
mqtt_host: String,
mqtt_user: String,
mqtt_pass: String,
mqtt_enabled: bool,
verbose: bool,
}

lazy_static! {
static ref SETTINGS: RwLock<MyConfig> = RwLock::new(MyConfig::default());
static ref ZONES: RwLock<HashMap<String, Zone>> = RwLock::new(HashMap::new());
}

const SETTINGS_FILE_PATH: &str = "/etc/sqlsprinkler/sqlsprinkler.conf";

impl ::std::default::Default for MyConfig {

impl Default for MyConfig {
fn default() -> Self {
Self {
sqlsprinkler_user: "".to_string(),
sqlsprinkler_pass: "".to_string(),
sqlsprinkler_host: "".to_string(),
sqlsprinkler_db: "".to_string(),
mqtt_host: "".to_string(),
mqtt_user: "".to_string(),
mqtt_pass: "".to_string(),
mqtt_enabled: false,
verbose: false,
}
}
Expand All @@ -115,6 +134,10 @@ impl Clone for MyConfig {
sqlsprinkler_pass: self.sqlsprinkler_pass.clone(),
sqlsprinkler_host: self.sqlsprinkler_host.clone(),
sqlsprinkler_db: self.sqlsprinkler_db.clone(),
mqtt_host: self.mqtt_host.clone(),
mqtt_user: self.mqtt_user.clone(),
mqtt_pass: self.mqtt_pass.clone(),
mqtt_enabled: self.mqtt_enabled,
verbose: self.verbose,
}
}
Expand All @@ -133,8 +156,11 @@ fn get_settings() -> MyConfig {

fn main() {
let cli = Opts::from_args();
println!("{:?}", cli);

let daemon_mode = cli.daemon_mode;
let version_mode = cli.version_mode;
let home_assistant = cli.home_assistant;

match read_settings() {
Ok(..) => (),
Expand All @@ -153,8 +179,127 @@ fn main() {
turn_off_all_zones();
daemon::run();
}

if home_assistant {

let mqtt_host = get_settings().mqtt_host;
let mqtt_user = get_settings().mqtt_user;
let mqtt_pass = get_settings().mqtt_pass;

// create the mqtt client
let mqtt_client = mqtt::AsyncClient::new(mqtt_host.to_string()).unwrap();
let opts = mqtt::ConnectOptionsBuilder::new()
.user_name(mqtt_user)
.password(mqtt_pass)
.finalize();

mqtt_client.connect_with_callbacks(opts, on_connect_success, on_connect_failure);

// create a hashset of topics and zone

// Set a closure to be called whenever the client connection is established.
mqtt_client.set_connected_callback(|cli: &mqtt::AsyncClient| {
println!("Connected.");
});

// Set a closure to be called whenever the client loses the connection.
// It will attempt to reconnect, and set up function callbacks to keep
// retrying until the connection is re-established.
mqtt_client.set_connection_lost_callback(|cli: &mqtt::AsyncClient| {
println!("Connection lost. Attempting reconnect.");
thread::sleep(Duration::from_millis(2500));
cli.reconnect_with_callbacks(on_connect_success, on_connect_failure);
});

// Start listening for mqtt messages
mqtt_client.set_message_callback(|_cli, msg| {
if let Some(msg) = msg {
let topic = msg.topic();
let payload_str = msg.payload_str();
println!("{} - {}", topic, payload_str);
// Iterate through the zones and turn on the zones that match the topic
for zone in ZONES.read().unwrap().iter() {
if topic == zone.0 {
// check if the payload matches sqlsprinkler_zone_<zone_id>
if topic == format!("sqlsprinkler_zone_{}/command", zone.1.id) {
turn_off_all_zones();
if payload_str.clone() == "ON" {
zone.1.turn_on();
}
}
// check if the payload matches sqlsprinkler_zone_<zone_id>_time
if topic == format!("sqlsprinkler_zone_{}_time/command", zone.1.id) {
let time = payload_str.parse::<u64>().unwrap();
let mut new_zone = zone.1.clone();
new_zone.time = time;
zone.1.update_zone(new_zone);

}
// check if payload matches sqlsprinkler_zone_<zone_id>_auto_off_state
if topic == format!("sqlsprinkler_zone_{}_auto_off_state/command", zone.1.id) {
let auto_off_state = payload_str.parse::<bool>().unwrap();
let mut new_zone = zone.1.clone();
new_zone.auto_off = auto_off_state;
zone.1.update_zone(new_zone);
}
// check if payload matches sqlsprinkler_zone_<zone_id>_enabled_state
if topic == format!("sqlsprinkler_zone_{}_enabled_state/command", zone.1.id) {
let enabled_state = payload_str.parse::<bool>().unwrap();
let mut new_zone = zone.1.clone();
new_zone.enabled = enabled_state;
zone.1.update_zone(new_zone);
}
}
}
// Check if topic is sqlsprinkler_system/command
if topic == "sqlsprinkler_system/command" {
if payload_str.clone() == "ON" {
set_system_status(true)
} else if payload_str.clone() == "OFF" {
set_system_status(false)
}
}
}
});

loop {
thread::sleep(Duration::from_millis(5000));
// Send current status of all zones
for zone in get_zones().zones {
// send current status
let mut topic = format!("sqlsprinkler_zone_{}/status", zone.id);
let mut payload = format!("{}", if zone.get_zone_with_state().state { "ON" } else { "OFF" });
let mut msg = mqtt::Message::new(topic, payload, 0);
mqtt_client.publish(msg);

// send current time
topic = format!("sqlsprinkler_zone_{}_time/status", zone.id);
payload = format!("{}", zone.time);
msg = mqtt::Message::new(topic, payload, 0);
mqtt_client.publish(msg);

// send current auto off state
topic = format!("sqlsprinkler_zone_{}_auto_off_state/status", zone.id);
payload = format!("{}", if zone.auto_off { "ON" } else { "OFF" });
msg = mqtt::Message::new(topic, payload, 0);
mqtt_client.publish(msg);

// send current enabled state
topic = format!("sqlsprinkler_zone_{}_enabled_state/status", zone.id);
payload = format!("{}", if zone.enabled { "ON" } else { "OFF" });
msg = mqtt::Message::new(topic, payload, 0);
mqtt_client.publish(msg);
}
// send current status of the system switch
let topic = format!("sqlsprinkler_system/status");
let payload = format!("{}", if get_system_status() { "ON" } else { "OFF" });
let msg = mqtt::Message::new(topic, payload, 0);
mqtt_client.publish(msg);
}

}
if let Some(subcommand) = cli.commands {
let zone_list = sqlsprinkler::system::get_zones();
let zone_list = get_zones();
match subcommand {
// `sqlsprinkler zone ...`
Cli::Zone(zone_state) => {
Expand All @@ -163,8 +308,8 @@ fn main() {
let my_zone: &Zone = match _zone_list.zones.get(id) {
Some(z) => z,
None => {
println!("Zone id not found.");
exit(-1);
// Return the default zone.
panic!("Zone {} not found.", id);
}
};
match ZoneOptsArgs::from(zone_state.state.parse().unwrap()) {
Expand Down Expand Up @@ -197,7 +342,7 @@ fn main() {
set_system_status(false);
}
SysOpts::Run => {
if sqlsprinkler::system::get_system_status() {
if get_system_status() {
if get_settings().verbose {
println!("Running the system schedule.");
}
Expand Down Expand Up @@ -233,4 +378,74 @@ fn main() {
}
}
}
}

fn on_connect_success(cli: &mqtt::AsyncClient, _msgid: u16) {
println!("Connection succeeded");

//broadcast all of the zones.
for zone in get_zones().zones {
// Broadcast the zone discovery message to the MQTT broker (as a switch)
let mut zone_topic = format!("homeassistant/switch/sqlsprinkler_zone_{}/config", &zone.id);
let mut mqtt_sprinkler = mqttsprinkler::MqttSprinkler::sprinkler(&zone);
let mut payload = serde_json::to_string(&mqtt_sprinkler).unwrap();
let mut msg = mqtt::Message::new(zone_topic.clone(), payload.clone(), 0);
println!("Sending MQTT message: {}", payload.clone());
cli.publish(msg);
ZONES.write().unwrap().insert(mqtt_sprinkler.cmd_t, zone.clone());

// Broadcast the zone time to the mqtt broker (as a number)
zone_topic = format!("homeassistant/number/sqlsprinkler_zone_{}_time/config", &zone.id);
mqtt_sprinkler = mqttsprinkler::MqttSprinkler::zone_time(&zone);
payload = serde_json::to_string(&mqtt_sprinkler).unwrap();
msg = mqtt::Message::new(zone_topic.clone(), payload.clone(), 0);
println!("Sending MQTT message: {}", payload.clone());
cli.publish(msg);
ZONES.write().unwrap().insert(mqtt_sprinkler.cmd_t, zone.clone());

// Broadcast the zone auto off state to the mqtt broker (as a switch)
zone_topic = format!("homeassistant/switch/sqlsprinkler_zone_{}_auto_off/config", &zone.id);
mqtt_sprinkler = mqttsprinkler::MqttSprinkler::zone_auto_off(&zone);
payload = serde_json::to_string(&mqtt_sprinkler).unwrap();
msg = mqtt::Message::new(zone_topic.clone(), payload.clone(), 0);
println!("Sending MQTT message: {}", payload.clone());
cli.publish(msg);
ZONES.write().unwrap().insert(mqtt_sprinkler.cmd_t, zone.clone());

// Broadcast the zone enabled state to the mqtt broker (as a switch)
zone_topic = format!("homeassistant/switch/sqlsprinkler_zone_{}_enabled/config", &zone.id);
mqtt_sprinkler = mqttsprinkler::MqttSprinkler::zone_enabled(&zone);
payload = serde_json::to_string(&mqtt_sprinkler).unwrap();
msg = mqtt::Message::new(zone_topic.clone(), payload.clone(), 0);
println!("Sending MQTT message: {}", payload.clone());
cli.publish(msg);
ZONES.write().unwrap().insert(mqtt_sprinkler.cmd_t, zone.clone());
}
// broadcast the system toggle
let topic = format!("homeassistant/switch/sqlsprinkler_system/config");
let system = mqttsprinkler::MqttSprinkler::system();
let payload = serde_json::to_string(&system).unwrap();
let msg = mqtt::Message::new(topic, payload.clone(), 0);
ZONES.write().unwrap().insert(system.cmd_t, Zone::default());
println!("Sending MQTT message: {}", payload.clone());
cli.publish(msg);
// Subscrive to all zone topics
for zone in ZONES.read().unwrap().keys() {
cli.subscribe(zone, 0);
println!("Subscribed to {}", zone);
}

}

// Callback for a failed attempt to connect to the server.
// We simply sleep and then try again.
//
// Note that normally we don't want to do a blocking operation or sleep
// from within a callback. But in th`is case, we know that the client is
// *not* conected, and thus not doing anything important. So we don't worry
// too much about stopping its callback thread.
fn on_connect_failure(cli: &mqtt::AsyncClient, _msgid: u16, rc: i32) {
println!("Connection attempt failed with error code {}.\n", rc);
thread::sleep(Duration::from_millis(2500));
cli.reconnect_with_callbacks(on_connect_success, on_connect_failure);
}
Loading

0 comments on commit 0859328

Please sign in to comment.