inm-icf-utilities/bin/dataladify_studyvisit_from_meta
Michael Hanke eedba8902c Switch over to the archivist remote
This is coming with https://github.com/datalad/datalad-next/pull/380

I manually tested data access (with external redirection of archive
download to a local cache). Something like this:

```
git \
   -c 'remote.uncurl.uncurl-url=file:///tmp/icfstore/{storepath}' \
   -c 'remote.uncurl.uncurl-match=https://data.inm-icf.de/(?P<storepath>.*)$' \
   annex get \
   P000624_P000624/incoming/...2.48.14.966.65880823.dcm
```

works like charm.
2023-05-31 13:30:27 +02:00

204 lines
6.7 KiB
Python
Executable file

#!/usr/bin/env python3
"""
"""
import json
import os
from pathlib import Path
import sys
import tempfile
import datalad.api as dl
# this points to the top of the ICF data store.
# internally it will be amended with the missing components
# for study and visit deposit locations
icfstore_baseurl = 'https://data.inm-icf.de'
# which DICOM tags to extract from DICOM files and store as
# git-annex metadata (e.g., to enable metadata-driven views
# of visit datasets)
dicom_metadata_keys = [
"SeriesDescription",
"SeriesNumber",
"Modality",
"MRAcquisitionType",
"ProtocolName",
"PulseSequenceName",
]
def main(store_dir: str,
study_id: str,
visit_id: str):
store_base_dir = Path(store_dir)
# where to deposit the final datalad dataset
repo_base_path = store_base_dir / study_id / f'{visit_id}_'
deposit_conflicts = [str(p) for p in repo_base_path.parent.glob(
f'{repo_base_path.name}XDLRA*')]
if deposit_conflicts:
# be safe
raise ValueError(
f'existing dataset deposit {deposit_conflicts}, '
'refusing to overwrite')
# locate metadata files
dataset_metadata_path = store_base_dir / study_id / \
f'{visit_id}_metadata_tarball.json'
file_metadata_path = store_base_dir / study_id / \
f'{visit_id}_metadata_dicoms.json'
with tempfile.TemporaryDirectory(prefix='dataladify_visit_') as wdir:
runshit(
# workdir
wdir,
# path to deposited dataset metadata
dataset_metadata_path.absolute(),
# path to deposited file metadata
file_metadata_path.absolute(),
# path to deposit the repo at
repo_base_path.absolute(),
)
def runshit(wdir, metapath_dataset, metapath_file, repobasepath):
# read tar metadata dict
tar_metadata = read_json_file(metapath_dataset)
expected_keys = ('size', 'md5', 'dspath', 'storepath')
if not all(k in tar_metadata for k in expected_keys):
raise ValueError(f'incomplete tarball metadata at {metapath_dataset}')
# create visit dataset
ds = dl.create(wdir)
# alias for speed, `.repo` is really expensive
repo = ds.repo
# enable uncurl remote to have the tarball URL be claimed by it
# and future-proof access (via its reconfiguration possibilities
# without having to touch the annex record
repo.call_annex([
'initremote',
'uncurl',
'type=external',
'externaltype=uncurl',
'encryption=none',
# auto-enabling is cheap (makes no connection attempts), and convenient
'autoenable=true',
])
# we need its UUID later
uncurl_uuid = repo.call_annex_records(['info', 'uncurl'])[0]['uuid']
assert uncurl_uuid
# register the URL of the tarball
tar_metadata['url'] = f"{icfstore_baseurl}/{tar_metadata['storepath']}"
res = ds.addurls(
[tar_metadata],
'{url}',
'{dspath}',
key='et:MD5-s{size}--{md5}',
)
# fish out annex key of tarball.
# we could also construct that, but let's not duplicate the setup above
tarpath = Path(tar_metadata.get('dspath'))
tarkey = [r.get('annexkey') for r in res
if r.get('action') == 'fromkey'
and r.get('path', '').endswith(tarpath.name)]
assert len(tarkey) == 1
tarkey = tarkey[0]
assert tarkey
# assure tar key availability
repo.call_annex(['setpresentkey', tarkey, uncurl_uuid, '1'])
# here we register the archivist special remote, to claim
# the dl+archives URLs registered below.
repo.call_annex([
'initremote',
'archivist',
'type=external',
'externaltype=archivist',
'encryption=none',
# auto-enabling is cheap (makes no connection attempts), and convenient
'autoenable=true',
])
archivist_uuid = repo.call_annex_records(
['info', 'archivist'])[0]['uuid']
assert archivist_uuid
# load dicom metadata
dicoms = read_json_file(metapath_file)
# add to dataset
dicom_recs = ds.addurls(
dicoms,
f'dl+archive:{tarkey}#path={{path}}&size={{size}}',
'{path}',
key='et:MD5-s{size}--{md5}',
# field names are limited to alphanumerics (and [_-.]),
# and are case insensitive
meta=[
f'{dmk.lower()}={{{dmk}}}'
for dmk in dicom_metadata_keys
],
)
# assure availability for each DICOM
dicomkeys = [
r['annexkey']
for r in dicom_recs if r.get('action') == 'fromkey'
]
for dicomkey in dicomkeys:
repo.call_annex(['setpresentkey', dicomkey, archivist_uuid, '1'])
repo.call_git([
'remote', 'add', 'icfstore',
# this is a little twisted:
# the first line is an f-string, because we need to get the base URL
# pointing to the study directory into the remote URL
f'datalad-annex::?type=external&externaltype=uncurl&url=file://{repobasepath}'
# this second line is NOT an f-string, and braces are quoted!!
# this is because datalad-annex:: will pass this URL to uncurl
# (removing the quoting; it can do placeholders too!), and uncurl
# will then fill in the annex key of the deposit in order to get
# the final upload URL
'{{annex_key}}&encryption=none'
])
# probe the availability metadata. This seems to be necessary at times to
# get git-annex to commit the metadata operations performed above
# to be able to actually push everything
repo.call_annex(['whereis', '--key', dicomkeys[0]])
ds.push(
to='icfstore',
# under no circumstances do we want to push annexed content.
# and there also should be none
data='nothing',
)
def read_json_file(file_path):
"""
Load content from catalog metadata file for current node
"""
try:
with open(file_path) as f:
return json.load(f)
except OSError as err:
raise("OS error: {0}".format(err))
except:
raise("Unexpected error:", sys.exc_info()[0])
if __name__ == '__main__':
import argparse
p = argparse.ArgumentParser(description=__doc__)
p.add_argument(
"-o", "--store-dir", metavar='PATH', default=os.getcwd(),
help="Root directory of the ICF data store. "
"Visit data will be read from it, and the DataLad dataset will be "
"deposited into it."
)
p.add_argument(
'--id', nargs=2, metavar=('STUDY-ID', 'VISIT-ID'), required=True,
help="The study and visit identifiers, used to "
"locate the visit archive in the storage organization. "
)
args = p.parse_args()
main(store_dir=args.store_dir,
study_id=args.id[0],
visit_id=args.id[1],
)