Skip to content

Commit

Permalink
version 0.0.5
Browse files Browse the repository at this point in the history
  • Loading branch information
haio committed Nov 26, 2013
1 parent 194b55a commit 36ca8da
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 50 deletions.
10 changes: 3 additions & 7 deletions example/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,11 @@ var Client = kafka.Client;

var client = new Client();
var topics = [
{topic: 'topic2'},
{topic: 'topic1'},
{topic: 't2'},
{topic: 'topic3'}
],
options = { autoCommit: false, fromBeginning: false, fetchMaxWaitMs: 10000 };
options = { autoCommit: false, fromBeginning: false, fetchMaxWaitMs: 1000 };

function createConsumer() {
function createConsumer(topics) {
var consumer = new Consumer(client, topics, options);
var offset = new Offset(client);
consumer.on('message', function (message) {
Expand All @@ -25,7 +22,6 @@ function createConsumer() {
console.log('error', err);
});
consumer.on('offsetOutOfRange', function (topic) {
console.log(topic);
topic.maxNum = 2;
offset.fetch([topic], function (err, offsets) {
var min = Math.min.apply(null, offsets[topic.topic][topic.partition]);
Expand All @@ -34,4 +30,4 @@ function createConsumer() {
})
}

createConsumer();
createConsumer(topics);
12 changes: 7 additions & 5 deletions example/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ var kafka = require('../kafka'),
Client = kafka.Client,
client = new Client();

var argv = require('optimist').argv;
var topic = argv.topic || 'topic1';

var producer = new Producer(client);

var letters = 'abcdefghijklmnopqrstuvwxyz',
Expand All @@ -26,18 +29,17 @@ function createMsg() {

var count = 1, rets = 0;
producer.on('ready', function () {
setInterval(send, 1000);
//setInterval(send, 1000);
send();
});

function send() {
for (var i = 0; i < count; i++) {
producer.send([
{topic: 'topic1', messages: ['777777777777777' + 1 + 'coolmessage'] },
{topic: 'topic2', messages: ['777777777777777' + 2 + 'coolmessage'] }
{topic: topic, messages: ['777777777777777' + 2 + 'coolmessage'] }
], function (err, data) {
if (err) console.log(arguments);
else console.log(data);
//if (++rets === count) process.exit();
if (++rets === count) process.exit();
});
}
}
23 changes: 9 additions & 14 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ var Client = function (connectionString, clientId) {
this.connectionString = connectionString || 'localhost:2181/kafka0.8';
this.clientId = clientId || 'kafka-node-client';
this.brokers = {}
this.longPollingBrokers = {};
this.topicMetadata = {};
this.topicPartitions = {};
this.correlationId = 0;
Expand All @@ -38,9 +37,7 @@ Client.prototype.connect = function () {
});
zk.on('brokersChanged', function (brokerMetadata) {
self.refreshBrokers(brokerMetadata);
//setTimeout(function () {
self.emit('brokersChanged');
//}, 5000);
self.emit('brokersChanged');
});
}

Expand Down Expand Up @@ -72,7 +69,7 @@ Client.prototype.sendFetchRequest = function (consumer, payloads, fetchMaxWaitMs
Array.prototype.unshift.call(arguments, 'error')
consumer.emit.apply(consumer, arguments);
}
}, consumer.longpolling);
});
}

Client.prototype.sendProduceRequest = function (payloads, requireAcks, ackTimeoutMs, cb) {
Expand Down Expand Up @@ -173,7 +170,6 @@ Client.prototype.refreshBrokers = function (brokerMetadata) {
return !~_.values(brokerMetadata).map(function (b) { return b.host + ':' + b.port }).indexOf(k);
}).forEach(function (deadKey) {
delete this.brokers[deadKey];
delete this.longPollingBrokers[deadKey];
}.bind(this));
}

Expand All @@ -189,12 +185,12 @@ Client.prototype.refreshMetadata = function (topicNames, cb) {
});
}

Client.prototype.send = function (payloads, encoder, decoder, cb, longPolling) {
Client.prototype.send = function (payloads, encoder, decoder, cb) {
var self = this;
// payloads: [ [metadata exists], [metadta not exists] ]
payloads = this.checkMetadatas(payloads);
if (payloads[0].length && !payloads[1].length) {
this.sendToBroker(_.flatten(payloads), encoder, decoder, cb, longPolling);
this.sendToBroker(_.flatten(payloads), encoder, decoder, cb);
return;
}
if (payloads[1].length) {
Expand All @@ -204,17 +200,17 @@ Client.prototype.send = function (payloads, encoder, decoder, cb, longPolling) {
var error = resp[1].error;
if (error) return cb(error);
self.updateMetadatas(resp);
self.sendToBroker(payloads[1].concat(payloads[0]), encoder, decoder, cb, longPolling);
self.sendToBroker(payloads[1].concat(payloads[0]), encoder, decoder, cb);
});
}
}

