Source code for pangea_api.analysis_result


import os
import json
import requests
import time
import logging
from os.path import join, basename, getsize

from .remote_object import RemoteObject, RemoteObjectError
from urllib.request import urlretrieve
from tempfile import NamedTemporaryFile

from .constants import FIVE_MB

logger = logging.getLogger(__name__)  # Same name as calling module
logger.addHandler(logging.NullHandler())  # No output unless configured by calling program


[docs]class AnalysisResult(RemoteObject): remote_fields = [ 'uuid', 'created_at', 'updated_at', 'module_name', 'replicate', 'metadata', 'description', ] def _get(self): """Fetch the result from the server.""" self.parent.idem() logger.info(f'Getting AnalysisResult.') blob = self.get_cached_blob() if not blob: blob = self.knex.get(self.nested_url()) self.load_blob(blob) self.cache_blob(blob) else: self.load_blob(blob)
[docs] def pre_hash(self): key = self.module_name + self.parent.pre_hash() key += self.replicate if self.replicate else '' return key
[docs]class SampleAnalysisResult(AnalysisResult): parent_field = 'sample' def __init__(self, knex, sample, module_name, replicate=None, metadata={}): super().__init__(self) self.knex = knex self.sample = sample self.parent = self.sample self.module_name = module_name self.replicate = replicate self._get_field_cache = [] self.metadata = metadata
[docs] def nested_url(self): return self.sample.nested_url() + f'/analysis_results/{self.module_name}'
def _save(self): data = { field: getattr(self, field) for field in self.remote_fields if hasattr(self, field) } data['sample'] = self.sample.uuid url = f'sample_ars/{self.uuid}' d = {'data': data, 'url': url, 'sample_ar': self} logger.info(f'Saving SampleAnalysisResult. {d}') self.knex.put(url, json=data) def _create(self): self.sample.idem() data = { 'sample': self.sample.uuid, 'module_name': self.module_name, } if self.replicate: data['replicate'] = self.replicate d = {'data': data, 'sample_ar': self} logger.info(f'Creating SampleAnalysisResult. {d}') blob = self.knex.post(f'sample_ars?format=json', json=data) self.load_blob(blob)
[docs] def field(self, field_name, data={}): d = {'data': data, 'field_name': field_name, 'sample_ar': self} logger.info(f'Creating SampleAnalysisResultField for SampleAnalysisResult. {d}') return SampleAnalysisResultField(self.knex, self, field_name, data=data)
[docs] def get_fields(self, cache=True): """Return a list of ar-fields fetched from the server.""" if cache and self._get_field_cache: for field in self._get_field_cache: yield field return url = f'sample_ar_fields?analysis_result_id={self.uuid}' logger.info(f'Fetching SampleAnalysisResultFields. {self}') result = self.knex.get(url) for result_blob in result['results']: result = self.field(result_blob['name']) result.load_blob(result_blob) # We just fetched from the server so we change the RemoteObject # meta properties to reflect that result._already_fetched = True result._modified = False if cache: self._get_field_cache.append(result) else: yield result if cache: for field in self._get_field_cache: yield field
[docs]class SampleGroupAnalysisResult(AnalysisResult): parent_field = 'grp' def __init__(self, knex, grp, module_name, replicate=None, metadata={}): super().__init__(self) self.knex = knex self.grp = grp self.parent = self.grp self.module_name = module_name self.replicate = replicate self.metadata = metadata
[docs] def nested_url(self): return self.grp.nested_url() + f'/analysis_results/{self.module_name}'
def _save(self): data = { field: getattr(self, field) for field in self.remote_fields if hasattr(self, field) } data['sample_group'] = self.grp.uuid url = f'sample_group_ars/{self.uuid}' self.knex.put(url, json=data) def _create(self): self.grp.idem() data = { 'sample_group': self.grp.uuid, 'module_name': self.module_name, } if self.replicate: data['replicate'] = self.replicate blob = self.knex.post(f'sample_group_ars?format=json', json=data) self.load_blob(blob)
[docs] def field(self, field_name, data={}): return SampleGroupAnalysisResultField(self.knex, self, field_name, data=data)
[docs] def get_fields(self): """Return a list of ar-fields fetched from the server.""" url = f'sample_group_ar_fields?analysis_result_id={self.uuid}' result = self.knex.get(url) for result_blob in result['results']: result = self.field(result_blob['name']) result.load_blob(result_blob) # We just fetched from the server so we change the RemoteObject # meta properties to reflect that result._already_fetched = True result._modified = False yield result
[docs]class AnalysisResultField(RemoteObject): remote_fields = [ 'uuid', 'created_at', 'updated_at', 'name', 'stored_data', ] parent_field = 'parent' def __init__(self, knex, parent, field_name, data={}): super().__init__(self) self.knex = knex self.parent = parent self.name = field_name self.stored_data = data self._cached_filename = None # Used if the field points to S3, FTP, etc self._temp_filename = False
[docs] def nested_url(self): return self.parent.nested_url() + f'/fields/{self.name}'
[docs] def get_blob_filename(self): sname = self.parent.parent.name.replace('.', '-') mname = self.parent.module_name.replace('.', '-') fname = self.name.replace('.', '-') filename = join( self.parent.parent.name, f'{sname}.{mname}.{fname}.json' ).replace('::', '__') return filename
[docs] def get_referenced_filename(self): key = None for key in ['filename', 'uri', 'url']: if key in self.stored_data: break if key is None: raise TypeError('Cannot make a reference filename for a BLOB type result field.') ext = self.stored_data[key].split('.')[-1] if ext in ['gz']: ext = self.stored_data[key].split('.')[-2] + '.' + ext sname = self.parent.parent.name.replace('.', '-') mname = self.parent.module_name.replace('.', '-') fname = self.name.replace('.', '-') filename = join( self.parent.parent.name, f'{sname}.{mname}.{fname}.{ext}' ).replace('::', '__') return filename
def _save(self): data = { field: getattr(self, field) for field in self.remote_fields if hasattr(self, field) } data['analysis_result'] = self.parent.uuid url = f'{self.canon_url()}/{self.uuid}' self.knex.put(url, json=data) def _get(self): """Fetch the result from the server.""" self.parent.idem() blob = self.knex.get(self.nested_url()) self.load_blob(blob) def _create(self): if json.loads(json.dumps(self.stored_data)) != self.stored_data: raise RemoteObjectError('JSON Serialization modifies object') self.parent.idem() data = { 'analysis_result': self.parent.uuid, 'name': self.name, 'stored_data': self.stored_data, } blob = self.knex.post(f'{self.canon_url()}?format=json', json=data) self.load_blob(blob)
[docs] def get_download_url(self): """Return a URL that can be used to download the file for this result.""" blob_type = self.stored_data.get('__type__', '').lower() if blob_type not in ['s3', 'sra']: raise TypeError('Cannot fetch a file for a BLOB type result field.') if blob_type == 's3': try: url = self.stored_data['presigned_url'] except KeyError: url = self.stored_data['uri'] if url.startswith('s3://'): url = self.stored_data['endpoint_url'] + '/' + url[5:] return url elif blob_type == 'sra': url = self.stored_data['url'] return url
[docs] def download_file(self, filename=None, cache=True): """Return a local filepath to the file this result points to.""" blob_type = self.stored_data.get('__type__', '').lower() if blob_type not in ['s3', 'sra']: raise TypeError('Cannot fetch a file for a BLOB type result field.') if cache and self._cached_filename: return self._cached_filename if blob_type == 's3': return self._download_s3(filename, cache) elif blob_type == 'sra': return self._download_sra(filename, cache)
def _download_s3(self, filename, cache): try: url = self.stored_data['presigned_url'] except KeyError: url = self.stored_data['uri'] if url.startswith('s3://'): url = self.stored_data['endpoint_url'] + '/' + url[5:] if not filename: self._temp_filename = True myfile = NamedTemporaryFile(delete=False) myfile.close() filename = myfile.name urlretrieve(url, filename) if cache: self._cached_filename = filename return filename def _download_sra(self, filename, cache): url = self.stored_data['url'] if not filename: self._temp_filename = True myfile = NamedTemporaryFile(delete=False) myfile.close() filename = myfile.name urlretrieve(url, filename) if cache: self._cached_filename = filename return filename
[docs] def upload_small_file(self, filepath): url = f'/{self.canon_url()}/{self.uuid}/upload_s3' filename = basename(filepath) response = self.knex.post(url, json={'filename': filename}) with open(filepath, 'rb') as f: files = {'file': (filename, f)} requests.post( # Not a call to pangea so we do not use knex response['url'], data=response['fields'], files=files ) return self
[docs] def upload_large_file(self, filepath, file_size, chunk_size=FIVE_MB, max_retries=3, logger=lambda x: x): n_parts = int(file_size / chunk_size) + 1 response = self.knex.post( f'/{self.canon_url()}/{self.uuid}/upload_s3', json={ 'filename': basename(filepath), 'n_parts': n_parts, 'stance': 'upload-multipart', } ) parts, urls, upload_id = [], response['urls'], response['upload_id'] logger(f'[INFO] Starting upload for "{filepath}"') with open(filepath, 'rb') as f: for num, url in enumerate(urls): file_data = f.read(chunk_size) attempts = 0 while attempts < max_retries: try: http_response = requests.put(url, data=file_data) http_response.raise_for_status() break except requests.exceptions.HTTPError: logger(f'[WARN] Upload for part {num + 1} failed. Attempt {attempts + 1}') attempts += 1 if attempts == max_retries: raise time.sleep(10 ** attempts) # exponential backoff, (10 ** 2)s default max parts.append({ 'ETag': http_response.headers['ETag'], 'PartNumber': num + 1 }) logger(f'[INFO] Uploaded part {num + 1} of {len(urls)} for "{filepath}"') response = self.knex.post( f'/{self.canon_url()}/{self.uuid}/complete_upload_s3', json={ 'parts': parts, 'upload_id': upload_id, }, json_response=False, ) logger(f'[INFO] Finished Upload for "{filepath}"') return self
[docs] def upload_file(self, filepath, multipart_thresh=FIVE_MB, **kwargs): file_size = getsize(filepath) if file_size >= multipart_thresh: return self.upload_large_file(filepath, file_size, **kwargs) return self.upload_small_file(filepath)
def __del__(self): if self._temp_filename and self._cached_filename: os.remove(self._cached_filename)
[docs] def pre_hash(self): return self.name + self.parent.pre_hash()
[docs]class SampleAnalysisResultField(AnalysisResultField):
[docs] def canon_url(self): return 'sample_ar_fields'
[docs]class SampleGroupAnalysisResultField(AnalysisResultField):
[docs] def canon_url(self): return 'sample_group_ar_fields'