Skip to content

Commit

Permalink
merged PR AlCalzone#10 and fixed the remaining bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
AlCalzone committed Sep 23, 2017
2 parents e3549f3 + 9de3200 commit 3942caa
Show file tree
Hide file tree
Showing 9 changed files with 301 additions and 45 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ The target must be a string or url of the form `coap(s)://hostname:port` or an i

## Changelog

#### 0.4.0 (2017-09-XX)
* (AlCalzone) Limit the number of concurrent requests

#### 0.3.2 (2017-09-21)
* (AlCalzone) Update DTLS library: Alert protocol support

Expand Down
17 changes: 14 additions & 3 deletions build/CoapClient.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ export declare class CoapClient {
private static pendingRequestsByToken;
private static pendingRequestsByMsgID;
private static pendingRequestsByUrl;
/** Array of the messages waiting to be sent */
private static sendQueue;
private static sendQueueHighPrioCount;
private static isSending;
/** Number of message we expect an answer for */
private static concurrency;
/**
* Sets the security params to be used for the given hostname
*/
Expand Down Expand Up @@ -91,9 +97,14 @@ export declare class CoapClient {
private static createMessage(type, code, messageId, token?, options?, payload?);
/**
* Send a CoAP message to the given endpoint
* @param connection
*/
private static send(connection, message);
* @param connection The connection to send the message on
* @param message The message to send
* @param highPriority Whether the message should be prioritized
*/
private static send(connection, message, highPriority?);
private static workOffSendQueue();
/** Calculates the current concurrency, i.e. how many parallel requests are being handled */
private static calculateConcurrency();
/**
* Remembers a request for resending lost messages and tracking responses and updates
* @param request
Expand Down
155 changes: 137 additions & 18 deletions build/CoapClient.js

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "node-coap-client",
"version": "0.3.2",
"version": "0.4.0",
"description": "Clientside implementation of the CoAP protocol with DTLS support.",
"keywords": [
"coap", "coaps", "dtls", "iot", "tradfri"
Expand Down
161 changes: 142 additions & 19 deletions src/CoapClient.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import * as crypto from "crypto";
import * as dgram from "dgram";
import { EventEmitter } from "events";
import { dtls } from "node-dtls-client";
import * as nodeUrl from "url";
import { ContentFormats } from "./ContentFormats";
Expand Down Expand Up @@ -42,7 +43,7 @@ interface ConnectionInfo {
lastMsgId: number;
}

interface PendingRequest {
interface IPendingRequest {
connection: ConnectionInfo;
url: string;
originalMessage: Message; // allows resending the message, includes token and message id
Expand All @@ -53,6 +54,50 @@ interface PendingRequest {
callback: (resp: CoapResponse) => void;
keepAlive: boolean;
observe: boolean;
concurrency: number;
}
class PendingRequest extends EventEmitter implements IPendingRequest {

constructor(initial?: IPendingRequest) {
super();
if (!initial) return;

this.connection = initial.connection;
this.url = initial.url;
this.originalMessage = initial.originalMessage;
this.retransmit = initial.retransmit;
this.promise = initial.promise;
this.callback = initial.callback;
this.keepAlive = initial.keepAlive;
this.observe = initial.observe;
this._concurrency = initial.concurrency;
}

public connection: ConnectionInfo;
public url: string;
public originalMessage: Message; // allows resending the message, includes token and message id
public retransmit: RetransmissionInfo;
// either (request):
public promise: Promise<CoapResponse>;
// or (observe)
public callback: (resp: CoapResponse) => void;
public keepAlive: boolean;
public observe: boolean;

private _concurrency: number;
public set concurrency(value: number) {
const changed = value !== this._concurrency;
this._concurrency = value;
if (changed) this.emit("concurrencyChanged", this);
}
public get concurrency(): number {
return this._concurrency;
}
}

interface QueuedMessage {
connection: ConnectionInfo;
message: Message;
}

export interface SecurityParameters {
Expand All @@ -72,19 +117,22 @@ const RETRANSMISSION_PARAMS = {
maxRetransmit: 4,
};
const TOKEN_LENGTH = 4;
/** How many concurrent messages are allowed. Should be 1 */
const MAX_CONCURRENCY = 1;

function incrementToken(token: Buffer): Buffer {
const len = token.length;
const ret = Buffer.alloc(len, token);
for (let i = len - 1; i >= 0; i--) {
if (token[i] < 0xff) {
token[i]++;
if (ret[i] < 0xff) {
ret[i]++;
break;
} else {
token[i] = 0;
ret[i] = 0;
// continue with the next digit
}
}
return token;
return ret;
}

function incrementMessageID(msgId: number): number {
Expand Down Expand Up @@ -113,8 +161,13 @@ export class CoapClient {
/** All pending requests, sorted by the token */
private static pendingRequestsByToken: { [token: string]: PendingRequest } = {};
private static pendingRequestsByMsgID: { [msgId: number]: PendingRequest } = {};
private static pendingRequestsByUrl: { [url: string]: PendingRequest } = {};

private static pendingRequestsByUrl: { [url: string]: PendingRequest } = {};
/** Array of the messages waiting to be sent */
private static sendQueue: QueuedMessage[] = [];
private static sendQueueHighPrioCount: number = 0;
private static isSending: boolean = false;
/** Number of message we expect an answer for */
private static concurrency: number = 0;
/**
* Sets the security params to be used for the given hostname
*/
Expand Down Expand Up @@ -225,7 +278,7 @@ export class CoapClient {
}

// remember the request
const req: PendingRequest = {
const req = new PendingRequest({
connection,
url: urlToString(url), // normalizedUrl
originalMessage: message,
Expand All @@ -234,7 +287,8 @@ export class CoapClient {
callback: null,
observe: false,
promise: response,
};
concurrency: 0,
});
// remember the request
CoapClient.rememberRequest(req);

Expand Down Expand Up @@ -279,7 +333,7 @@ export class CoapClient {
);

// remember the request
const req: PendingRequest = {
const req = new PendingRequest({
connection,
url: originString,
originalMessage: message,
Expand All @@ -288,7 +342,8 @@ export class CoapClient {
callback: null,
observe: false,
promise: response,
};
concurrency: 0,
});
// remember the request
CoapClient.rememberRequest(req);

Expand Down Expand Up @@ -425,7 +480,7 @@ export class CoapClient {
}

// remember the request
const req: PendingRequest = {
const req = new PendingRequest({
connection,
url: urlToString(url), // normalizedUrl
originalMessage: message,
Expand All @@ -434,7 +489,8 @@ export class CoapClient {
callback,
observe: true,
promise: null,
};
concurrency: 0,
});
// remember the request
CoapClient.rememberRequest(req);

Expand Down Expand Up @@ -469,6 +525,9 @@ export class CoapClient {
// see if we have a request for this message id
const request = CoapClient.findRequest({ msgID: coapMsg.messageId });
if (request != null) {
// reduce the request's concurrency, since it was handled on the server
request.concurrency = 0;
// handle the message
switch (coapMsg.type) {
case MessageType.ACK:
debug(`received ACK for ${coapMsg.messageId.toString(16)}, stopping retransmission...`);
Expand All @@ -492,7 +551,6 @@ export class CoapClient {
break;
}
}
// TODO handle non-piggybacked messages
} else if (coapMsg.code.isRequest()) {
// we are a client implementation, we should not get requests
// ignore them
Expand All @@ -508,6 +566,8 @@ export class CoapClient {
if (coapMsg.type === MessageType.ACK) {
debug(`received ACK for ${coapMsg.messageId.toString(16)}, stopping retransmission...`);
CoapClient.stopRetransmission(request);
// reduce the request's concurrency, since it was handled on the server
request.concurrency = 0;
}

// parse options
Expand Down Expand Up @@ -543,7 +603,7 @@ export class CoapClient {
MessageCodes.empty,
coapMsg.messageId,
);
CoapClient.send(request.connection, ACK);
CoapClient.send(request.connection, ACK, true);
}

} else { // request == null
Expand All @@ -561,7 +621,7 @@ export class CoapClient {
MessageCodes.empty,
coapMsg.messageId,
);
CoapClient.send(connection, RST);
CoapClient.send(connection, RST, true);
}
} // request != null?
} // (coapMsg.token && coapMsg.token.length)
Expand Down Expand Up @@ -594,16 +654,73 @@ export class CoapClient {

/**
* Send a CoAP message to the given endpoint
* @param connection
* @param connection The connection to send the message on
* @param message The message to send
* @param highPriority Whether the message should be prioritized
*/
private static send(
connection: ConnectionInfo,
message: Message,
highPriority: boolean = false,
): void {

// send the message
connection.socket.send(message.serialize(), connection.origin);
// Put the message in the queue
if (highPriority) {
// insert at the end of the high-priority queue
CoapClient.sendQueue.splice(CoapClient.sendQueueHighPrioCount, 0, {connection, message});
CoapClient.sendQueueHighPrioCount++;
} else {
// at the end
CoapClient.sendQueue.push({connection, message});
}
debug(`added message to send queue, new length = ${CoapClient.sendQueue.length} (high prio: ${CoapClient.sendQueueHighPrioCount})`);

// if there's a request for this message, listen for concurrency changes
const request = CoapClient.findRequest({msgID: message.messageId});
if (request != null) {
// and continue working off the queue when it drops
request.on("concurrencyChanged", (req) => {
if (request.concurrency === 0) CoapClient.workOffSendQueue();
});
}

// start working it off now (maybe)
CoapClient.workOffSendQueue();
}
private static workOffSendQueue() {

// check if there are messages to send
if (CoapClient.sendQueue.length === 0) {
debug(`workOffSendQueue > queue empty`);
return;
}

// check if we may send a message now
debug(`workOffSendQueue > concurrency = ${CoapClient.calculateConcurrency()} (MAX ${MAX_CONCURRENCY})`);
if (CoapClient.calculateConcurrency() < MAX_CONCURRENCY) {
// get the next message to send
const { connection, message } = CoapClient.sendQueue.shift();
debug(`concurrency low enough, sending message ${message.messageId.toString(16)}`);
// update the request's concurrency (it's now being handled)
const request = CoapClient.findRequest({ msgID: message.messageId });
if (request != null) request.concurrency = 1;
// update the high priority count
if (CoapClient.sendQueueHighPrioCount > 0) CoapClient.sendQueueHighPrioCount--;
// send the message
connection.socket.send(message.serialize(), connection.origin);
}

// to avoid any deadlocks we didn't think of, re-call this later
setTimeout(CoapClient.workOffSendQueue, 1000);
}

/** Calculates the current concurrency, i.e. how many parallel requests are being handled */
private static calculateConcurrency(): number {
return Object.keys(CoapClient.pendingRequestsByMsgID) // find all requests
.map(msgid => CoapClient.pendingRequestsByMsgID[msgid])
.map(req => req.concurrency) // extract their concurrency
.reduce((sum, item) => sum + item, 0) // and sum it up
;
}

/**
Expand Down Expand Up @@ -673,6 +790,11 @@ export class CoapClient {
delete CoapClient.pendingRequestsByUrl[request.url];
}

// Set concurrency to 0, so the send queue can continue
request.concurrency = 0;
// Clean up the event listeners
request.removeAllListeners();

// If this request doesn't have the keepAlive option,
// close the connection if it was the last one with the same origin
if (!request.keepAlive) {
Expand All @@ -683,6 +805,7 @@ export class CoapClient {
CoapClient.reset(origin);
}
}

}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/ContentFormats.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export enum ContentFormats {
export enum ContentFormats {
text_plain = 0,
application_linkFormat = 40,
application_xml = 41,
Expand Down
2 changes: 1 addition & 1 deletion src/Message.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { expect } from "chai";
import { expect } from "chai";

import { Message, MessageCode, MessageCodes, MessageType } from "./Message";

Expand Down
2 changes: 1 addition & 1 deletion src/Message.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Option } from "./Option";
import { Option } from "./Option";

export enum MessageType {
CON = 0, // Confirmable
Expand Down
2 changes: 1 addition & 1 deletion src/lib/DeferredPromise.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export interface DeferredPromise<T> extends Promise<T> {
export interface DeferredPromise<T> extends Promise<T> {
resolve(value?: T | PromiseLike<T>): void;
reject(reason?: any): void;
}
Expand Down

0 comments on commit 3942caa

Please sign in to comment.