-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathsave_data.js
127 lines (108 loc) · 3.79 KB
/
save_data.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
/* Manage Script to
- Listen to Mqtt Broker
- On message
- Analyse message Type ( Sensor, jetpack, ConfigSys, RtcConfig... )
- Request for save on Mongo
- Request for save on InFlux
- If Internet connected, send to Cloud Mqtt Broker
- Winston Logger to get last actions... ( Debug process )
*/
var DEBUG = true;
var jsonfile = require('jsonfile')
jsonfile.spaces = 4;
var mqtt = require('mqtt'); //includes mqtt server
var moment = require('moment')
const TamataInfluxDB = require('./actions/components/TamataInflux')
const TamataMongoDB = require('./actions/components/TamataMongo')
var jsonConfig ;
/* Config JSON indent mode */
// configFile = "/home/pi/node/config/config.json";
var configFile = "../config/config.json";
var mqttTopicIn=""
var mqttTopicOut=""
var mqttServer=""
var mqttAWS= ""
var influx;
var mongo;
//---------------------
//get config
//---------------------
jsonfile.readFile(configFile, function(err, data) {
jsonConfig = data;
if (err) throw err;
mqttTopic = jsonConfig.system.mqttTopic + '/update';
mqttServer = jsonConfig.system.mqttServer;
mqttAWS = jsonConfig.system.mqttAWS;
user = jsonConfig.system.user;
begin();
});
function begin() {
if (DEBUG) {
// console.log('Function Begin(jsonConfig) : '+JSON.stringify(jsonConfig) );
console.log('Config');
console.log('MqttServer ='+ jsonConfig.system.mqttServer);
console.log('MqttTopic ='+ jsonConfig.system.mqttTopic);
console.log('Mongo Config ='+ JSON.stringify(jsonConfig.system.mongoDB) ) ;
console.log('Influx Config ='+ JSON.stringify(jsonConfig.system.influxDB) ) ;
}
client = mqtt.connect('mqtt://'+ jsonConfig.system.mqttServer );
client.subscribe( jsonConfig.system.mqttTopic + "/update");
//client.subscribe("dev/update");
client.on('connect', () => { console.log('Mqtt connected to ' + jsonConfig.system.mqttServer + "/ Topic : " + jsonConfig.system.mqttTopic )} )
client.on('message', insertEvent );
}
function insertEvent(topic,message) {
var parsedMessage = JSON.parse(message);
if (DEBUG) console.log('************************');
if (DEBUG) console.log('Mqtt Message received : ');
if (DEBUG) console.log('Insert Message : ' + JSON.stringify(parsedMessage) ) ;
// Checking Message
const promiseMeasurement = new Promise( (resolve, reject) => {
return resolve();
})
// Then Connect to Moogose // Influx
.then( getMeasurement(parsedMessage)
.then( (measurement) => {
if (measurement !== "unManaged") {
if (DEBUG) console.log('Begin saving... measurement = ' + measurement );
influx = new TamataInfluxDB( jsonConfig.system.influxDB, measurement );
influx.save( parsedMessage, measurement );
// mongo = new TamataMongoDB( jsonConfig.system.mongoDB, measurement );
// mongo.save(parsedMessage, measurement);
}
else {
if (DEBUG) console.log('UnManaged measurement = ' + measurement );
}
})
);
// Then Internet Check & Close DBs
promiseMeasurement.then( () => {
if (DEBUG) console.log('Last actions... ');
});
} // End Insert Event
/*********************************************
- function getMeasurement(parsedMessage)
Return a Promise with measurement type
sensor, jetpack, coolboardconfig, rtcconfig...
*/
function getMeasurement(parsedMessage) {
return new Promise( (resolve, reject) => {
return resolve();
}).then( () => {
if (DEBUG) {console.log('Function getMeasurement start... ');}
var measurement;
if ( parsedMessage.state.reported.user === "TamataSpiru" ) {
if (DEBUG) { console.log('sensor message detected') }
measurement = "sensor";
return measurement;
}
else if ( parsedMessage.state.reported.Act0 !== null ){//&& parsedMessage.state.reported.user === null ) {
if (DEBUG) { console.log('jetpack message detected') }
measurement = "jetpack"
return measurement;
}
else {
return "unManaged";
}
});
}