Skip to content

Commit

Permalink
Merge branch 'release/0.1.2'
Browse files Browse the repository at this point in the history
  • Loading branch information
wtolson committed Jul 8, 2014
2 parents a16ecdc + ab9d894 commit d44f0f4
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 11 deletions.
5 changes: 5 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
History
-------

0.1.2 (2014-07-08)
~~~~~~~~~~~~~~~~~~

* Flush delfate buffer for each message.

0.1.1 (2014-07-07)
~~~~~~~~~~~~~~~~~~

Expand Down
10 changes: 5 additions & 5 deletions gnsq/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,17 +262,16 @@ def close(self):
if not self.is_running:
return

self.logger.debug('closing')
self.state = CLOSED

self.logger.debug('closing %d worker(s)' % len(self.workers))
for worker in self.workers:
worker.kill(block=False)

self.logger.debug('closing %d connection(s)' % len(self.conns))
for conn in self.conns:
conn.close_stream()

self.logger.debug('workers: %r' % self.workers)

def join(self, timeout=None, raise_error=False):
"""Block until all connections have closed and workers stopped."""
gevent.joinall(self.workers, timeout, raise_error)
Expand Down Expand Up @@ -548,8 +547,6 @@ def _listen(self, conn):
try:
conn.listen()
except NSQException as error:
if self.state == CLOSED:
return
self.logger.warning('[%s] connection lost (%r)' % (conn, error))

self.handle_connection_failure(conn)
Expand All @@ -576,6 +573,9 @@ def handle_connection_failure(self, conn):
self.conn_workers.pop(conn, None)
conn.close_stream()

if self.state == CLOSED:
return

if conn.ready_count:
self.need_ready_redistributed = True

Expand Down
7 changes: 6 additions & 1 deletion gnsq/stream/defalte.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@ def __init__(self, socket, level):
super(DefalteSocket, self).__init__(socket)

def compress(self, data):
return self._compressor.compress(data)
data = self._compressor.compress(data)
return data + self._compressor.flush(zlib.Z_SYNC_FLUSH)

def decompress(self, data):
return self._decompressor.decompress(data)

def close(self):
self._socket.write(self._compressor.flush(zlib.Z_FINISH))
self._socket.close()
2 changes: 1 addition & 1 deletion gnsq/version.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# -*- coding: utf-8 -*-
# also update in setup.py
__version__ = '0.1.1'
__version__ = '0.1.2'
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

setup(
name='gnsq',
version='0.1.1',
version='0.1.2',
description='A gevent based NSQ driver for Python.',
long_description=readme + '\n\n' + history,
author='Trevor Olson',
Expand Down
46 changes: 43 additions & 3 deletions tests/test_nsqd.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,17 @@ def test_tls():
assert frame == nsq.FRAME_TYPE_RESPONSE
assert data == 'OK'

conn.close()
conn.subscribe('topic', 'channel')
frame, data = conn.read_response()
assert frame == nsq.FRAME_TYPE_RESPONSE
assert data == 'OK'

conn.ready(1)
frame, data = conn.read_response()
assert frame == nsq.FRAME_TYPE_MESSAGE
assert data.body == 'sup'

conn.close_stream()


@pytest.mark.slow
Expand All @@ -350,7 +360,22 @@ def test_deflate():
assert frame == nsq.FRAME_TYPE_RESPONSE
assert data == 'OK'

conn.close()
conn.publish('topic', 'sup')
frame, data = conn.read_response()
assert frame == nsq.FRAME_TYPE_RESPONSE
assert data == 'OK'

conn.subscribe('topic', 'channel')
frame, data = conn.read_response()
assert frame == nsq.FRAME_TYPE_RESPONSE
assert data == 'OK'

conn.ready(1)
frame, data = conn.read_response()
assert frame == nsq.FRAME_TYPE_MESSAGE
assert data.body == 'sup'

conn.close_stream()


@pytest.mark.slow
Expand All @@ -373,7 +398,22 @@ def test_snappy():
assert frame == nsq.FRAME_TYPE_RESPONSE
assert data == 'OK'

conn.close()
conn.publish('topic', 'sup')
frame, data = conn.read_response()
assert frame == nsq.FRAME_TYPE_RESPONSE
assert data == 'OK'

conn.subscribe('topic', 'channel')
frame, data = conn.read_response()
assert frame == nsq.FRAME_TYPE_RESPONSE
assert data == 'OK'

conn.ready(1)
frame, data = conn.read_response()
assert frame == nsq.FRAME_TYPE_MESSAGE
assert data.body == 'sup'

conn.close_stream()


@pytest.mark.slow
Expand Down

0 comments on commit d44f0f4

Please sign in to comment.