Source code for pangea_api.knex

import os
import requests
import logging
import json
from time import time
from glob import glob

from .file_system_cache import FileSystemCache

DEFAULT_ENDPOINT = 'https://pangea.gimmebio.com'


logger = logging.getLogger(__name__)  # Same name as calling module
logger.addHandler(logging.NullHandler())  # No output unless configured by calling program


def clean_url(url):
    if url[-1] == '/':
        url = url[:-1]
    return url


class TokenAuth(requests.auth.AuthBase):
    """Attaches MetaGenScope Token Authentication to the given Request object."""

    def __init__(self, token):
        self.token = token

    def __call__(self, request):
        """Add authentication header to request."""
        request.headers['Authorization'] = f'Token {self.token}'
        return request

    def __str__(self):
        """Return string representation of TokenAuth."""
        return self.token


[docs]class Knex: def __init__(self, endpoint_url=DEFAULT_ENDPOINT): self.endpoint_url = endpoint_url self.endpoint_url += '/api' self.auth = None self.headers = {'Accept': 'application/json'} self.cache = FileSystemCache() def _logging_info(self, **kwargs): base = {'endpoint_url': self.endpoint_url, 'headers': self.headers} base.update(kwargs) return base def _clean_url(self, url): url = clean_url(url) url = url.replace(self.endpoint_url, '') if url[0] == '/': url = url[1:] return url
[docs] def add_auth_token(self, token): self.auth = TokenAuth(token)
[docs] def login(self, username, password): d = self._logging_info(email=username, password='*' * len(password)) logger.info(f'Sending log in request. {d}') blob = self.cache.get_cached_blob(username) if not blob: response = requests.post( f'{self.endpoint_url}/auth/token/login', headers=self.headers, json={ 'email': username, 'password': password, } ) response.raise_for_status() logger.info(f'Received log in response. {response.json()}') blob = response.json() self.cache.cache_blob(username, blob) self.add_auth_token(blob['auth_token']) return self
def _handle_response(self, response, json_response=True): try: response.raise_for_status() except: logger.error(f'Request failed. {response}') raise if json_response: return response.json() return response
[docs] def get(self, url, **kwargs): url = self._clean_url(url) d = self._logging_info(url=url, auth_token=self.auth) logger.info(f'Sending GET request. {d}') response = requests.get( f'{self.endpoint_url}/{url}', headers=self.headers, auth=self.auth, ) return self._handle_response(response, **kwargs)
[docs] def post(self, url, json={}, **kwargs): url = self._clean_url(url) d = self._logging_info(url=url, auth_token=self.auth, json=json) logger.info(f'Sending POST request. {d}') response = requests.post( f'{self.endpoint_url}/{url}', headers=self.headers, auth=self.auth, json=json ) return self._handle_response(response, **kwargs)
[docs] def put(self, url, json={}, **kwargs): url = self._clean_url(url) d = self._logging_info(url=url, auth_token=self.auth, json=json) logger.info(f'Sending PUT request. {d}') response = requests.put( f'{self.endpoint_url}/{url}', headers=self.headers, auth=self.auth, json=json ) return self._handle_response(response, **kwargs)
[docs] def delete(self, url, **kwargs): url = self._clean_url(url) d = self._logging_info(url=url, auth_token=self.auth) logger.info(f'Sending DELETE request. {d}') response = requests.delete( f'{self.endpoint_url}/{url}', headers=self.headers, auth=self.auth, ) return self._handle_response(response, **kwargs)