diff --git a/lib/telnet-device.js b/lib/telnet-device.js index 615f0fd..7501653 100644 --- a/lib/telnet-device.js +++ b/lib/telnet-device.js @@ -34,7 +34,7 @@ class TelnetDevice extends Device { }) } unload(){ - this.fl.unload(); + this.telnetService.unload() } } diff --git a/lib/telnet-service.js b/lib/telnet-service.js index e3c3826..36f207f 100644 --- a/lib/telnet-service.js +++ b/lib/telnet-service.js @@ -8,6 +8,32 @@ const { filter, map } = require('rxjs/operators'); const LFCR = '\r\n'; +class Queue { + constructor(_timeout) { + this.timeout = _timeout; + this.queue = new Array(); + } + _sleep(time) { + return new Promise(resolve => setTimeout(resolve, time)); + } + next() { + return new Promise((resolve, reject) => { + let wait_next = async () => { + let next + do { + await (this._sleep(this.timeout)) + next = this.queue.pop() + } while (!next) + resolve(next) + } + wait_next(); + }) + } + push(val) { + this.queue.push(val) + } +} + class TelnetClient extends events.EventEmitter { constructor(params) { super() @@ -15,8 +41,10 @@ class TelnetClient extends events.EventEmitter { this.socket = null; this.DataStream = new Subject() this.params = params; - this.params.timeout = parseInt(this.params.timeout) ? parseInt(this.params.timeout) : 1000*60 + this.params.timeout = parseInt(this.params.timeout) ? parseInt(this.params.timeout) : 1000 * 60 this.reconnectTry = null; + this.CmdQueue = new Queue(500); + this._solveWrite(); } connect() { return new Promise((resolve, reject) => { @@ -30,15 +58,16 @@ class TelnetClient extends events.EventEmitter { }, () => { this.socket .setNoDelay(true) - .setKeepAlive(true, 1000*2) + .setKeepAlive(true, 1000 * 2) this.emit('connect') + this.reconnectTry = null; resolve(); }) fromEvent(this.socket, 'data') .pipe(map((chunk) => chunk.toString().trim())) .pipe(filter((cmd) => typeof cmd != 'undefined')) - .pipe(filter((cmd) => cmd.length > 0 )) + .pipe(filter((cmd) => cmd.length > 0)) .subscribe((val) => this.DataStream.next(val)) @@ -58,10 +87,25 @@ class TelnetClient extends events.EventEmitter { }) }) } + write(command) { - this.socket.write(command + LFCR); + this.CmdQueue.push((_continue) => { + let write_fn = async () => { + while (this.reconnectTry != null) { + await new Promise(resolve => setTimeout(resolve, this.params.timeout)); + } + this.socket.write(command + LFCR); + _continue(); + + } + write_fn() + }) + } + _solveWrite(){ + this.CmdQueue.next() + .then((fn) => (fn(this._solveWrite.bind(this)))) } - close(){ + close() { this.socket.close(); setTimeout(() => { clearTimeout(this.reconnectTry) @@ -101,7 +145,7 @@ class TelnetService { .pipe(map((cmd) => { try { // TODO: Move to a config - let rex = /(^[A-Z]+[\d]??[A-Z]+)([\d]{1,})/g + let rex = /(^[A-Z]+[\d]??[A-Z]+)([\d]{1,})/g let res = rex.exec(cmd); let retVal = { name: res[1], @@ -123,16 +167,16 @@ class TelnetService { .pipe(filter((update) => { // Remove values which failed in regex // TODO: Log into file - if(update.error){ + if (update.error) { return false - }else{ + } else { return true; } })).subscribe(this.receiver) - this.conn.on('reconnected', () => { - console.log('reconnected') - }) + this.conn.on('reconnected', () => { + console.log('reconnected') + }) this.conn.on('error', err => { console.log('error:', err) }) @@ -164,7 +208,7 @@ class TelnetService { Observe(command) { return this.receiver.pipe(filter(cmd => cmd.placeholder == command)) } - unload(){ + unload() { this.conn.unload() } }