Source code for nmdc_client

"""
This module provides functions to interact with the NMDC API,
including retrieving metadata into dataframes and linking across schema objects.

The :class:`NmdcClient.find` and :class:`NmdcClient.lookup` functions return data as a DataFrame.
To retrieve data as a list of dictionaries, use :class:`NmdcClient.find_dict` and
:class:`NmdcClient.lookup_dict`.
To retrieve data as a list of ``nmdc_schema.nmdc`` objects, use :class:`NmdcClient.find_full`
and :class:`NmdcClient.lookup_full`.
See the `NMDC schema documentation <https://microbiomedata.github.io/nmdc-schema/>`_
for more information on NMDC schema classes.

When working with DataFrames, there is a :class:`NmdcClient.merge_related` function
to merge related objects into one DataFrame.
This uses the more low-level :class:`NmdcClient.related_ids` function to
find related objects of a given type from a single ID.

Use the following import to access Pythonic representations of the NMDC schema.

.. code-block:: python

    >>> from nmdc_schema import nmdc

Names of classes and slots can be discovered by autocompleting ``nmdc.`` in your IDE, or by
visiting the `NMDC schema documentation <https://microbiomedata.github.io/nmdc-schema/>`_.

To interact with the NMDC API, create an :class:`NmdcClient` instance.

.. code-block:: python

    >>> client = NmdcClient()

The following retrieves all studies with Wrighton as principal investigator as a DataFrame.
Queries are specified using
`MongoDB query syntax <https://www.mongodb.com/docs/manual/tutorial/query-documents/>`_.

.. code-block:: python

    >>> query = {'principal_investigator.has_raw_value': {'$regex': 'Wrighton'}}
    >>> studies = client.find(nmdc.Study, query=query)
    >>> len(studies)
    2

The following retrieves a specific biosample by its identifier and associates it
with study metadata. The first time :class:`NmdcClient.merge_related` is called,
it will fetch all linkages between objects in the NMDC schema and cache them in
a file local to the package. You can force an update of this cache at a later time with
by calling :class:`NmdcClient.fetch_links` with ``force`` set to ``True``.

.. code-block:: python

    >>> ids = ['nmdc:bsm-13-7qxjvr77']
    >>> biosample = client.lookup(nmdc.Biosample, ids, fields=['id', 'name'])
    >>> add_study = client.merge_related(biosample, 'id', nmdc.Study, fields=['id', 'title'])
    >>> for col in add_study.columns:
    ...    print(col)
    id
    name
    id_Study
    title

The following retrieves 10 samples then merges them with all associated WorkflowExecutions.

.. code-block:: python

    >>> samp = client.find(nmdc.Biosample, fields=["id", "type"], limit=10)
    >>> exec = client.merge_related(samp, "id", nmdc.WorkflowExecution, fields=["id", "type"])
    >>> for col in exec.columns:
    ...     print(col)
    id
    type
    id_WorkflowExecution
    type_WorkflowExecution
"""

from enum import Enum
import json
import time
import os
import pickle
from typing import Any, cast, Dict, get_type_hints, List, Optional, Type, Tuple, TypeVar

import requests
import pandas as pd
from nmdc_schema import nmdc
from nmdc_schema.nmdc_data import get_nmdc_jsonschema_dict, get_nmdc_schema_definition
from linkml_runtime.utils.schemaview import SchemaView

SchemaClass = TypeVar("SchemaClass", bound=nmdc.NamedThing)
schema_dict = get_nmdc_jsonschema_dict()
schema_view = SchemaView(get_nmdc_schema_definition())


