Skip to content

Commit

Permalink
starting work on tasks for watcher, and more work on docs
Browse files Browse the repository at this point in the history
  • Loading branch information
vsoch committed Jun 6, 2017
1 parent 7e96ae6 commit 4612e13
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 22 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
.*
!/.gitignore
sendit/settings/secrets.py
sendit/logs
sendit/watcher.pid

# Static #
##########
Expand Down
67 changes: 55 additions & 12 deletions docs/watcher.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
# Watcher
The watcher is implemented as a [pyinotify daemon](https://github.com/seb-m/pyinotify/wiki) that is controlled via the [manage.py](../manage.py). If you are more interested about how this module works, it uses [inotify](https://pypi.python.org/pypi/inotify) that comes from the linux Kernel. Specifically, this means that you can start and stop the daemon with the following commands (from inside the Docker image):
The watcher is implemented as a [pyinotify daemon](https://github.com/seb-m/pyinotify/wiki) that is controlled via the [manage.py](../manage.py). If you are more interested about how this module works, it uses [inotify](https://pypi.python.org/pypi/inotify) that comes from the linux Kernel. It's basic function is going to be doing stuff when the filesystem is changed in the location(s) we specify, in the way that we specify.

## Basic Workflow
The basic workflow of the watcher is the following:

1. The user starts the daemon.
2. The process id is stored at `watcher.pid` in the application base folder at `/code/sendit`. Generally, we always check for this file first to stop a running process, stop an old process, or write a new pid. A clean exit will remove an old pid file.
3. Logs for error (watcher.err) and output (watcher.out) are written under [logs](../sendit/logs). We likely (might?) want to clear / update these every so often, in case they get really big. The logging itself is done with the [sendit/logger](../sendit/logger.py).
4. The daemon responds to all events within the `/data` folder of the application. When this happens, we trigger a function that:
- checks for a complete series folder
- if complete, adds to database and starts processing via a celery task


## Usage
Specifically, this means that you can start and stop the daemon with the following commands (from inside the Docker image):

```
python manage.py watcher_start
Expand All @@ -10,7 +24,7 @@ When you start, you will see something like the following:

```
python manage.py watcher_start
DEBUG Adding watch on /data, processed by sendit.apps.watcher.event_processors.AllEventsSignaler
DEBUG Adding watch on /data, processed by sendit.apps.watcher.event_processors.DicomCelery
```

and you will notice a `pid` file for the watcher generated:
Expand Down Expand Up @@ -48,7 +62,7 @@ INOTIFIER_DAEMON_STDOUT = os.path.join(LOG_DIR,'watcher.out')
INOTIFIER_DAEMON_STDERR = os.path.join(LOG_DIR,'watcher.err')
```

`BASE_DIR` refers to the `/code/sendit` directory, meaning that our logs will be found in `/code/sendit/logs`, and out daemon by default will be generated as a file called `watcher.pid` under `/code/sendit`. If there is, for any reason, an error with these paths, the same files will be generated in `/tmp`.
`BASE_DIR` refers to the `/code/sendit` directory, meaning that our logs will be found in `/code/sendit/logs`, and out daemon by default will be generated as a file called `watcher.pid` under `/code/sendit`. If there is, for any reason, an error with these paths, the same files will be generated in `/tmp`.


### Watch Paths
Expand Down Expand Up @@ -132,14 +146,43 @@ For our purposes, since we already have an asyncronous celery queue, what we rea

For better understanding, you can look at the code itself, for each of [watcher_start.py](../sendit/apps/watcher/management/commands/watcher_start.py) and [watcher_stop.py](../sendit/apps/watcher/management/commands/watcher_stop.py).


## Basic Workflow
The basic workflow of the watcher is the following:

1. The user starts the daemon.
2. The process id is stored at `watcher.pid` in the application base folder at `/code`. Generally, we always check for this file first to stop a running process, stop an old process, or write a new pid.
3. Logs for error (watcher.err) and output (watcher.out) are written under [logs](../logs). We likely (might?) want to clear / update these every so often, in case they get really big.
4. The daemon responds to all events within the `/data` folder of the application. When this happens, we trigger a function that:
- checks for a complete series folder
- if complete, adds to database and starts processing
## Testing
I made a testing mode so it's easy to shell into the Docker image and get a feel for starting, and using the watcher specifically for a DICOM directory. It will work by starting the watcher, but generating a directory in the /data folder that begins with any derivation of `test`. First we will start the watcher, and created a test directory that is "finished" because it doesn't start with a '.tmp*' extension:

```
root@0b0b9c4f2a6e:/code# python manage.py watcher_start
DEBUG Adding watch on /data, processed by sendit.apps.watcher.event_processors.DicomCelery
root@0b0b9c4f2a6e:/code# mkdir data/test_finished
```

we can now look in the watcher's output log to see that it identified the finished directory:

```
root@0b0b9c4f2a6e:/code# cat sendit/logs/watcher.out
LOG FINISHED: /data/test_finished
```

Now we can create an unfinished directory, indicated by having an extension `.tmp*`:

```
root@0b0b9c4f2a6e:/code# mkdir data/test_notfinished.tmp1234
```

and we can again look at the output file to see that an addition has been made to indicate the creation of the directory:

```
root@0b0b9c4f2a6e:/code# cat sendit/logs/watcher.out
LOG FINISHED: /data/test_finished
LOG CREATED: /data/test_notfinished.tmp1234
```

and then of course we should stop the watcher.

```
root@0b0b9c4f2a6e:/code# python manage.py watcher_stop
DEBUG Dicom watching has been stopped.
root@0b0b9c4f2a6e:/code#
```

More to come soon, and of course actually writing the function to process the dicoms :) Stay tuned!
127 changes: 127 additions & 0 deletions sendit/apps/main/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
'''
Copyright (c) 2017 Vanessa Sochat
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
'''

from celery.decorators import periodic_task
from celery import (
shared_task,
Celery
)

from celery.schedules import crontab
from sendit.logger import bot
from sendit.apps.main.models import (
Series,
SeriesIdentifiers,
Image
)

from sendit.settings import (
DEIDENTIFY_RESTFUL,
SEND_TO_ORTHANC,
ORTHANC_IPADDRESS,
ORTHANC_PORT,
SEND_TO_GOOGLE,
GOOGLE_CLOUD_STORAGE
)

from django.conf import settings
from django.contrib.auth.models import User
from django.utils import timezone
from datetime import datetime
import os

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'sendit.settings')
app = Celery('sendit')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


