Source code for pangea_api.sample
from .remote_object import RemoteObject
from .analysis_result import SampleAnalysisResult
[docs]class Sample(RemoteObject):
remote_fields = [
'uuid',
'created_at',
'updated_at',
'name',
'metadata',
'library',
'description',
]
parent_field = 'lib'
def __init__(self, knex, lib, name, metadata={}):
super().__init__(self)
self.knex = knex
self.lib = lib
self.name = name
self.metadata = metadata
self._get_result_cache = []
[docs] def nested_url(self):
return self.lib.nested_url() + f'/samples/{self.name}'
def _save(self):
data = {
field: getattr(self, field)
for field in self.remote_fields if hasattr(self, field)
}
data['library'] = self.lib.uuid
url = f'samples/{self.uuid}'
self.knex.put(url, json=data)
def _get(self):
"""Fetch the result from the server."""
self.lib.get()
blob = self.get_cached_blob()
if not blob:
blob = self.knex.get(self.nested_url())
self.load_blob(blob)
self.cache_blob(blob)
else:
self.load_blob(blob)
def _create(self):
assert self.lib.is_library
self.lib.idem()
data = {
'library': self.lib.uuid,
'name': self.name,
}
url = 'samples?format=json'
blob = self.knex.post(url, json=data)
self.load_blob(blob)
[docs] def analysis_result(self, module_name, replicate=None, metadata=None):
return SampleAnalysisResult(self.knex, self, module_name, replicate=replicate, metadata=metadata)
[docs] def get_analysis_results(self, cache=True):
"""Yield sample analysis results fetched from the server."""
if cache and self._get_result_cache:
for ar in self._get_result_cache:
yield ar
return
url = f'sample_ars?sample_id={self.uuid}'
result = self.knex.get(url)
for result_blob in result['results']:
result = self.analysis_result(result_blob['module_name'])
result.load_blob(result_blob)
# We just fetched from the server so we change the RemoteObject
# meta properties to reflect that
result._already_fetched = True
result._modified = False
if cache:
self._get_result_cache.append(result)
else:
yield result
if cache:
for ar in self._get_result_cache:
yield ar
[docs] def get_manifest(self):
"""Return a manifest for this sample."""
url = f'samples/{self.uuid}/manifest'
return self.knex.get(url)
def __str__(self):
return f'<Pangea::Sample {self.name} {self.uuid} />'
def __repr__(self):
return f'<Pangea::Sample {self.name} {self.uuid} />'
[docs] def pre_hash(self):
return 'SAMPLE' + self.name + self.lib.pre_hash()