Skip to content

Commit

Permalink
adding finish of function to derive groups (values, fields)
Browse files Browse the repository at this point in the history
Signed-off-by: vsoch <vsochat@stanford.edu>
  • Loading branch information
vsoch committed Mar 8, 2020
1 parent 6fc5e53 commit 5e69a3b
Show file tree
Hide file tree
Showing 8 changed files with 292 additions and 22 deletions.
2 changes: 1 addition & 1 deletion deid/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ def has_actions(self):

def listof(self, section):
"""return a list of keys for a section"""
listing = self._get_section(section)
listing = self._get_section(section) or {}
return list(listing.keys())

def ls_filters(self):
Expand Down
11 changes: 9 additions & 2 deletions deid/config/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
# pylint: skip-file

from deid.logger import bot
from deid.utils import read_file
from deid.utils import read_file, get_installdir
from deid.data import data_base
from deid.config.standards import (
formats,
Expand Down Expand Up @@ -208,6 +208,9 @@ def find_deid(path=None):
path: a path on the filesystem. If not provided, will assume PWD.
"""
# A default deid will be loaded if all else fails
default_deid = os.path.join(get_installdir(), "data", "deid.dicom")

if path is None:
path = os.getcwd()

Expand All @@ -218,7 +221,11 @@ def find_deid(path=None):
]

if len(contenders) == 0:
bot.exit("No deid settings files found in %s, exiting." % (path))
bot.warning(
"No deid settings files found in %s, will use default dicom.deid."
% path
)
contenders.append(default_deid)

elif len(contenders) > 1:
bot.warning("Multiple deid files found in %s, will use first." % (path))
Expand Down
1 change: 1 addition & 0 deletions deid/dicom/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def perform_action(dicom, action, item=None, fields=None, return_seen=False):
"action" (eg, REPLACE) what to do with the field
"value": if needed, the field from the response to replace with
"""
print(action)
field = action.get("field") # e.g: PatientID, endswith:ID, values:name, fields:name
value = action.get("value") # "suid" or "var:field"
action = action.get("action") # "REPLACE"
Expand Down
4 changes: 4 additions & 0 deletions deid/dicom/fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ def find_by_values(values, dicom):
"""Given a list of values, find fields in the dicom that contain any
of those values, as determined by a regular expression search.
"""
# Values must be strings
values = [str(x) for x in values]
print(values)

fields = []
contenders = get_fields(dicom)

Expand Down
42 changes: 25 additions & 17 deletions deid/dicom/header.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,17 @@
from deid.logger import bot
from deid.utils import read_json

from .tags import remove_sequences
from .groups import extract_values_list, extract_fields_list

from deid.dicom.tags import get_private
from deid.config import DeidRecipe

from pydicom import read_file

from .utils import save_dicom
from .actions import perform_action
from pydicom.dataset import Dataset
from deid.dicom.utils import save_dicom
from deid.dicom.actions import perform_action
from deid.dicom.tags import remove_sequences, get_private
from deid.dicom.groups import extract_values_list, extract_fields_list
from deid.dicom.fields import get_fields

from .fields import get_fields
from pydicom.dataset import Dataset

import os

Expand Down Expand Up @@ -137,8 +135,6 @@ def get_identifiers(
skip_fields: if not None, added fields to skip
"""
bot.debug("Extracting identifiers for %s dicom" % (len(dicom_files)))

if config is None:
config = "%s/config.json" % here

Expand All @@ -149,6 +145,7 @@ def get_identifiers(
if not isinstance(dicom_files, list):
dicom_files = [dicom_files]

bot.debug("Extracting identifiers for %s dicom" % len(dicom_files))
ids = dict() # identifiers

# We will skip PixelData
Expand All @@ -159,7 +156,13 @@ def get_identifiers(
skip = skip + skip_fields

for dicom_file in dicom_files:
dicom = read_file(dicom_file, force=True)

# TODO: this should have shared reader class to hold dicom, ids, etc.
if isinstance(dicom_file, Dataset):
dicom = dicom_file
dicom_file = dicom.filename
else:
dicom = read_file(dicom_file, force=force)

if dicom_file not in ids:
ids[dicom_file] = dict()
Expand Down Expand Up @@ -253,7 +256,12 @@ def replace_identifiers(
# Parse through dicom files, update headers, and save
updated_files = []
for _, dicom_file in enumerate(dicom_files):
dicom = read_file(dicom_file, force=force)

if isinstance(dicom_file, Dataset):
dicom = dicom_file
dicom_file = dicom.filename
else:
dicom = read_file(dicom_file, force=force)
dicom_name = os.path.basename(dicom_file)
fields = dicom.dir()

Expand All @@ -266,19 +274,19 @@ def replace_identifiers(
if dicom_file in ids:

# Prepare additional lists of values and fields (updates item)
if deid.has_values_lists():
for group, actions in deid.get_values_lists().items():
if recipe.has_values_lists():
for group, actions in recipe.get_values_lists().items():
ids[dicom_file][group] = extract_values_list(
dicom=dicom, actions=actions
)

if deid.has_fields_lists():
for group, actions in deid.get_fields_lists().items():
if recipe.has_fields_lists():
for group, actions in recipe.get_fields_lists().items():
ids[dicom_file][group] = extract_fields_list(
dicom=dicom, actions=actions
)

for action in deid.get_actions():
for action in recipe.get_actions():
dicom = perform_action(
dicom=dicom, item=ids[dicom_file], action=action
)
Expand Down
7 changes: 5 additions & 2 deletions deid/tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,14 @@ def test_load_deid(self):
config = load_deid(os.path.dirname(self.deid))
self.assertTrue("format" in config)

print("Case 3: Testing error on non-existing load")
print("Case 3: Testing error on non-existing load of file")
with self.assertRaises(SystemExit) as cm:
config = load_deid(self.tmpdir)
config = load_deid(os.path.join(self.tmpdir, "deid.doesnt-exist"))
self.assertEqual(cm.exception.code, 1)

print("Case 4: Testing load of default deid.")
config = load_deid(self.tmpdir)

def test_find_deid(self):

print("Testing finding deid file, referencing directly.")
Expand Down
133 changes: 133 additions & 0 deletions deid/tests/test_deid_recipe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
#!/usr/bin/env python

"""
Test DeidRecipe class
Copyright (c) 2020 Vanessa Sochat
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""

import unittest
import tempfile
import shutil
import json
import os

from deid.config import DeidRecipe
from deid.utils import get_installdir


class TestConfig(unittest.TestCase):
def setUp(self):
self.pwd = get_installdir()
self.deid = os.path.abspath("%s/../examples/deid/deid.dicom" % self.pwd)
self.tmpdir = tempfile.mkdtemp()
print("\n######################START######################")

def tearDown(self):
shutil.rmtree(self.tmpdir)
print("\n######################END########################")

def test_load_recipe(self):

print("Case 1: Test loading default DeidRecipe")

recipe = DeidRecipe()

self.assertTrue(isinstance(recipe.deid, dict))

print("Checking basic sections are loaded")
print(recipe.deid.keys())
for section in ["header", "format", "filter"]:
self.assertTrue(section in recipe.deid)

print("Case 2: Loading from file")
recipe = DeidRecipe(self.deid)

def test_get_functions(self):

recipe = DeidRecipe(self.deid)

# Format
self.assertEqual(recipe.get_format(), "dicom")

# Actions for header
print("Testing get_actions")
actions = recipe.get_actions()
self.assertTrue(isinstance(actions, list))
for key in ["action", "field", "value"]:
self.assertTrue(key in actions[0])
self.assertTrue(recipe.has_actions())

# Filters
print("Testing get_filters")
filters = recipe.get_filters()
self.assertTrue(isinstance(filters, dict))

# whitelist, blacklist, graylist
for key in recipe.ls_filters():
self.assertTrue(key in filters)

recipe = DeidRecipe()
filters = recipe.get_filters()
self.assertTrue(isinstance(filters["whitelist"], list))

# Test that each filter has a set of filters, coords, name
for key in ["filters", "coordinates", "name"]:
self.assertTrue(key in filters["whitelist"][0])

# Each filter is a list of actions, name is string, coords are list
self.assertTrue(isinstance(filters["whitelist"][0]["filters"], list))
self.assertTrue(isinstance(filters["whitelist"][0]["name"], str))
self.assertTrue(isinstance(filters["whitelist"][0]["coordinates"], list))

# Check content of the first filter
for key in ["action", "field", "operator", "InnerOperators", "value"]:
self.assertTrue(key in filters["whitelist"][0]["filters"][0])

# Fields and Values
print("Testing get_fields_lists and get_values_lists")
self.assertEqual(recipe.get_fields_lists(), None)
self.assertEqual(recipe.get_values_lists(), None)
self.assertEqual(recipe.ls_fieldlists(), [])
self.assertEqual(recipe.ls_valuelists(), [])
self.assertTrue(not recipe.has_fields_lists())
self.assertTrue(not recipe.has_values_lists())

# Load in recipe with values and fields
deid = os.path.abspath("%s/../examples/deid/deid.dicom-groups" % self.pwd)
recipe = DeidRecipe(deid)

assert "values" in recipe.deid
assert "fields" in recipe.deid
self.assertTrue(isinstance(recipe.deid["values"], dict))
self.assertTrue(isinstance(recipe.deid["fields"], dict))

self.assertTrue(recipe.get_fields_lists() is not None)
self.assertTrue(recipe.get_values_lists() is not None)
self.assertEqual(recipe.ls_fieldlists(), ["instance_fields"])
self.assertEqual(recipe.ls_valuelists(), ["cookie_names", "operator_names"])
self.assertTrue(recipe.has_fields_lists())
self.assertTrue(recipe.has_values_lists())


if __name__ == "__main__":
unittest.main()
Loading

0 comments on commit 5e69a3b

Please sign in to comment.