@shared_task
def import_dicomdir(dicom_dir):
bot.debug("Importing %s" %(dicom_dir))
bot.warning('Vanessa write me!!')

@shared_task
def get_identifiers(sid):
'''get identifiers is the celery task to get identifiers for
all images in a series, done by way of sending one restful call
to the DASHER endpoint. If DEIDENTIFY_RESTFUL is False
under settings, this function doesn't run
'''
if DEIDENTIFY_RESTFUL is True:
try:
series = Series.objects.get(id=sid)
except Series.DoesNotExist:
bot.error("In get_identifiers: Series %s does not exist." %(sid))
return None

bot.debug("Getting identifiers for %s" %(series))
bot.warning('Vanessa write me!!')
# Send off task here to replace identifiers, which will send to storage
else:
bot.debug("Vanessa write me!")
# Otherwise, just fire off function to send to storage as is.


@shared_task
def replace_identifiers(sid):
'''replace identifiers is called from get_identifiers, given that the user
has asked to deidentify_restful. This function will do the replacement,
and then trigger the function to send to storage
'''
try:
series = Series.objects.get(id=sid)
series_identifiers = SeriesIdentifiers.get(series=series)
except:
bot.error("In replace_identifiers: Series %s or identifiers does not exist." %(sid))
return None

bot.debug("Importing %s" %(dicom_dir))
bot.warning('Vanessa write me!!')

# Do replacement of identifiers
# trigger storage function


@shared_task
def upload_storage(sid):
'''upload storage will send data to OrthanC and/or Google Storage, depending on the
user preference.
'''
try:
series = Series.objects.get(id=sid)
except:
bot.error("In upload_storage: Series %s does not exist." %(sid))
return None

if SEND_TO_ORTHANC is True:
bot.log("Sending %s to %s:%s" %(series,ORTHANC_IPADDRESS,ORTHANC_PORT)
# do the send here!

if SEND_TO_GOOGLE is True and GOOGLE_CLOUD_STORAGE not in [None,""]:
bot.log("Uploading to Google Storage %s" %(GOOGLE_CLOUD_STORAGE))
# GOOGLE_CLOUD_STORAGE
bot.warning('Vanessa write me!!')


25 changes: 19 additions & 6 deletions sendit/apps/watcher/event_processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,32 @@ def is_finished(self,path):
else:
return False

def process_IN_CREATE(self, event):
'''Create should be called when the path is created (or modified)
NOTE: if this isn't the case, use modify instead.
def check_dicomdir(self,event):
'''check_dicomdir is the main function to call on a folder
creation or modification, which both could signal new dicom directories
'''
if self.is_finished(event.pathname):
bot.log("FINISHED: %s" %(event.pathname))

# Here is the celery task to use
import_dicomdir.apply_async(kwargs={"dicom_dir":event.pathname})
if event.pathname.lower().startswith("test"):
bot.log("Here would be call to import_dicomdir for %s" %(event.pathname))
else:
# Here is the celery task to use
import_dicomdir.apply_async(kwargs={"dicom_dir":event.pathname})
else:
bot.log("CREATED: %s" %(event.pathname))


def process_IN_CREATE(self, event):
'''Create should be called when the path is created (or modified)
NOTE: if this isn't the case, use modify instead.
'''
return self.check_dicomdir(event)

def process_IN_MODIFY(self, event):
'''Modify should do the equivalent of create
'''
return self.check_dicomdir(event)


class AllEventsPrinter(pyinotify.ProcessEvent):
'''A class to print every event.
Expand Down
7 changes: 4 additions & 3 deletions sendit/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

'''
logger.py: Simple logger for sendit.
logger.py: Simple logger for sendit. Note that levels info and log are the
only two considered stdout, the rest are sent to stderr.
Copyright (c) 2017 Vanessa Sochat
Expand Down Expand Up @@ -92,7 +93,6 @@ def emitError(self,level):
if level in [ABRT,
ERROR,
WARNING,
LOG,
VERBOSE,
VERBOSE1,
VERBOSE2,
Expand All @@ -105,7 +105,8 @@ def emitError(self,level):
def emitOutput(self,level):
'''determine if a level should print to stdout
only includes INFO'''
if level in [INFO]:
if level in [LOG,
INFO]:
return True
return False

Expand Down
2 changes: 1 addition & 1 deletion sendit/settings/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
# Orthanc Storage
SEND_TO_ORTHANC=True
ORTHANC_IPADDRESS="127.0.0.1"
ORTHAC_PORT=4747
ORTHANC_PORT=4747

# Google Storage
# Should we send to Google at all?
Expand Down

0 comments on commit 4612e13

Please sign in to comment.