Skip to content

Commit

Permalink
Merge pull request #2 from roderm/queue-cmds
Browse files Browse the repository at this point in the history
Queue cmds
  • Loading branch information
roderm authored Oct 26, 2018
2 parents 4d8cc37 + 73f8129 commit 9e96d61
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 13 deletions.
2 changes: 1 addition & 1 deletion lib/telnet-device.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class TelnetDevice extends Device {
})
}
unload(){
this.fl.unload();
this.telnetService.unload()
}
}

Expand Down
68 changes: 56 additions & 12 deletions lib/telnet-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,43 @@ const { filter, map } = require('rxjs/operators');

const LFCR = '\r\n';

class Queue {
constructor(_timeout) {
this.timeout = _timeout;
this.queue = new Array();
}
_sleep(time) {
return new Promise(resolve => setTimeout(resolve, time));
}
next() {
return new Promise((resolve, reject) => {
let wait_next = async () => {
let next
do {
await (this._sleep(this.timeout))
next = this.queue.pop()
} while (!next)
resolve(next)
}
wait_next();
})
}
push(val) {
this.queue.push(val)
}
}

class TelnetClient extends events.EventEmitter {
constructor(params) {
super()
this.setMaxListeners(0)
this.socket = null;
this.DataStream = new Subject()
this.params = params;
this.params.timeout = parseInt(this.params.timeout) ? parseInt(this.params.timeout) : 1000*60
this.params.timeout = parseInt(this.params.timeout) ? parseInt(this.params.timeout) : 1000 * 60
this.reconnectTry = null;
this.CmdQueue = new Queue(500);
this._solveWrite();
}
connect() {
return new Promise((resolve, reject) => {
Expand All @@ -30,15 +58,16 @@ class TelnetClient extends events.EventEmitter {
}, () => {
this.socket
.setNoDelay(true)
.setKeepAlive(true, 1000*2)
.setKeepAlive(true, 1000 * 2)

this.emit('connect')
this.reconnectTry = null;
resolve();
})
fromEvent(this.socket, 'data')
.pipe(map((chunk) => chunk.toString().trim()))
.pipe(filter((cmd) => typeof cmd != 'undefined'))
.pipe(filter((cmd) => cmd.length > 0 ))
.pipe(filter((cmd) => cmd.length > 0))
.subscribe((val) => this.DataStream.next(val))


Expand All @@ -58,10 +87,25 @@ class TelnetClient extends events.EventEmitter {
})
})
}

write(command) {
this.socket.write(command + LFCR);
this.CmdQueue.push((_continue) => {
let write_fn = async () => {
while (this.reconnectTry != null) {
await new Promise(resolve => setTimeout(resolve, this.params.timeout));
}
this.socket.write(command + LFCR);
_continue();

}
write_fn()
})
}
_solveWrite(){
this.CmdQueue.next()
.then((fn) => (fn(this._solveWrite.bind(this))))
}
close(){
close() {
this.socket.close();
setTimeout(() => {
clearTimeout(this.reconnectTry)
Expand Down Expand Up @@ -101,7 +145,7 @@ class TelnetService {
.pipe(map((cmd) => {
try {
// TODO: Move to a config
let rex = /(^[A-Z]+[\d]??[A-Z]+)([\d]{1,})/g
let rex = /(^[A-Z]+[\d]??[A-Z]+)([\d]{1,})/g
let res = rex.exec(cmd);
let retVal = {
name: res[1],
Expand All @@ -123,16 +167,16 @@ class TelnetService {
.pipe(filter((update) => {
// Remove values which failed in regex
// TODO: Log into file
if(update.error){
if (update.error) {
return false
}else{
} else {
return true;
}
})).subscribe(this.receiver)

this.conn.on('reconnected', () => {
console.log('reconnected')
})
this.conn.on('reconnected', () => {
console.log('reconnected')
})
this.conn.on('error', err => {
console.log('error:', err)
})
Expand Down Expand Up @@ -164,7 +208,7 @@ class TelnetService {
Observe(command) {
return this.receiver.pipe(filter(cmd => cmd.placeholder == command))
}
unload(){
unload() {
this.conn.unload()
}
}
Expand Down

0 comments on commit 9e96d61

Please sign in to comment.