Source code for taxadb.parser

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import sys
import gzip
import logging

from taxadb.schema import Taxa, Accession


[docs]class TaxaParser(object): """Base parser class for taxonomic files"""
[docs] def __init__(self, verbose=False): """Base class""" self._verbose = verbose
@property def logger(self): component = "{}.{}".format(type(self).__module__, type(self).__name__) return logging.getLogger(component)
[docs] @staticmethod def cache_taxids(): """Load data from taxa table into a dictionary Returns: data (:obj:`dict`): Data from taxa table mapped as dictionary """ data = {} for x in Taxa.select(Taxa.ncbi_taxid).dicts(): data[str(x['ncbi_taxid'])] = True return data
[docs] @staticmethod def check_file(element): """Make some check on a file This method is used to check an `element` is a real file. Args: element (:obj:`type`): File to check Returns: True Raises: SystemExit: if `element` file does not exist SystemExit: if `element` is not a file """ logger = logging.getLogger('parser') if element is None: logger.error("Please provide an input file to check") sys.exit(1) if not os.path.exists(element): logger.error("File %s does not exist" % str(element)) sys.exit(1) if not os.path.isfile(element): logger.error("%s is not a file" % str(element)) sys.exit(1) return True
[docs]class TaxaDumpParser(TaxaParser): """Main parser class for ncbi taxdump files This class is used to parse NCBI taxonomy files found in taxdump.gz archive Args: nodes_file (:obj:`str`): Path to nodes.dmp file names_file (:obj:`str`): Path to names.dmp file """
[docs] def __init__(self, nodes_file=None, names_file=None, **kwargs): """ """ super().__init__(**kwargs) self.nodes_file = nodes_file self.names_file = names_file
[docs] def taxdump(self, nodes_file=None, names_file=None): """Parse .dmp files Parse nodes.dmp and names.dmp files (from taxdump.tgz) and insert taxons in Taxa table. Args: nodes_file (:obj:`str`): Path to nodes.dmp file names_file (:obj:`str`): Path to names.dmp file Returns: list: Zipped data from both files """ if nodes_file is None: nodes_file = self.nodes_file if names_file is None: names_file = self.names_file self.check_file(names_file) self.check_file(nodes_file) # parse nodes.dmp nodes_data = list() self.logger.debug("Loading taxa data ...") ncbi_ids = self.cache_taxids() self.logger.debug("Parsing %s" % str(nodes_file)) with open(nodes_file, 'r') as f: for line in f: line_list = line.split('|') ncbi_id = line_list[0].strip('\t') if ncbi_id in ncbi_ids: continue data_dict = { 'ncbi_taxid': ncbi_id, 'parent_taxid': line_list[1].strip('\t'), 'tax_name': '', 'lineage_level': line_list[2].strip('\t') } nodes_data.append(data_dict) self.logger.info('Parsed nodes.dmp') # parse names.dmp names_data = list() self.logger.debug("Parsing %s" % str(names_file)) with open(names_file, 'r') as f: for line in f: if 'scientific name' in line: line_list = line.split('|') ncbi_id = line_list[0].strip('\t') if ncbi_id in ncbi_ids: continue data_dict = { 'ncbi_taxid': line_list[0].strip('\t'), 'tax_name': line_list[1].strip('\t') } names_data.append(data_dict) self.logger.info('Parsed names.dmp') # merge the two dictionaries taxa_info_list = list() for nodes, names in zip(nodes_data, names_data): taxa_info = {**nodes, **names} # PEP 448, requires python 3.5 taxa_info_list.append(taxa_info) self.logger.debug('merge successful') return taxa_info_list
[docs] def set_nodes_file(self, nodes_file): """Set nodes_file Set the accession file to use Args: nodes_file (:obj:`str`): Nodes file to be set Returns: True Raises: SystemExit: If `nodes_file` is None or not a file (`check_file`) """ if nodes_file is None: self.logger.error("Please provide an nodes file to set") sys.exit(1) self.check_file(nodes_file) self.nodes_file = nodes_file return True
[docs] def set_names_file(self, names_file): """Set names_file Set the accession file to use Args: names_file (:obj:`str`): Nodes file to be set Returns: True Raises: SystemExit: If `names_file` is None or not a file (`check_file`) """ if names_file is None: self.logger.error("Please provide an names file to set") sys.exit(1) self.check_file(names_file) self.names_file = names_file return True
[docs]class Accession2TaxidParser(TaxaParser): """Main parser class for nucl_xxx_accession2taxid files This class is used to parse accession2taxid files. Args: acc_file (:obj:`str`): File to parse chunk (:obj:`int`): Chunk insert size. Default 500 fast (:obj:`bool`): Directly load accession into database, do not check existence. """
[docs] def __init__(self, acc_file=None, chunk=500, fast=False, **kwargs): super().__init__(**kwargs) self.acc_file = acc_file self.chunk = chunk self.fast = fast
[docs] def accession2taxid(self, acc2taxid=None, chunk=None): """Parses the accession2taxid files This method parses the accession2taxid file, build a dictionary, stores it in a list and yield for insertion in the database. :: { 'accession': accession_id_from_file, 'taxid': associated_taxonomic_id } Args: acc2taxid (:obj:`str`): Path to acc2taxid input file (gzipped) chunk (:obj:`int`): Chunk size of entries to gather before yielding. Default 500 (set at object construction) Yields: list: Chunk size of read entries """ # Some accessions (e.g.: AAA22826) have a taxid = 0 entries = [] counter = 0 taxids = self.cache_taxids() if not self.fast: accessions = {} if acc2taxid is None: acc2taxid = self.acc_file self.check_file(acc2taxid) if chunk is None: chunk = self.chunk self.logger.debug("Parsing %s" % str(acc2taxid)) self.logger.debug("Fast mode %s" % "ON" if self.fast else "OFF") with gzip.open(acc2taxid, 'rb') as f: f.readline() # discard the header for line in f: line_list = line.decode().rstrip('\n').split('\t') # Check the taxid already exists and get its id if line_list[2] not in taxids: continue # In case of an update or parsing an already inserted list of # accessions if not self.fast: if line_list[0] in accessions: continue try: Accession.get(Accession.accession == line_list[0]) except Accession.DoesNotExist: accessions[line_list[0]] = True data_dict = { 'accession': line_list[0], 'taxid': line_list[2] } else: data_dict = { 'accession': line_list[0], 'taxid': line_list[2] } entries.append(data_dict) counter += 1 if counter == chunk: yield(entries) entries = [] counter = 0 if len(entries): yield(entries)
[docs] def set_accession_file(self, acc_file): """Set the accession file to use Args: acc_file (:obj:`str`): File to be set Returns: True Raises: SystemExit: If `acc_file` is None or not a file (`check_file`) """ if acc_file is None: self.logger.error("Please provide an accession file to set") sys.exit(1) self.check_file(acc_file) self.acc_file = acc_file return True