Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for multi-reply handlers #92

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 98 additions & 91 deletions lib/xcore.js
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ XClient.prototype.importRequestsFromTemplates = function(target, reqs)
var requestArguments = reqPack[1];

if (callback)
this.replies[this.seq_num] = [reqReplTemplate[1], callback];
this.replies[this.seq_num] = [reqReplTemplate[1], callback, reqReplTemplate[2]];

client.pack_stream.pack(format, requestArguments);
var b = client.pack_stream.write_queue[0];
Expand Down Expand Up @@ -225,7 +225,7 @@ XClient.prototype.importRequestsFromTemplates = function(target, reqs)
requestArguments.push(args[a]);

if (callback)
this.replies[this.seq_num] = [reqReplTemplate[1], callback];
this.replies[this.seq_num] = [reqReplTemplate[1], callback, reqReplTemplate[2]];

client.pack_stream.pack(format, requestArguments);
client.pack_stream.flush();
Expand Down Expand Up @@ -422,111 +422,118 @@ XClient.prototype.expectReplyHeader = function()

var client = this;
client.pack_stream.get( 8, function(headerBuf) {
var res = headerBuf.unpack('CCSL');
var type = res[0];
var seq_num = res[2];
var bad_value = res[3];
var res = headerBuf.unpack('CCSL');
var type = res[0];
var seq_num = res[2];
var bad_value = res[3];

if (type == 0)
{
var error_code = res[1];
var error = new Error();
error.error = error_code;
error.seq = seq_num;
if (client.options.debug) {
error.longstack = client.seq2stack[error.seq]
console.log(client.seq2stack[error.seq].stack);
}

// unpack error packet (32 bytes for all error types, 8 of them in CCSL header)
client.pack_stream.get(24, function(buf) {

var res = buf.unpack('SC');
error.message = xerrors.errorText[error_code];
error.badParam = bad_value;
error.minorOpcode = res[0];
error.majorOpcode = res[1];

var extUnpacker = client.errorParsers[error_code];
if (extUnpacker) {
extUnpacker(error, error_code, seq_num, bad_value, buf);
}
function unsubscribe() {
delete client.replies[seq_num];
}

var handler = client.replies[seq_num];
if (handler) {
var callback = handler[1];
var handled = callback(error);
if (!handled)
client.emit('error', error);
// TODO: should we delete seq2stack and reply even if there is no handler?
if (client.options.debug)
delete client.seq2stack[seq_num];
delete client.replies[seq_num];
} else
client.emit('error', error);
client.expectReplyHeader();
} );
return;
} else if (type > 1)
{
client.pack_stream.get(24, function(buf) {
var extra = res[3];
var code = res[1];
var ev = client.unpackEvent(type, seq_num, extra, code, buf, headerBuf);

// raw event 32-bytes packet (primarily for use in SendEvent);
// TODO: Event::pack based on event parameters, inverse to unpackEvent
ev.rawData = new Buffer(32);
headerBuf.copy(ev.rawData);
buf.copy(ev.rawData, 8);

client.emit('event', ev);
var ee = client.event_consumers[ev.wid];
if (ee) {
ee.emit('event', ev);
}
if (ev.parent) {
ee = client.event_consumers[ev.parent];
if (ee)
ee.emit('child-event', ev);
}
client.expectReplyHeader();
} );
return;
if (type == 0) {
var error_code = res[1];
var error = new Error();
error.error = error_code;
error.seq = seq_num;
if (client.options.debug) {
error.longstack = client.seq2stack[error.seq]
console.log(client.seq2stack[error.seq].stack);
}

var opt_data = res[1];
var length_total = res[3]; // in 4-bytes units, _including_ this header
var bodylength = 24 + length_total*4; // 24 is rest if 32-bytes header

client.pack_stream.get( bodylength, function( data ) {
// unpack error packet (32 bytes for all error types, 8 of them in CCSL header)
client.pack_stream.get(24, function(buf) {
var res = buf.unpack('SC');
error.message = xerrors.errorText[error_code];
error.badParam = bad_value;
error.minorOpcode = res[0];
error.majorOpcode = res[1];

var extUnpacker = client.errorParsers[error_code];
if (extUnpacker) {
extUnpacker(error, error_code, seq_num, bad_value, buf);
}

var handler = client.replies[seq_num];
if (handler) {
var unpack = handler[0];
if (client.pending_atoms[seq_num]) {
opt_data = seq_num;
}

var result = unpack.call(client, data, opt_data);
var callback = handler[1];
callback(null, result);
// TODO: add multiple replies flag and delete handler only after last reply (eg ListFontsWithInfo)
delete client.replies[seq_num];
var handled = callback(error);
if (!handled) {
client.emit('error', error);
}
// TODO: should we delete seq2stack and reply even if there is no handler?
if (client.options.debug) {
delete client.seq2stack[seq_num];
}
unsubscribe();
} else {
client.emit('error', error);
}
client.expectReplyHeader();
} );
return;
} else if (type > 1) {
client.pack_stream.get(24, function(buf) {
var extra = res[3];
var code = res[1];
var ev = client.unpackEvent(type, seq_num, extra, code, buf, headerBuf);

// raw event 32-bytes packet (primarily for use in SendEvent);
// TODO: Event::pack based on event parameters, inverse to unpackEvent
ev.rawData = new Buffer(32);
headerBuf.copy(ev.rawData);
buf.copy(ev.rawData, 8);

client.emit('event', ev);
var ee = client.event_consumers[ev.wid];
if (ee) {
ee.emit('event', ev);
}
if (ev.parent) {
ee = client.event_consumers[ev.parent];
if (ee) {
ee.emit('child-event', ev);
}
}
// wait for new packet from server
client.expectReplyHeader();
});
return;
}
);

var opt_data = res[1];
var length_total = res[3]; // in 4-bytes units, _including_ this header
var bodylength = 24 + length_total*4; // 24 is rest if 32-bytes header

client.pack_stream.get( bodylength, function( data ) {

var handler = client.replies[seq_num];
if (handler) {
var unpack = handler[0];
var callback = handler[1];
var multiReply = handler[2];
if (client.pending_atoms[seq_num]) {
opt_data = seq_num;
}
var args = [data, opt_data];
if (multiReply) {
args.push(unsubscribe);
} else {
unsubscribe();
}
var result = unpack.apply(client, args);
callback(null, result);
}
// wait for new packet from server
client.expectReplyHeader();
});
});
}

XClient.prototype.startHandshake = function() {
XClient.prototype.startHandshake = function()
{
var client = this;

handshake.writeClientHello(this.pack_stream, this.displayNum, this.authHost);
handshake.readServerHello(this.pack_stream, function(display)
{
handshake.readServerHello(this.pack_stream, function(display) {
// TODO: readServerHello can set error state in display
// emit error in that case
client.expectReplyHeader();
Expand Down