diff --git a/.gitignore b/.gitignore index 0d20b64..74ffd4e 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,43 @@ -*.pyc +*.py[cod] + +# C extensions +*.so + +# Packages +*.egg +*.egg-info +dist +build +eggs +parts +bin +var +sdist +develop-eggs +.installed.cfg +lib +lib64 + +# Installer logs +pip-log.txt + +# Unit test / coverage reports +.coverage +.tox +nosetests.xml +htmlcov + +# Translations +*.mo + +# Mr Developer +.mr.developer.cfg +.project +.pydevproject + +# Complexity +output/*.html +output/*/index.html + +# Sphinx +docs/_build diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..75d6015 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,16 @@ +# Config file for automatic testing at travis-ci.org + +language: python + +python: + - "2.7" + - "2.6" + +install: + - sudo apt-get install libsnappy-dev libevent-dev + - pip install -r requirements.txt + - wget https://s3.amazonaws.com/bitly-downloads/nsq/nsq-0.2.28.linux-amd64.go1.2.1.tar.gz + - tar zxvf nsq-0.2.28.linux-amd64.go1.2.1.tar.gz + - sudo cp nsq-0.2.28.linux-amd64.go1.2.1/bin/nsqd nsq-0.2.28.linux-amd64.go1.2.1/bin/nsqlookupd /usr/local/bin + +script: py.test --runslow diff --git a/AUTHORS.rst b/AUTHORS.rst new file mode 100644 index 0000000..8d64416 --- /dev/null +++ b/AUTHORS.rst @@ -0,0 +1,13 @@ +======= +Credits +======= + +Development Lead +---------------- + +* Trevor Olson + +Contributors +------------ + +None yet. Why not be the first? \ No newline at end of file diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst new file mode 100644 index 0000000..283e507 --- /dev/null +++ b/CONTRIBUTING.rst @@ -0,0 +1,111 @@ +============ +Contributing +============ + +Contributions are welcome, and they are greatly appreciated! Every +little bit helps, and credit will always be given. + +You can contribute in many ways: + +Types of Contributions +---------------------- + +Report Bugs +~~~~~~~~~~~ + +Report bugs at https://github.com/wtolson/gnsq/issues. + +If you are reporting a bug, please include: + +* Your operating system name and version. +* Any details about your local setup that might be helpful in troubleshooting. +* Detailed steps to reproduce the bug. + +Fix Bugs +~~~~~~~~ + +Look through the GitHub issues for bugs. Anything tagged with "bug" +is open to whoever wants to implement it. + +Implement Features +~~~~~~~~~~~~~~~~~~ + +Look through the GitHub issues for features. Anything tagged with "feature" +is open to whoever wants to implement it. + +Write Documentation +~~~~~~~~~~~~~~~~~~~ + +gnsq could always use more documentation, whether as part of the +official gnsq docs, in docstrings, or even on the web in blog posts, +articles, and such. + +Submit Feedback +~~~~~~~~~~~~~~~ + +The best way to send feedback is to file an issue at https://github.com/wtolson/gnsq/issues. + +If you are proposing a feature: + +* Explain in detail how it would work. +* Keep the scope as narrow as possible, to make it easier to implement. +* Remember that this is a volunteer-driven project, and that contributions + are welcome :) + +Get Started! +------------ + +Ready to contribute? Here's how to set up `gnsq` for local development. + +1. Fork the `gnsq` repo on GitHub. +2. Clone your fork locally:: + + $ git clone git@github.com:your_name_here/gnsq.git + +3. Install your local copy into a virtualenv. Assuming you have virtualenvwrapper installed, this is how you set up your fork for local development:: + + $ mkvirtualenv gnsq + $ cd gnsq/ + $ pip install -r requirements.txt + +4. Create a branch for local development:: + + $ git checkout -b name-of-your-bugfix-or-feature + + Now you can make your changes locally. + +5. When you're done making changes, check that your changes pass flake8 and the tests, including testing other Python versions with tox:: + + $ flake8 gnsq tests + $ py.test + $ tox + + To get flake8 and tox, just pip install them into your virtualenv. + +6. Commit your changes and push your branch to GitHub:: + + $ git add . + $ git commit -m "Your detailed description of your changes." + $ git push origin name-of-your-bugfix-or-feature + +7. Submit a pull request through the GitHub website. + +Pull Request Guidelines +----------------------- + +Before you submit a pull request, check that it meets these guidelines: + +1. The pull request should include tests. +2. If the pull request adds functionality, the docs should be updated. Put + your new functionality into a function with a docstring, and add the + feature to the list in README.rst. +3. The pull request should work for Python 2.6, 2.7, and 3.3, 3.4, and for PyPy. Check + https://travis-ci.org/wtolson/gnsq/pull_requests + and make sure that the tests pass for all supported Python versions. + +Tips +---- + +To run a subset of tests:: + + $ py.test tests/test_basic.py diff --git a/HISTORY.rst b/HISTORY.rst new file mode 100644 index 0000000..a072281 --- /dev/null +++ b/HISTORY.rst @@ -0,0 +1,9 @@ +.. :changelog: + +History +------- + +0.1.0 (2014-07-07) +~~~~~~~~~~~~~~~~~~ + +* First release on PyPI. diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..7a983bd --- /dev/null +++ b/LICENSE @@ -0,0 +1,12 @@ +Copyright (c) 2014, Trevor Olson +All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. + +* Neither the name of gnsq nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. \ No newline at end of file diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..6fd9409 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,11 @@ +include AUTHORS.rst +include CONTRIBUTING.rst +include HISTORY.rst +include LICENSE +include README.rst + +recursive-include tests * +recursive-exclude * __pycache__ +recursive-exclude * *.py[co] + +recursive-include docs *.rst conf.py Makefile make.bat \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..eaa6eba --- /dev/null +++ b/Makefile @@ -0,0 +1,67 @@ +.PHONY: clean-pyc clean-build clean-pyc clean-docs clean-coverage clean-tox docs clean + +help: + @echo "clean-build - remove build artifacts" + @echo "clean-pyc - remove Python file artifacts" + @echo "lint - check style with flake8" + @echo "test - run tests quickly with the default Python" + @echo "test-all - run tests on every Python version with tox" + @echo "coverage - check code coverage quickly with the default Python" + @echo "docs - generate Sphinx HTML documentation, including API docs" + @echo "release - package and upload a release" + @echo "dist - package" + +clean: clean-build clean-pyc clean-docs clean-coverage clean-tox + +clean-build: + rm -fr build/ + rm -fr dist/ + rm -fr *.egg-info + +clean-pyc: + find . -type f -name "*.py[co]" -delete + find . -type f -name '*~' -delete + find . -type d -name "__pycache__" -delete + +clean-docs: + rm -f docs/gnsq.rst + rm -f docs/gnsq.stream.rst + rm -f docs/modules.rst + $(MAKE) -C docs clean + +clean-coverage: + rm -f .coverage + rm -fr htmlcov/ + +clean-tox: + rm -fr .tox + +lint: + flake8 gnsq tests + +test: + py.test --runslow + +test-fast: + py.test + +test-all: + tox + +coverage: + py.test --runslow --cov gnsq --cov-report html tests + open htmlcov/index.html + +docs: clean-docs + sphinx-apidoc -o docs/ gnsq + $(MAKE) -C docs html + open docs/_build/html/index.html + +release: clean + python setup.py sdist upload + python setup.py bdist_wheel upload + +dist: clean + python setup.py sdist + python setup.py bdist_wheel + ls -l dist diff --git a/README.rst b/README.rst new file mode 100644 index 0000000..3d5cac8 --- /dev/null +++ b/README.rst @@ -0,0 +1,48 @@ +=============================== +gnsq +=============================== + +.. image:: https://badge.fury.io/py/gnsq.svg + :target: http://badge.fury.io/py/gnsq + +.. image:: https://travis-ci.org/wtolson/gnsq.svg?branch=master + :target: https://travis-ci.org/wtolson/gnsq + +.. image:: https://pypip.in/d/gnsq/badge.png + :target: https://pypi.python.org/pypi/gnsq + + +A `gevent`_ based `NSQ`_ driver for Python. + +* Free software: BSD license +* Documentation: http://gnsq.readthedocs.org. + +Installation +------------ + +At the command line:: + + $ easy_install gnsq + +Or even better, if you have virtualenvwrapper installed:: + + $ mkvirtualenv gnsq + $ pip install gnsq + +Usage +----- + +To use gnsq in a project:: + + import gnsq + reader = gnsq.Reader('topic', 'channel', 'localhost:4150') + + @reader.on_message.connect + def handler(reader, message): + do_work(message.body) + + reader.start() + + +.. _gevent: http://gevent.org/ +.. _NSQ: http://nsq.io/ diff --git a/__init__.py b/__init__.py deleted file mode 100644 index b3073e8..0000000 --- a/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -from .reader import Reader -from .nsqd import Nsqd -from .lookupd import Lookupd -from .message import Message -from .backofftimer import BackoffTimer diff --git a/docs/Makefile b/docs/Makefile new file mode 100644 index 0000000..caad866 --- /dev/null +++ b/docs/Makefile @@ -0,0 +1,177 @@ +# Makefile for Sphinx documentation +# + +# You can set these variables from the command line. +SPHINXOPTS = +SPHINXBUILD = sphinx-build +PAPER = +BUILDDIR = _build + +# User-friendly check for sphinx-build +ifeq ($(shell which $(SPHINXBUILD) >/dev/null 2>&1; echo $$?), 1) +$(error The '$(SPHINXBUILD)' command was not found. Make sure you have Sphinx installed, then set the SPHINXBUILD environment variable to point to the full path of the '$(SPHINXBUILD)' executable. Alternatively you can add the directory with the executable to your PATH. If you don't have Sphinx installed, grab it from http://sphinx-doc.org/) +endif + +# Internal variables. +PAPEROPT_a4 = -D latex_paper_size=a4 +PAPEROPT_letter = -D latex_paper_size=letter +ALLSPHINXOPTS = -d $(BUILDDIR)/doctrees $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) . +# the i18n builder cannot share the environment and doctrees with the others +I18NSPHINXOPTS = $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) . + +.PHONY: help clean html dirhtml singlehtml pickle json htmlhelp qthelp devhelp epub latex latexpdf text man changes linkcheck doctest gettext + +help: + @echo "Please use \`make ' where is one of" + @echo " html to make standalone HTML files" + @echo " dirhtml to make HTML files named index.html in directories" + @echo " singlehtml to make a single large HTML file" + @echo " pickle to make pickle files" + @echo " json to make JSON files" + @echo " htmlhelp to make HTML files and a HTML help project" + @echo " qthelp to make HTML files and a qthelp project" + @echo " devhelp to make HTML files and a Devhelp project" + @echo " epub to make an epub" + @echo " latex to make LaTeX files, you can set PAPER=a4 or PAPER=letter" + @echo " latexpdf to make LaTeX files and run them through pdflatex" + @echo " latexpdfja to make LaTeX files and run them through platex/dvipdfmx" + @echo " text to make text files" + @echo " man to make manual pages" + @echo " texinfo to make Texinfo files" + @echo " info to make Texinfo files and run them through makeinfo" + @echo " gettext to make PO message catalogs" + @echo " changes to make an overview of all changed/added/deprecated items" + @echo " xml to make Docutils-native XML files" + @echo " pseudoxml to make pseudoxml-XML files for display purposes" + @echo " linkcheck to check all external links for integrity" + @echo " doctest to run all doctests embedded in the documentation (if enabled)" + +clean: + rm -rf $(BUILDDIR) + +html: + $(SPHINXBUILD) -b html $(ALLSPHINXOPTS) $(BUILDDIR)/html + @echo + @echo "Build finished. The HTML pages are in $(BUILDDIR)/html." + +dirhtml: + $(SPHINXBUILD) -b dirhtml $(ALLSPHINXOPTS) $(BUILDDIR)/dirhtml + @echo + @echo "Build finished. The HTML pages are in $(BUILDDIR)/dirhtml." + +singlehtml: + $(SPHINXBUILD) -b singlehtml $(ALLSPHINXOPTS) $(BUILDDIR)/singlehtml + @echo + @echo "Build finished. The HTML page is in $(BUILDDIR)/singlehtml." + +pickle: + $(SPHINXBUILD) -b pickle $(ALLSPHINXOPTS) $(BUILDDIR)/pickle + @echo + @echo "Build finished; now you can process the pickle files." + +json: + $(SPHINXBUILD) -b json $(ALLSPHINXOPTS) $(BUILDDIR)/json + @echo + @echo "Build finished; now you can process the JSON files." + +htmlhelp: + $(SPHINXBUILD) -b htmlhelp $(ALLSPHINXOPTS) $(BUILDDIR)/htmlhelp + @echo + @echo "Build finished; now you can run HTML Help Workshop with the" \ + ".hhp project file in $(BUILDDIR)/htmlhelp." + +qthelp: + $(SPHINXBUILD) -b qthelp $(ALLSPHINXOPTS) $(BUILDDIR)/qthelp + @echo + @echo "Build finished; now you can run "qcollectiongenerator" with the" \ + ".qhcp project file in $(BUILDDIR)/qthelp, like this:" + @echo "# qcollectiongenerator $(BUILDDIR)/qthelp/complexity.qhcp" + @echo "To view the help file:" + @echo "# assistant -collectionFile $(BUILDDIR)/qthelp/complexity.qhc" + +devhelp: + $(SPHINXBUILD) -b devhelp $(ALLSPHINXOPTS) $(BUILDDIR)/devhelp + @echo + @echo "Build finished." + @echo "To view the help file:" + @echo "# mkdir -p $$HOME/.local/share/devhelp/complexity" + @echo "# ln -s $(BUILDDIR)/devhelp $$HOME/.local/share/devhelp/complexity" + @echo "# devhelp" + +epub: + $(SPHINXBUILD) -b epub $(ALLSPHINXOPTS) $(BUILDDIR)/epub + @echo + @echo "Build finished. The epub file is in $(BUILDDIR)/epub." + +latex: + $(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex + @echo + @echo "Build finished; the LaTeX files are in $(BUILDDIR)/latex." + @echo "Run \`make' in that directory to run these through (pdf)latex" \ + "(use \`make latexpdf' here to do that automatically)." + +latexpdf: + $(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex + @echo "Running LaTeX files through pdflatex..." + $(MAKE) -C $(BUILDDIR)/latex all-pdf + @echo "pdflatex finished; the PDF files are in $(BUILDDIR)/latex." + +latexpdfja: + $(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex + @echo "Running LaTeX files through platex and dvipdfmx..." + $(MAKE) -C $(BUILDDIR)/latex all-pdf-ja + @echo "pdflatex finished; the PDF files are in $(BUILDDIR)/latex." + +text: + $(SPHINXBUILD) -b text $(ALLSPHINXOPTS) $(BUILDDIR)/text + @echo + @echo "Build finished. The text files are in $(BUILDDIR)/text." + +man: + $(SPHINXBUILD) -b man $(ALLSPHINXOPTS) $(BUILDDIR)/man + @echo + @echo "Build finished. The manual pages are in $(BUILDDIR)/man." + +texinfo: + $(SPHINXBUILD) -b texinfo $(ALLSPHINXOPTS) $(BUILDDIR)/texinfo + @echo + @echo "Build finished. The Texinfo files are in $(BUILDDIR)/texinfo." + @echo "Run \`make' in that directory to run these through makeinfo" \ + "(use \`make info' here to do that automatically)." + +info: + $(SPHINXBUILD) -b texinfo $(ALLSPHINXOPTS) $(BUILDDIR)/texinfo + @echo "Running Texinfo files through makeinfo..." + make -C $(BUILDDIR)/texinfo info + @echo "makeinfo finished; the Info files are in $(BUILDDIR)/texinfo." + +gettext: + $(SPHINXBUILD) -b gettext $(I18NSPHINXOPTS) $(BUILDDIR)/locale + @echo + @echo "Build finished. The message catalogs are in $(BUILDDIR)/locale." + +changes: + $(SPHINXBUILD) -b changes $(ALLSPHINXOPTS) $(BUILDDIR)/changes + @echo + @echo "The overview file is in $(BUILDDIR)/changes." + +linkcheck: + $(SPHINXBUILD) -b linkcheck $(ALLSPHINXOPTS) $(BUILDDIR)/linkcheck + @echo + @echo "Link check complete; look for any errors in the above output " \ + "or in $(BUILDDIR)/linkcheck/output.txt." + +doctest: + $(SPHINXBUILD) -b doctest $(ALLSPHINXOPTS) $(BUILDDIR)/doctest + @echo "Testing of doctests in the sources finished, look at the " \ + "results in $(BUILDDIR)/doctest/output.txt." + +xml: + $(SPHINXBUILD) -b xml $(ALLSPHINXOPTS) $(BUILDDIR)/xml + @echo + @echo "Build finished. The XML files are in $(BUILDDIR)/xml." + +pseudoxml: + $(SPHINXBUILD) -b pseudoxml $(ALLSPHINXOPTS) $(BUILDDIR)/pseudoxml + @echo + @echo "Build finished. The pseudo-XML files are in $(BUILDDIR)/pseudoxml." diff --git a/docs/authors.rst b/docs/authors.rst new file mode 100644 index 0000000..94292d0 --- /dev/null +++ b/docs/authors.rst @@ -0,0 +1 @@ +.. include:: ../AUTHORS.rst \ No newline at end of file diff --git a/docs/conf.py b/docs/conf.py new file mode 100755 index 0000000..58e2678 --- /dev/null +++ b/docs/conf.py @@ -0,0 +1,275 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# complexity documentation build configuration file, created by +# sphinx-quickstart on Tue Jul 9 22:26:36 2013. +# +# This file is execfile()d with the current directory set to its +# containing dir. +# +# Note that not all possible configuration values are present in this +# autogenerated file. +# +# All configuration values have a default; values that are commented out +# serve to show the default. + +import sys +import os + +# If extensions (or modules to document with autodoc) are in another +# directory, add these directories to sys.path here. If the directory is +# relative to the documentation root, use os.path.abspath to make it +# absolute, like shown here. +#sys.path.insert(0, os.path.abspath('.')) + +# Get the project root dir, which is the parent dir of this +cwd = os.getcwd() +project_root = os.path.dirname(cwd) + +# Insert the project root dir as the first element in the PYTHONPATH. +# This lets us ensure that the source package is imported, and that its +# version is used. +sys.path.insert(0, project_root) + +import gnsq + +# -- General configuration --------------------------------------------- + +# If your documentation needs a minimal Sphinx version, state it here. +#needs_sphinx = '1.0' + +# Add any Sphinx extension module names here, as strings. They can be +# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom ones. +extensions = ['sphinx.ext.autodoc', 'sphinx.ext.viewcode'] + +# Add any paths that contain templates here, relative to this directory. +templates_path = ['_templates'] + +# The suffix of source filenames. +source_suffix = '.rst' + +# The encoding of source files. +#source_encoding = 'utf-8-sig' + +# The master toctree document. +master_doc = 'index' + +# General information about the project. +project = u'gnsq' +copyright = u'2014, Trevor Olson' + +# The version info for the project you're documenting, acts as replacement +# for |version| and |release|, also used in various other places throughout +# the built documents. +# +# The short X.Y version. +version = gnsq.__version__ +# The full version, including alpha/beta/rc tags. +release = gnsq.__version__ + +# The language for content autogenerated by Sphinx. Refer to documentation +# for a list of supported languages. +#language = None + +# There are two options for replacing |today|: either, you set today to +# some non-false value, then it is used: +#today = '' +# Else, today_fmt is used as the format for a strftime call. +#today_fmt = '%B %d, %Y' + +# List of patterns, relative to source directory, that match files and +# directories to ignore when looking for source files. +exclude_patterns = ['_build'] + +# The reST default role (used for this markup: `text`) to use for all +# documents. +#default_role = None + +# If true, '()' will be appended to :func: etc. cross-reference text. +#add_function_parentheses = True + +# If true, the current module name will be prepended to all description +# unit titles (such as .. function::). +#add_module_names = True + +# If true, sectionauthor and moduleauthor directives will be shown in the +# output. They are ignored by default. +#show_authors = False + +# The name of the Pygments (syntax highlighting) style to use. +pygments_style = 'sphinx' + +# A list of ignored prefixes for module index sorting. +#modindex_common_prefix = [] + +# If true, keep warnings as "system message" paragraphs in the built +# documents. +#keep_warnings = False + + +# -- Options for HTML output ------------------------------------------- + +# The theme to use for HTML and HTML Help pages. See the documentation for +# a list of builtin themes. +html_theme = 'default' + +# Theme options are theme-specific and customize the look and feel of a +# theme further. For a list of options available for each theme, see the +# documentation. +#html_theme_options = {} + +# Add any paths that contain custom themes here, relative to this directory. +#html_theme_path = [] + +# The name for this set of Sphinx documents. If None, it defaults to +# " v documentation". +#html_title = None + +# A shorter title for the navigation bar. Default is the same as +# html_title. +#html_short_title = None + +# The name of an image file (relative to this directory) to place at the +# top of the sidebar. +#html_logo = None + +# The name of an image file (within the static path) to use as favicon +# of the docs. This file should be a Windows icon file (.ico) being +# 16x16 or 32x32 pixels large. +#html_favicon = None + +# Add any paths that contain custom static files (such as style sheets) +# here, relative to this directory. They are copied after the builtin +# static files, so a file named "default.css" will overwrite the builtin +# "default.css". +html_static_path = ['_static'] + +# If not '', a 'Last updated on:' timestamp is inserted at every page +# bottom, using the given strftime format. +#html_last_updated_fmt = '%b %d, %Y' + +# If true, SmartyPants will be used to convert quotes and dashes to +# typographically correct entities. +#html_use_smartypants = True + +# Custom sidebar templates, maps document names to template names. +#html_sidebars = {} + +# Additional templates that should be rendered to pages, maps page names +# to template names. +#html_additional_pages = {} + +# If false, no module index is generated. +#html_domain_indices = True + +# If false, no index is generated. +#html_use_index = True + +# If true, the index is split into individual pages for each letter. +#html_split_index = False + +# If true, links to the reST sources are added to the pages. +#html_show_sourcelink = True + +# If true, "Created using Sphinx" is shown in the HTML footer. +# Default is True. +#html_show_sphinx = True + +# If true, "(C) Copyright ..." is shown in the HTML footer. +# Default is True. +#html_show_copyright = True + +# If true, an OpenSearch description file will be output, and all pages +# will contain a tag referring to it. The value of this option +# must be the base URL from which the finished HTML is served. +#html_use_opensearch = '' + +# This is the file name suffix for HTML files (e.g. ".xhtml"). +#html_file_suffix = None + +# Output file base name for HTML help builder. +htmlhelp_basename = 'gnsqdoc' + + +# -- Options for LaTeX output ------------------------------------------ + +latex_elements = { + # The paper size ('letterpaper' or 'a4paper'). + #'papersize': 'letterpaper', + + # The font size ('10pt', '11pt' or '12pt'). + #'pointsize': '10pt', + + # Additional stuff for the LaTeX preamble. + #'preamble': '', +} + +# Grouping the document tree into LaTeX files. List of tuples +# (source start file, target name, title, author, documentclass +# [howto/manual]). +latex_documents = [ + ('index', 'gnsq.tex', + u'gnsq Documentation', + u'Trevor Olson', 'manual'), +] + +# The name of an image file (relative to this directory) to place at +# the top of the title page. +#latex_logo = None + +# For "manual" documents, if this is true, then toplevel headings +# are parts, not chapters. +#latex_use_parts = False + +# If true, show page references after internal links. +#latex_show_pagerefs = False + +# If true, show URL addresses after external links. +#latex_show_urls = False + +# Documents to append as an appendix to all manuals. +#latex_appendices = [] + +# If false, no module index is generated. +#latex_domain_indices = True + + +# -- Options for manual page output ------------------------------------ + +# One entry per manual page. List of tuples +# (source start file, name, description, authors, manual section). +man_pages = [ + ('index', 'gnsq', + u'gnsq Documentation', + [u'Trevor Olson'], 1) +] + +# If true, show URL addresses after external links. +#man_show_urls = False + + +# -- Options for Texinfo output ---------------------------------------- + +# Grouping the document tree into Texinfo files. List of tuples +# (source start file, target name, title, author, +# dir menu entry, description, category) +texinfo_documents = [ + ('index', 'gnsq', + u'gnsq Documentation', + u'Trevor Olson', + 'gnsq', + 'One line description of project.', + 'Miscellaneous'), +] + +# Documents to append as an appendix to all manuals. +#texinfo_appendices = [] + +# If false, no module index is generated. +#texinfo_domain_indices = True + +# How to display URL addresses: 'footnote', 'no', or 'inline'. +#texinfo_show_urls = 'footnote' + +# If true, do not generate a @detailmenu in the "Top" node's menu. +#texinfo_no_detailmenu = False \ No newline at end of file diff --git a/docs/contributing.rst b/docs/contributing.rst new file mode 100644 index 0000000..3bdd7dc --- /dev/null +++ b/docs/contributing.rst @@ -0,0 +1 @@ +.. include:: ../CONTRIBUTING.rst \ No newline at end of file diff --git a/docs/history.rst b/docs/history.rst new file mode 100644 index 0000000..bec23d8 --- /dev/null +++ b/docs/history.rst @@ -0,0 +1 @@ +.. include:: ../HISTORY.rst \ No newline at end of file diff --git a/docs/index.rst b/docs/index.rst new file mode 100644 index 0000000..28e1ec8 --- /dev/null +++ b/docs/index.rst @@ -0,0 +1,31 @@ +.. complexity documentation master file, created by + sphinx-quickstart on Tue Jul 9 22:26:36 2013. + You can adapt this file completely to your liking, but it should at least + contain the root `toctree` directive. + +Welcome to gnsq's documentation! +================================ + +.. include:: ../README.rst + +Contents +======== + +.. toctree:: + :maxdepth: 2 + + reader + nsqd + lookupd + message + signals + contributing + authors + history + +Indices and tables +================== + +* :ref:`genindex` +* :ref:`modindex` +* :ref:`search` diff --git a/docs/lookupd.rst b/docs/lookupd.rst new file mode 100644 index 0000000..e665ba1 --- /dev/null +++ b/docs/lookupd.rst @@ -0,0 +1,6 @@ +Nsqlookupd client +----------------- + +.. autoclass:: gnsq.Lookupd + :members: + :inherited-members: diff --git a/docs/make.bat b/docs/make.bat new file mode 100644 index 0000000..2b44764 --- /dev/null +++ b/docs/make.bat @@ -0,0 +1,242 @@ +@ECHO OFF + +REM Command file for Sphinx documentation + +if "%SPHINXBUILD%" == "" ( + set SPHINXBUILD=sphinx-build +) +set BUILDDIR=_build +set ALLSPHINXOPTS=-d %BUILDDIR%/doctrees %SPHINXOPTS% . +set I18NSPHINXOPTS=%SPHINXOPTS% . +if NOT "%PAPER%" == "" ( + set ALLSPHINXOPTS=-D latex_paper_size=%PAPER% %ALLSPHINXOPTS% + set I18NSPHINXOPTS=-D latex_paper_size=%PAPER% %I18NSPHINXOPTS% +) + +if "%1" == "" goto help + +if "%1" == "help" ( + :help + echo.Please use `make ^` where ^ is one of + echo. html to make standalone HTML files + echo. dirhtml to make HTML files named index.html in directories + echo. singlehtml to make a single large HTML file + echo. pickle to make pickle files + echo. json to make JSON files + echo. htmlhelp to make HTML files and a HTML help project + echo. qthelp to make HTML files and a qthelp project + echo. devhelp to make HTML files and a Devhelp project + echo. epub to make an epub + echo. latex to make LaTeX files, you can set PAPER=a4 or PAPER=letter + echo. text to make text files + echo. man to make manual pages + echo. texinfo to make Texinfo files + echo. gettext to make PO message catalogs + echo. changes to make an overview over all changed/added/deprecated items + echo. xml to make Docutils-native XML files + echo. pseudoxml to make pseudoxml-XML files for display purposes + echo. linkcheck to check all external links for integrity + echo. doctest to run all doctests embedded in the documentation if enabled + goto end +) + +if "%1" == "clean" ( + for /d %%i in (%BUILDDIR%\*) do rmdir /q /s %%i + del /q /s %BUILDDIR%\* + goto end +) + + +%SPHINXBUILD% 2> nul +if errorlevel 9009 ( + echo. + echo.The 'sphinx-build' command was not found. Make sure you have Sphinx + echo.installed, then set the SPHINXBUILD environment variable to point + echo.to the full path of the 'sphinx-build' executable. Alternatively you + echo.may add the Sphinx directory to PATH. + echo. + echo.If you don't have Sphinx installed, grab it from + echo.http://sphinx-doc.org/ + exit /b 1 +) + +if "%1" == "html" ( + %SPHINXBUILD% -b html %ALLSPHINXOPTS% %BUILDDIR%/html + if errorlevel 1 exit /b 1 + echo. + echo.Build finished. The HTML pages are in %BUILDDIR%/html. + goto end +) + +if "%1" == "dirhtml" ( + %SPHINXBUILD% -b dirhtml %ALLSPHINXOPTS% %BUILDDIR%/dirhtml + if errorlevel 1 exit /b 1 + echo. + echo.Build finished. The HTML pages are in %BUILDDIR%/dirhtml. + goto end +) + +if "%1" == "singlehtml" ( + %SPHINXBUILD% -b singlehtml %ALLSPHINXOPTS% %BUILDDIR%/singlehtml + if errorlevel 1 exit /b 1 + echo. + echo.Build finished. The HTML pages are in %BUILDDIR%/singlehtml. + goto end +) + +if "%1" == "pickle" ( + %SPHINXBUILD% -b pickle %ALLSPHINXOPTS% %BUILDDIR%/pickle + if errorlevel 1 exit /b 1 + echo. + echo.Build finished; now you can process the pickle files. + goto end +) + +if "%1" == "json" ( + %SPHINXBUILD% -b json %ALLSPHINXOPTS% %BUILDDIR%/json + if errorlevel 1 exit /b 1 + echo. + echo.Build finished; now you can process the JSON files. + goto end +) + +if "%1" == "htmlhelp" ( + %SPHINXBUILD% -b htmlhelp %ALLSPHINXOPTS% %BUILDDIR%/htmlhelp + if errorlevel 1 exit /b 1 + echo. + echo.Build finished; now you can run HTML Help Workshop with the ^ +.hhp project file in %BUILDDIR%/htmlhelp. + goto end +) + +if "%1" == "qthelp" ( + %SPHINXBUILD% -b qthelp %ALLSPHINXOPTS% %BUILDDIR%/qthelp + if errorlevel 1 exit /b 1 + echo. + echo.Build finished; now you can run "qcollectiongenerator" with the ^ +.qhcp project file in %BUILDDIR%/qthelp, like this: + echo.^> qcollectiongenerator %BUILDDIR%\qthelp\complexity.qhcp + echo.To view the help file: + echo.^> assistant -collectionFile %BUILDDIR%\qthelp\complexity.ghc + goto end +) + +if "%1" == "devhelp" ( + %SPHINXBUILD% -b devhelp %ALLSPHINXOPTS% %BUILDDIR%/devhelp + if errorlevel 1 exit /b 1 + echo. + echo.Build finished. + goto end +) + +if "%1" == "epub" ( + %SPHINXBUILD% -b epub %ALLSPHINXOPTS% %BUILDDIR%/epub + if errorlevel 1 exit /b 1 + echo. + echo.Build finished. The epub file is in %BUILDDIR%/epub. + goto end +) + +if "%1" == "latex" ( + %SPHINXBUILD% -b latex %ALLSPHINXOPTS% %BUILDDIR%/latex + if errorlevel 1 exit /b 1 + echo. + echo.Build finished; the LaTeX files are in %BUILDDIR%/latex. + goto end +) + +if "%1" == "latexpdf" ( + %SPHINXBUILD% -b latex %ALLSPHINXOPTS% %BUILDDIR%/latex + cd %BUILDDIR%/latex + make all-pdf + cd %BUILDDIR%/.. + echo. + echo.Build finished; the PDF files are in %BUILDDIR%/latex. + goto end +) + +if "%1" == "latexpdfja" ( + %SPHINXBUILD% -b latex %ALLSPHINXOPTS% %BUILDDIR%/latex + cd %BUILDDIR%/latex + make all-pdf-ja + cd %BUILDDIR%/.. + echo. + echo.Build finished; the PDF files are in %BUILDDIR%/latex. + goto end +) + +if "%1" == "text" ( + %SPHINXBUILD% -b text %ALLSPHINXOPTS% %BUILDDIR%/text + if errorlevel 1 exit /b 1 + echo. + echo.Build finished. The text files are in %BUILDDIR%/text. + goto end +) + +if "%1" == "man" ( + %SPHINXBUILD% -b man %ALLSPHINXOPTS% %BUILDDIR%/man + if errorlevel 1 exit /b 1 + echo. + echo.Build finished. The manual pages are in %BUILDDIR%/man. + goto end +) + +if "%1" == "texinfo" ( + %SPHINXBUILD% -b texinfo %ALLSPHINXOPTS% %BUILDDIR%/texinfo + if errorlevel 1 exit /b 1 + echo. + echo.Build finished. The Texinfo files are in %BUILDDIR%/texinfo. + goto end +) + +if "%1" == "gettext" ( + %SPHINXBUILD% -b gettext %I18NSPHINXOPTS% %BUILDDIR%/locale + if errorlevel 1 exit /b 1 + echo. + echo.Build finished. The message catalogs are in %BUILDDIR%/locale. + goto end +) + +if "%1" == "changes" ( + %SPHINXBUILD% -b changes %ALLSPHINXOPTS% %BUILDDIR%/changes + if errorlevel 1 exit /b 1 + echo. + echo.The overview file is in %BUILDDIR%/changes. + goto end +) + +if "%1" == "linkcheck" ( + %SPHINXBUILD% -b linkcheck %ALLSPHINXOPTS% %BUILDDIR%/linkcheck + if errorlevel 1 exit /b 1 + echo. + echo.Link check complete; look for any errors in the above output ^ +or in %BUILDDIR%/linkcheck/output.txt. + goto end +) + +if "%1" == "doctest" ( + %SPHINXBUILD% -b doctest %ALLSPHINXOPTS% %BUILDDIR%/doctest + if errorlevel 1 exit /b 1 + echo. + echo.Testing of doctests in the sources finished, look at the ^ +results in %BUILDDIR%/doctest/output.txt. + goto end +) + +if "%1" == "xml" ( + %SPHINXBUILD% -b xml %ALLSPHINXOPTS% %BUILDDIR%/xml + if errorlevel 1 exit /b 1 + echo. + echo.Build finished. The XML files are in %BUILDDIR%/xml. + goto end +) + +if "%1" == "pseudoxml" ( + %SPHINXBUILD% -b pseudoxml %ALLSPHINXOPTS% %BUILDDIR%/pseudoxml + if errorlevel 1 exit /b 1 + echo. + echo.Build finished. The pseudo-XML files are in %BUILDDIR%/pseudoxml. + goto end +) + +:end \ No newline at end of file diff --git a/docs/message.rst b/docs/message.rst new file mode 100644 index 0000000..d08d312 --- /dev/null +++ b/docs/message.rst @@ -0,0 +1,6 @@ +NSQ Message +----------- + +.. autoclass:: gnsq.Message + :members: + :inherited-members: diff --git a/docs/nsqd.rst b/docs/nsqd.rst new file mode 100644 index 0000000..a189002 --- /dev/null +++ b/docs/nsqd.rst @@ -0,0 +1,6 @@ +Nsqd client +----------- + +.. autoclass:: gnsq.Nsqd + :members: + :inherited-members: diff --git a/docs/reader.rst b/docs/reader.rst new file mode 100644 index 0000000..2d123ac --- /dev/null +++ b/docs/reader.rst @@ -0,0 +1,6 @@ +Reader: high-level consumer +--------------------------- + +.. autoclass:: gnsq.Reader + :members: + :inherited-members: diff --git a/docs/signals.rst b/docs/signals.rst new file mode 100644 index 0000000..b25dff5 --- /dev/null +++ b/docs/signals.rst @@ -0,0 +1,28 @@ +Signals +------- + +Both :doc:`Reader ` and :doc:`Nsqd ` classes expose various +signals provided by the `Blinker`_ library. + +Subscribing to signals +~~~~~~~~~~~~~~~~~~~~~~ + +To subscribe to a signal, you can use the +:meth:`~blinker.base.Signal.connect` method of a signal. The first +argument is the function that should be called when the signal is emitted, +the optional second argument specifies a sender. To unsubscribe from a +signal, you can use the :meth:`~blinker.base.Signal.disconnect` method. :: + + def error_handler(reader, error): + print 'Got on error:', error + + reader.on_error.connect(error_handler) + +You can also easily subscribe to signals by using the +:meth:`~blinker.base.NamedSignal.connect` decorator:: + + @reader.on_giving_up.connect + def handle_giving_up(reader, message): + print 'Giving up on:', message.id + +.. _Blinker: https://pypi.python.org/pypi/blinker diff --git a/gnsq/__init__.py b/gnsq/__init__.py new file mode 100644 index 0000000..37fd507 --- /dev/null +++ b/gnsq/__init__.py @@ -0,0 +1,21 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import + +from .reader import Reader +from .nsqd import Nsqd +from .lookupd import Lookupd +from .message import Message +from .backofftimer import BackoffTimer +from .version import __version__ + +__author__ = 'Trevor Olson' +__email__ = 'trevor@heytrevor.com' +__version__ = __version__ + +__all__ = [ + 'Reader', + 'Nsqd', + 'Lookupd', + 'Message', + 'BackoffTimer', +] diff --git a/backofftimer.py b/gnsq/backofftimer.py similarity index 73% rename from backofftimer.py rename to gnsq/backofftimer.py index 1f3911c..7e8a60a 100644 --- a/backofftimer.py +++ b/gnsq/backofftimer.py @@ -1,19 +1,25 @@ -from random import randint +# -*- coding: utf-8 -*- +from __future__ import absolute_import +import random + class BackoffTimer(object): def __init__(self, ratio=1, max_interval=None, min_interval=None): - self.c = 0 + self.c = 0 self.ratio = ratio self.max_interval = max_interval self.min_interval = min_interval + def is_reset(self): + return self.c == 0 + def reset(self): self.c = 0 return self def success(self): - self.c = max(self.c-1, 0) + self.c = max(self.c - 1, 0) return self def failure(self): @@ -22,7 +28,7 @@ def failure(self): def get_interval(self): k = pow(2, self.c) - 1 - interval = randint(0, k) * self.ratio + interval = random.random() * k * self.ratio if self.max_interval is not None: interval = min(interval, self.max_interval) diff --git a/errors.py b/gnsq/errors.py similarity index 60% rename from errors.py rename to gnsq/errors.py index 2424882..8e9b91d 100644 --- a/errors.py +++ b/gnsq/errors.py @@ -1,80 +1,106 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import import socket + class NSQException(Exception): pass + class NSQRequeueMessage(NSQException): pass + class NSQNoConnections(NSQException): pass + +class NSQHttpError(NSQException): + pass + + class NSQSocketError(socket.error, NSQException): pass + class NSQFrameError(NSQException): pass + class NSQErrorCode(NSQException): - pass + fatal = True + class NSQInvalid(NSQErrorCode): """E_INVALID""" pass + class NSQBadBody(NSQErrorCode): """E_BAD_BODY""" pass + class NSQBadTopic(NSQErrorCode): """E_BAD_TOPIC""" pass + class NSQBadChannel(NSQErrorCode): """E_BAD_CHANNEL""" pass + class NSQBadMessage(NSQErrorCode): """E_BAD_MESSAGE""" pass + class NSQPutFailed(NSQErrorCode): """E_PUT_FAILED""" pass + class NSQPubFailed(NSQErrorCode): """E_PUB_FAILED""" + class NSQMPubFailed(NSQErrorCode): """E_MPUB_FAILED""" + class NSQFinishFailed(NSQErrorCode): """E_FIN_FAILED""" - pass + fatal = False + class NSQRequeueFailed(NSQErrorCode): """E_REQ_FAILED""" - pass + fatal = False + class NSQTouchFailed(NSQErrorCode): """E_TOUCH_FAILED""" - pass + fatal = False + ERROR_CODES = { - 'E_INVALID': NSQInvalid, - 'E_BAD_BODY': NSQBadBody, - 'E_BAD_TOPIC': NSQBadTopic, - 'E_BAD_CHANNEL': NSQBadChannel, - 'E_BAD_MESSAGE': NSQBadMessage, - 'E_PUT_FAILED': NSQPutFailed, - 'E_PUB_FAILED': NSQPubFailed, - 'E_MPUB_FAILED': NSQMPubFailed, - 'E_FINISH_FAILED': NSQFinishFailed, - 'E_FIN_FAILED': NSQFinishFailed, + 'E_INVALID': NSQInvalid, + 'E_BAD_BODY': NSQBadBody, + 'E_BAD_TOPIC': NSQBadTopic, + 'E_BAD_CHANNEL': NSQBadChannel, + 'E_BAD_MESSAGE': NSQBadMessage, + 'E_PUT_FAILED': NSQPutFailed, + 'E_PUB_FAILED': NSQPubFailed, + 'E_MPUB_FAILED': NSQMPubFailed, + 'E_FINISH_FAILED': NSQFinishFailed, + 'E_FIN_FAILED': NSQFinishFailed, 'E_REQUEUE_FAILED': NSQRequeueFailed, - 'E_REQ_FAILED': NSQRequeueFailed, - 'E_TOUCH_FAILED': NSQTouchFailed + 'E_REQ_FAILED': NSQRequeueFailed, + 'E_TOUCH_FAILED': NSQTouchFailed } + def make_error(error_code): - return ERROR_CODES.get(error_code, NSQErrorCode)(error_code) + parts = error_code.split(None, 1) + return ERROR_CODES.get(parts[0], NSQErrorCode)(parts[-1]) diff --git a/gnsq/httpclient.py b/gnsq/httpclient.py new file mode 100644 index 0000000..6013697 --- /dev/null +++ b/gnsq/httpclient.py @@ -0,0 +1,53 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import + +import urllib3 + +try: + import simplejson as json +except ImportError: + import json # pyflakes.ignore + +from .errors import NSQHttpError + + +class HTTPClient(object): + base_url = None + __http = None + + @property + def http(self): + if self.__http is None: + self.__http = urllib3.connection_from_url(url=self.base_url) + return self.__http + + def http_request(self, method, url, **kwargs): + response = self.http.request_encode_url(method, url, **kwargs) + + if 'application/json' in response.getheader('content-type', ''): + return self._http_check_json(response) + + return self._http_check(response) + + def _http_check(self, response): + if response.status != 200: + raise NSQHttpError('http error <%s>' % response.status) + return response.data + + def _http_check_json(self, response): + try: + data = json.loads(response.data) + except ValueError: + return self._http_check(response) + + if response.status != 200: + status_txt = data.get('status_txt', 'http error') + raise NSQHttpError('%s <%s>' % (status_txt, response.status)) + + return data['data'] + + def http_get(self, url, **kwargs): + return self.http_request('GET', url, **kwargs) + + def http_post(self, url, **kwargs): + return self.http_request('POST', url, **kwargs) diff --git a/gnsq/lookupd.py b/gnsq/lookupd.py new file mode 100644 index 0000000..cd078ea --- /dev/null +++ b/gnsq/lookupd.py @@ -0,0 +1,64 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import +from .httpclient import HTTPClient +from . import protocol as nsq + + +class Lookupd(HTTPClient): + """Low level client for nsqlookupd. + + :param address: nsqlookupd http address (default: http://localhost:4161/) + """ + def __init__(self, address='http://localhost:4161/'): + self.address = self.base_url = address + + def lookup(self, topic): + """Returns producers for a topic.""" + nsq.assert_valid_topic_name(topic) + return self.http_get('/lookup', fields={'topic': topic}) + + def topics(self): + """Returns all known topics.""" + return self.http_get('/topics') + + def channels(self, topic): + """Returns all known channels of a topic.""" + nsq.assert_valid_topic_name(topic) + return self.http_get('/channels', fields={'topic': topic}) + + def nodes(self): + """Returns all known nsqd.""" + return self.http_get('/nodes') + + def delete_topic(self, topic): + """Deletes an existing topic.""" + nsq.assert_valid_topic_name(topic) + return self.http_post('/delete_topic', fields={'topic': topic}) + + def delete_channel(self, topic, channel): + """Deletes an existing channel of an existing topic.""" + nsq.assert_valid_topic_name(topic) + nsq.assert_valid_channel_name(channel) + return self.http_post( + url='/delete_channel', + fields={'topic': topic, 'channel': channel}, + ) + + def tombstone_topic_producer(self, topic, node): + """Tombstones a specific producer of an existing topic.""" + nsq.assert_valid_topic_name(topic) + return self.http_post( + url='/tombstone_topic_producer', + fields={'topic': topic, 'node': node}, + ) + + def ping(self): + """Monitoring endpoint. + + :returns: should return `"OK"`, otherwise raises an exception. + """ + return self.http_get('/ping') + + def info(self): + """Returns version information.""" + return self.http_get('/info') diff --git a/gnsq/message.py b/gnsq/message.py new file mode 100644 index 0000000..13eeb97 --- /dev/null +++ b/gnsq/message.py @@ -0,0 +1,67 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import +import blinker +from .errors import NSQException + + +class Message(object): + """A class representing a message received from nsqd. + + **Signals:** + + .. data:: on_finish(reader, message) + :noindex: + + Sent when successfully finished. + + .. data:: on_requeue(reader, message, timeout) + :noindex: + + Sent when requeued. + + .. data:: on_touch(reader, message) + :noindex: + + Sent when touched. + """ + + def __init__(self, timestamp, attempts, id, body): + self.timestamp = timestamp + self.attempts = attempts + self.id = id + self.body = body + + self._has_responded = False + self.on_finish = blinker.Signal() + self.on_requeue = blinker.Signal() + self.on_touch = blinker.Signal() + + def has_responded(self): + """Returns whether or not this message has been responded to.""" + return self._has_responded + + def finish(self): + """ + Respond to nsqd that you’ve processed this message successfully + (or would like to silently discard it). + """ + if self._has_responded: + raise NSQException('already responded') + self._has_responded = True + self.on_finish.send(self) + + def requeue(self, time_ms=0): + """ + Respond to nsqd that you’ve failed to process this message successfully + (and would like it to be requeued). + """ + if self._has_responded: + raise NSQException('already responded') + self._has_responded = True + self.on_requeue.send(self, timeout=time_ms) + + def touch(self): + """Respond to nsqd that you need more time to process the message.""" + if self._has_responded: + raise NSQException('already responded') + self.on_touch.send(self) diff --git a/gnsq/nsqd.py b/gnsq/nsqd.py new file mode 100644 index 0000000..c3e117e --- /dev/null +++ b/gnsq/nsqd.py @@ -0,0 +1,524 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import + +import blinker +import time +from gevent import socket + +try: + import simplejson as json +except ImportError: + import json # pyflakes.ignore + +from . import protocol as nsq +from . import errors + +from .message import Message +from .httpclient import HTTPClient +from .states import INIT, CONNECTED, DISCONNECTED +from .stream import Stream +from .version import __version__ + +HOSTNAME = socket.gethostname() +SHORTNAME = HOSTNAME.split('.')[0] +USERAGENT = 'gnsq/%s' % __version__ + + +class Nsqd(HTTPClient): + """Low level object representing a TCP or HTTP connection to nsqd. + + **Signals:** + + .. data:: on_message(conn, message) + :noindex: + + Sent when a message frame is received. + + .. data:: on_finish(conn, message_id) + :noindex: + + Sent when a message is successfully finished. + + .. data:: on_requeue(conn, message_id, timeout) + :noindex: + + Sent when a message is has been requeued. + + .. data:: on_response(conn, response) + :noindex: + + Sent when a response frame is received. + + .. data:: on_error(conn, error) + :noindex: + + Sent when an error frame is received. + + .. data:: on_close(conn) + :noindex: + + Sent when the stream is closed. + + :param address: the host or ip address of the nsqd + + :param tcp_port: the nsqd tcp port to connect to + + :param http_port: the nsqd http port to connect to + + :param timeout: the timeout for read/write operations (in seconds) + + :param client_id: an identifier used to disambiguate this client (defaults + to the first part of the hostname) + + :param hostname: the hostname where the client is deployed (defaults to the + clients hostname) + + :param heartbeat_interval: the amount of time in seconds to negotiate with + the connected producers to send heartbeats (requires nsqd 0.2.19+) + + :param output_buffer_size: size of the buffer (in bytes) used by nsqd for + buffering writes to this connection + + :param output_buffer_timeout: timeout (in ms) used by nsqd before flushing + buffered writes (set to 0 to disable). Warning: configuring clients with + an extremely low (< 25ms) output_buffer_timeout has a significant effect + on nsqd CPU usage (particularly with > 50 clients connected). + + :param tls_v1: enable TLS v1 encryption (requires nsqd 0.2.22+) + + :param tls_options: dictionary of options to pass to `ssl.wrap_socket() + `_ + + :param snappy: enable Snappy stream compression (requires nsqd 0.2.23+) + + :param deflate: enable deflate stream compression (requires nsqd 0.2.23+) + + :param deflate_level: configure the deflate compression level for this + connection (requires nsqd 0.2.23+) + + :param sample_rate: take only a sample of the messages being sent to the + client. Not setting this or setting it to 0 will ensure you get all the + messages destined for the client. Sample rate can be greater than 0 or + less than 100 and the client will receive that percentage of the message + traffic. (requires nsqd 0.2.25+) + + :param user_agent: a string identifying the agent for this client in the + spirit of HTTP (default: ``/``) (requires + nsqd 0.2.25+) + """ + def __init__( + self, + address='127.0.0.1', + tcp_port=4150, + http_port=4151, + timeout=60.0, + client_id=None, + hostname=None, + heartbeat_interval=30, + output_buffer_size=16 * 1024, + output_buffer_timeout=250, + tls_v1=False, + tls_options=None, + snappy=False, + deflate=False, + deflate_level=6, + sample_rate=0, + user_agent=USERAGENT, + ): + self.address = address + self.tcp_port = tcp_port + self.http_port = http_port + self.timeout = timeout + + self.client_id = client_id or SHORTNAME + self.hostname = hostname or HOSTNAME + self.heartbeat_interval = 1000 * heartbeat_interval + self.output_buffer_size = output_buffer_size + self.output_buffer_timeout = output_buffer_timeout + self.tls_v1 = tls_v1 + self.tls_options = tls_options + self.snappy = snappy + self.deflate = deflate + self.deflate_level = deflate_level + self.sample_rate = sample_rate + self.user_agent = user_agent + + self.state = INIT + self.last_response = time.time() + self.last_message = time.time() + self.last_ready = 0 + self.ready_count = 0 + self.in_flight = 0 + self.max_ready_count = 2500 + + self.on_response = blinker.Signal() + self.on_error = blinker.Signal() + self.on_message = blinker.Signal() + self.on_finish = blinker.Signal() + self.on_requeue = blinker.Signal() + self.on_close = blinker.Signal() + + self._frame_handlers = { + nsq.FRAME_TYPE_RESPONSE: self.handle_response, + nsq.FRAME_TYPE_ERROR: self.handle_error, + nsq.FRAME_TYPE_MESSAGE: self.handle_message + } + + @property + def is_connected(self): + """Check if the client is currently connected.""" + return self.state == CONNECTED + + def connect(self): + """Initialize connection to the nsqd.""" + if self.state not in (INIT, DISCONNECTED): + return + + stream = Stream(self.address, self.tcp_port, self.timeout) + stream.connect() + + self.stream = stream + self.state = CONNECTED + self.send(nsq.MAGIC_V2) + + def close_stream(self): + """Close the underlying socket.""" + if not self.is_connected: + return + + self.stream.close() + self.state = DISCONNECTED + self.on_close.send(self) + + def send(self, data, async=False): + try: + return self.stream.send(data, async) + except Exception: + self.close_stream() + raise + + def _read_response(self): + try: + size = nsq.unpack_size(self.stream.read(4)) + return self.stream.read(size) + except Exception: + self.close_stream() + raise + + def read_response(self): + """Read an individual response from nsqd. + + :returns: tuple of the frame type and the processed data. + """ + + response = self._read_response() + frame, data = nsq.unpack_response(response) + self.last_response = time.time() + + if frame not in self._frame_handlers: + raise errors.NSQFrameError('unknown frame %d' % frame) + + frame_handler = self._frame_handlers[frame] + processed_data = frame_handler(data) + + return frame, processed_data + + def handle_response(self, data): + if data == nsq.HEARTBEAT: + self.nop() + + self.on_response.send(self, response=data) + return data + + def handle_error(self, data): + error = errors.make_error(data) + self.on_error.send(self, error=error) + + if error.fatal: + self.close_stream() + + return error + + def handle_message(self, data): + self.last_message = time.time() + self.ready_count -= 1 + self.in_flight += 1 + + message = Message(*nsq.unpack_message(data)) + message.on_finish.connect(self.handle_finish) + message.on_requeue.connect(self.handle_requeue) + message.on_touch.connect(self.handle_touch) + + self.on_message.send(self, message=message) + return message + + def handle_finish(self, message): + self.finish(message.id) + + def handle_requeue(self, message, timeout): + self.requeue(message.id, timeout) + + def handle_touch(self, message): + self.touch(message.id) + + def finish_inflight(self): + self.in_flight -= 1 + + def listen(self): + """Listen to incoming responses until the connection closes.""" + while self.is_connected: + self.read_response() + + def upgrade_to_tls(self): + self.stream.upgrade_to_tls(**self.tls_options) + + def upgrade_to_snappy(self): + self.stream.upgrade_to_snappy() + + def upgrade_to_defalte(self): + self.stream.upgrade_to_defalte(self.deflate_level) + + def identify(self): + """Update client metadata on the server and negotiate features. + + :returns: nsqd response data if there was feature negotiation, + otherwise `None` + """ + self.send(nsq.identify({ + # nsqd <0.2.28 + 'short_id': self.client_id, + 'long_id': self.hostname, + + # nsqd 0.2.28+ + 'client_id': self.client_id, + 'hostname': self.hostname, + + # nsqd 0.2.19+ + 'feature_negotiation': True, + 'heartbeat_interval': self.heartbeat_interval, + + # nsqd 0.2.21+ + 'output_buffer_size': self.output_buffer_size, + 'output_buffer_timeout': self.output_buffer_timeout, + + # nsqd 0.2.22+ + 'tls_v1': self.tls_v1, + + # nsqd 0.2.23+ + 'snappy': self.snappy, + 'deflate': self.deflate, + 'deflate_level': self.deflate_level, + + # nsqd nsqd 0.2.25+ + 'sample_rate': self.sample_rate, + 'user_agent': self.user_agent, + })) + + frame, data = self.read_response() + + if frame == nsq.FRAME_TYPE_ERROR: + raise data + + if data == 'OK': + return + + try: + data = json.loads(data) + + except ValueError: + self.close_stream() + msg = 'failed to parse IDENTIFY response JSON from nsqd: %r' + raise errors.NSQException(msg % data) + + self.max_ready_count = data.get('max_rdy_count', self.max_ready_count) + + if self.tls_v1 and data.get('tls_v1'): + self.upgrade_to_tls() + + elif self.snappy and data.get('snappy'): + self.upgrade_to_snappy() + + elif self.deflate and data.get('deflate'): + self.deflate_level = data.get('deflate_level', self.deflate_level) + self.upgrade_to_defalte() + + return data + + def subscribe(self, topic, channel): + """Subscribe to a nsq `topic` and `channel`.""" + self.send(nsq.subscribe(topic, channel)) + + def publish_tcp(self, topic, data): + """Publish a message to the given topic over tcp.""" + self.send(nsq.publish(topic, data)) + + def multipublish_tcp(self, topic, messages): + """Publish an iterable of messages to the given topic over tcp.""" + self.send(nsq.multipublish(topic, messages)) + + def ready(self, count): + """Indicate you are ready to receive `count` messages.""" + self.last_ready = count + self.ready_count = count + self.send(nsq.ready(count)) + + def finish(self, message_id): + """Finish a message (indicate successful processing).""" + self.send(nsq.finish(message_id)) + self.finish_inflight() + self.on_finish.send(self, message_id=message_id) + + def requeue(self, message_id, timeout=0): + """Re-queue a message (indicate failure to process).""" + self.send(nsq.requeue(message_id, timeout)) + self.finish_inflight() + self.on_requeue.send(self, message_id=message_id, timeout=timeout) + + def touch(self, message_id): + """Reset the timeout for an in-flight message.""" + self.send(nsq.touch(message_id)) + + def close(self): + """Indicate no more messages should be sent.""" + self.send(nsq.close()) + + def nop(self): + """Send no-op to nsqd. Used to keep connection alive.""" + self.send(nsq.nop()) + + @property + def base_url(self): + return 'http://%s:%s/' % (self.address, self.http_port) + + def publish_http(self, topic, data): + """Publish a message to the given topic over http.""" + nsq.assert_valid_topic_name(topic) + return self.http_post('/put', fields={'topic': topic}, body=data) + + def _validate_http_mpub(self, message): + if '\n' not in message: + return message + + error = 'newlines are not allowed in http multipublish' + raise errors.NSQException(error) + + def multipublish_http(self, topic, messages): + """Publish an iterable of messages to the given topic over http.""" + nsq.assert_valid_topic_name(topic) + return self.http_post( + url='/mput', + fields={'topic': topic}, + body='\n'.join(self._validate_http_mpub(m) for m in messages) + ) + + def create_topic(self, topic): + """Create a topic.""" + nsq.assert_valid_topic_name(topic) + return self.http_post('/create_topic', fields={'topic': topic}) + + def delete_topic(self, topic): + """Delete a topic.""" + nsq.assert_valid_topic_name(topic) + return self.http_post('/delete_topic', fields={'topic': topic}) + + def create_channel(self, topic, channel): + """Create a channel for an existing topic.""" + nsq.assert_valid_topic_name(topic) + nsq.assert_valid_channel_name(channel) + return self.http_post( + url='/create_channel', + fields={'topic': topic, 'channel': channel}, + ) + + def delete_channel(self, topic, channel): + """Delete an existing channel for an existing topic.""" + nsq.assert_valid_topic_name(topic) + nsq.assert_valid_channel_name(channel) + return self.http_post( + url='/delete_channel', + fields={'topic': topic, 'channel': channel}, + ) + + def empty_topic(self, topic): + """Empty all the queued messages for an existing topic.""" + nsq.assert_valid_topic_name(topic) + return self.http_post('/empty_topic', fields={'topic': topic}) + + def empty_channel(self, topic, channel): + """Empty all the queued messages for an existing channel.""" + nsq.assert_valid_topic_name(topic) + nsq.assert_valid_channel_name(channel) + return self.http_post( + url='/empty_channel', + fields={'topic': topic, 'channel': channel}, + ) + + def pause_channel(self, topic, channel): + """Pause message flow to all channels on an existing topic. + + Messages will queue at topic. + """ + nsq.assert_valid_topic_name(topic) + nsq.assert_valid_channel_name(channel) + return self.http_post( + url='/pause_channel', + fields={'topic': topic, 'channel': channel}, + ) + + def unpause_channel(self, topic, channel): + """Resume message flow to channels of an existing, paused, topic.""" + nsq.assert_valid_topic_name(topic) + nsq.assert_valid_channel_name(channel) + return self.http_post( + url='/unpause_channel', + fields={'topic': topic, 'channel': channel}, + ) + + def stats(self): + """Return internal instrumented statistics.""" + return self.http_get('/stats', fields={'format': 'json'}) + + def ping(self): + """Monitoring endpoint. + + :returns: should return `"OK"`, otherwise raises an exception. + """ + return self.http_get('/ping') + + def info(self): + """Returns version information.""" + return self.http_get('/info') + + def publish(self, topic, data): + """Publish a message. + + If connected, the message will be sent over tcp. Otherwise it will + fall back to http. + """ + if self.is_connected: + return self.publish_tcp(topic, data) + else: + return self.publish_http(topic, data) + + def multipublish(self, topic, messages): + """Publish an iterable of messages in one roundtrip. + + If connected, the messages will be sent over tcp. Otherwise it will + fall back to http. + """ + if self.is_connected: + return self.multipublish_tcp(topic, messages) + else: + return self.multipublish_http(topic, messages) + + def __str__(self): + return self.address + ':' + str(self.tcp_port) + + def __hash__(self): + return hash(str(self)) + + def __eq__(self, other): + return isinstance(other, Nsqd) and str(self) == str(other) + + def __cmp__(self, other): + return hash(self) - hash(other) diff --git a/protocal.py b/gnsq/protocol.py similarity index 61% rename from protocal.py rename to gnsq/protocol.py index a3c1ceb..2c41dc5 100644 --- a/protocal.py +++ b/gnsq/protocol.py @@ -1,6 +1,14 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import + import re import struct -import json + +try: + import simplejson as json +except ImportError: + import json # pyflakes.ignore + __all__ = [ 'MAGIC_V2', @@ -20,97 +28,138 @@ ] MAGIC_V2 = ' V2' -NEWLINE = '\n' +NEWLINE = '\n' +HEARTBEAT = '_heartbeat_' FRAME_TYPE_RESPONSE = 0 -FRAME_TYPE_ERROR = 1 -FRAME_TYPE_MESSAGE = 2 +FRAME_TYPE_ERROR = 1 +FRAME_TYPE_MESSAGE = 2 # # Helpers # -TOPIC_NAME_RE = re.compile(r'^[\.a-zA-Z0-9_-]+$') +TOPIC_NAME_RE = re.compile(r'^[\.a-zA-Z0-9_-]+$') CHANNEL_NAME_RE = re.compile(r'^[\.a-zA-Z0-9_-]+(#ephemeral)?$') + def valid_topic_name(topic): if not 0 < len(topic) < 33: return False return bool(TOPIC_NAME_RE.match(topic)) + def valid_channel_name(channel): if not 0 < len(channel) < 33: return False return bool(CHANNEL_NAME_RE.match(channel)) + def assert_valid_topic_name(topic): - assert valid_topic_name(topic) + if valid_topic_name(topic): + return + raise ValueError('invalid topic name') + def assert_valid_channel_name(channel): - assert valid_channel_name(channel) + if valid_channel_name(channel): + return + raise ValueError('invalid channel name') + # # Responses # def unpack_size(data): - assert len(data) == 4 return struct.unpack('>l', data)[0] + def unpack_response(data): return unpack_size(data[:4]), data[4:] + def unpack_message(data): - timestamp = struct.unpack('>q', data[:8])[0] - attempts = struct.unpack('>h', data[8:10])[0] + timestamp = struct.unpack('>q', data[:8])[0] + attempts = struct.unpack('>h', data[8:10])[0] message_id = data[10:26] - body = data[26:] + body = data[26:] return timestamp, attempts, message_id, body + # # Commands # +def _packsize(data): + return struct.pack('>l', len(data)) + + def _packbody(body): if body is None: return '' - return struct.pack('>l', len(body)) + body + if not isinstance(body, str): + raise TypeError('message body must be a byte string') + return _packsize(body) + body + + +def _encode_param(data): + if not isinstance(data, unicode): + return data + return data.encode('utf-8') + def _command(cmd, body, *params): + params = tuple(_encode_param(p) for p in params) return ''.join((' '.join((cmd,) + params), NEWLINE, _packbody(body))) + def identify(data): return _command('IDENTIFY', json.dumps(data)) + def subscribe(topic_name, channel_name): assert_valid_topic_name(topic_name) assert_valid_channel_name(channel_name) return _command('SUB', None, topic_name, channel_name) + def publish(topic_name, data): assert_valid_topic_name(topic_name) return _command('PUB', data, topic_name) + def multipublish(topic_name, messages): assert_valid_topic_name(topic_name) data = ''.join(_packbody(m) for m in messages) - return _command('MPUB', data, topic_name) + return _command('MPUB', _packsize(messages) + data, topic_name) + def ready(count): - assert isinstance(count, int), "ready count must be an integer" - assert count >= 0, "ready count cannot be negative" + if not isinstance(count, int): + raise TypeError('ready count must be an integer') + + if count < 0: + raise ValueError('ready count cannot be negative') + return _command('RDY', None, str(count)) + def finish(message_id): return _command('FIN', None, message_id) + def requeue(message_id, timeout=0): - assert isinstance(timeout, int), "requeue timeout must be an integer" + if not isinstance(timeout, int): + raise TypeError('requeue timeout must be an integer') return _command('REQ', None, message_id, str(timeout)) + def touch(message_id): return _command('TOUCH', None, message_id) + def close(): return _command('CLS', None) + def nop(): return _command('NOP', None) diff --git a/gnsq/reader.py b/gnsq/reader.py new file mode 100644 index 0000000..a94ef2a --- /dev/null +++ b/gnsq/reader.py @@ -0,0 +1,646 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import + +import logging +import random +import gevent +import blinker +import time + +from itertools import cycle +from collections import defaultdict +from multiprocessing import cpu_count +from gevent.queue import Queue + +from .lookupd import Lookupd +from .nsqd import Nsqd +from .backofftimer import BackoffTimer +from .states import INIT, RUNNING, BACKOFF, THROTTLED, CLOSED + +from .errors import ( + NSQException, + NSQNoConnections, + NSQRequeueMessage +) + + +class Reader(object): + """High level NSQ consumer. + + A Reader will connect to the nsqd tcp addresses or poll the provided + nsqlookupd http addresses for the configured topic and send signals to + message handlers connected to the `on_message` signal or provided by + `message_handler`. + + Messages will automatically be finished when the message handle returns + unless the readers `async` flag is set to `True`. If an exception occurs or + :class:`gnsq.errors.NSQRequeueMessage` is raised, the message will be + requeued. + + The Reader will handle backing off of failed messages up to a configurable + `max_interval` as well as automatically reconnecting to dropped connections. + + **Signals:** + + .. data:: on_message(reader, message) + :noindex: + + Sent when the reader receives a message. The `message_handler` param is + connected to this signal. + + .. data:: on_finish(reader, message_id) + :noindex: + + Sent when a message is successfully finished. + + .. data:: on_requeue(reader, message_id, timeout) + :noindex: + + Sent when a message is requeued. + + .. data:: on_giving_up(reader, message) + :noindex: + + Sent when a message has exceeded the maximum number of attempts + (`max_tries`) and will no longer be requeued. + + .. data:: on_response(reader, response) + :noindex: + + Sent when the reader receives a response frame from a connection. + + .. data:: on_error(reader, error) + :noindex: + + Sent when the reader receives an error frame from a connection. + + .. data:: on_exception(reader, message, error) + :noindex: + + Sent when an exception is caught while handling a message. + + + :param topic: specifies the desired NSQ topic + + :param channel: specifies the desired NSQ channel + + :param nsqd_tcp_addresses: a sequence of string addresses of the nsqd + instances this reader should connect to + + :param lookupd_http_addresses: a sequence of string addresses of the + nsqlookupd instances this reader should query for producers of the + specified topic + + :param name: a string that is used for logging messages (defaults to + 'gnsq.reader.topic.channel') + + :param message_handler: the callable that will be executed for each message + received + + :param async: consider the message handling to be async. The message will + not automatically be finished after the handler returns and must + manually be called + + :param max_tries: the maximum number of attempts the reader will make to + process a message after which messages will be automatically discarded + + :param max_in_flight: the maximum number of messages this reader will + pipeline for processing. this value will be divided evenly amongst the + configured/discovered nsqd producers + + :param max_concurrency: the maximum number of messages that will be handled + concurrently. Defaults to the number of nsqd connections. Setting + `max_concurrency` to `-1` will use the systems cpu count. + + :param requeue_delay: the default delay to use when requeueing a failed + message + + :param lookupd_poll_interval: the amount of time in seconds between querying + all of the supplied nsqlookupd instances. a random amount of time based + on thie value will be initially introduced in order to add jitter when + multiple readers are running + + :param lookupd_poll_jitter: The maximum fractional amount of jitter to add + to the lookupd pool loop. This helps evenly distribute requests even if + multiple consumers restart at the same time. + + :param low_ready_idle_timeout: the amount of time in seconds to wait for a + message from a producer when in a state where RDY counts are + re-distributed (ie. max_in_flight < num_producers) + + :param max_backoff_duration: the maximum time we will allow a backoff state + to last in seconds + + :param \*\*kwargs: passed to :class:`gnsq.Nsqd` initialization + """ + def __init__( + self, + topic, + channel, + nsqd_tcp_addresses=[], + lookupd_http_addresses=[], + name=None, + message_handler=None, + async=False, + max_tries=5, + max_in_flight=1, + max_concurrency=0, + requeue_delay=0, + lookupd_poll_interval=60, + lookupd_poll_jitter=0.3, + low_ready_idle_timeout=10, + max_backoff_duration=128, + **kwargs + ): + if not nsqd_tcp_addresses and not lookupd_http_addresses: + raise ValueError('must specify at least on nsqd or lookupd') + + nsqd_tcp_addresses = self._get_nsqds(nsqd_tcp_addresses) + lookupd_http_addresses = self._get_lookupds(lookupd_http_addresses) + random.shuffle(lookupd_http_addresses) + + self.nsqd_tcp_addresses = nsqd_tcp_addresses + self.lookupds = [Lookupd(a) for a in lookupd_http_addresses] + self.iterlookupds = cycle(self.lookupds) + + self.topic = topic + self.channel = channel + self.async = async + self.max_tries = max_tries + self.max_in_flight = max_in_flight + self.requeue_delay = requeue_delay + self.lookupd_poll_interval = lookupd_poll_interval + self.lookupd_poll_jitter = lookupd_poll_jitter + self.low_ready_idle_timeout = low_ready_idle_timeout + self.max_backoff_duration = max_backoff_duration + self.conn_kwargs = kwargs + + if name: + self.name = name + else: + self.name = '%s.%s.%s' % (__name__, self.topic, self.channel) + + self._need_ready_redistributed = False + self.last_random_ready = time.time() + self.state = INIT + + self.logger = logging.getLogger(self.name) + self.conn_backoffs = defaultdict(self.create_backoff) + self.backoff = self.create_backoff() + + self.on_response = blinker.Signal() + self.on_error = blinker.Signal() + self.on_message = blinker.Signal() + self.on_finish = blinker.Signal() + self.on_requeue = blinker.Signal() + self.on_giving_up = blinker.Signal() + self.on_exception = blinker.Signal() + + if message_handler is not None: + self.on_message.connect(message_handler) + + if max_concurrency < 0: + self.max_concurrency = cpu_count() + else: + self.max_concurrency = max_concurrency + + if max_concurrency: + self.queue = Queue() + else: + self.queue = None + + self.conns = set() + self.pending = set() + + self.workers = [] + self.conn_workers = {} + + def _get_nsqds(self, nsqd_tcp_addresses): + if isinstance(nsqd_tcp_addresses, basestring): + return set([nsqd_tcp_addresses]) + + elif isinstance(nsqd_tcp_addresses, (list, tuple, set)): + return set(nsqd_tcp_addresses) + + raise TypeError('nsqd_tcp_addresses must be a list, set or tuple') + + def _get_lookupds(self, lookupd_http_addresses): + if isinstance(lookupd_http_addresses, basestring): + return [lookupd_http_addresses] + + elif isinstance(lookupd_http_addresses, (list, tuple)): + lookupd_http_addresses = list(lookupd_http_addresses) + return lookupd_http_addresses + + msg = 'lookupd_http_addresses must be a list, set or tuple' + raise TypeError(msg) + + def start(self, block=True): + """Start discovering and listing to connections.""" + if self.state != INIT: + self.logger.warn('%s all ready started' % self.name) + return + + self.logger.debug('starting %s...' % self.name) + self.state = RUNNING + self.query_nsqd() + + if self.lookupds: + self.query_lookupd() + self.workers.append(gevent.spawn(self._poll_lookupd)) + + self.workers.append(gevent.spawn(self._poll_ready)) + + for _ in xrange(self.max_concurrency): + self.workers.append(gevent.spawn(self._run)) + + if block: + self.join() + + def close(self): + """Immediately close all connections and stop workers.""" + if not self.is_running: + return + + self.logger.debug('closing') + self.state = CLOSED + + for worker in self.workers: + worker.kill(block=False) + + for conn in self.conns: + conn.close_stream() + + self.logger.debug('workers: %r' % self.workers) + + def join(self, timeout=None, raise_error=False): + """Block until all connections have closed and workers stopped.""" + gevent.joinall(self.workers, timeout, raise_error) + gevent.joinall(self.conn_workers.values(), timeout, raise_error) + + @property + def is_running(self): + """Check if reader is currently running.""" + return self.state in (RUNNING, BACKOFF, THROTTLED) + + @property + def is_starved(self): + """Check if reader is currently starved for messages.""" + for conn in self.conns: + if conn.in_flight >= max(conn.last_ready * 0.85, 1): + return True + return False + + @property + def connection_max_in_flight(self): + return max(1, self.max_in_flight / max(1, len(self.conns))) + + @property + def total_ready_count(self): + return sum(c.ready_count for c in self.conns) + + @property + def total_in_flight(self): + return sum(c.in_flight for c in self.conns) + + @property + def total_in_flight_or_ready(self): + return self.total_in_flight + self.total_ready_count + + def send_ready(self, conn, count): + if self.state in (BACKOFF, CLOSED): + return + + if self.state == THROTTLED and self.total_in_flight_or_ready: + return + + if (self.total_in_flight_or_ready + count) > self.max_in_flight: + if not (conn.ready_count or conn.in_flight): + gevent.spawn_later(5, self.send_ready, conn, count) + return + + conn.ready(count) + + def query_nsqd(self): + self.logger.debug('querying nsqd...') + for address in self.nsqd_tcp_addresses: + address, port = address.split(':') + conn = Nsqd(address, int(port), **self.conn_kwargs) + self.connect_to_nsqd(conn) + + def query_lookupd(self): + self.logger.debug('querying lookupd...') + lookupd = self.iterlookupds.next() + + try: + producers = lookupd.lookup(self.topic)['producers'] + self.logger.debug('found %d producers' % len(producers)) + + except Exception as error: + msg = 'Failed to lookup %s on %s (%s)' + self.logger.warn(msg % (self.topic, lookupd.address, error)) + return + + for producer in producers: + conn = Nsqd( + producer.get('broadcast_address') or producer['address'], + producer['tcp_port'], + producer['http_port'], + **self.conn_kwargs + ) + self.connect_to_nsqd(conn) + + def create_backoff(self): + return BackoffTimer(max_interval=self.max_backoff_duration) + + def start_backoff(self): + self.state = BACKOFF + + for conn in self.conns: + conn.ready(0) + + interval = self.backoff.get_interval() + self.logger.info('backing off for %s seconds' % interval) + gevent.sleep(interval) + + self.state = THROTTLED + if not self.conns: + return + + conn = self.random_connection() + self.logger.info('[%s] testing backoff state with RDY 1' % conn) + self.send_ready(conn, 1) + + def complete_backoff(self): + self.state = RUNNING + self.logger.info('backoff complete, resuming normal operation') + + count = self.connection_max_in_flight + for conn in self.conns: + self.send_ready(conn, count) + + def _poll_lookupd(self): + delay = self.lookupd_poll_interval * self.lookupd_poll_jitter + gevent.sleep(random.random() * delay) + + while True: + gevent.sleep(self.lookupd_poll_interval) + self.query_lookupd() + + def _poll_ready(self): + while True: + gevent.sleep(5) + if not self.need_ready_redistributed: + continue + self.redistribute_ready_state() + + @property + def need_ready_redistributed(self): + if self.state == BACKOFF: + return False + + if self._need_ready_redistributed: + return True + + if len(self.conns) > self.max_in_flight: + return True + + if self.state == THROTTLED and len(self.conns) > 1: + return True + + @need_ready_redistributed.setter + def need_ready_redistributed(self, value): + self._need_ready_redistributed = value + + def redistribute_ready_state(self): + self.need_ready_redistributed = False + + # first set RDY 0 to all connections that have not received a message + # within a configurable timeframe (low_ready_idle_timeout). + for conn in self.conns: + if conn.ready_count == 0: + continue + + if (time.time() - conn.last_message) < self.low_ready_idle_timeout: + continue + + self.logger.info('[%s] idle connection, giving up RDY count' % conn) + conn.ready(0) + + if self.state == THROTTLED: + max_in_flight = 1 - self.total_in_flight_or_ready + else: + max_in_flight = self.max_in_flight - self.total_in_flight_or_ready + + if max_in_flight <= 0: + return + + # randomly walk the list of possible connections and send RDY 1 (up to + # our calculate "max_in_flight"). We only need to send RDY 1 because in + # both cases described above your per connection RDY count would never + # be higher. + # + # We also don't attempt to avoid the connections who previously might + # have had RDY 1 because it would be overly complicated and not actually + # worth it (ie. given enough redistribution rounds it doesn't matter). + conns = list(self.conns) + conns = random.sample(conns, min(max_in_flight, len(self.conns))) + + for conn in conns: + self.logger.info('[%s] redistributing RDY' % conn) + self.send_ready(conn, 1) + + def random_ready_conn(self, conn): + # if all connections aren't getting RDY + # occsionally randomize which connection gets RDY + if len(self.conns) <= self.max_in_flight: + return conn + + if (time.time() - self.last_random_ready) < 30: + return conn + + self.last_random_ready = time.time() + return random.choice([c for c in self.conns if not c.ready_count]) + + def update_ready(self, conn): + if self.state in (BACKOFF, THROTTLED): + return + + conn = self.random_ready_conn(conn) + if conn.ready_count < max(conn.last_ready * 0.25, 2): + self.send_ready(conn, self.connection_max_in_flight) + + def random_connection(self): + if not self.conns: + return None + return random.choice(list(self.conns)) + + def publish(self, topic, message): + """Publish a message to a random connection.""" + conn = self.random_connection() + if conn is None: + raise NSQNoConnections() + + conn.publish(topic, message) + + def connect_to_nsqd(self, conn): + if not self.is_running: + return + + if conn in self.conns: + self.logger.debug('[%s] already connected' % conn) + return + + if conn in self.pending: + self.logger.debug('[%s] already pending' % conn) + return + + self.logger.debug('[%s] connecting...' % conn) + + conn.on_response.connect(self.handle_response) + conn.on_error.connect(self.handle_error) + conn.on_finish.connect(self.handle_finish) + conn.on_requeue.connect(self.handle_requeue) + + if self.max_concurrency: + conn.on_message.connect(self.queue_message) + else: + conn.on_message.connect(self.handle_message) + + self.pending.add(conn) + + try: + conn.connect() + conn.identify() + + if conn.max_ready_count < self.max_in_flight: + msg = ' '.join([ + '[%s] max RDY count %d < reader max in flight %d,', + 'truncation possible' + ]) + + self.logger.warning(msg % ( + conn, + conn.max_ready_count, + self.max_in_flight + )) + + conn.subscribe(self.topic, self.channel) + self.send_ready(conn, 1) + + except NSQException as error: + self.logger.debug('[%s] connection failed (%r)' % (conn, error)) + self.handle_connection_failure(conn) + return + + finally: + self.pending.remove(conn) + + # Check if we've closed since we started + if not self.is_running: + conn.close_stream() + return + + self.logger.info('[%s] connection successful' % conn) + self.handle_connection_success(conn) + + def _listen(self, conn): + try: + conn.listen() + except NSQException as error: + if self.state == CLOSED: + return + self.logger.warning('[%s] connection lost (%r)' % (conn, error)) + + self.handle_connection_failure(conn) + + def _run(self): + for conn, message in self.queue: + self.handle_message(conn, message) + + def queue_message(self, conn, message): + self.logger.debug('[%s] queueing message: %s' % (conn, message.id)) + self.queue.put((conn, message)) + + def handle_connection_success(self, conn): + self.conns.add(conn) + self.conn_workers[conn] = gevent.spawn(self._listen, conn) + + if str(conn) not in self.nsqd_tcp_addresses: + return + + self.conn_backoffs[conn].success() + + def handle_connection_failure(self, conn): + self.conns.discard(conn) + self.conn_workers.pop(conn, None) + conn.close_stream() + + if conn.ready_count: + self.need_ready_redistributed = True + + if str(conn) not in self.nsqd_tcp_addresses: + return + + seconds = self.conn_backoffs[conn].failure().get_interval() + self.logger.debug('[%s] retrying in %ss' % (conn, seconds)) + gevent.spawn_later(seconds, self.connect_to_nsqd, conn) + + def handle_response(self, conn, response): + self.logger.debug('[%s] response: %s' % (conn, response)) + self.on_response.send(self, response=response) + + def handle_error(self, conn, error): + self.logger.debug('[%s] error: %s' % (conn, error)) + self.on_error.send(self, error=error) + + def handle_message(self, conn, message): + self.logger.debug('[%s] got message: %s' % (conn, message.id)) + + if self.max_tries and message.attempts > self.max_tries: + msg = "giving up on message '%s' after max tries %d" + self.logger.warning(msg % (message.id, self.max_tries)) + self.on_giving_up.send(self, message=message) + return message.finish() + + try: + self.on_message.send(self, message=message) + if self.is_running and not self.async: + message.finish() + return + + except NSQRequeueMessage: + pass + + except Exception as error: + msg = '[%s] caught exception while handling message' % conn + self.logger.exception(msg) + self.on_exception.send(self, message=message, error=error) + + if self.is_running: + message.requeue(self.requeue_delay) + + def handle_finish(self, conn, message_id): + self.logger.debug('[%s] finished message: %s' % (conn, message_id)) + self.backoff.success() + self.update_ready(conn) + self.handle_backoff() + self.on_finish.send(self, message_id=message_id) + + def handle_requeue(self, conn, message_id, timeout): + msg = '[%s] requeued message: %s (%s)' + self.logger.debug(msg % (conn, message_id, timeout)) + self.backoff.failure() + self.update_ready(conn) + self.handle_backoff() + self.on_requeue.send(self, message_id=message_id, timeout=timeout) + + def handle_backoff(self): + if self.state in (BACKOFF, CLOSED): + return + + if self.state == THROTTLED and self.backoff.is_reset(): + return self.complete_backoff() + + if not self.backoff.is_reset(): + return self.start_backoff() diff --git a/gnsq/states.py b/gnsq/states.py new file mode 100644 index 0000000..24a42a0 --- /dev/null +++ b/gnsq/states.py @@ -0,0 +1,10 @@ +# -*- coding: utf-8 -*- +"""Connection states.""" + +INIT = 0 +CONNECTED = 1 +DISCONNECTED = 2 +RUNNING = 3 +BACKOFF = 4 +THROTTLED = 5 +CLOSED = 6 diff --git a/gnsq/stream/__init__.py b/gnsq/stream/__init__.py new file mode 100644 index 0000000..8842acf --- /dev/null +++ b/gnsq/stream/__init__.py @@ -0,0 +1,8 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import +from .stream import Stream + + +__all__ = [ + 'Stream' +] diff --git a/gnsq/stream/compression.py b/gnsq/stream/compression.py new file mode 100644 index 0000000..89149f8 --- /dev/null +++ b/gnsq/stream/compression.py @@ -0,0 +1,36 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import +from errno import EWOULDBLOCK +from gnsq.errors import NSQSocketError + + +class CompressionSocket(object): + def __init__(self, socket): + self._socket = socket + self._bootstrapped = None + + def __getattr__(self, name): + return getattr(self._socket, name) + + def bootstrap(self, data): + if not data: + return + self._bootstrapped = self.decompress(data) + + def recv(self, size): + if self._bootstrapped: + data = self._bootstrapped + self._bootstrapped = None + return data + + chunk = self._socket.recv(size) + if chunk: + uncompressed = self.decompress(chunk) + + if not uncompressed: + raise NSQSocketError(EWOULDBLOCK, 'Operation would block') + + return uncompressed + + def send(self, data): + self._socket.send(self.compress(data)) diff --git a/gnsq/stream/defalte.py b/gnsq/stream/defalte.py new file mode 100644 index 0000000..8940e2c --- /dev/null +++ b/gnsq/stream/defalte.py @@ -0,0 +1,18 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import +import zlib +from .compression import CompressionSocket + + +class DefalteSocket(CompressionSocket): + def __init__(self, socket, level): + wbits = -zlib.MAX_WBITS + self._decompressor = zlib.decompressobj(wbits) + self._compressor = zlib.compressobj(level, zlib.DEFLATED, wbits) + super(DefalteSocket, self).__init__(socket) + + def compress(self, data): + return self._compressor.compress(data) + + def decompress(self, data): + return self._decompressor.decompress(data) diff --git a/gnsq/stream/snappy.py b/gnsq/stream/snappy.py new file mode 100644 index 0000000..a3042c8 --- /dev/null +++ b/gnsq/stream/snappy.py @@ -0,0 +1,17 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import +import snappy +from .compression import CompressionSocket + + +class SnappySocket(CompressionSocket): + def __init__(self, socket): + self._decompressor = snappy.StreamDecompressor() + self._compressor = snappy.StreamCompressor() + super(SnappySocket, self).__init__(socket) + + def compress(self, data): + return self._compressor.add_chunk(data, compress=True) + + def decompress(self, data): + return self._decompressor.decompress(data) diff --git a/gnsq/stream/stream.py b/gnsq/stream/stream.py new file mode 100644 index 0000000..5fe8f7c --- /dev/null +++ b/gnsq/stream/stream.py @@ -0,0 +1,157 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import + +from resource import getpagesize +from errno import ENOTCONN + +import gevent +from gevent import socket +from gevent.queue import Queue +from gevent.event import AsyncResult + +try: + from gevent.ssl import SSLSocket, PROTOCOL_TLSv1, CERT_NONE +except ImportError: + SSLSocket = None # pyflakes.ignore + +from gnsq.states import INIT, CONNECTED, DISCONNECTED +from gnsq.errors import NSQSocketError + +try: + from .snappy import SnappySocket +except ImportError: + SnappySocket = None # pyflakes.ignore + +from .defalte import DefalteSocket + + +class Stream(object): + def __init__(self, address, port, timeout, buffer_size=getpagesize()): + self.address = address + self.port = port + self.timeout = timeout + + self.buffer = '' + self.buffer_size = buffer_size + + self.socket = None + self.worker = None + self.queue = Queue() + self.state = INIT + + @property + def is_connected(self): + return self.state == CONNECTED + + def ensure_connection(self): + if self.is_connected: + return + raise NSQSocketError(ENOTCONN, 'Socket is not connected') + + def connect(self): + if self.state not in (INIT, DISCONNECTED): + return + + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.settimeout(self.timeout) + + try: + self.socket.connect((self.address, self.port)) + except socket.error as error: + raise NSQSocketError(*error) + + self.state = CONNECTED + self.worker = gevent.spawn(self.send_loop) + + def read(self, size): + while len(self.buffer) < size: + self.ensure_connection() + + try: + packet = self.socket.recv(self.buffer_size) + except socket.error as error: + raise NSQSocketError(*error) + + if not packet: + self.close() + + self.buffer += packet + + data = self.buffer[:size] + self.buffer = self.buffer[size:] + + return data + + def send(self, data, async=False): + self.ensure_connection() + + result = AsyncResult() + self.queue.put((data, result)) + + if async: + return result + + result.get() + + def consume_buffer(self): + data = self.buffer + self.buffer = '' + return data + + def close(self): + if not self.is_connected: + return + + self.state = DISCONNECTED + self.queue.put(StopIteration) + self.socket.close() + + def send_loop(self): + for data, result in self.queue: + if not self.is_connected: + error = NSQSocketError(ENOTCONN, 'Socket is not connected') + result.set_exception(error) + + try: + self.socket.send(data) + result.set() + + except socket.error as error: + result.set_exception(NSQSocketError(*error)) + + except Exception as error: + result.set_exception(error) + + def upgrade_to_tls( + self, + keyfile=None, + certfile=None, + cert_reqs=CERT_NONE, + ca_certs=None + ): + if SSLSocket is None: + msg = 'tls_v1 requires Python 2.6+ or Python 2.5 w/ pip install ssl' + raise RuntimeError(msg) + + self.ensure_connection() + self.socket = SSLSocket( + self.socket, + keyfile=keyfile, + certfile=certfile, + cert_reqs=cert_reqs, + ca_certs=ca_certs, + ssl_version=PROTOCOL_TLSv1, + ) + + def upgrade_to_snappy(self): + if SnappySocket is None: + raise RuntimeError('snappy requires the python-snappy package') + + self.ensure_connection() + self.socket = SnappySocket(self.socket) + self.socket.bootstrap(self.consume_buffer()) + + def upgrade_to_defalte(self, level): + self.ensure_connection() + self.socket = DefalteSocket(self.socket, level) + self.socket.bootstrap(self.consume_buffer()) diff --git a/gnsq/version.py b/gnsq/version.py new file mode 100644 index 0000000..6865456 --- /dev/null +++ b/gnsq/version.py @@ -0,0 +1,3 @@ +# -*- coding: utf-8 -*- +# also update in setup.py +__version__ = '0.1.0' diff --git a/httpclient.py b/httpclient.py deleted file mode 100644 index 83a4d0b..0000000 --- a/httpclient.py +++ /dev/null @@ -1,41 +0,0 @@ -import requests -from . import errors - -class HTTPClient(object): - base_url = None - _session = None - - @property - def session(self): - if self._session is None: - self._session = requests.Session() - return self._session - - def url(self, *parts): - return self.base_url + '/'.join(parts) - - def _check_connection(self): - pass - - def _check_api(self, *args, **kwargs): - self._check_connection() - - resp = self.session.post(*args, **kwargs) - if resp.status_code != 200: - raise errors.NSQException(resp.status_code, 'api error') - - return resp.text - - def _json_api(self, *args, **kwargs): - self._check_connection() - - resp = self.session.post(*args, **kwargs) - if resp.status_code != 200: - try: - msg = resp.json()['status_txt'] - except: - msg = 'api error' - - raise errors.NSQException(resp.status_code, msg) - - return resp.json()['data'] diff --git a/lookupd.py b/lookupd.py deleted file mode 100644 index 25e38ab..0000000 --- a/lookupd.py +++ /dev/null @@ -1,57 +0,0 @@ -from .httpclient import HTTPClient -from . import protocal as nsq - -class Lookupd(HTTPClient): - def __init__(self, address='http://localhost:4161/'): - if not address.endswith('/'): - address += '/' - - self.address = self.base_url = address - - def lookup(self, topic): - nsq.assert_valid_topic_name(topic) - return self._json_api( - self.url('lookup'), - params = {'topic': topic} - ) - - def topics(self): - return self._json_api(self.url('topics')) - - def channels(self, topic): - nsq.assert_valid_topic_name(topic) - return self._json_api( - self.url('channels'), - params = {'topic': topic} - ) - - def nodes(self): - return self._json_api(self.url('nodes')) - - def delete_topic(self, topic): - nsq.assert_valid_topic_name(topic) - return self._json_api( - self.url('delete_topic'), - params = {'topic': topic} - ) - - def delete_channel(self, topic, channel): - nsq.assert_valid_topic_name(topic) - nsq.assert_valid_channel_name(channel) - return self._json_api( - self.url('delete_channel'), - params = {'topic': topic, 'channel': channel} - ) - - def tombstone_topic_producer(self, topic, node): - nsq.assert_valid_topic_name(topic) - return self._json_api( - self.url('tombstone_topic_producer'), - params = {'topic': topic, 'node': node} - ) - - def ping(self): - return self._check_api(self.url('ping')) - - def info(self): - return self._json_api(self.url('info')) diff --git a/message.py b/message.py deleted file mode 100644 index 2ed7f24..0000000 --- a/message.py +++ /dev/null @@ -1,25 +0,0 @@ -class Message(object): - def __init__(self, conn, timestamp, attempts, id, body): - self._has_responded = False - self.conn = conn - self.timestamp = timestamp - self.attempts = attempts - self.id = id - self.body = body - - def has_responded(self): - return self._has_responded - - def finish(self): - assert not self._has_responded - self._has_responded = True - self.conn.finish(self.id) - - def requeue(self, time_ms=0): - assert not self._has_responded - self._has_responded = True - self.conn.requeue(self.id, time_ms) - - def touch(self): - assert not self._has_responded - self.conn.touch(self.id) diff --git a/nsqd.py b/nsqd.py deleted file mode 100644 index e64da16..0000000 --- a/nsqd.py +++ /dev/null @@ -1,368 +0,0 @@ -import re -import blinker - -import gevent -from gevent import socket -from gevent.queue import Queue -from gevent.event import AsyncResult -from Queue import Empty - -from . import protocal as nsq -from . import errors - -from .message import Message -from .httpclient import HTTPClient - -HOSTNAME = socket.gethostname() -SHORTNAME = HOSTNAME.split('.')[0] - -class Nsqd(HTTPClient): - def __init__(self, - address = '127.0.0.1', - tcp_port = 4150, - http_port = 4151, - timeout = 60.0 - ): - self.address = address - self.tcp_port = tcp_port - self.http_port = http_port - self.timeout = timeout - - self.on_response = blinker.Signal() - self.on_error = blinker.Signal() - self.on_message = blinker.Signal() - self.on_finish = blinker.Signal() - self.on_requeue = blinker.Signal() - - self._send_worker = None - self._send_queue = Queue() - - self._frame_handlers = { - nsq.FRAME_TYPE_RESPONSE: self.handle_response, - nsq.FRAME_TYPE_ERROR: self.handle_error, - nsq.FRAME_TYPE_MESSAGE: self.handle_message - } - - self.reset() - - @property - def is_connected(self): - return self._socket is not None - - def connect(self): - if self.is_connected: - raise NSQException('already connected') - - self.reset() - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.settimeout(self.timeout) - - try: - s.connect((self.address, self.tcp_port)) - except socket.error as error: - raise errors.NSQSocketError(*error) - - self._socket = s - self._send_worker = gevent.spawn(self._send) - self.send(nsq.MAGIC_V2) - - def _empty_send_queue(self): - while 1: - try: - data, result = self._send_queue.get_nowait() - except Empty: - return - - result.set_exception(errors.NSQException(-1, 'not connected')) - - def join(self, timeout=None): - if self._send_worker is None: - return - self._send_worker.join(timeout) - - def kill(self): - self._socket = None - - if self._send_worker: - worker, self._send_worker = self._send_worker, None - worker.kill() - - self._empty_send_queue() - - def send(self, data, async=False): - if not self.is_connected: - raise errors.NSQException(-1, 'not connected') - - result = AsyncResult() - self._send_queue.put((data, result)) - - if async: - return result - - result.get() - - def _send(self): - while 1: - data, result = self._send_queue.get() - if not self.is_connected: - result.set_exception(errors.NSQException(-1, 'not connected')) - break - - try: - self._socket.send(data) - result.set(True) - - except socket.error as error: - result.set_exception(errors.NSQSocketError(*error)) - - except Exception as error: - result.set_exception(error) - - self._empty_send_queue() - - def reset(self): - self.ready_count = 0 - self.in_flight = 0 - self._buffer = '' - self._socket = None - self._on_next_response = None - - def _readn(self, size): - while len(self._buffer) < size: - if not self.is_connected: - raise errors.NSQException(-1, 'not connected') - - try: - packet = self._socket.recv(4096) - except socket.error as error: - raise errors.NSQSocketError(*error) - - if not packet: - raise errors.NSQSocketError(-1, 'failed to read %d' % size) - - self._buffer += packet - - data = self._buffer[:size] - self._buffer = self._buffer[size:] - return data - - def _read_response(self): - size = nsq.unpack_size(self._readn(4)) - return self._readn(size) - - def read_response(self): - response = self._read_response() - frame, data = nsq.unpack_response(response) - - if frame not in self._frame_handlers: - raise errors.NSQFrameError('unknown frame %s' % frame) - - frame_handler = self._frame_handlers[frame] - processed_data = frame_handler(data) - self._on_next_response = None - - return frame, processed_data - - def handle_response(self, data): - if data == '_heartbeat_': - self.nop() - - elif self._on_next_response is not None: - self._on_next_response(self, response=data) - - self.on_response.send(self, response=data) - return data - - def handle_error(self, data): - error = errors.make_error(data) - - if self._on_next_response is not None: - self._on_next_response(self, response=error) - - self.on_error.send(self, error=error) - return error - - def handle_message(self, data): - self.ready_count -= 1 - self.in_flight += 1 - message = Message(self, *nsq.unpack_message(data)) - self.on_message.send(self, message=message) - return message - - def listen(self): - while self.is_connected: - self.read_response() - - def identify(self, - short_id = SHORTNAME, - long_id = HOSTNAME, - heartbeat_interval = None - ): - self.send(nsq.identify({ - 'short_id': short_id, - 'long_id': long_id, - 'heartbeat_interval': heartbeat_interval - })) - - def subscribe(self, topic, channel): - self.send(nsq.subscribe(topic, channel)) - - def publish_tcp(self, topic, data): - self.send(nsq.publish(topic, data)) - - def multipublish_tcp(self, topic, messages): - self.send(nsq.multipublish(topic, messages)) - - def ready(self, count): - self.ready_count = count - self.send(nsq.ready(count)) - - def finish(self, message_id): - self.send(nsq.finish(message_id)) - self.in_flight -= 1 - self.on_finish.send(self, message_id=message_id) - - def requeue(self, message_id, timeout=0): - self.send(nsq.requeue(message_id, timeout)) - self.in_flight -= 1 - self.on_requeue.send(self, message_id=message_id, timeout=timeout) - - def touch(self, message_id): - self.send(nsq.touch(message_id)) - - def close(self): - self.send(nsq.close()) - - def nop(self): - self.send(nsq.nop()) - - @property - def base_url(self): - return 'http://%s:%s/' % (self.address, self.http_port) - - def _check_connection(self): - if self.http_port: - return - raise errors.NSQException(-1, 'no http port') - - def publish_http(self, topic, data): - nsq.assert_valid_topic_name(topic) - return self._check_api( - self.url('put'), - params = {'topic': topic}, - data = data - ) - - def multipublish_http(self, topic, messages): - nsq.assert_valid_topic_name(topic) - - for message in messages: - if '\n' not in message: - continue - - err = 'newlines are not allowed in http multipublish' - raise errors.NSQException(-1, err) - - return self._check_api( - self.url('mput'), - params = {'topic': topic}, - data = '\n'.join(messages) - ) - - def create_topic(self, topic): - nsq.assert_valid_topic_name(topic) - return self._json_api( - self.url('create_topic'), - params = {'topic': topic} - ) - - def delete_topic(self, topic): - nsq.assert_valid_topic_name(topic) - return self._json_api( - self.url('delete_topic'), - params = {'topic': topic} - ) - - def create_channel(self, topic, channel): - nsq.assert_valid_topic_name(topic) - nsq.assert_valid_channel_name(channel) - return self._json_api( - self.url('create_channel'), - params = {'topic': topic, 'channel': channel} - ) - - def delete_channel(self, topic, channel): - nsq.assert_valid_topic_name(topic) - nsq.assert_valid_channel_name(channel) - return self._json_api( - self.url('delete_channel'), - params = {'topic': topic, 'channel': channel} - ) - - def empty_topic(self, topic): - nsq.assert_valid_topic_name(topic) - return self._json_api( - self.url('empty_topic'), - params = {'topic': topic} - ) - - def empty_channel(self, topic, channel): - nsq.assert_valid_topic_name(topic) - nsq.assert_valid_channel_name(channel) - return self._json_api( - self.url('empty_channel'), - params = {'topic': topic, 'channel': channel} - ) - - def pause_channel(self, topic, channel): - nsq.assert_valid_topic_name(topic) - nsq.assert_valid_channel_name(channel) - return self._json_api( - self.url('pause_channel'), - params = {'topic': topic, 'channel': channel} - ) - - def unpause_channel(self, topic, channel): - nsq.assert_valid_topic_name(topic) - nsq.assert_valid_channel_name(channel) - return self._json_api( - self.url('unpause_channel'), - params = {'topic': topic, 'channel': channel} - ) - - def stats(self): - return self._json_api(self.url('stats'), params={'format': 'json'}) - - def ping(self): - return self._check_api(self.url('ping')) - - def info(self): - return self._json_api(self.url('info')) - - def publish(self, topic, data): - if self.is_connected: - return self.publish_tcp(topic, data) - else: - return self.publish_http(topic, data) - - def multipublish(self, topic, messages): - if self.is_connected: - return self.multipublish_tcp(topic, messages) - else: - return self.multipublish_http(topic, messages) - - def __str__(self): - return self.address + ':' + str(self.tcp_port) - - def __hash__(self): - return hash((self.address, self.tcp_port)) - - def __eq__(self, other): - return ( - isinstance(other, Nsqd) and - self.address == other.address and - self.tcp_port == other.tcp_port - ) - - def __cmp__(self, other): - return hash(self) - hash(other) diff --git a/reader.py b/reader.py deleted file mode 100644 index 8112387..0000000 --- a/reader.py +++ /dev/null @@ -1,245 +0,0 @@ -import logging -import random -import gevent -import blinker - -from .lookupd import Lookupd -from .nsqd import Nsqd -from .util import assert_list - -from .errors import ( - NSQException, - NSQNoConnections, - NSQRequeueMessage -) - - -class Reader(object): - def __init__(self, - topic, - channel, - nsqd_tcp_addresses = [], - lookupd_http_addresses = [], - async = False, - max_tries = 5, - max_in_flight = 1, - lookupd_poll_interval = 120, - requeue_delay = 0 - ): - lookupd_http_addresses = assert_list(lookupd_http_addresses) - self.lookupds = [Lookupd(a) for a in lookupd_http_addresses] - self.nsqd_tcp_addresses = assert_list(nsqd_tcp_addresses) - assert self.nsqd_tcp_addresses or self.lookupds - - self.topic = topic - self.channel = channel - self.async = async - self.max_tries = max_tries - self.max_in_flight = max_in_flight - self.lookupd_poll_interval = lookupd_poll_interval - self.requeue_delay = requeue_delay - self.logger = logging.getLogger(__name__) - - self.on_response = blinker.Signal() - self.on_error = blinker.Signal() - self.on_message = blinker.Signal() - self.on_finish = blinker.Signal() - self.on_requeue = blinker.Signal() - - self.conns = set() - self.stats = {} - - def start(self): - self.query_nsqd() - self.query_lookupd() - self.update_stats() - self._poll() - - def connection_max_in_flight(self): - return max(1, self.max_in_flight / max(1, len(self.conns))) - - def query_nsqd(self): - self.logger.debug('querying nsqd...') - for address in self.nsqd_tcp_addresses: - address, port = address.split(':') - self.connect_to_nsqd(address, int(port)) - - def _query_lookupd(self, lookupd): - try: - producers = lookupd.lookup(self.topic)['producers'] - - except Exception as error: - template = 'Failed to lookup %s on %s (%s)' - data = (self.topic, lookupd.address, error) - self.logger.warn(template % data) - return - - for producer in producers: - self.connect_to_nsqd( - producer.get('address') or producer['hostname'], - producer['tcp_port'], - producer['http_port'] - ) - - def query_lookupd(self): - self.logger.debug('querying lookupd...') - for lookupd in self.lookupds: - self._query_lookupd(lookupd) - - def _poll(self): - gevent.sleep(random.random() * self.lookupd_poll_interval * 0.1) - while 1: - gevent.sleep(self.lookupd_poll_interval) - self.query_nsqd() - self.query_lookupd() - self.update_stats() - - def update_stats(self): - stats = {} - for conn in self.conns: - stats[conn] = self.get_stats(conn) - - self.stats = stats - - def get_stats(self, conn): - try: - stats = conn.stats() - except Exception as error: - self.logger.warn('[%s] stats lookup failed (%r)' % (conn, error)) - return None - - if stats is None: - return None - - for topic in stats['topics']: - if topic['topic_name'] != self.topic: - continue - - for channel in topic['channels']: - if channel['channel_name'] != self.channel: - continue - - return channel - - return None - - def smallest_depth(self): - if len(conn) == 0: - return None - - stats = self.stats - depths = [(stats.get(c, {}).get('depth'), c) for c in self.conns] - - return max(depths)[1] - - def random_connection(self): - if not self.conns: - return None - return random.choice(list(self.conns)) - - def publish(self, topic, message): - conn = self.random_connection() - if conn is None: - raise NSQNoConnections() - - conn.publish(topic, message) - - def connect_to_nsqd(self, address, tcp_port, http_port=None): - assert isinstance(address, (str, unicode)) - assert isinstance(tcp_port, int) - assert isinstance(http_port, int) or http_port is None - - conn = Nsqd(address, tcp_port, http_port) - if conn in self.conns: - self.logger.debug('[%s] already connected' % conn) - return - - self.logger.debug('[%s] connecting...' % conn) - - conn.on_response.connect(self.handle_response) - conn.on_error.connect(self.handle_error) - conn.on_message.connect(self.handle_message) - conn.on_finish.connect(self.handle_finish) - conn.on_requeue.connect(self.handle_requeue) - - try: - conn.connect() - conn.identify() - conn.subscribe(self.topic, self.channel) - conn.ready(self.connection_max_in_flight()) - except NSQException as error: - self.logger.debug('[%s] connection failed (%r)' % (conn, error)) - return - - self.logger.info('[%s] connection successful' % conn) - self.conns.add(conn) - conn.worker = gevent.spawn(self._listen, conn) - - def _listen(self, conn): - try: - conn.listen() - except NSQException as error: - self.logger.warning('[%s] connection lost (%r)' % (conn, error)) - - self.conns.remove(conn) - conn.kill() - - def handle_response(self, conn, response): - self.logger.debug('[%s] response: %s' % (conn, response)) - self.on_response.send(self, conn=conn, response=response) - - def handle_error(self, conn, error): - self.logger.debug('[%s] error: %s' % (conn, error)) - self.on_error.send(self, conn=conn, error=error) - - def handle_message(self, conn, message): - self.logger.debug('[%s] got message: %s' % (conn, message.id)) - - if self.max_tries and message.attempts > self.max_tries: - template = "giving up on message '%s' after max tries %d" - self.logger.warning(template, message.id, self.max_tries) - return message.finish() - - try: - self.on_message.send(self, conn=conn, message=message) - if not self.async: - message.finish() - return - - except NSQRequeueMessage: - pass - - except Exception: - template = '[%s] caught exception while handling message' - self.logger.exception(template % conn) - - message.requeue(self.requeue_delay) - - def update_ready(self, conn): - max_in_flight = self.connection_max_in_flight() - if conn.ready_count < (0.25 * max_in_flight): - conn.ready(max_in_flight) - - def handle_finish(self, conn, message_id): - self.logger.debug('[%s] finished message: %s' % (conn, message_id)) - self.on_finish.send(self, conn=conn, message_id=message_id) - self.update_ready(conn) - - def handle_requeue(self, conn, message_id, timeout): - template = '[%s] requeued message: %s (%s)' - self.logger.debug(template % (conn, message_id, timeout)) - self.on_requeue.send( - self, - conn = conn, - message_id = message_id, - timeout = timeout - ) - self.update_ready(conn) - - def close(self): - for conn in self.conns: - conn.close() - - def join(self, timeout=None, raise_error=False): - workers = [c._send_worker for c in self.conns if c._send_worker] - gevent.joinall(workers, timeout, raise_error) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..a0c8ef3 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,9 @@ +# Install gnsq itself +-e . + +# Install our development requirements +flake8 +pytest +python-snappy +tox +wheel>=0.22 diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..0a8df87 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,2 @@ +[wheel] +universal = 1 \ No newline at end of file diff --git a/setup.py b/setup.py new file mode 100755 index 0000000..5126b1b --- /dev/null +++ b/setup.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +try: + from setuptools import setup +except ImportError: + from distutils.core import setup + + +readme = open('README.rst').read() +history = open('HISTORY.rst').read().replace('.. :changelog:', '') + + +setup( + name='gnsq', + version='0.1.0', + description='A gevent based NSQ driver for Python.', + long_description=readme + '\n\n' + history, + author='Trevor Olson', + author_email='trevor@heytrevor.com', + url='https://github.com/wtolson/gnsq', + packages=['gnsq'], + package_dir={'gnsq': 'gnsq'}, + include_package_data=True, + install_requires=[ + 'gevent', + 'blinker', + 'urllib3', + ], + license="BSD", + zip_safe=False, + keywords='gnsq', + classifiers=[ + 'Development Status :: 4 - Beta', + 'Intended Audience :: Developers', + 'License :: OSI Approved :: BSD License', + 'Natural Language :: English', + 'Programming Language :: Python :: 2.6', + 'Programming Language :: Python :: 2.7', + ] +) diff --git a/tests/cert.pem b/tests/cert.pem new file mode 100644 index 0000000..ed73acf --- /dev/null +++ b/tests/cert.pem @@ -0,0 +1,26 @@ +-----BEGIN CERTIFICATE----- +MIIEbjCCA1agAwIBAgIJAK6x7y6AwBmLMA0GCSqGSIb3DQEBBQUAMIGAMQswCQYD +VQQGEwJVUzERMA8GA1UECBMITmV3IFlvcmsxFjAUBgNVBAcTDU5ldyBZb3JrIENp +dHkxDDAKBgNVBAoTA05TUTETMBEGA1UEAxMKdGVzdC5sb2NhbDEjMCEGCSqGSIb3 +DQEJARYUbXJlaWZlcnNvbkBnbWFpbC5jb20wHhcNMTMwNjI4MDA0MzQ4WhcNMTYw +NDE3MDA0MzQ4WjCBgDELMAkGA1UEBhMCVVMxETAPBgNVBAgTCE5ldyBZb3JrMRYw +FAYDVQQHEw1OZXcgWW9yayBDaXR5MQwwCgYDVQQKEwNOU1ExEzARBgNVBAMTCnRl +c3QubG9jYWwxIzAhBgkqhkiG9w0BCQEWFG1yZWlmZXJzb25AZ21haWwuY29tMIIB +IjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAnX0KB+svwy+yHU2qggz/EaGg +craKShagKo+9M9y5HLM852ngk5c+t+tJJbx3N954Wr1FXBuGIv1ltU05rU4zhvBS +25tVP1UIEnT5pBt2TeetLkl199Y7fxh1hKmnwJMG3fy3VZdNXEndBombXMmtXpQY +shuEJHKeUNDbQKz5X+GjEdkTPO/HY/VMHsxS23pbSimQozMg3hvLIdgv0aS3QECz +ydZBgTPThy3uDtHIuCpxCwXd/vDF68ATlYgo3h3lh2vxNwM/pjklIUhzMh4XaKQF +7m3/0KbtUcXfy0QHueeuMr11E9MAFNyRN4xf9Fk1yB97KJ3PJBTC5WD/m1nW+QID +AQABo4HoMIHlMB0GA1UdDgQWBBR3HMBws4lmYYSIgwoZsfW+bbgaMjCBtQYDVR0j +BIGtMIGqgBR3HMBws4lmYYSIgwoZsfW+bbgaMqGBhqSBgzCBgDELMAkGA1UEBhMC +VVMxETAPBgNVBAgTCE5ldyBZb3JrMRYwFAYDVQQHEw1OZXcgWW9yayBDaXR5MQww +CgYDVQQKEwNOU1ExEzARBgNVBAMTCnRlc3QubG9jYWwxIzAhBgkqhkiG9w0BCQEW +FG1yZWlmZXJzb25AZ21haWwuY29tggkArrHvLoDAGYswDAYDVR0TBAUwAwEB/zAN +BgkqhkiG9w0BAQUFAAOCAQEANOYTbanW2iyV1v4oYpcM/y3TWcQKzSME8D2SGFZb +dbMYU81hH3TTlQdvyeh3FAcdjhKE8Xi/RfNNjEslTBscdKXePGpZg6eXRNJzPP5K +KZPf5u6tcpAeUOKrMqbGwbE+h2QixxG1EoVQtE421szsU2P7nHRTdHzKFRnOerfl +Phm3NocR0P40Rv7WKdxpOvqc+XKf0onTruoVYoPWGpwcLixCG0zu4ZQ23/L/Dy18 +4u70Hbq6O/6kq9FBFaDNp3IhiEdu2Cq6ZplU6bL9XDF27KIEErHwtuqBHVlMG+zB +oH/k9vZvwH7OwAjHdKp+1yeZFLYC8K5hjFIHqcdwpZCNIg== +-----END CERTIFICATE----- diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..736b2ec --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,14 @@ +import pytest + + +def pytest_addoption(parser): + parser.addoption( + '--runslow', + action='store_true', + help='run slow tests' + ) + + +def pytest_runtest_setup(item): + if 'slow' in item.keywords and not item.config.getoption('--runslow'): + pytest.skip('need --runslow option to run') diff --git a/tests/integration_server.py b/tests/integration_server.py new file mode 100644 index 0000000..32fa082 --- /dev/null +++ b/tests/integration_server.py @@ -0,0 +1,116 @@ +import random +import time +import shutil +import subprocess +import tempfile +import os.path +import urllib3 + + +def with_all(head, *tail): + def decorator(fn, *args): + with head as arg: + args = args + (arg,) + if not tail: + return fn(*args) + return with_all(*tail)(fn, *args) + return decorator + + +class BaseIntegrationServer(object): + http = urllib3.PoolManager() + + def __init__(self, address=None, tcp_port=None, http_port=None): + if address is None: + address = '127.0.0.1' + + if tcp_port is None: + tcp_port = random.randint(10000, 65535) + + if http_port is None: + http_port = tcp_port + 1 + + self.address = address + self.tcp_port = tcp_port + self.http_port = http_port + self.data_path = tempfile.mkdtemp() + + @property + def cmd(self): + raise NotImplementedError('cmd not implemented') + + @property + def tcp_address(self): + return '%s:%d' % (self.address, self.tcp_port) + + @property + def http_address(self): + return '%s:%d' % (self.address, self.http_port) + + def is_running(self): + try: + url = 'http://%s/ping' % self.http_address + return self.http.request('GET', url).data == 'OK' + except urllib3.exceptions.HTTPError: + return False + + def wait(self): + for attempt in xrange(10): + if self.is_running(): + return + time.sleep(0.01 * pow(2, attempt)) + raise RuntimeError('unable to start: %r' % ' '.join(self.cmd)) + + def __enter__(self): + self.subp = subprocess.Popen(self.cmd) + self.wait() + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.subp.terminate() + self.subp.wait() + shutil.rmtree(self.data_path) + + +class NsqdIntegrationServer(BaseIntegrationServer): + tls_cert = os.path.join(os.path.dirname(__file__), 'cert.pem') + tls_key = os.path.join(os.path.dirname(__file__), 'key.pem') + + def __init__(self, lookupd=None, **kwargs): + self.lookupd = lookupd + super(NsqdIntegrationServer, self).__init__(**kwargs) + + @property + def https_port(self): + return self.http_port + 1 + + @property + def https_address(self): + return '%s:%d' % (self.address, self.https_port) + + @property + def cmd(self): + cmd = [ + 'nsqd', + '--tcp-address', self.tcp_address, + '--http-address', self.http_address, + '--https-address', self.https_address, + '--data-path', self.data_path, + '--tls-cert', self.tls_cert, + '--tls-key', self.tls_key, + ] + + if self.lookupd: + cmd.extend(['--lookupd-tcp-address', self.lookupd]) + + return cmd + + +class LookupdIntegrationServer(BaseIntegrationServer): + @property + def cmd(self): + return [ + 'nsqlookupd', + '--tcp-address', self.tcp_address, + '--http-address', self.http_address, + ] diff --git a/tests/key.pem b/tests/key.pem new file mode 100644 index 0000000..9b4db2e --- /dev/null +++ b/tests/key.pem @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEogIBAAKCAQEAnX0KB+svwy+yHU2qggz/EaGgcraKShagKo+9M9y5HLM852ng +k5c+t+tJJbx3N954Wr1FXBuGIv1ltU05rU4zhvBS25tVP1UIEnT5pBt2TeetLkl1 +99Y7fxh1hKmnwJMG3fy3VZdNXEndBombXMmtXpQYshuEJHKeUNDbQKz5X+GjEdkT +PO/HY/VMHsxS23pbSimQozMg3hvLIdgv0aS3QECzydZBgTPThy3uDtHIuCpxCwXd +/vDF68ATlYgo3h3lh2vxNwM/pjklIUhzMh4XaKQF7m3/0KbtUcXfy0QHueeuMr11 +E9MAFNyRN4xf9Fk1yB97KJ3PJBTC5WD/m1nW+QIDAQABAoIBACvtfKbIywG+hAf4 +ad7skRjx5DcbA2e29+XnQfb9UgTXWd2SgrmoLi5OypBkCTzkKN3mfTo70yZfV8dC +Sxwz+9tfnTz0DssjhKThS+CiaFVCkeOfSfBfKSlCQUVHrSrh18CDhP+yvDlJwQTZ +zSQMfPcsh9bmJe2kqtQP7ZgUp1o+vaB8Sju8YYrO6FllxbdLRGm4pfvvrHIRRmXa +oVHn0ei0JpwoTY9kHYht4LNeJnbP/MCWdmcuv3Gnel7jAlhaKab5aNIGr0Xe7aIQ +iX6mpZ0/Rnt8o/XcTOg8l3ruIdVuySX6SYn08JMnfFkXdNYRVhoV1tC5ElWkaZLf +hPmj2yECgYEAyts0R0b8cZ6HTAyuLm3ilw0s0v0/MM9ZtaqMRilr2WEtAhF0GpHG +TzmGnii0WcTNXD7NTsNcECR/0ZpXPRleMczsL2Juwd4FkQ37h7hdKPseJNrfyHRg +VolOFBX9H14C3wMB9cwdsG4Egw7fE27WCoreEquHgwFxl1zBrXKH088CgYEAxr8w +BKZs0bF7LRrFT5pH8hpMLYHMYk8ZIOfgmEGVBKDQCOERPR9a9kqUss7wl/98LVNK +RnFlyWD6Z0/QcQsLL4LjBeZJ25qEMc6JXm9VGAzhXA1ZkUofVoYCnG+f6KUn8CuJ +/AcV2ZDFsEP10IiQG0hKsceXiwFEvEr8306tMrcCgYBLgnscSR0xAeyk71dq6vZc +ecgEpcX+2kAvclOSzlpZ6WVCjtKkDT0/Qk+M0eQIQkybGLl9pxS+4Yc+s2/jy2yX +pwsHvGE0AvwZeZX2eDcdSRR4bYy9ZixyKdwJeAHnyivRbaIuJ5Opl9pQGpoI9snv +1K9DTdw8dK4exKVHdgl/WwKBgDkmLsuXg4EEtPOyV/xc08VVNIR9Z2T5c7NXmeiO +KyiKiWeUOF3ID2L07S9BfENozq9F3PzGjMtMXJSqibiHwW6nB1rh7mj8VHjx9+Q0 +xVZGFeNfX1r84mgB3uxW2LeQDhzsmB/lda37CC14TU3qhu2hawEV8IijE73FHlOk +Dv+fAoGAI4/XO5o5tNn5Djo8gHmGMCbinUE9+VySxl7wd7PK8w2VSofO88ofixDk +NX94yBYhg5WZcLdPm45RyUnq+WVQYz9IKUrdxLFTH+wxyzUqZCW7jgXCvWV+071q +vqm9C+kndq+18/1VKuCSGWnF7Ay4lbsgPXY2s4VKRxcb3QpZSPU= +-----END RSA PRIVATE KEY----- diff --git a/tests/mock_server.py b/tests/mock_server.py new file mode 100644 index 0000000..ec70ab8 --- /dev/null +++ b/tests/mock_server.py @@ -0,0 +1,26 @@ +from gevent.event import AsyncResult +from gevent.server import StreamServer + + +class mock_server(object): + def __init__(self, handler): + self.handler = handler + self.result = AsyncResult() + self.server = StreamServer(('127.0.0.1', 0), self) + + def __call__(self, socket, address): + try: + self.result.set(self.handler(socket, address)) + except Exception as error: + self.result.set_exception(error) + finally: + socket.close() + + def __enter__(self): + self.server.start() + return self.server + + def __exit__(self, exc_type, exc_value, traceback): + if exc_type is None: + self.result.get() + self.server.stop() diff --git a/tests/test_basic.py b/tests/test_basic.py new file mode 100644 index 0000000..2fcb0f9 --- /dev/null +++ b/tests/test_basic.py @@ -0,0 +1,95 @@ +import pytest + +from gnsq import BackoffTimer +from gnsq import protocol as nsq + + +@pytest.mark.parametrize('name,good', [ + ('valid_name', True), + ('invalid name with space', False), + ('invalid_name_due_to_length_this_is_really_really_really_long', False), + ('test-with_period.', True), + ('test#ephemeral', False), + ('test:ephemeral', False), +]) +def test_topic_names(name, good): + assert nsq.valid_topic_name(name) == good + + +@pytest.mark.parametrize('name,good', [ + ('test', True), + ('test-with_period.', True), + ('test#ephemeral', True), + ('invalid_name_due_to_length_this_is_really_really_really_long', False), + ('invalid name with space', False), +]) +def test_channel_names(name, good): + assert nsq.valid_channel_name(name) == good + + +def test_assert_topic(): + assert nsq.assert_valid_topic_name('topic') is None + + with pytest.raises(ValueError): + nsq.assert_valid_topic_name('invalid name with space') + + +def test_assert_channel(): + assert nsq.assert_valid_channel_name('channel') is None + + with pytest.raises(ValueError): + nsq.assert_valid_channel_name('invalid name with space') + + +def test_invalid_commands(): + with pytest.raises(TypeError): + nsq.requeue('1234', None) + + with pytest.raises(TypeError): + nsq.ready(None) + + with pytest.raises(ValueError): + nsq.ready(-1) + + +def test_backoff_timer(): + timer = BackoffTimer(max_interval=1000) + assert timer.get_interval() == 0 + assert timer.is_reset() + + timer.success() + assert timer.get_interval() == 0 + assert timer.is_reset() + + timer.failure() + assert timer.c == 1 + assert not timer.is_reset() + + for _ in xrange(100): + interval = timer.get_interval() + assert interval > 0 and interval < 2 + + timer.failure() + assert timer.c == 2 + + for _ in xrange(100): + interval = timer.get_interval() + assert interval > 0 and interval < 4 + + timer.success().success() + assert timer.get_interval() == 0 + assert timer.is_reset() + + for _ in xrange(100): + timer.failure() + + assert timer.c == 100 + assert timer.get_interval() <= 1000 + + timer.reset() + assert timer.c == 0 + assert timer.get_interval() == 0 + + timer = BackoffTimer(min_interval=1000) + assert timer.c == 0 + assert timer.get_interval() == 1000 diff --git a/tests/test_command.py b/tests/test_command.py new file mode 100644 index 0000000..ee31ae3 --- /dev/null +++ b/tests/test_command.py @@ -0,0 +1,69 @@ +import struct +import pytest + +try: + import simplejson as json +except ImportError: + import json # pyflakes.ignore + +from gnsq import protocol as nsq + + +def pytest_generate_tests(metafunc): + identify_dict_ascii = {'a': 1, 'b': 2} + identify_dict_unicode = {'c': u'w\xc3\xa5\xe2\x80\xa0'} + identify_body_ascii = json.dumps(identify_dict_ascii) + identify_body_unicode = json.dumps(identify_dict_unicode) + + msgs = ['asdf', 'ghjk', 'abcd'] + mpub_body = struct.pack('>l', len(msgs)) + ''.join(struct.pack('>l', len(m)) + m for m in msgs) + if metafunc.function == test_command: + for cmd_method, kwargs, result in [ + (nsq.identify, + {'data': identify_dict_ascii}, + 'IDENTIFY\n' + struct.pack('>l', len(identify_body_ascii)) + + identify_body_ascii), + (nsq.identify, + {'data': identify_dict_unicode}, + 'IDENTIFY\n' + struct.pack('>l', len(identify_body_unicode)) + + identify_body_unicode), + (nsq.subscribe, + {'topic_name': 'test_topic', 'channel_name': 'test_channel'}, + 'SUB test_topic test_channel\n'), + (nsq.finish, + {'message_id': 'test'}, + 'FIN test\n'), + (nsq.finish, + {'message_id': u'\u2020est \xfcn\xee\xe7\xf8\u2202\xe9'}, + 'FIN \xe2\x80\xa0est \xc3\xbcn\xc3\xae\xc3\xa7\xc3\xb8\xe2\x88\x82\xc3\xa9\n'), + (nsq.requeue, + {'message_id': 'test'}, + 'REQ test 0\n'), + (nsq.requeue, + {'message_id': 'test', 'timeout': 60}, + 'REQ test 60\n'), + (nsq.touch, + {'message_id': 'test'}, + 'TOUCH test\n'), + (nsq.ready, + {'count': 100}, + 'RDY 100\n'), + (nsq.nop, + {}, + 'NOP\n'), + (nsq.publish, + {'topic_name': 'test', 'data': msgs[0]}, + 'PUB test\n' + struct.pack('>l', len(msgs[0])) + msgs[0]), + (nsq.multipublish, + {'topic_name': 'test', 'messages': msgs}, + 'MPUB test\n' + struct.pack('>l', len(mpub_body)) + mpub_body) + ]: + metafunc.addcall(funcargs=dict(cmd_method=cmd_method, kwargs=kwargs, result=result)) + + +def test_command(cmd_method, kwargs, result): + assert cmd_method(**kwargs) == result + + +def test_unicode_body(): + pytest.raises(TypeError, nsq.publish, 'topic', u'unicode body') diff --git a/tests/test_lookupd.py b/tests/test_lookupd.py new file mode 100644 index 0000000..f4b3229 --- /dev/null +++ b/tests/test_lookupd.py @@ -0,0 +1,54 @@ +from __future__ import with_statement + +import pytest +import gnsq + +from integration_server import ( + with_all, + LookupdIntegrationServer, + NsqdIntegrationServer +) + + +@pytest.mark.slow +def test_basic(): + with LookupdIntegrationServer() as server: + lookupd = gnsq.Lookupd(server.http_address) + assert lookupd.ping() == 'OK' + assert 'version' in lookupd.info() + + with pytest.raises(gnsq.errors.NSQHttpError): + lookupd.lookup('topic') + + assert len(lookupd.topics()['topics']) == 0 + assert len(lookupd.channels('topic')['channels']) == 0 + assert len(lookupd.nodes()['producers']) == 0 + + +@pytest.mark.slow +def test_lookup(): + lookupd_server = LookupdIntegrationServer() + nsqd_server = NsqdIntegrationServer(lookupd=lookupd_server.tcp_address) + + @with_all(lookupd_server, nsqd_server) + def _(lookupd_server, nsqd_server): + lookupd = gnsq.Lookupd(lookupd_server.http_address) + conn = gnsq.Nsqd(nsqd_server.address, http_port=nsqd_server.http_port) + + assert len(lookupd.topics()['topics']) == 0 + assert len(lookupd.channels('topic')['channels']) == 0 + assert len(lookupd.nodes()['producers']) == 1 + + conn.create_topic('topic') + info = lookupd.lookup('topic') + assert len(info['channels']) == 0 + assert len(info['producers']) == 1 + assert len(lookupd.topics()['topics']) == 1 + assert len(lookupd.channels('topic')['channels']) == 0 + + conn.create_channel('topic', 'channel') + info = lookupd.lookup('topic') + assert len(info['channels']) == 1 + assert len(info['producers']) == 1 + assert len(lookupd.topics()['topics']) == 1 + assert len(lookupd.channels('topic')['channels']) == 1 diff --git a/tests/test_message.py b/tests/test_message.py new file mode 100644 index 0000000..103ea39 --- /dev/null +++ b/tests/test_message.py @@ -0,0 +1,110 @@ +import pytest +import gnsq + + +class MockConnection(object): + def __init__(self, message, operations): + message.on_finish.connect(self.finish) + message.on_requeue.connect(self.requeue) + message.on_touch.connect(self.touch) + self.operations = iter(operations) + + def finish(self, message): + exp_name, exp_args = self.operations.next() + assert exp_name == 'finish' + assert exp_args == (message,) + + def requeue(self, message, timeout): + exp_name, exp_args = self.operations.next() + assert exp_name == 'requeue' + assert exp_args == (message, timeout) + + def touch(self, message): + exp_name, exp_args = self.operations.next() + assert exp_name == 'touch' + assert exp_args == (message,) + + def assert_finished(self): + with pytest.raises(StopIteration): + self.operations.next() + + +def test_basic(): + message = gnsq.Message(0, 42, '1234', 'sup') + assert message.timestamp == 0 + assert message.attempts == 42 + assert message.id == '1234' + assert message.body == 'sup' + assert message.has_responded() is False + + +def test_finish(): + message = gnsq.Message(0, 42, '1234', 'sup') + mock_conn = MockConnection(message, [ + ('finish', (message,)), + ]) + assert message.has_responded() is False + + message.finish() + assert message.has_responded() is True + + with pytest.raises(gnsq.errors.NSQException): + message.finish() + + mock_conn.assert_finished() + + +def test_requeue(): + message = gnsq.Message(0, 42, '1234', 'sup') + mock_conn = MockConnection(message, [ + ('requeue', (message, 0)), + ]) + assert message.has_responded() is False + + message.requeue() + assert message.has_responded() is True + + with pytest.raises(gnsq.errors.NSQException): + message.requeue() + + mock_conn.assert_finished() + + +def test_requeue_timeout(): + message = gnsq.Message(0, 42, '1234', 'sup') + mock_conn = MockConnection(message, [ + ('requeue', (message, 1000)), + ]) + assert message.has_responded() is False + + message.requeue(1000) + assert message.has_responded() is True + + with pytest.raises(gnsq.errors.NSQException): + message.requeue(1000) + + mock_conn.assert_finished() + + +def test_touch(): + message = gnsq.Message(0, 42, '1234', 'sup') + mock_conn = MockConnection(message, [ + ('touch', (message,)), + ('touch', (message,)), + ('touch', (message,)), + ('finish', (message,)), + ]) + assert message.has_responded() is False + + message.touch() + message.touch() + message.touch() + assert message.has_responded() is False + + message.finish() + assert message.has_responded() is True + + with pytest.raises(gnsq.errors.NSQException): + message.touch() + + mock_conn.assert_finished() diff --git a/tests/test_nsqd.py b/tests/test_nsqd.py new file mode 100644 index 0000000..b11cbbc --- /dev/null +++ b/tests/test_nsqd.py @@ -0,0 +1,390 @@ +from __future__ import with_statement + +import struct +import json +import pytest + +from gnsq import Nsqd, Message, states, errors +from gnsq import protocol as nsq +from gnsq.stream.stream import SSLSocket, DefalteSocket, SnappySocket + +from mock_server import mock_server +from integration_server import NsqdIntegrationServer + + +def mock_response(frame_type, data): + body_size = 4 + len(data) + body_size_packed = struct.pack('>l', body_size) + frame_type_packed = struct.pack('>l', frame_type) + return body_size_packed + frame_type_packed + data + + +def mock_response_message(timestamp, attempts, id, body): + timestamp_packed = struct.pack('>q', timestamp) + attempts_packed = struct.pack('>h', attempts) + id = "%016d" % id + data = timestamp_packed + attempts_packed + id + body + return mock_response(nsq.FRAME_TYPE_MESSAGE, data) + + +def test_connection(): + @mock_server + def handle(socket, address): + assert socket.recv(4) == ' V2' + assert socket.recv(1) == '' + + with handle as server: + conn = Nsqd(address='127.0.0.1', tcp_port=server.server_port) + assert conn.state == states.INIT + + conn.connect() + assert conn.state == states.CONNECTED + + conn.connect() + assert conn.state == states.CONNECTED + + conn.close_stream() + assert conn.state == states.DISCONNECTED + + +def test_disconnected(): + @mock_server + def handle(socket, address): + assert socket.recv(4) == ' V2' + assert socket.recv(1) == '' + + with handle as server: + conn = Nsqd(address='127.0.0.1', tcp_port=server.server_port) + conn.connect() + conn.close_stream() + assert conn.state == states.DISCONNECTED + + with pytest.raises(errors.NSQSocketError): + conn.nop() + + with pytest.raises(errors.NSQSocketError): + conn.read_response() + + +@pytest.mark.parametrize('body', [ + 'hello world', + '', + '{"some": "json data"}', +]) +def test_read(body): + @mock_server + def handle(socket, address): + socket.send(struct.pack('>l', len(body))) + socket.send(body) + + with handle as server: + conn = Nsqd(address='127.0.0.1', tcp_port=server.server_port) + conn.connect() + + assert conn._read_response() == body + conn.close_stream() + + +def test_identify(): + @mock_server + def handle(socket, address): + assert socket.recv(4) == ' V2' + assert socket.recv(9) == 'IDENTIFY\n' + + size = nsq.unpack_size(socket.recv(4)) + data = json.loads(socket.recv(size)) + + assert 'gnsq' in data['user_agent'] + socket.send(mock_response(nsq.FRAME_TYPE_RESPONSE, 'OK')) + + with handle as server: + conn = Nsqd(address='127.0.0.1', tcp_port=server.server_port) + conn.connect() + + assert conn.identify() is None + + +def test_negotiation(): + @mock_server + def handle(socket, address): + assert socket.recv(4) == ' V2' + assert socket.recv(9) == 'IDENTIFY\n' + + size = nsq.unpack_size(socket.recv(4)) + data = json.loads(socket.recv(size)) + + assert 'gnsq' in data['user_agent'] + resp = json.dumps({'test': 42}) + socket.send(mock_response(nsq.FRAME_TYPE_RESPONSE, resp)) + + with handle as server: + conn = Nsqd(address='127.0.0.1', tcp_port=server.server_port) + conn.connect() + + assert conn.identify()['test'] == 42 + + +@pytest.mark.parametrize('command,args,resp', [ + ('subscribe', ('topic', 'channel'), 'SUB topic channel\n'), + ('subscribe', ('foo', 'bar'), 'SUB foo bar\n'), + ('ready', (0,), 'RDY 0\n'), + ('ready', (1,), 'RDY 1\n'), + ('ready', (42,), 'RDY 42\n'), + ('finish', ('0000000000000000',), 'FIN 0000000000000000\n'), + ('finish', ('deadbeafdeadbeaf',), 'FIN deadbeafdeadbeaf\n'), + ('requeue', ('0000000000000000',), 'REQ 0000000000000000 0\n'), + ('requeue', ('deadbeafdeadbeaf', 0), 'REQ deadbeafdeadbeaf 0\n'), + ('requeue', ('deadbeafdeadbeaf', 42), 'REQ deadbeafdeadbeaf 42\n'), + ('touch', ('0000000000000000',), 'TOUCH 0000000000000000\n'), + ('touch', ('deadbeafdeadbeaf',), 'TOUCH deadbeafdeadbeaf\n'), + ('close', (), 'CLS\n'), + ('nop', (), 'NOP\n'), +]) +def test_command(command, args, resp): + @mock_server + def handle(socket, address): + assert socket.recv(4) == ' V2' + assert socket.recv(len(resp)) == resp + + with handle as server: + conn = Nsqd(address='127.0.0.1', tcp_port=server.server_port) + conn.connect() + getattr(conn, command)(*args) + + +def test_publish(): + @mock_server + def handle(socket, address): + assert socket.recv(4) == ' V2' + assert socket.recv(10) == 'PUB topic\n' + + assert nsq.unpack_size(socket.recv(4)) == 3 + assert socket.recv(3) == 'sup' + + with handle as server: + conn = Nsqd(address='127.0.0.1', tcp_port=server.server_port) + conn.connect() + conn.publish('topic', 'sup') + + +def test_multipublish(): + @mock_server + def handle(socket, address): + assert socket.recv(4) == ' V2' + assert socket.recv(11) == 'MPUB topic\n' + + size = nsq.unpack_size(socket.recv(4)) + data = socket.recv(size) + + head, data = data[:4], data[4:] + assert nsq.unpack_size(head) == 2 + + for _ in xrange(2): + head, data = data[:4], data[4:] + assert nsq.unpack_size(head) == 3 + + head, data = data[:3], data[3:] + assert head == 'sup' + + assert data == '' + + with handle as server: + conn = Nsqd(address='127.0.0.1', tcp_port=server.server_port) + conn.connect() + conn.multipublish('topic', ['sup', 'sup']) + + +@pytest.mark.parametrize('error,error_class,fatal', [ + ('E_INVALID', errors.NSQInvalid, True), + ('E_BAD_BODY', errors.NSQBadBody, True), + ('E_BAD_TOPIC', errors.NSQBadTopic, True), + ('E_BAD_CHANNEL', errors.NSQBadChannel, True), + ('E_BAD_MESSAGE', errors.NSQBadMessage, True), + ('E_PUT_FAILED', errors.NSQPutFailed, True), + ('E_PUB_FAILED', errors.NSQPubFailed, True), + ('E_MPUB_FAILED', errors.NSQMPubFailed, True), + ('E_FIN_FAILED', errors.NSQFinishFailed, False), + ('E_REQ_FAILED', errors.NSQRequeueFailed, False), + ('E_TOUCH_FAILED', errors.NSQTouchFailed, False), + ('unknown error', errors.NSQException, True), +]) +def test_error(error, error_class, fatal): + @mock_server + def handle(socket, address): + assert socket.recv(4) == ' V2' + socket.send(mock_response(nsq.FRAME_TYPE_ERROR, error)) + + with handle as server: + conn = Nsqd(address='127.0.0.1', tcp_port=server.server_port) + conn.connect() + + frame, resp = conn.read_response() + assert frame == nsq.FRAME_TYPE_ERROR + assert isinstance(resp, error_class) + assert conn.is_connected != fatal + + +def test_hashing(): + conn1 = Nsqd('localhost', 1337) + conn2 = Nsqd('localhost', 1337) + assert conn1 == conn2 + assert not (conn1 < conn2) + assert not (conn2 < conn1) + + test = {conn1: True} + assert conn2 in test + + +def test_sync_receive_messages(): + @mock_server + def handle(socket, address): + assert socket.recv(4) == ' V2' + assert socket.recv(9) == 'IDENTIFY\n' + + size = nsq.unpack_size(socket.recv(4)) + data = json.loads(socket.recv(size)) + + assert isinstance(data, dict) + socket.send(mock_response(nsq.FRAME_TYPE_RESPONSE, 'OK')) + + msg = 'SUB topic channel\n' + assert socket.recv(len(msg)) == msg + socket.send(mock_response(nsq.FRAME_TYPE_RESPONSE, 'OK')) + + for i in xrange(10): + assert socket.recv(6) == 'RDY 1\n' + + body = json.dumps({'data': {'test_key': i}}) + ts = i * 1000 * 1000 + socket.send(mock_response_message(ts, i, i, body)) + + with handle as server: + conn = Nsqd(address='127.0.0.1', tcp_port=server.server_port) + conn.connect() + + assert conn.identify() is None + + conn.subscribe('topic', 'channel') + frame, data = conn.read_response() + + assert frame == nsq.FRAME_TYPE_RESPONSE + assert data == 'OK' + + for i in xrange(10): + conn.ready(1) + frame, msg = conn.read_response() + + assert frame == nsq.FRAME_TYPE_MESSAGE + assert isinstance(msg, Message) + assert msg.timestamp == i * 1000 * 1000 + assert msg.id == '%016d' % i + assert msg.attempts == i + assert json.loads(msg.body)['data']['test_key'] == i + + +def test_sync_heartbeat(): + @mock_server + def handle(socket, address): + assert socket.recv(4) == ' V2' + socket.send(mock_response(nsq.FRAME_TYPE_RESPONSE, '_heartbeat_')) + assert socket.recv(4) == 'NOP\n' + + with handle as server: + conn = Nsqd(address='127.0.0.1', tcp_port=server.server_port) + conn.connect() + + frame, data = conn.read_response() + assert frame == nsq.FRAME_TYPE_RESPONSE + assert data == '_heartbeat_' + + +@pytest.mark.slow +def test_tls(): + with NsqdIntegrationServer() as server: + conn = Nsqd( + address=server.address, + tcp_port=server.tcp_port, + tls_v1=True, + tls_options={ + 'keyfile': server.tls_key, + 'certfile': server.tls_cert, + } + ) + conn.connect() + assert conn.state == states.CONNECTED + + resp = conn.identify() + assert isinstance(resp, dict) + assert resp['tls_v1'] is True + assert isinstance(conn.stream.socket, SSLSocket) + + frame, data = conn.read_response() + assert frame == nsq.FRAME_TYPE_RESPONSE + assert data == 'OK' + + conn.publish('topic', 'sup') + frame, data = conn.read_response() + assert frame == nsq.FRAME_TYPE_RESPONSE + assert data == 'OK' + + conn.close() + + +@pytest.mark.slow +def test_deflate(): + with NsqdIntegrationServer() as server: + conn = Nsqd( + address=server.address, + tcp_port=server.tcp_port, + deflate=True + ) + conn.connect() + assert conn.state == states.CONNECTED + + resp = conn.identify() + assert isinstance(resp, dict) + assert resp['deflate'] is True + assert isinstance(conn.stream.socket, DefalteSocket) + + frame, data = conn.read_response() + assert frame == nsq.FRAME_TYPE_RESPONSE + assert data == 'OK' + + conn.close() + + +@pytest.mark.slow +def test_snappy(): + with NsqdIntegrationServer() as server: + conn = Nsqd( + address=server.address, + tcp_port=server.tcp_port, + snappy=True + ) + conn.connect() + assert conn.state == states.CONNECTED + + resp = conn.identify() + assert isinstance(resp, dict) + assert resp['snappy'] is True + assert isinstance(conn.stream.socket, SnappySocket) + + frame, data = conn.read_response() + assert frame == nsq.FRAME_TYPE_RESPONSE + assert data == 'OK' + + conn.close() + + +@pytest.mark.slow +def test_cls_error(): + with NsqdIntegrationServer() as server: + conn = Nsqd(address=server.address, tcp_port=server.tcp_port) + + conn.connect() + assert conn.state == states.CONNECTED + + conn.close() + frame, error = conn.read_response() + assert frame == nsq.FRAME_TYPE_ERROR + assert isinstance(error, errors.NSQInvalid) diff --git a/tests/test_nsqd_http.py b/tests/test_nsqd_http.py new file mode 100644 index 0000000..69b3c3d --- /dev/null +++ b/tests/test_nsqd_http.py @@ -0,0 +1,74 @@ +from __future__ import with_statement + +import pytest +import gnsq +from integration_server import NsqdIntegrationServer + + +@pytest.mark.slow +def test_basic(): + with NsqdIntegrationServer() as server: + conn = gnsq.Nsqd(server.address, http_port=server.http_port) + assert conn.ping() == 'OK' + assert 'topics' in conn.stats() + assert 'version' in conn.info() + + +@pytest.mark.slow +def test_topics_channels(): + with NsqdIntegrationServer() as server: + conn = gnsq.Nsqd(server.address, http_port=server.http_port) + assert len(conn.stats()['topics']) == 0 + + with pytest.raises(gnsq.errors.NSQHttpError): + conn.delete_topic('topic') + + conn.create_topic('topic') + topics = conn.stats()['topics'] + assert len(topics) == 1 + assert topics[0]['topic_name'] == 'topic' + + conn.delete_topic('topic') + assert len(conn.stats()['topics']) == 0 + + with pytest.raises(gnsq.errors.NSQHttpError): + conn.create_channel('topic', 'channel') + + with pytest.raises(gnsq.errors.NSQHttpError): + conn.delete_channel('topic', 'channel') + + conn.create_topic('topic') + assert len(conn.stats()['topics'][0]['channels']) == 0 + + conn.create_channel('topic', 'channel') + channels = conn.stats()['topics'][0]['channels'] + assert len(channels) == 1 + assert channels[0]['channel_name'] == 'channel' + + conn.delete_channel('topic', 'channel') + assert len(conn.stats()['topics'][0]['channels']) == 0 + + +def test_publish(): + with NsqdIntegrationServer() as server: + conn = gnsq.Nsqd(server.address, http_port=server.http_port) + + conn.publish('topic', 'sup') + assert conn.stats()['topics'][0]['depth'] == 1 + + conn.multipublish('topic', ['sup', 'sup']) + assert conn.stats()['topics'][0]['depth'] == 3 + + conn.multipublish('topic', iter(['sup', 'sup', 'sup'])) + assert conn.stats()['topics'][0]['depth'] == 6 + + conn.empty_topic('topic') + assert conn.stats()['topics'][0]['depth'] == 0 + + conn.create_topic('topic') + conn.create_channel('topic', 'channel') + conn.publish('topic', 'sup') + assert conn.stats()['topics'][0]['channels'][0]['depth'] == 1 + + conn.empty_channel('topic', 'channel') + assert conn.stats()['topics'][0]['channels'][0]['depth'] == 0 diff --git a/tests/test_reader.py b/tests/test_reader.py new file mode 100644 index 0000000..ee4357e --- /dev/null +++ b/tests/test_reader.py @@ -0,0 +1,261 @@ +from __future__ import with_statement + +import multiprocessing +import pytest +import gevent + +from gnsq import Nsqd, Reader, states +from gnsq.errors import NSQSocketError + +from integration_server import ( + with_all, + LookupdIntegrationServer, + NsqdIntegrationServer +) + + +def test_basic(): + with pytest.raises(ValueError): + Reader('test', 'test') + + with pytest.raises(TypeError): + Reader( + topic='test', + channel='test', + nsqd_tcp_addresses=None, + lookupd_http_addresses='http://localhost:4161/', + ) + + with pytest.raises(TypeError): + Reader( + topic='test', + channel='test', + nsqd_tcp_addresses='localhost:4150', + lookupd_http_addresses=None, + ) + + def message_handler(reader, message): + pass + + reader = Reader( + topic='test', + channel='test', + name='test', + max_concurrency=-1, + nsqd_tcp_addresses='localhost:4150', + lookupd_http_addresses='http://localhost:4161/', + message_handler=message_handler + ) + + assert reader.name == 'test' + assert reader.max_concurrency == multiprocessing.cpu_count() + assert len(reader.on_message.receivers) == 1 + + assert isinstance(reader.nsqd_tcp_addresses, set) + assert len(reader.nsqd_tcp_addresses) == 1 + + assert isinstance(reader.lookupds, list) + assert len(reader.lookupds) == 1 + + +@pytest.mark.slow +def test_messages(): + with NsqdIntegrationServer() as server: + + class Accounting(object): + count = 0 + total = 500 + error = None + + conn = Nsqd( + address=server.address, + tcp_port=server.tcp_port, + http_port=server.http_port, + ) + + for _ in xrange(Accounting.total): + conn.publish_http('test', 'danger zone!') + + reader = Reader( + topic='test', + channel='test', + nsqd_tcp_addresses=[server.tcp_address], + max_in_flight=100, + ) + + @reader.on_exception.connect + def error_handler(reader, message, error): + if isinstance(error, NSQSocketError): + return + Accounting.error = error + reader.close() + + @reader.on_message.connect + def handler(reader, message): + assert message.body == 'danger zone!' + + Accounting.count += 1 + if Accounting.count == Accounting.total: + assert not reader.is_starved + reader.close() + + try: + reader.start() + except NSQSocketError: + pass + + if Accounting.error: + raise Accounting.error + + assert Accounting.count == Accounting.total + + +@pytest.mark.slow +def test_max_concurrency(): + server1 = NsqdIntegrationServer() + server2 = NsqdIntegrationServer() + + @with_all(server1, server2) + def _(server1, server2): + class Accounting(object): + count = 0 + total = 100 + concurrency = 0 + error = None + + for server in (server1, server2): + conn = Nsqd( + address=server.address, + tcp_port=server.tcp_port, + http_port=server.http_port, + ) + + for _ in xrange(Accounting.total / 2): + conn.publish_http('test', 'danger zone!') + + reader = Reader( + topic='test', + channel='test', + nsqd_tcp_addresses=[ + server1.tcp_address, + server2.tcp_address, + ], + max_in_flight=5, + max_concurrency=1, + ) + + @reader.on_exception.connect + def error_handler(reader, message, error): + if isinstance(error, NSQSocketError): + return + Accounting.error = error + reader.close() + + @reader.on_message.connect + def handler(reader, message): + assert message.body == 'danger zone!' + assert Accounting.concurrency == 0 + + Accounting.concurrency += 1 + gevent.sleep() + Accounting.concurrency -= 1 + + Accounting.count += 1 + if Accounting.count == Accounting.total: + reader.close() + + try: + reader.start() + except NSQSocketError: + pass + + if Accounting.error: + raise Accounting.error + + assert Accounting.count == Accounting.total + + +@pytest.mark.slow +def test_lookupd(): + lookupd_server = LookupdIntegrationServer() + server1 = NsqdIntegrationServer(lookupd=lookupd_server.tcp_address) + server2 = NsqdIntegrationServer(lookupd=lookupd_server.tcp_address) + + @with_all(lookupd_server, server1, server2) + def _(lookupd_server, server1, server2): + class Accounting(object): + count = 0 + total = 500 + concurrency = 0 + error = None + + for server in (server1, server2): + conn = Nsqd( + address=server.address, + tcp_port=server.tcp_port, + http_port=server.http_port, + ) + + for _ in xrange(Accounting.total / 2): + conn.publish_http('test', 'danger zone!') + + reader = Reader( + topic='test', + channel='test', + lookupd_http_addresses=lookupd_server.http_address, + max_in_flight=32, + ) + + @reader.on_exception.connect + def error_handler(reader, message, error): + if isinstance(error, NSQSocketError): + return + Accounting.error = error + reader.close() + + @reader.on_message.connect + def handler(reader, message): + assert message.body == 'danger zone!' + + Accounting.count += 1 + if Accounting.count == Accounting.total: + reader.close() + + try: + reader.start() + except NSQSocketError: + pass + + if Accounting.error: + raise Accounting.error + + assert Accounting.count == Accounting.total + + +@pytest.mark.slow +def test_backoff(): + with NsqdIntegrationServer() as server: + conn = Nsqd( + address=server.address, + tcp_port=server.tcp_port, + http_port=server.http_port, + ) + + for _ in xrange(500): + conn.publish_http('test', 'danger zone!') + + reader = Reader( + topic='test', + channel='test', + nsqd_tcp_addresses=[server.tcp_address], + max_in_flight=100, + ) + + reader.start(block=False) + reader.start_backoff() + + assert reader.state == states.THROTTLED + assert reader.total_in_flight_or_ready <= 1 + + reader.complete_backoff() + assert reader.state == states.RUNNING diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..1fbe6de --- /dev/null +++ b/tox.ini @@ -0,0 +1,14 @@ +[tox] +envlist = py26, py27 + +[testenv] +setenv = + PYTHONPATH = {toxinidir}:{toxinidir}/gnsq +commands = py.test --runslow +deps = + -r{toxinidir}/requirements.txt + +[flake8] +max-line-length = 80 +exclude = tests/* +max-complexity = 10 diff --git a/util.py b/util.py deleted file mode 100644 index d747044..0000000 --- a/util.py +++ /dev/null @@ -1,7 +0,0 @@ - -def assert_list(item): - if isinstance(item, basestring): - item = [item] - - assert isinstance(item, (list, set, tuple)) - return item