diff --git a/connector/CHANGES.rst b/connector/CHANGES.rst
index 0f3b3a4a9..3732f4f82 100644
--- a/connector/CHANGES.rst
+++ b/connector/CHANGES.rst
@@ -7,6 +7,14 @@ Changelog
.. *
+9.0.1.0.1 (2016-03-03)
+~~~~~~~~~~~~~~~~~~~~~~
+
+* Enabled the JobRunner by default, with a default channels configuration of root:1
+* Removed the old workers
+* Removed the broken dbfilter support (https://github.com/OCA/connector/issues/58)
+* Cleaned the methods that have been deprecated in version 3.x
+
8.0.3.3.0 (2016-02-29)
~~~~~~~~~~~~~~~~~~~~~~
diff --git a/connector/__openerp__.py b/connector/__openerp__.py
index 79ae69f22..2558e77d3 100644
--- a/connector/__openerp__.py
+++ b/connector/__openerp__.py
@@ -20,7 +20,7 @@
##############################################################################
{'name': 'Connector',
- 'version': '9.0.1.0.0',
+ 'version': '9.0.1.0.1',
'author': 'Camptocamp,Openerp Connector Core Editors,'
'Odoo Community Association (OCA)',
'website': 'http://odoo-connector.com',
@@ -40,6 +40,6 @@
'setting_view.xml',
'res_partner_view.xml',
],
- 'installable': False,
+ 'installable': True,
'application': True,
}
diff --git a/connector/connector.py b/connector/connector.py
index 3537a28b8..ede1c3866 100644
--- a/connector/connector.py
+++ b/connector/connector.py
@@ -23,7 +23,6 @@
import logging
import struct
-from contextlib import contextmanager
from openerp import models, fields
from .exception import RetryableJobError
diff --git a/connector/jobrunner/__init__.py b/connector/jobrunner/__init__.py
index 1be109974..e8ad0c40b 100644
--- a/connector/jobrunner/__init__.py
+++ b/connector/jobrunner/__init__.py
@@ -46,9 +46,6 @@
# to configure the runner (channels mostly).
-enable = os.environ.get('ODOO_CONNECTOR_CHANNELS')
-
-
class ConnectorRunnerThread(Thread):
def __init__(self):
@@ -78,7 +75,7 @@ def stop(self):
def prefork_start(server, *args, **kwargs):
global runner_thread
res = orig_prefork_start(server, *args, **kwargs)
- if enable and not config['stop_after_init']:
+ if not config['stop_after_init']:
_logger.info("starting jobrunner thread (in prefork server)")
runner_thread = ConnectorRunnerThread()
runner_thread.start()
@@ -99,7 +96,7 @@ def prefork_stop(server, graceful=True):
def threaded_start(server, *args, **kwargs):
global runner_thread
res = orig_threaded_start(server, *args, **kwargs)
- if enable and not config['stop_after_init']:
+ if not config['stop_after_init']:
_logger.info("starting jobrunner thread (in threaded server)")
runner_thread = ConnectorRunnerThread()
runner_thread.start()
diff --git a/connector/queue/queue.py b/connector/queue/queue.py
deleted file mode 100644
index 83a8a1f85..000000000
--- a/connector/queue/queue.py
+++ /dev/null
@@ -1,41 +0,0 @@
-# -*- coding: utf-8 -*-
-##############################################################################
-#
-# Author: Guewen Baconnier
-# Copyright 2013 Camptocamp SA
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as
-# published by the Free Software Foundation, either version 3 of the
-# License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Affero General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public License
-# along with this program. If not, see .
-#
-##############################################################################
-from __future__ import absolute_import
-from Queue import PriorityQueue
-
-
-class JobsQueue(object):
- """ Holds the jobs planned for execution in memory.
-
- The Jobs are sorted, the higher the priority is,
- the earlier the jobs are dequeued.
- """
-
- def __init__(self):
- self._queue = PriorityQueue()
-
- def enqueue(self, job):
- self._queue.put_nowait(job)
-
- def dequeue(self):
- """ Take the first job according to its priority
- and return it"""
- return self._queue.get()
diff --git a/connector/queue/worker.py b/connector/queue/worker.py
deleted file mode 100644
index e9a5873f2..000000000
--- a/connector/queue/worker.py
+++ /dev/null
@@ -1,352 +0,0 @@
-# -*- coding: utf-8 -*-
-##############################################################################
-#
-# Author: Guewen Baconnier
-# Copyright 2013 Camptocamp SA
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as
-# published by the Free Software Foundation, either version 3 of the
-# License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Affero General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public License
-# along with this program. If not, see .
-#
-##############################################################################
-
-import logging
-import os
-import threading
-import time
-import traceback
-import uuid
-from datetime import datetime
-from StringIO import StringIO
-
-from psycopg2 import OperationalError, ProgrammingError
-
-import openerp
-from openerp.service.model import PG_CONCURRENCY_ERRORS_TO_RETRY
-from openerp.service import db
-from openerp.tools import config
-from .queue import JobsQueue
-from ..session import ConnectorSessionHandler
-from .job import (OpenERPJobStorage,
- PENDING,
- DONE)
-from ..exception import (NoSuchJobError,
- NotReadableJobError,
- RetryableJobError,
- FailedJobError,
- NothingToDoJob)
-
-_logger = logging.getLogger(__name__)
-
-WAIT_CHECK_WORKER_ALIVE = 30 # seconds
-WAIT_WHEN_ONLY_AFTER_JOBS = 10 # seconds
-WORKER_TIMEOUT = 5 * 60 # seconds
-PG_RETRY = 5 # seconds
-
-
-class Worker(threading.Thread):
- """ Post and retrieve jobs from the queue, execute them"""
-
- queue_class = JobsQueue
- job_storage_class = OpenERPJobStorage
-
- def __init__(self, db_name, watcher_):
- super(Worker, self).__init__()
- self.queue = self.queue_class()
- self.db_name = db_name
- threading.current_thread().dbname = db_name
- self.uuid = unicode(uuid.uuid4())
- self.watcher = watcher_
-
- def run_job(self, job):
- """ Execute a job """
- def retry_postpone(job, message, seconds=None):
- with session_hdl.session() as session:
- job.postpone(result=message, seconds=seconds)
- job.set_enqueued(self)
- self.job_storage_class(session).store(job)
- self.queue.enqueue(job)
-
- session_hdl = ConnectorSessionHandler(self.db_name,
- openerp.SUPERUSER_ID)
- try:
- with session_hdl.session() as session:
- job = self._load_job(session, job.uuid)
- if job is None:
- return
-
- # if the job has been manually set to DONE or PENDING
- # before its execution, stop
- if job.state in (DONE, PENDING):
- return
-
- # the job has been enqueued in this worker but has likely be
- # modified in the database since its enqueue
- if job.worker_uuid != self.uuid:
- # put the job in pending so it can be requeued
- _logger.error('Job %s was enqueued in worker %s but '
- 'was linked to worker %s. Reset to pending.',
- job.uuid, self.uuid, job.worker_uuid)
- with session_hdl.session() as session:
- job.set_pending()
- self.job_storage_class(session).store(job)
- return
-
- if job.eta and job.eta > datetime.now():
- # The queue is sorted by 'eta' date first
- # so if we dequeued a job expected to be run in
- # the future, we have no jobs to do right now!
- self.queue.enqueue(job)
- # Wait some time just to avoid to loop over
- # the same 'future' jobs
- _logger.debug('Wait %s seconds because the delayed '
- 'jobs have been reached',
- WAIT_WHEN_ONLY_AFTER_JOBS)
- time.sleep(WAIT_WHEN_ONLY_AFTER_JOBS)
- return
-
- with session_hdl.session() as session:
- job.set_started()
- self.job_storage_class(session).store(job)
-
- _logger.debug('%s started', job)
- with session_hdl.session() as session:
- job.perform(session)
- job.set_done()
- self.job_storage_class(session).store(job)
- _logger.debug('%s done', job)
-
- except NothingToDoJob as err:
- if unicode(err):
- msg = unicode(err)
- else:
- msg = None
- job.cancel(msg)
- with session_hdl.session() as session:
- self.job_storage_class(session).store(job)
-
- except RetryableJobError as err:
- # delay the job later, requeue
- retry_postpone(job, unicode(err), seconds=err.seconds)
- _logger.debug('%s postponed', job)
-
- except OperationalError as err:
- # Automatically retry the typical transaction serialization errors
- if err.pgcode not in PG_CONCURRENCY_ERRORS_TO_RETRY:
- raise
- retry_postpone(job, unicode(err), seconds=PG_RETRY)
- _logger.debug('%s OperationalError, postponed', job)
-
- except (FailedJobError, Exception):
- buff = StringIO()
- traceback.print_exc(file=buff)
- _logger.error(buff.getvalue())
-
- job.set_failed(exc_info=buff.getvalue())
- with session_hdl.session() as session:
- self.job_storage_class(session).store(job)
- raise
-
- def _load_job(self, session, job_uuid):
- """ Reload a job from the backend """
- try:
- job = self.job_storage_class(session).load(job_uuid)
- except NoSuchJobError:
- # just skip it
- job = None
- except NotReadableJobError:
- _logger.exception('Could not read job: %s', job_uuid)
- raise
- return job
-
- def run(self):
- """ Worker's main loop
-
- Check if it still exists in the ``watcher``. When it does no
- longer exist, it break the loop so the thread stops properly.
-
- Wait for jobs and execute them sequentially.
- """
- while 1:
- # check if the worker has to exit (db destroyed, connector
- # uninstalled)
- if self.watcher.worker_lost(self):
- break
- job = self.queue.dequeue()
- try:
- self.run_job(job)
- except:
- continue
-
- def enqueue_job_uuid(self, job_uuid):
- """ Enqueue a job:
-
- It will be executed by the worker as soon as possible (according
- to the job's priority
- """
- session_hdl = ConnectorSessionHandler(self.db_name,
- openerp.SUPERUSER_ID)
- with session_hdl.session() as session:
- job = self._load_job(session, job_uuid)
- if job is None:
- # skip a deleted job
- return
- job.set_enqueued(self)
- self.job_storage_class(session).store(job)
- # the change of state should be commited before
- # the enqueue otherwise we may have concurrent updates
- # if the job is started directly
- self.queue.enqueue(job)
- _logger.debug('%s enqueued in %s', job, self)
-
-
-class WorkerWatcher(threading.Thread):
- """ Keep a sight on the workers and signal their aliveness.
-
- A `WorkerWatcher` is shared between databases, so only 1 instance is
- necessary to check the aliveness of the workers for every database.
- """
-
- def __init__(self):
- super(WorkerWatcher, self).__init__()
- self._workers = {}
-
- def _new(self, db_name):
- """ Create a new worker for the database """
- if db_name in self._workers:
- raise Exception('Database %s already has a worker (%s)' %
- (db_name, self._workers[db_name].uuid))
- worker = Worker(db_name, self)
- self._workers[db_name] = worker
- worker.daemon = True
- worker.start()
-
- def _delete(self, db_name):
- """ Delete a worker associated with a database """
- if db_name in self._workers:
- # the worker will exit (it checks ``worker_lost()``)
- del self._workers[db_name]
-
- def worker_for_db(self, db_name):
- return self._workers.get(db_name)
-
- def worker_lost(self, worker):
- """ Indicate if a worker is no longer referenced by the watcher.
-
- Used by the worker threads to know if they have to exit.
- """
- return worker not in self._workers.itervalues()
-
- @staticmethod
- def available_db_names():
- """ Returns the databases for the server having
- the connector module installed.
-
- Available means that they can be used by a `Worker`.
-
- :return: database names
- :rtype: list
- """
- if config['db_name']:
- db_names = config['db_name'].split(',')
- else:
- db_names = db.exp_list(True)
- available_db_names = []
- for db_name in db_names:
- session_hdl = ConnectorSessionHandler(db_name,
- openerp.SUPERUSER_ID)
- with session_hdl.session() as session:
- cr = session.cr
- try:
- cr.execute("SELECT 1 FROM ir_module_module "
- "WHERE name = %s "
- "AND state = %s", ('connector', 'installed'),
- log_exceptions=False)
- except ProgrammingError as err:
- no_db_error = 'relation "ir_module_module" does not exist'
- if unicode(err).startswith(no_db_error):
- _logger.debug('Database %s is not an OpenERP database,'
- ' connector worker not started', db_name)
- else:
- raise
- else:
- if cr.fetchone():
- available_db_names.append(db_name)
- return available_db_names
-
- def _update_workers(self):
- """ Refresh the list of workers according to the available
- databases and registries.
-
- A new database can be available, so we need to create a new
- `Worker` or a database could have been dropped, so we have to
- discard the Worker.
- """
- db_names = self.available_db_names()
- # deleted db or connector uninstalled: remove the workers
- for db_name in set(self._workers) - set(db_names):
- self._delete(db_name)
-
- for db_name in db_names:
- if db_name not in self._workers:
- self._new(db_name)
-
- def run(self):
- """ `WorkerWatcher`'s main loop """
- while 1:
- self._update_workers()
- for db_name, worker in self._workers.items():
- self.check_alive(db_name, worker)
- time.sleep(WAIT_CHECK_WORKER_ALIVE)
-
- def check_alive(self, db_name, worker):
- """ Check if the the worker is still alive and notify
- its aliveness.
- Check if the other workers are still alive, if they are
- dead, remove them from the worker's pool.
- """
- session_hdl = ConnectorSessionHandler(db_name,
- openerp.SUPERUSER_ID)
- with session_hdl.session() as session:
- if worker.is_alive():
- self._notify_alive(session, worker)
- session.commit()
- self._purge_dead_workers(session)
- session.commit()
-
- def _notify_alive(self, session, worker):
- _logger.debug('Worker %s is alive on process %s',
- worker.uuid, os.getpid())
- session.env['queue.worker']._notify_alive(worker)
-
- def _purge_dead_workers(self, session):
- session.env['queue.worker']._purge_dead_workers()
-
-
-watcher = WorkerWatcher()
-
-
-def start_service():
- """ Start the watcher """
- watcher.daemon = True
- watcher.start()
-
-# We have to launch the Jobs Workers only if:
-# 0. The alternative connector runner is not enabled
-# 1. OpenERP is used in standalone mode (monoprocess)
-# 2. Or it is used in multiprocess (with option ``--workers``)
-# but the current process is a Connector Worker
-# (launched with the ``openerp-connector-worker`` script).
-if not os.environ.get('ODOO_CONNECTOR_CHANNELS'):
- if (not getattr(openerp, 'multi_process', False) or
- getattr(openerp, 'worker_connector', False)):
- start_service()
diff --git a/connector/tests/__init__.py b/connector/tests/__init__.py
index a2736a025..eed661a13 100644
--- a/connector/tests/__init__.py
+++ b/connector/tests/__init__.py
@@ -22,8 +22,6 @@
from . import test_session
from . import test_event
from . import test_job
-from . import test_queue
-from . import test_worker
from . import test_backend
from . import test_producer
from . import test_connector
diff --git a/connector/tests/test_queue.py b/connector/tests/test_queue.py
deleted file mode 100644
index c533bc9ed..000000000
--- a/connector/tests/test_queue.py
+++ /dev/null
@@ -1,41 +0,0 @@
-# -*- coding: utf-8 -*-
-
-import unittest
-from datetime import timedelta
-
-from openerp.addons.connector.queue.queue import JobsQueue
-from openerp.addons.connector.queue.job import Job
-
-
-def dummy_task(session):
- pass
-
-
-class test_queue(unittest.TestCase):
- """ Test Queue """
-
- def setUp(self):
- self.queue = JobsQueue()
-
- def test_sort(self):
- """ Sort: the lowest priority number has the highest priority.
- A job with a `eta` datetime is less priority in any case.
- """
- job1 = Job(dummy_task, priority=10)
- job2 = Job(dummy_task, priority=5)
- job3 = Job(dummy_task, priority=15,
- eta=timedelta(hours=2))
- job4 = Job(dummy_task, priority=15,
- eta=timedelta(hours=1))
- job5 = Job(dummy_task, priority=1,
- eta=timedelta(hours=2))
- self.queue.enqueue(job1)
- self.queue.enqueue(job2)
- self.queue.enqueue(job3)
- self.queue.enqueue(job4)
- self.queue.enqueue(job5)
- self.assertEqual(self.queue.dequeue(), job2)
- self.assertEqual(self.queue.dequeue(), job1)
- self.assertEqual(self.queue.dequeue(), job4)
- self.assertEqual(self.queue.dequeue(), job3)
- self.assertEqual(self.queue.dequeue(), job5)
diff --git a/connector/tests/test_worker.py b/connector/tests/test_worker.py
deleted file mode 100644
index 499ee33ee..000000000
--- a/connector/tests/test_worker.py
+++ /dev/null
@@ -1,12 +0,0 @@
-# -*- coding: utf-8 -*-
-
-import unittest
-
-from openerp.addons.connector.queue.queue import JobsQueue
-
-
-class test_worker(unittest.TestCase):
- """ Test Worker """
-
- def setUp(self):
- self.queue = JobsQueue()