Client.prototype.sendToBroker = function (payloads, encoder, decoder, cb, longPolling) {
Client.prototype.sendToBroker = function (payloads, encoder, decoder, cb) {
payloads = this.payloadsByLeader(payloads);
for (var leader in payloads) {
var correlationId = this.nextId();
var request = encoder.call(null, this.clientId, correlationId, payloads[leader]);
var broker = this.brokerForLeader(leader, longPolling);
var broker = this.brokerForLeader(leader);
if (broker.error) return cb('Leader not available', payloads[leader]);
this.cbqueue[correlationId] = [decoder, cb];
broker && broker.write(request);
Expand Down Expand Up @@ -268,8 +264,8 @@ Client.prototype.leaderByPartition = function (topic, partition) {
return this.topicMetadata[topic][partition].leader;
}

Client.prototype.brokerForLeader = function (leader, longPolling) {
var brokers = longPolling ? this.longPollingBrokers : this.brokers;
Client.prototype.brokerForLeader = function (leader) {
var brokers = this.brokers;
// If leader is not give, choose the first broker as leader
if (typeof leader === 'undefined') {
if (!_.isEmpty(brokers)) {
Expand Down Expand Up @@ -318,7 +314,6 @@ Client.prototype.createBroker = function connect(host, port) {

function retry(s) {
if(s.retrying) return;
console.log('retry', s.addr)
s.retrying = true;
s.error = true;
s.retryTimer = setTimeout(function () {
Expand Down
22 changes: 3 additions & 19 deletions lib/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ var Consumer = function (client, topics, options) {
this.options = _.defaults( (options||{}), DEFAULTS );
this.ready = false;
this.id = nextId();
this.longpolling = false;
this.payloads = this.buildPayloads(topics);
this.connect();
}
Expand Down Expand Up @@ -121,16 +120,11 @@ Consumer.prototype.init = function () {
* Update offset info in current payloads
*/
Consumer.prototype.updateOffsets = function (topics) {
var offline = !this.longpolling;
this.payloads.forEach(function (p) {
if (!_.isEmpty(topics[p.topic])) {
var offset = topics[p.topic][p.partition];
p.offset = offset + 1;
offline = (offset < p.offlineOffset);
}
if (!_.isEmpty(topics[p.topic]))
p.offset = topics[p.topic][p.partition] + 1;
});

this.longpolling = !offline;
if (this.options.autoCommit) this.autoCommit();
}

Expand All @@ -155,17 +149,7 @@ Consumer.prototype.commit = Consumer.prototype.autoCommit = autoCommit;

Consumer.prototype.fetch = function () {
if (!this.ready) return;
var maxBytes = null,
fetchMaxWaitMs = this.options.fetchMaxWaitMs,
payloads = this.payloads;

if (!this.longpolling) {
maxBytes = 1024*1024;
fetchMaxWaitMs = 100;
payloads = this.payloads.map(function (p) { return _.defaults({ maxBytes: maxBytes }, p) });
}

this.client.sendFetchRequest(this, payloads, fetchMaxWaitMs, this.options.fetchMinBytes);
this.client.sendFetchRequest(this, this.payloads, this.options.fetchMaxWaitMs, this.options.fetchMinBytes);
}

Consumer.prototype.fetchOffset = function (payloads, cb) {
Expand Down
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "kafka-node",
"description": "node client for Apache kafka, only support kafka 0.8 and above",
"version": "0.0.4",
"version": "0.0.5",
"main": "kafka.js",
"dependencies": {
"buffermaker": "0.0.11",
Expand All @@ -13,7 +13,8 @@
"devDependencies": {
"mocha": "~1.12.0",
"should": "~1.2.2",
"line-by-line": "~0.1.1"
"line-by-line": "~0.1.1",
"optimist": "~0.6.0"
},
"repository": {
"type": "git",
Expand Down
26 changes: 23 additions & 3 deletions test/test.consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,41 @@ var client, consumer, producer, offset;

function noop() { console.log(arguments) }

function offsetOutOfRange (topic, consumer) {
topic.maxNum = 2;
offset.fetch([topic], function (err, offsets) {
var min = Math.min.apply(null, offsets[topic.topic][topic.partition]);
consumer.setOffset(topic.topic, topic.partition, min);
});
}

before(function (done) {
client = new Client();
producer = new Producer(client);
offset = new Offset(client);
producer.on('ready', function () {
producer.createTopics(['_exist_topic_1_test', '_exist_topic_2_test'], false, function (err, created) {
producer.send([{ topic: '_exist_topic_2_test', messages: 'hello kafka' }], function (err) {
producer.send([
{ topic: '_exist_topic_2_test', messages: 'hello kafka' }
], function (err) {
done(err);
});
});
});
});

describe('Consumer', function () {

describe('events', function () {
it ('should emit message when get new message', function (done) {
var topics = [ { topic: '_exist_topic_2_test' } ],
options = { autoCommit: false, groupId: '_groupId_1_test' };
var consumer = new Consumer(client, topics, options);
var count = 0;
consumer.on('error', noop);
consumer.on('offsetOutOfRange', function (topic) {
offsetOutOfRange.call(null, topic, this);
});
consumer.on('message', function (message) {
message.topic.should.equal('_exist_topic_2_test');
//message.value.should.equal('hello kafka');
Expand Down Expand Up @@ -128,15 +142,21 @@ describe('Consumer', function () {

describe('#commit', function () {
it('should commit offset of current topics', function (done) {
var options = { autoCommit: true, groupId: '_groupId_commit_test' },
topics = [{ topic: '_exist_topic_2_test' }];
var topics = [ { topic: '_exist_topic_2_test' } ],
options = { autoCommit: false, groupId: '_groupId_commit_test' };

var consumer = new Consumer(client, topics, options);
var count = 0;
consumer.on('error', noop);
consumer.on('offsetOutOfRange', function (topic) {
offsetOutOfRange.call(null, topic, this);
});
consumer.on('message', function (message) {
consumer.commit(true, function (err) {
if (!err && count++ === 0) done(err);
});
});

});
});

Expand Down

0 comments on commit 36ca8da

Please sign in to comment.