[docs] class SearchDirection(Enum): """ Enumeration to specify the direction to follow linkages among objects in the schema. See :func:`NmdcClient.fetch_links` for details. """ BACK = 0 FORWARD = 1
[docs] class NmdcClient: """ Class to manage calls to the NMDC API service. :param base_url: The base URL of the NMDC API. :param max_page_size: The maximum number of objects to retrieve per request. :param sleep_seconds: The number of seconds to wait between API requests when paginating. :param timeout: The number of seconds to wait for an API request to complete before throwing an exception. :param verbose: Whether to print URLs of API requests. :param links: A dictionary of links between objects in the NMDC schema. Defaults to None which fetches and caches the links on the first call to :func:`related_ids` or :func:`merge_related`. Example: .. code-block:: python >>> client = NmdcClient(max_page_size=100) """ def __init__( self, base_url: str = "https://api.microbiomedata.org", max_page_size: int = 1000, sleep_seconds: float = 0.5, timeout: float = 30, verbose: bool = False, links: Optional[Dict[str, Tuple[List[str], List[str]]]] = None, ): self.base_url = base_url self.max_page_size = max_page_size self.sleep_seconds = sleep_seconds self.timeout = timeout self.verbose = verbose self.links = links self.codes = None
[docs] def database_collections_for_type(self, cls: Type[SchemaClass]) -> List[str]: """ Find the database collection names for a given schema class, if any. :param cls: The schema class to find the collection name for. Must be a subclass of ``nmdc_schema.nmdc.NamedThing``. :return: The list of collection names containing items of that class or subclass, or an empty list if none are found. """ cls = cast(Type[nmdc.NamedThing], cls) descendants = [ f"#/$defs/{name}" for name in schema_view.class_descendants(cls.class_name) ] collection = [] for key, value in schema_dict["$defs"]["Database"]["properties"].items(): db_items = [value["items"]] if "anyOf" in value["items"]: db_items = value["items"]["anyOf"] for db_item in db_items: if db_item["$ref"] in descendants: collection.append(key) break return collection
[docs] def find_dict( self, cls: Type[SchemaClass] | str, query: Optional[Dict[str, Any]] = None, fields: Optional[List[str]] = None, limit: Optional[int] = None, ) -> List[Dict[str, Any]]: """ Retrieve a list of dictionaries of objects of the specified class from the NMDC schema. :param cls: The class type or collection of the objects to retrieve. Must be a subclass of ``nmdc_schema.nmdc.NamedThing`` or the name of a NMDC database collection. :param query: A dictionary representing the query to filter the results. Defaults to None which returns all objects. :param fields: A list of fields to include in the result. Defaults to None which returns all fields. :param limit: The maximum number of objects to retrieve. Defaults to None which retrieves all matching objects. :return: A list of dictionaries representing the retrieved objects. """ if query is None: query = {} if fields is None: fields = [] if isinstance(cls, str): collection = cls else: cls = cast(Type[nmdc.NamedThing], cls) collections = self.database_collections_for_type(cls) if len(collections) == 0: print(f"Note: No collection found for {cls}") return [] if len(collections) > 1: print( f"Note: Multiple collections found for {cls}: {collections}." " Using the first one." ) collection = collections[0] # Add the class to the filter (for heterogeneous collections) query["type"] = { "$in": [ schema_view.get_class(name).class_uri for name in schema_view.class_descendants(cls.class_name) ] } max_page_size = self.max_page_size if limit is not None: max_page_size = min(limit, self.max_page_size) url = ( f"{self.base_url}/nmdcschema/{collection}?" f"&filter={json.dumps(query)}" f"&projection={','.join(fields)}" f"&max_page_size={max_page_size}" ) if self.verbose: print(url) response = requests.get(url, timeout=self.timeout) data = response.json() results = data["resources"] while "next_page_token" in data and (limit is None or len(results) < limit): page_token = data["next_page_token"] time.sleep(self.sleep_seconds) page_url = f"{url}&page_token={page_token}" if self.verbose: print(page_url) response = requests.get(page_url, timeout=self.timeout) data = response.json() results.extend(data["resources"]) return results[:limit]
[docs] def find_full( self, cls: Type[SchemaClass], query: Optional[Dict[str, Any]] = None, limit: Optional[int] = None, ) -> List[SchemaClass]: """ Retrieve a list of full objects of the specified class from the NMDC schema. This function queries the NMDC API to retrieve objects of the specified class, converts the resulting dictionaries to instances of the specified schema class, and returns them as a list. :param cls: The class type of the objects to retrieve. Must be a subclass of ``nmdc_schema.nmdc.NamedThing``. :param query: A dictionary representing the query to filter the results. Defaults to None which returns all objects. :param limit: The maximum number of objects to retrieve. Defaults to None which retrieves all matching objects. :return: A list of instances of the specified class type. """ return [ cls(d) for d in self.find_dict(cls=cls, query=query, fields=None, limit=limit) ]
[docs] def find( self, cls: Type[SchemaClass] | str, query: Optional[Dict[str, Any]] = None, fields: Optional[List[str]] = None, limit: Optional[int] = None, ) -> pd.DataFrame: """ Retrieve a DataFrame of objects of the specified class from the NMDC schema. :param cls: The class type or collection of the objects to retrieve. Must be a subclass of ``nmdc_schema.nmdc.NamedThing`` or the name of a NMDC database collection. :param query: A dictionary representing the query to filter the objects. Defaults to None which returns all objects. :param fields: A list of fields to include in the result. Defaults to None which returns all fields. :param limit: The maximum number of objects to retrieve. Defaults to None which retrieves all matching objects. :return: A DataFrame containing the retrieved objects. """ data = self.find_dict(cls=cls, query=query, fields=fields, limit=limit) df = pd.json_normalize(data) return df
def _split_list(self, input_list: List[int], chunk_size=100): result = [] for i in range(0, len(input_list), chunk_size): result.append(input_list[i : i + chunk_size]) return result
[docs] def lookup_dict( self, cls: Type[SchemaClass] | str, ids: List[str], fields: Optional[List[str]] = None, ) -> List[Dict[str, Any]]: """ Retrieve a list of dictionaries of objects by their identifiers. :param cls: The class type or collection of the objects to retrieve. Must be a subclass of ``nmdc_schema.nmdc.NamedThing`` or the name of a NMDC database collection. :param ids: A list of object identifiers. :param fields: A list of fields to include in the result. Defaults to None which includes all fields. :return: A list of dictionaries representing the retrieved objects. """ chunked_ids = self._split_list(ids) results = [] for chunk in chunked_ids: data = self.find_dict(cls=cls, query={"id": {"$in": chunk}}, fields=fields) results.extend(data) return data
[docs] def lookup( self, cls: Type[SchemaClass] | str, ids: List[str], fields: Optional[List[str]] = None, ) -> pd.DataFrame: """ Retrieve a DataFrame of objects by their identifiers. :param cls: The class type or collection of the objects to retrieve. Must be a subclass of ``nmdc_schema.nmdc.NamedThing`` or the name of an NMDC database collection. :param ids: A list of object identifiers. :param fields: A list of fields to include in the result. Defaults to None which includes all fields. :return: A DataFrame representing the retrieved objects. """ data = self.lookup_dict(cls=cls, ids=ids, fields=fields) df = pd.json_normalize(data) return df
[docs] def lookup_full( self, cls: Type[SchemaClass] | str, ids: List[str], ) -> pd.DataFrame: """ Retrieve a list of schema objects by their identifiers. :param cls: The class type or collection of the objects to retrieve. Must be a subclass of ``nmdc_schema.nmdc.NamedThing`` or the name of an NMDC database collection. :param ids: A list of object identifiers. :return: A list of instances of the specified class type. """ data = self.lookup_dict(cls=cls, ids=ids, fields=None) return [cls(d) for d in data]
[docs] def fetch_codes(self): """ Fetch and cache typecodes from the NMDC typecode API. """ url = f"{self.base_url}/nmdcschema/typecodes" if self.verbose: print(url) self.codes = { d["schema_class"].split(":")[1]: d["name"] for d in requests.get(url, timeout=self.timeout).json() }
[docs] def related_ids( self, ids: List[str], cls: Type[SchemaClass] ) -> List[Tuple[str, str]]: """ Find related object identifiers of the specified class for the specified IDs. This function finds related objects of the specified class for the specified IDs, and returns a dictionary mapping each input object ID to a list of related object IDs matching the specified class. :param ids: A list of object IDs. :param cls: The type of objects to find related objects for. :return: A list of tuples mapping each source ID to a related target ID. Example: .. code-block:: python >>> client = NmdcClient() >>> related = client.related_ids(['nmdc:bsm-13-7qxjvr77'], nmdc.WorkflowExecution) >>> related.sort() >>> for source_id, target_id in related: ... print(source_id, target_id) nmdc:bsm-13-7qxjvr77 nmdc:wfmb-13-w61ppf20.1 nmdc:bsm-13-7qxjvr77 nmdc:wfmgan-11-szz9bq42.1 nmdc:bsm-13-7qxjvr77 nmdc:wfmgas-13-a7e90z13.1 nmdc:bsm-13-7qxjvr77 nmdc:wfmp-11-hpexdy53.1 nmdc:bsm-13-7qxjvr77 nmdc:wfrbt-13-8z2h4m87.1 nmdc:bsm-13-7qxjvr77 nmdc:wfrqc-13-zntcxa44.1 """ cls = cast(Type[nmdc.NamedThing], cls) descendants = schema_view.class_descendants(cls.class_name) if self.codes is None: self.fetch_codes() descendant_codes = [ self.codes.get(c) for c in descendants if self.codes.get(c) is not None ] if self.links is None: self.fetch_links() result = [] for source_id in ids: linked = set(self.follow_links(source_id, SearchDirection.BACK)) linked.update(self.follow_links(source_id, SearchDirection.FORWARD)) # We're checking if the typecode matches any of the descendant codes result.extend( [ (source_id, d) for d in linked if any(d.startswith(f"nmdc:{code}") for code in descendant_codes) ] ) return result
if __name__ == "__main__": import doctest doctest.testmod()