202 lines
6.8 KiB
Python
Executable file
202 lines
6.8 KiB
Python
Executable file
#!/usr/bin/env python3
|
||
"""Reproducible (DICOM) archive builder.
|
||
|
||
This utility generates a TAR archive from a directory containing DICOM files.
|
||
|
||
The input directory can have any number of files, with any organization or
|
||
naming. However, the DICOM files are assumed to come from a single "visit"
|
||
(i.e., the time between a person or sample entering and then leaving a
|
||
scanner). The input directory's content is copied into a TAR archive verbatim,
|
||
with no changes to filenames or organization.
|
||
|
||
In order to generate reproducible TAR archives, the file order, recorded
|
||
permissions and ownership, and modification times are standardized. All files
|
||
in the TAR archive are declared to be owned by root/root (uid/gid: 0/0) with
|
||
0644 permissions. The modification time of any DICOM file is determined
|
||
by its contained DICOM `StudyDate/StudyTime` timestamps. The modification time
|
||
for any non-DICOM file is set to the latest timestamp across all DICOM files.
|
||
"""
|
||
import os
|
||
import tarfile
|
||
from datetime import datetime
|
||
from hashlib import md5
|
||
from pathlib import Path
|
||
from typing import Dict
|
||
|
||
from tqdm import tqdm
|
||
|
||
# this implementation works with pydicom 2x
|
||
from pydicom import (
|
||
dcmread,
|
||
)
|
||
from pydicom.valuerep import (
|
||
DA,
|
||
TM,
|
||
)
|
||
from pydicom.errors import InvalidDicomError
|
||
|
||
|
||
default_date = datetime(1970, 1, 1)
|
||
|
||
|
||
def scan_dir(path: Path) -> Dict:
|
||
for p in tqdm(path.rglob('*'), desc='Scanning', unit=' files'):
|
||
if p.is_dir():
|
||
continue
|
||
try:
|
||
# determine a reproducible timestamp for this DICOM file
|
||
# based on required attributes (0008, 0020) and (0008, 0030)
|
||
with dcmread(p) as dcm:
|
||
studydate = DA(dcm.StudyDate)
|
||
studytime = TM(dcm.StudyTime)
|
||
timestamp = datetime.combine(studydate, studytime)
|
||
yield (p, timestamp)
|
||
except InvalidDicomError:
|
||
# this is not a DICOM file, report path without timestamp
|
||
yield (p, None)
|
||
|
||
|
||
def write_archive(
|
||
dest_path: Path,
|
||
input_base_dir: Path,
|
||
content: Dict,
|
||
default_timestamp: datetime,
|
||
):
|
||
# might have ben done already, but the check is cheap, so do a localized
|
||
# one here
|
||
if dest_path.exists():
|
||
# be safe
|
||
raise ValueError(
|
||
f'output path {dest_path} already exists, refusing to overwrite')
|
||
|
||
dest_path.parent.mkdir(parents=True, exist_ok=True)
|
||
|
||
archive_content_base_dir = '_'.join(dest_path.parts[-2:])
|
||
# be safe
|
||
assert archive_content_base_dir.endswith('_dicom.tar')
|
||
# strip _dicom.tar
|
||
archive_content_base_dir = archive_content_base_dir[:-len('_dicom.tar')]
|
||
|
||
# write uncompressed TAR
|
||
with tarfile.open(dest_path, "w") as tar:
|
||
# order of member in archive is significant, sort by path
|
||
for p in tqdm(sorted(content), 'Composing archive', unit=' files'):
|
||
tinfo = tar.gettarinfo(name=p)
|
||
# adjust properties to make archive builds reproducible
|
||
tinfo = normalize_tarinfo(
|
||
tinfo,
|
||
archive_content_base_dir,
|
||
# go with the reported timestamp from DICOM or with default
|
||
content[p] or default_timestamp,
|
||
)
|
||
# ingest into archive
|
||
with p.open('rb') as fp:
|
||
tar.addfile(tinfo, fp)
|
||
|
||
|
||
def normalize_tarinfo(tinfo, archive_path, timestamp):
|
||
# strip first level and replace with generated archive root dir name
|
||
tinfo.name = str(Path(archive_path, *Path(tinfo.name).parts[1:]))
|
||
# be safe
|
||
tinfo.uid = 0
|
||
tinfo.gid = 0
|
||
tinfo.uname = 'root'
|
||
tinfo.gname = 'root'
|
||
tinfo.mtime = timestamp.timestamp()
|
||
if tinfo.isfile():
|
||
# for any regular file normalize the permission
|
||
# leave unexpected extra-ordinary content untouched
|
||
tinfo.mode = int('0o100644', 8)
|
||
return tinfo
|
||
|
||
|
||
def get_archive_path(
|
||
output_base_dir: Path, study_id: str, visit_id: str) -> Path:
|
||
return output_base_dir / study_id / f'{visit_id}_dicom.tar'
|
||
|
||
|
||
def generate_md5sum_file(path):
|
||
md5sum = md5()
|
||
with path.open('rb') as fp:
|
||
# in-line def works from PY3.8+
|
||
# MD5 has 128-byte digest blocks (8192 is 128×64)
|
||
while chunk := fp.read(8192):
|
||
md5sum.update(chunk)
|
||
Path(f'{path}.md5sum').write_text(
|
||
# yes, two spaces!
|
||
f'{md5sum.hexdigest()} {path.name}\n'
|
||
)
|
||
|
||
|
||
def main(input_base_dir: str,
|
||
output_base_dir: str,
|
||
study_id: str,
|
||
visit_id: str):
|
||
input_base_dir = Path(input_base_dir)
|
||
output_base_dir = Path(output_base_dir)
|
||
# get and check archive path
|
||
archive_path = get_archive_path(
|
||
output_base_dir,
|
||
study_id,
|
||
visit_id,
|
||
)
|
||
if archive_path.exists():
|
||
# be safe
|
||
raise ValueError(
|
||
f'{archive_path} already exists, refusing to overwrite')
|
||
|
||
# scan input directory, get a mapping of path->timestamp
|
||
content = dict(scan_dir(input_base_dir))
|
||
# for non-DICOM file the timestamp is `None`, determine "youngest"
|
||
# timestamp and use in such cases
|
||
default_timestamp = (
|
||
sorted(v for v in content.values() if v) or [default_date])[-1]
|
||
|
||
write_archive(
|
||
archive_path,
|
||
input_base_dir,
|
||
content,
|
||
default_timestamp,
|
||
)
|
||
# be nice (?) an give the generated archive the mtime of the DICOM set
|
||
os.utime(
|
||
archive_path,
|
||
times=(
|
||
# access time
|
||
datetime.now().timestamp(),
|
||
# modification time
|
||
default_timestamp.timestamp(),
|
||
),
|
||
)
|
||
|
||
generate_md5sum_file(archive_path)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
import argparse
|
||
p = argparse.ArgumentParser(description=__doc__)
|
||
p.add_argument(
|
||
"-o", "--output-dir", metavar='PATH', default=os.getcwd(),
|
||
help="Base directory to place the archive structure in. "
|
||
"The corresponding '<study-id>/' subdirectory for the "
|
||
"study is created automatically, if needed")
|
||
p.add_argument(
|
||
'--id', nargs=2, metavar=('STUDY-ID', 'VISIT-ID'), required=True,
|
||
help="The study and visit identifiers, used to name and "
|
||
"locate the generated archive in the storage organization. "
|
||
"The study identifier must be globally unique in the storage "
|
||
"system, and the visit identifier must be unique within the "
|
||
"collection of visits in the given study")
|
||
p.add_argument(
|
||
'input_dir', metavar='<input-dir>',
|
||
help="Directory with the files to place into the visit archive. "
|
||
"The input base directory itself is not put into the archive "
|
||
"(i.e., its own name is irrelevant). Instead, a top-level "
|
||
"directory with the name '<study-id>_<visit_id>' is used "
|
||
"to place all archive content in.")
|
||
args = p.parse_args()
|
||
main(input_base_dir=args.input_dir,
|
||
output_base_dir=args.output_dir,
|
||
study_id=args.id[0],
|
||
visit_id=args.id[1],
|
||
)
|