Source code for proteinshake.utils.io

"""
Helper functions for all input/output related things.
"""

import os
import tarfile
import pickle
import json
import gzip
import shutil
import requests
import re
import warnings
import pandas as pd
import numpy as np
from pathlib import Path
from tqdm import tqdm
from fastavro import writer as avro_writer, reader as avro_reader, parse_schema as parse_avro_schema

AA_THREE_TO_ONE = {'ALA': 'A', 'CYS': 'C', 'ASP': 'D', 'GLU': 'E', 'PHE': 'F', 'GLY': 'G', 'HIS': 'H', 'ILE': 'I', 'LYS': 'K', 'LEU': 'L', 'MET': 'M', 'ASN': 'N', 'PRO': 'P', 'GLN': 'Q', 'ARG': 'R', 'SER': 'S', 'THR': 'T', 'VAL': 'V', 'TRP': 'W', 'TYR': 'Y'}
AA_ONE_TO_THREE = {v:k for k, v in AA_THREE_TO_ONE.items()}

def progressbar(iterable=None, desc='', total=None, verbosity=2, **kwargs):
    total = len(iterable) if total is None else total
    disable = verbosity < 2
    if verbosity == 1: print(desc+'...')
    if verbosity == 2 and len(desc) > 20:
        print(desc)
        desc = None
    return tqdm(iterable, desc=desc, total=total, disable=disable, **kwargs)

def warning(message, verbosity=2):
    if verbosity > -1: warnings.warn(message)

def error(message, verbosity=2):
    if verbosity > -2: raise Exception(message)

class Generator(object):
    def __init__(self, generator, length):
        self.generator = generator
        self.length = length

    def __len__(self): 
        return self.length

    def __iter__(self):
        return self.generator

    def __next__(self):
        return next(self.generator)

def fx2str(fx):
    """ Converts a function to a string representation.

    Parameters
    ----------
    fx: function
        A function.

    Returns
    -------
    str
        The stringified function.
    """
    return re.sub('(<.*?)\\s.*(>)', r'\1\2', fx.__repr__())

def avro_schema_from_protein(protein):
    """ Guesses the avro schema from a dictionary.

    Parameters
    ----------
    protein: dict
        A protein dictionary.

    Returns
    -------
    schema
        An avro schema.
    """
    typedict = {'int':'int', 'float':'float', 'str':'string', 'bool':'boolean'}
    def field_spec(k,v):
        if type(v) == dict:
            return {'name':k, 'type':{'name':k, 'type':'record', 'fields': [field_spec(_k,_v) for _k,_v in v.items()]}}
        elif type(v) == list:
            return {'name':k, 'type':{'type': 'array', 'items': typedict[type(v[0]).__name__] if len(v)>0 else 'string'}}
        elif type(v).__name__ in typedict:
            return {'name':k, 'type': typedict[type(v).__name__]}
        else:
            raise TypeError(f"All fields in a protein object need to be either int, float, bool or string, not {type(v).__name__}")
    schema = {
        'name': 'Protein',
        'namespace': 'Dataset',
        'type': 'record',
        'fields': [field_spec(k,v) for k,v in protein.items()],
    }
    return parse_avro_schema(schema)

def write_avro(proteins, path):
    """ Writes a list of protein dictionaries to an avro file.

    Parameters
    ----------
    proteins: list
        The list of proteins.
    path:
        The path to the output file.
    """
    path = Path(path)
    schema = avro_schema_from_protein(proteins[0])
    with open(path, 'wb') as file:
        avro_writer(file, schema, proteins, metadata={'number_of_proteins':str(len(proteins))})
def save(obj, path): """ Saves an object to either pickle, json, or json.gz (determined by the extension in the file name). Parameters ---------- obj: The object to be saved. path: The path to save the object. """ if path.endswith('.json.gz'): with gzip.open(Path(path), 'w') as file: file.write(json.dumps(obj).encode('utf-8')) elif path.endswith('.json'): with open(Path(path),'w') as file: json.dump(obj, file) elif path.endswith('.npy'): np.save(path, obj) else: with open(Path(path), 'wb') as file: pickle.dump(obj, file, protocol=pickle.HIGHEST_PROTOCOL)