Skip to content

Latest commit

 

History

History
386 lines (269 loc) · 14.7 KB

UPDATES_CELERY.adoc

File metadata and controls

386 lines (269 loc) · 14.7 KB

BillMap Celery Tasks: What they do.

Celery, a Python task runner, runs scheduled tasks to scrape and process data. To run Celery, install Redis (as a taskrunner and db for Celery). The other dependencies are installed with pip when initially installing the requirements for the app with pip install -r requirements.txt.

The main celery app is in server_py/flatgov/flatgov/celery.py and the schedule of tasks is defined in app.conf.beat_schedule there. Logs for celery tasks are stored in /var/log/celery, e.g. /var/log/celery/link_celery.log`, /var/log/celery/link_celery-2.log, /var/log/celery/link_celery-3.log etc. This repository includes four main tasks with some subtasks of the uscongress task:

uscongress

update uscongress bill and metadata

statementAdminPolicy

updates statements of administration policy from the current White House (OMB) website

committeereport

update committee reports associated with bills

cbo

update cbo reports associated with bills

crs

update crs reports associated with bills

US Congress Task

Within the US Congress task, there are six celery tasks that run sequentially: update_bill_task, bill_data_task, process_bill_meta_task, related_bill_task, elastic_load_task, bill_similarity_task

The tasks after update_bill_task are triggered upon save of the status of update_bill_task. After each task is run, its status is saved in the database in the UscongressUpdateJob table (see below). The status of the tasks is displayed in the /admin/ page. The six tasks are:

update_bill_task

Download uscongress bill and metadata using sitemaps to efficiently determine what needs to be updated.

With the open source scraper itself, we run ./run govinfo --collections=BILLS --congress=117 --extract=mods,xml,premis --bulkdata=BILLSTATUS. The status of this process is stored as fdyss_status.

Then ./run bills

It will create data.json out of data.xml and text_versions. The status of this is stored as data_status.

bill_data_task

This, and the following task are now run from a Go executable.

Creates billList.json and billsMeta.json. Once the celery task update_bill_task is finished (complete the download bill text and metadata), the bill_data_task runs. This is triggered in the save function of the uscongress.models.py:

 def save(self, *args, **kwargs):
        super().save(*args, **kwargs)
        if self.pk and self.data_status == self.SUCCESS and self.bill_status == self.PENDING:
            current_app.send_task(
                'uscongress.tasks.bill_data_task',
                args=(self.pk, ),
                queue='bill'
            )
        if self.pk and self.bill_status == self.SUCCESS and self.meta_status == self.PENDING:
            current_app.send_task(
                'uscongress.tasks.process_bill_meta_task',
                args=(self.pk, ),
                queue='bill'
            )
			...

The task bill_data_task creates billList.json and billsMeta.json file with the list in saved field and dump related bill json files.

data.json files at the top level (not the data.json in the text versions) are used to create metadata.

process_bill_meta_task

After completing bill_data_task, process_bill_meta_task runs. This processes and organizes the metadata for bills.

related_bill_task

This task creates bill instances in the Bill table in the database.

elastic_load_task

Parses xml into sections and loads the bill and sections into Elasticsearch. The xml for bill similarity is in the text_versions field.

bill_similarity_task

Runs the similarity task on new bills, which are found in the new bill list of the saved field in the UscongressUpdateJob table record.

UscongressUpdateJob table data (status)

When celery tasks run, a UscongressUpdateJob table record is created in the database to track the task status.

The fields in the UscongressUpdateJob:

  • job_id : celery task id

  • fdsys_status : choice field (pending, success, failed) : It represents the status of uscongress bill download (fdsys and text versions) what the celery task update_bill_task does. Once it’s finished, the field value turns to success or failed

  • saved : the list of bill congress numbers downloaded by running the celery task update_bill_task.

  • skips : the list of bill congress numbers skipped by running the celery task update_bill_task.

  • data_status : choice field (pending, success, failed) : Once download is finished, the celery task update_bill_task creates data.json out of data.xml and text_versions that is exactly same as the ./run bills does.

  • bill_status : choice field (pending, success, failed) : After creating data.json, we creates billList.json and billsMeta.json by running another celery task named bill_data_task. the field represents the status of the celery task to see if it’s finished (succeed or failed)

  • meta_status : choice field (pending, success, failed) : After creating billList.json and billsMeta.json, we process the metadata by running the celery task named process_bill_meta_task. It represents the status of the task to see if it’s finished or in pending.

  • related_status : choice field (pending, success, failed) : Once the celery task process_bill_meta_task is finished, the other celery task named related_bill_task to get related bills. This field represents the status of the celery task.

  • elastic_status : choice field (pending, success, failed) : Once the task related_bill_task is finished, the other celery task named elastic_load_task runs in order to update loading of the new bills into Elasticsearch. This field represents the status of the task.

  • similarity : choice field (pending, success, failed) : After finishing elastic_load_task, we update es_similarity field of each bill in the database. This is the field of the task status.

  • created : the date time field represents the time when the record is created.

Flat structure

├── 110 │ ├── dtd │ └── pdf ├── 111 │ ├── dtd │ └── pdf ├── 112 │ ├── dtd │ └── pdf ├── 113 │ ├── dtd └── pdf ├── 114 │ ├── dtd │ ├── pdf ├── 115 │ ├── dtd │ ├── pdf ├── 115-bk │ ├── dtd │ ├── pdf ├── 116 │ ├── dtd │ ├── pdf

Statements of Administration Policy

Found in server_py/flatgov/common/management, the Statements of Administration Policy task (currently 'biden_statements.py') scrapes the links of SAP from the White House website and stores to the database using the original_pdf_link as a unique field to avoid duplicates.

Committee Reports

TODO: describe the celery task for committee reports

CBO Scores

The CBO Scores task (in common/tasks.py, referring to common/cbo.py) processes metadata from bill status XML, to retrieve the 'cboCostEstimates' field. Once all of these are collected, it checks the database for each entry and stores any new entries.

CRS Reports

The CRS Reports task runs the CRS scraper, described in CRS_REPORTS.

Celery installation and set-up

Celery Schedule

The Celery tasks are run on a schedule by celery-beat. The schedule is defined in https://github.com/aih/FlatGov/blob/main/server_py/flatgov/flatgov/celery.py

For example, a CSV of the CRS documents is created every night at midnight here:

'crs_scraper_daily': {
'task': 'bills.tasks.crs_task',
'schedule': crontab(minute=0, hour=1),
'options': {'queue': 'bill'}
},

How to run a celery task

Development

For a celery worker, open one terminal, go to the Django project root directory (in our case …​/Flatgov/server_py/flatgov), then activate the virtual environment.

Run the command below (Run the celery worker).

celery worker -Q bill -A flatgov.celery:app -n flatgov.%%h --loglevel=info

For celery scheduler, open another terminal, go to the Django project root directory (in our case …​/Flatgov/server_py/flatgov), then activate the virtual environment.

Run the command below (Run the celery redbeat)

celery beat -S redbeat.RedBeatScheduler -A flatgov.celery:app --loglevel=info

Then the background tasks (celery tasks ) run daily at midnight.

Production: Run Celeryd as a daemon on the Ubuntu server

  1. Init-script: celeryd

Before configuring it, go to the deployment_scripts/conf_celeryd and update all the paths with the absolute paths

Copy deployment_scripts/bill_celeryd file to /etc/init.d/celeryd.

Make celeryd executable (Run following commands from the terminal.)

sudo chmod 755 /etc/init.d/celeryd

sudo chown root:root /etc/init.d/celeryd

For configuration, copy deployment_scripts/conf_celeryd file to /etc/default/celeryd.

You can check if the worker is active by:

(flatgov) ubuntu:/opt/flatgov/FlatGov/server_py/flatgov/flatgov$  sudo chown -R ubuntu:ubuntu /var/run/celery/
(flatgov) ubuntu/opt/flatgov/FlatGov/server_py/flatgov/flatgov$ sudo chown -R ubuntu:ubuntu /var/log/celery/
(flatgov) ubuntu:/opt/flatgov/FlatGov/server_py/flatgov/flatgov$ sudo /etc/init.d/celeryd start
celery init v10.1.
Using config script: /etc/default/celeryd
celery multi v4.4.2 (cliffs)
> Starting nodes...
	> celery@ip-172-31-58-205: OK
Note
On Ubuntu, using the default ubuntu user, the settings are as follows.
CELERY_BIN="/home/ubuntu/.pyenv/versions/flatgov/bin/celery"
CELERY_APP="flatgov.celery:app"

CELERYD_CHDIR="/opt/flatgov/FlatGov/server_py/flatgov/"
CELERYD_OPTS="--time-limit=300 --concurrency=3 -Q bill -l INFO"
CELERYD_LOG_FILE="/var/log/celery/link_%n%I.log"
CELERYD_PID_FILE="/var/run/celery/link_%n.pid"
CELERYD_USER="ubuntu"
CELERYD_GROUP="ubuntu"
CELERY_CREATE_DIRS=1

To test:

(flatgov) ubuntu:/opt/flatgov/FlatGov/server_py/flatgov/flatgov$ sudo /etc/init.d/celeryd status
celery init v10.1.
Using config script: /etc/default/celeryd
celeryd (node link_celery) (pid 26679) is up...

+ 2. Init-script: celerybeat

Before configuring it, go to the deployment_scripts/conf_celerybeat and update all the paths with the absolute paths

Copy deployment_scripts/celerybeat file to /etc/init.d/celerybeat.

Make celerybeat executable (Run following commands from the terminal.)

sudo chmod 755 /etc/init.d/celerybeat

sudo chown root:root /etc/init.d/celerybeat

For configuration, copy deployment_scripts/conf_celerybeat file to /etc/default/celerybeat.

Then sudo chown root:root '/etc/default/celerybeat' sudo chmod 640 '/etc/default/celerybeat'

You can check if the beat is active by:

sudo /etc/init.d/celerybeat start

sudo /etc/init.d/celerybeat status

On ubuntu, with a 'flatgov' virtualenv, the settings are as follows:

"/etc/default/celerybeat"

CELERY_BIN="/home/ubuntu/.pyenv/versions/3.8.3/envs/flatgov/bin/celery"
CELERY_APP="flatgov.celery:app"
CELERYBEAT_CHDIR="/opt/flatgov/FlatGov/server_py/flatgov"
CELERYBEAT_USER="ubuntu"
CELERYBEAT_GROUP="ubuntu"
CELERYBEAT_OPTS="--schedule=/var/run/celery/celerybeat-schedule"

+ 3. Maintenance

As was shown, the following commands control worker and beat:

/etc/init.d/celeryd {start|stop|restart}

/etc/init.d/celerybeat {start|stop|restart}

The celerybeat user may also need to be set to ubuntu

+ 4. Run a task manually

If you need to run a task manually (e.g. to test, or to get data off schedule), run a separate Celery worker:

(flatgov) ubuntu:/opt/flatgov/FlatGov/server_py/flatgov$ celery worker -Q bill -A flatgov.celery:app -n flatgov.%%h --loglevel=info

 -------------- celery@flatgov.%ip-... v4.4.2 (cliffs)
--- ***** -----
-- ******* ---- Linux-5.4.0-1041-aws-x86_64-with-glibc2.27 2021-04-01 18:18:04
- *** --- * ---
- ** ---------- [config]

Then in a separate terminal run pyenv activate flatgov. Then:

(flatgov) ubuntu:/opt/flatgov/FlatGov/server_py/flatgov$ python manage.py shell
Python 3.8.3 (default, Sep 24 2020, 22:52:34)
[GCC 7.5.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
(InteractiveConsole)
>>> from bills.tasks import sap_biden_task
>>> from celery import current_app
>>> current_app.send_task('bills.tasks.sap_biden_task', queue='bill')
<AsyncResult: a5d7d336-0125-4bdf-8819-5628b2341081>

OR for the uscongress update task:

>>> from uscongress.tasks import update_bill_task
>>> from celery import current_app
>>> current_app.send_task('uscongress.tasks.update_bill_task', queue='bill')
<AsyncResult: f05d3449-d473-498f-b6f0-87f663cd20e3>

Then you can track the task by looking in the celery logs, or on the original celery terminal, e.g.:

2021-04-01 18:27:47,069: WARNING/ForkPoolWorker-1] 2021-04-01 18:27:47 [scrapy.statscollectors] INFO: Dumping Scrapy stats:
{'downloader/request_bytes': 486,
 'downloader/request_count': 2,
 'downloader/request_method_count/GET': 2,
 'downloader/response_bytes': 27570,
 'downloader/response_count': 2,
 'downloader/response_status_count/200': 2,
 'elapsed_time_seconds': 0.393221,
 'finish_reason': 'finished',
 'finish_time': datetime.datetime(2021, 4, 1, 18, 27, 47, 68878),
 'item_scraped_count': 10,
 'log_count/DEBUG': 12,
 'log_count/INFO': 10,
 'log_count/WARNING': 22,
 'memusage/max': 83107840,
 'memusage/startup': 83107840,
 'response_received_count': 2,
 'robotstxt/request_count': 1,
 'robotstxt/response_count': 1,
 'robotstxt/response_status_count/200': 1,
 'scheduler/dequeued': 1,
 'scheduler/dequeued/memory': 1,
 'scheduler/enqueued': 1,
 'scheduler/enqueued/memory': 1,
 'start_time': datetime.datetime(2021, 4, 1, 18, 27, 46, 675657)}
[2021-04-01 18:27:47,069: INFO/ForkPoolWorker-1] Spider closed (finished)

For a different task, e.g. CommitteeDocument, the commands are: celery worker -Q bill -A flatgov.celery:app -n flatgov.%%h --loglevel=info

  • Open another shell and run django shell →

python manage.py shell
from celery import current_app
current_app.send_task("bills.tasks.committee_report_scrapy_task", queue="bill")

See image::media/celery-task-manual.png[Manual Celery Task,300,200]

Then you can keep track of the task status on the terminal that celery is running on or you can see the CommitteeDocument records in the django admin dashboard. The initial data loading will take a long time; there are about 17,000 records.

Redis: install and start

Running Celery requires Redis. To set up and get Redis working see below. Also see the instructions on the [Redis website](https://redis.io/).:

  • On Ubuntu:

Install

$ sudo apt update
$ sudo apt install redis-server
Reading package lists... Done
Fetched 634 kB in 0s (24.3 MB/s)...

Start sudo systemctl restart redis.service

To confirm that it is running: sudo systemctl status redis

If necessary, edit /etc/redis/redis.conf. Our set-up should not require any special settings