"""
Set up a catalog for Axiom assets.
"""
from datetime import datetime
from typing import List, MutableMapping, Optional, Sequence, Tuple, Union
import pandas as pd
from cf_pandas import astype
from intake.catalog.base import Catalog
from intake.catalog.local import LocalCatalogEntry
from intake.source.csv import CSVSource
from intake_parquet.source import ParquetSource
from . import __version__
from .axds import AXDSSensorSource
from .utils import (
check_station,
load_metadata,
match_key_to_parameter,
match_std_names_to_parameter,
response_from_url,
)
[docs]class AXDSCatalog(Catalog):
"""
Makes data sources out of all datasets for a given AXDS data type.
Attributes
----------
pglabels : list[str]
If ``keys_to_match`` or ``standard_names`` is input to search on, they are converted to parameterGroupLabels and saved to the catalog metadata.
pgids : list[int]
If ``keys_to_match`` or ``standard_names`` is input to search on, they are converted to parameterGroupIds and saved to the catalog metadata. In the case that ``query_type=="intersection_constrained"`` and ``datatype=="platform2"``, the pgids are passed to the sensor source so that only data from variables corresponding to those pgids are returned.
Parameters
----------
datatype : str
Axiom data type. Currently "platform2" or "sensor_station" but eventually also "module". Platforms and sensors are returned as dataframe containers.
keys_to_match : str, list, optional
Name of keys to match with system-available variable parameterNames using criteria. To filter search by variables, either input keys_to_match and a vocabulary or input standard_names. Results from multiple values will be combined according to ``query_type``.
standard_names : str, list, optional
Standard names to select from Axiom search parameterNames. If more than one is input, the search is for a logical OR of datasets containing the standard_names. To filter search by variables, either input keys_to_match and a vocabulary or input standard_names. Results from multiple values will be combined according to ``query_type``.
bbox : tuple of 4 floats, optional
For explicit geographic search queries, pass a tuple of four floats in the `bbox` argument. The bounding box parameters are `(min_lon, min_lat, max_lon, max_lat)`.
start_time : str, optional
For explicit search queries for datasets that contain data after `start_time`. Must include end_time if include start_time.
end_time : str, optional
For explicit search queries for datasets that contain data before `end_time`. Must include start_time if include end_time.
search_for : str, list of strings, optional
For explicit search queries for datasets that any contain of the terms specified in this keyword argument. Results from multiple values will be combined according to ``query_type``.
kwargs_search : dict, optional
Keyword arguments to input to search on the server before making the catalog. Options are:
* to search by bounding box: include all of min_lon, max_lon, min_lat, max_lat: (int, float). Longitudes must be between -180 to +180.
* to search within a datetime range: include both of min_time, max_time: interpretable datetime string, e.g., "2021-1-1"
* to search using a textual keyword: include `search_for` as a string or list of strings. Results from multiple values will be combined according to ``query_type``.
query_type : str, default "union"
Specifies how the catalog should apply the query parameters. Choices are:
* ``"union"``: the results will be the union of each resulting dataset. This is equivalent to a logical OR.
* ``"intersection"``: the set of results will be the intersection of each individual query made to the server. This is equivalent to a logical AND of the results.
* ``"intersection_constrained"``: the set of results will be the intersection of queries but also only the variables requested (using either ``keys_to_match`` or ``standard_names``) will be returned in the DataFrame, instead of all available variables. This only applies to ``datatype=="sensor_station"``.
qartod : bool, int, list, optional
Whether to return QARTOD agg flags when available, which is only for sensor_stations. Can instead input an int or a list of ints representing the _qa_agg flags for which to return data values. More information about QARTOD testing and flags can be found here: https://cdn.ioos.noaa.gov/media/2020/07/QARTOD-Data-Flags-Manual_version1.2final.pdf. Only used by datatype "sensor_station". Is not available if ``binned==True``.
Examples of ways to use this input are:
* ``qartod=True``: Return aggregate QARTOD flags as a column for each data variable.
* ``qartod=False``: Do not return any QARTOD flag columns.
* ``qartod=1``: nan any data values for which the aggregated QARTOD flags are not equal to 1.
* ``qartod=[1,3]``: nan any data values for which the aggregated QARTOD flags are not equal to 1 or 3.
Flags are:
* 1: Pass
* 2: Not Evaluated
* 3: Suspect
* 4: Fail
* 9: Missing Data
use_units : bool, optional
If True include units in column names. Syntax is "standard_name [units]". If False, no units. Then syntax for column names is "standard_name". This is currently specific to sensor_station only. Only used by datatype "sensor_station".
binned : bool, optional
True for binned data, False for raw, by default False. Only used by datatype "sensor_station".
bin_interval : Optional[str], optional
If ``binned=True``, input the binning interval to return. Options are hourly, daily, weekly, monthly, yearly. If bin_interval is input, binned is set to True. Only used by datatype "sensor_station".
page_size : int, optional
Number of results. Fewer is faster. Note that default is 10. Note that if you want to make sure you get all available datasets, you should input a large number like 50000.
verbose : bool, optional
Set to True for helpful information.
ttl : int, optional
Time to live for catalog (in seconds). How long before force-reloading catalog. Set to None to not do this.
name : str, optional
Name for catalog.
description : str, optional
Description for catalog.
metadata : dict, optional
Metadata for catalog.
kwargs:
Other input arguments are passed to the intake Catalog class. They can includegetenv, getshell, persist_mode, storage_options, and user_parameters, in addition to some that are surfaced directly in this class.
Notes
-----
only datatype sensor_station uses the following parameters: qartod, use_units, binned, bin_interval
datatype of sensor_station skips webcam data.
"""
name = "axds_cat"
version = __version__
def __init__(
self,
datatype: str,
keys_to_match: Optional[Union[str, list]] = None,
standard_names: Optional[Union[str, list]] = None,
bbox: Optional[Tuple[float, float, float, float]] = None,
start_time: Optional[str] = None,
end_time: Optional[str] = None,
search_for: Optional[Union[str, List[str]]] = None,
kwargs_search: MutableMapping[
str, Union[str, int, float, Sequence[Union[str, None]]]
] = None,
query_type: str = "union",
qartod: Union[bool, int, List[int]] = False,
use_units: bool = True,
binned: bool = False,
bin_interval: Optional[str] = None,
page_size: int = 10,
verbose: bool = False,
name: str = "catalog",
description: str = "Catalog of Axiom assets.",
metadata: dict = None,
ttl: Optional[int] = None,
**kwargs,
):
self.datatype = datatype
self.kwargs_search = kwargs_search
self.page_size = page_size
self.verbose = verbose
self.qartod = qartod
self.use_units = use_units
self.kwargs_search = kwargs_search or {}
self.query_type = query_type
if bin_interval is not None:
binned = True
self.binned = binned
self.bin_interval = bin_interval
if self.binned:
if self.qartod:
raise ValueError(
"QARTOD is not available for binned output. Set QARTOD to False or use raw data."
)
allowed_datatypes = ("platform2", "sensor_station")
if datatype not in allowed_datatypes:
raise KeyError(
f"Datatype must be one of {allowed_datatypes} but is {datatype}"
)
allowed_query_types = ("union", "intersection", "intersection_constrained")
if query_type not in allowed_query_types:
raise KeyError(
f"`query_type` must be one of {allowed_query_types} but is {query_type}"
)
if (
query_type == "intersection" or query_type == "intersection_constrained"
) and page_size == 10:
if verbose:
print(
"With `query_type` of 'intersection' you probably want to use a larger `page_size` than the default of 10."
)
if query_type == "intersection_constrained" and datatype == "platform2":
raise ValueError(
"`query_type=='intersection_constrained'` does not apply to `datatype=='platform2'`."
)
# can instead input the kwargs_search outside of that dictionary
if bbox is not None:
if not isinstance(bbox, tuple):
raise TypeError(
f"Expecting a tuple of four floats for argument bbox: {type(bbox)}"
)
if len(bbox) != 4:
raise ValueError("bbox argument requires a tuple of four floats")
self.kwargs_search["min_lon"] = bbox[0]
self.kwargs_search["min_lat"] = bbox[1]
self.kwargs_search["max_lon"] = bbox[2]
self.kwargs_search["max_lat"] = bbox[3]
if start_time is not None:
if start_time in self.kwargs_search:
raise KeyError("start_time defined explicitly and in kwargs_search.")
if not isinstance(start_time, (str, datetime)):
raise TypeError(
f"Expecting a datetime for start_time argument: {repr(start_time)}"
)
# if isinstance(start_time, str):
# start_time = pd.Timestamp(start_time)#.strptime(start_time, "%Y-%m-%dT%H:%M:%SZ")
self.kwargs_search["min_time"] = start_time
if end_time is not None:
if end_time in self.kwargs_search:
raise KeyError("end_time defined explicitly and in kwargs_search.")
if not isinstance(end_time, (str, datetime)):
raise TypeError(
f"Expecting a datetime for end_time argument: {repr(end_time)}"
)
# if isinstance(end_time, str):
# end_time = pd.Timestamp(end_time)#.strptime(end_time, "%Y-%m-%dT%H:%M:%SZ")
self.kwargs_search["max_time"] = end_time
if search_for is not None:
if "search_for" in self.kwargs_search:
raise KeyError("search_for defined explicitly and in kwargs_search.")
if not isinstance(search_for, (str, list)):
raise TypeError(
f"Expecting string or list of strings for search_for argument: {repr(search_for)}"
)
self.kwargs_search["search_for"] = search_for
if "search_for" not in self.kwargs_search:
self.kwargs_search["search_for"] = [None]
else:
if isinstance(self.kwargs_search["search_for"], str):
self.kwargs_search["search_for"] = [self.kwargs_search["search_for"]]
checks = [
["min_lon", "max_lon", "min_lat", "max_lat"],
["min_time", "max_time"],
]
for check in checks:
if any(key in self.kwargs_search for key in check) and not all(
key in self.kwargs_search for key in check
):
raise ValueError(
f"If any of {check} are input, they all must be input."
)
if "min_lon" in self.kwargs_search and "max_lon" in self.kwargs_search:
min_lon, max_lon = (
self.kwargs_search["min_lon"],
self.kwargs_search["max_lon"],
)
if isinstance(min_lon, (int, float)) and isinstance(max_lon, (int, float)):
if abs(min_lon) > 180 or abs(max_lon) > 180:
raise ValueError(
"`min_lon` and `max_lon` must be in the range -180 to 180."
)
# input keys_to_match OR standard_names but not both
if keys_to_match is not None and standard_names is not None:
raise ValueError(
"Input either `keys_to_match` or `standard_names` but not both."
)
self.pglabels: List[Union[str, None]]
self.pgids: List[Union[int, None]]
if keys_to_match is not None:
self.pglabels, self.pgids = match_key_to_parameter(
astype(keys_to_match, list)
)
elif standard_names is not None:
self.pglabels, self.pgids = match_std_names_to_parameter(
astype(standard_names, list)
)
else:
self.pglabels, self.pgids = [None], [None]
# Put together catalog-level stuff
if metadata is None:
metadata = {}
metadata["kwargs_search"] = self.kwargs_search
metadata["pglabels"] = list(self.pglabels)
metadata["pgids"] = list(self.pgids)
metadata["query_type"] = query_type
# metadata["use_units"] = use_units
super(AXDSCatalog, self).__init__(
**kwargs, ttl=ttl, name=name, description=description, metadata=metadata
)
[docs] def search_url(
self, pglabel: Optional[str] = None, text_search: Optional[str] = None
) -> str:
"""Set up one url for searching.
Parameters
----------
pglabel : Optional[str], optional
Parameter Group Label (not ID), by default None
text_search : Optional[str], optional
free text search, by default None
Returns
-------
str
URL to use to search Axiom systems.
"""
self.url_search_base = f"https://search.axds.co/v2/search?portalId=-1&page=1&pageSize={self.page_size}&verbose=true"
url = f"{self.url_search_base}&type={self.datatype}"
assert isinstance(self.kwargs_search, dict)
if self.kwargs_search.keys() >= {
"max_lon",
"min_lon",
"min_lat",
"max_lat",
}:
url_add_box = (
f'&geom={{"type":"Polygon","coordinates":[[[{self.kwargs_search["min_lon"]},{self.kwargs_search["min_lat"]}],'
+ f'[{self.kwargs_search["max_lon"]},{self.kwargs_search["min_lat"]}],'
+ f'[{self.kwargs_search["max_lon"]},{self.kwargs_search["max_lat"]}],'
+ f'[{self.kwargs_search["min_lon"]},{self.kwargs_search["max_lat"]}],'
+ f'[{self.kwargs_search["min_lon"]},{self.kwargs_search["min_lat"]}]]]}}'
)
url += f"{url_add_box}"
if self.kwargs_search.keys() >= {"max_time", "min_time"}:
# convert input datetime to seconds since 1970
startDateTime = (
pd.Timestamp(self.kwargs_search["min_time"]).tz_localize("UTC")
- pd.Timestamp("1970-01-01 00:00").tz_localize("UTC")
) // pd.Timedelta("1s")
endDateTime = (
pd.Timestamp(self.kwargs_search["max_time"]).tz_localize("UTC")
- pd.Timestamp("1970-01-01 00:00").tz_localize("UTC")
) // pd.Timedelta("1s")
# search by time
url_add_time = f"&startDateTime={startDateTime}&endDateTime={endDateTime}"
url += f"{url_add_time}"
if text_search is not None:
url += f"&query={text_search}"
# search by variable
if pglabel is not None:
url += f"&tag=Parameter+Group:{pglabel}"
# if requests.get(url).status_code != 200:
# raise ValueError("")
if self.verbose:
print(f"search url: {url}")
return url
[docs] def get_search_urls(self) -> list:
"""Gather all search urls for catalog.
Inputs that can have more than one search_url are pglabels and search_for list.
Returns
-------
list
List of search urls.
"""
assert isinstance(self.kwargs_search, dict)
search_urls = [
self.search_url(pglabel, text_search)
for pglabel in self.pglabels
for text_search in self.kwargs_search["search_for"]
]
return search_urls
def _load_all_results(self) -> list:
"""Combine results from multiple search urls using query_type.
Returns
-------
list
list of results from potentially multiple searches.
Raises
------
ValueError
if no results found.
"""
combined_results = []
first_loop = True
for search_url in self.get_search_urls():
res = response_from_url(search_url)
if "results" not in res:
raise ValueError(
f"No results were returned for the search. Search url: {search_url}."
)
assert isinstance(res, dict)
if self.verbose:
print(
f"For search url {search_url}, number of results found: {len(res['results'])}. Page size: {self.page_size}."
)
if self.query_type == "union": # logical OR
combined_results.extend(res["results"])
elif (
self.query_type == "intersection"
or self.query_type == "intersection_constrained"
): # logical AND
if first_loop: # initialize
combined_results = res["results"]
first_loop = False
else:
# compare uuids from first search with uuids from next search
uuids_before = [dataset["uuid"] for dataset in combined_results]
uuids_now = [dataset["uuid"] for dataset in res["results"]]
overlapping_uuids = set(uuids_before).intersection(set(uuids_now))
combined_results = [
dataset
for dataset in combined_results
if dataset["uuid"] in overlapping_uuids
]
if self.verbose:
unique = set([res["uuid"] for res in combined_results])
print(
f"Total number of results found for page_size {self.page_size} over {len(self.get_search_urls())} different searches with query_type {self.query_type}: {len(combined_results)}, with unique results: {len(unique)}."
)
return combined_results
def _load(self):
"""Find all UUIDs and create catalog."""
results = self._load_all_results()
self._entries = {}
for result in results:
uuid = result["uuid"]
# don't repeat an entry (it won't actually allow you to, but probably saves time not to try)
if uuid in self._entries:
continue
if self.verbose:
print(f"Dataset ID: {uuid}")
# # quick check if OPENDAP is in the access methods for this uuid, otherwise move on
# if self.datatype == "module":
# # if opendap is not in the access methods at the module level, then we assume it
# # also isn't at the layer_group level, so we will not check each layer_group
# if "OPENDAP" not in results["data"]["access_methods"]:
# if self.verbose:
# print(
# f"Cannot access module {dataset_id} via opendap so no source is being made for it.",
# UserWarning,
# )
# # warnings.warn(f"Cannot access module {dataset_id} via opendap so no source is being made for it.", UserWarning)
# continue
# if "DEPRECATED" in results["data"]["label"]:
# if self.verbose:
# print(
# f"Skipping module {dataset_id} because label says it is deprecated.",
# UserWarning,
# )
# continue
description = f"AXDS dataset_id {uuid} of datatype {self.datatype}"
metadata = load_metadata(self.datatype, result)
keep_station = check_station(metadata, verbose=self.verbose)
if not keep_station:
continue
# Find urlpath
if self.datatype == "platform2":
# use parquet if available, otherwise csv
try:
key = [
key
for key in result["source"]["files"].keys()
if ".parquet" in key
][0]
urlpath = result["source"]["files"][key]["url"]
plugin = ParquetSource
except Exception:
urlpath = result["source"]["files"]["data.csv.gz"]["url"]
plugin = CSVSource
args = {
"urlpath": urlpath,
}
# this Source has different arg requirements
elif self.datatype == "sensor_station":
args = {
"internal_id": metadata["internal_id"],
"uuid": uuid,
"start_time": self.kwargs_search.get("min_time", None),
"end_time": self.kwargs_search.get("max_time", None),
# "kwargs_search": self.kwargs_search,
"qartod": self.qartod,
"use_units": self.use_units,
"binned": self.binned,
"bin_interval": self.bin_interval,
"only_pgids": list(self.pgids)
if self.query_type == "intersection_constrained"
else None,
}
plugin = AXDSSensorSource
# elif self.datatype == "module":
# plugin = NetCDFSource # 'netcdf'
# # modules are the umbrella and contain 1 or more layer_groups
# # pull out associated layer groups uuids to make sure to capture them
# layer_group_uuids = list(docs["data"]["layer_group_info"].keys())
# # pull up docs for each layer_group to get urlpath
# # can only get a urlpath if it is available on opendap
# urlpaths = [] # using this to see if there are ever multiple urlpaths
# for layer_group_uuid in layer_group_uuids:
# docs_lg = return_docs_response(layer_group_uuid)
# docs_log = response_from_url(make_search_docs_url(layer_group_uuid))
# if "OPENDAP" in docs_lg["data"]["access_methods"]:
# urlpath = docs_lg["source"]["layers"][0][
# "thredds_opendap_url"
# ].removesuffix(".html")
# urlpaths.append(urlpath)
# # only want unique urlpaths
# urlpaths = list(set(urlpaths))
# if len(urlpaths) > 1:
# if self.verbose:
# print(
# f"Several urlpaths were found for module {dataset_id} so no source is being made for it."
# )
# # warnings.warn(f"Several urlpaths were found for module {dataset_id} so no source is being made for it.", UserWarning)
# continue
# # raise ValueError(f"the layer_groups for module {dataset_id} have different urlpaths.")
# elif len(urlpaths) == 0:
# if self.verbose:
# print(
# f"No urlpath was found for module {dataset_id} so no source is being made for it."
# )
# # warnings.warn(f"No urlpath was found for module {dataset_id} so no source is being made for it.", UserWarning)
# continue
# else:
# urlpath = urlpaths[0]
entry = LocalCatalogEntry(
name=uuid,
description=description,
driver=plugin,
direct_access="allow",
args=args,
metadata=metadata,
# True,
# args,
# {},
# {},
# {},
# "",
# getenv=False,
# getshell=False,
)
entry._plugin = [plugin]
self._entries[uuid] = entry
# final tally
if self.verbose:
print(
f"Final number of stations found after removing some: {len(self._entries)}."
)