#!/usr/bin/env python3 # coding: utf8 """A basic opensearch.org client. All start here. Author: frederic zind 2022-08-02 Licenses: GNU AGPL v3: http://www.gnu.org/licenses/ """ import argparse import logging import os from pprint import pformat as pf from pprint import pprint as pp import sys from opensearchpy import AuthorizationException import client # pylint: disable=import-error # ### # Config # ### # Range for the request retrieving the last entries. LAST_E_CHOICES = range(1, 51) STD_FIELDS = [ "X-OVH-CONTENT-SIZE", "X-OVH-DELIVERY-DATE", "X-OVH-INPUT", "gl2_source_input", "gl2_source_node", "source", "streams", "timestamp", ] logger = logging.getLogger(os.path.splitext(os.path.basename(sys.argv[0]))[0]) def setup_logging(options): """Configure logging.""" root = logging.getLogger("") logger.setLevel(options.debug and logging.DEBUG or logging.INFO) if options.debug: # Logging with --debug option # Logging on terminal standard output handler_tty = logging.StreamHandler() handler_tty.setFormatter( logging.Formatter("%(name)s::%(levelname)s::%(message)s") ) root.addHandler(handler_tty) else: # Logging default setings handler_tty = logging.StreamHandler() handler_tty.setFormatter(logging.Formatter("%(levelname)s\t%(message)s")) root.addHandler(handler_tty) class CustomFormatter( argparse.RawDescriptionHelpFormatter, argparse.ArgumentDefaultsHelpFormatter ): """This convert file docstring into help text.""" pass def parse_args(args=sys.argv[1:]): """Argument parser. Put values & describe parameters here not in your code Ease usage with help & docs """ parser = argparse.ArgumentParser( description=sys.modules[__name__].__doc__, formatter_class=CustomFormatter ) group = parser.add_argument_group("log setting") group.add_argument( "-d", "--debug", action="store_true", default=False, help="Log activity in console", ) group = parser.add_mutually_exclusive_group() group.add_argument( "-m", "--mapping", action="store_true", help="Show the stream mapping", ) group.add_argument( "-l", "--last", choices=LAST_E_CHOICES, const="3", help="Last entries of the stream", metavar="INT", nargs="?", type=int, ) return parser.parse_args(args) def request_last_entries(entries_nb, fields=[]): """Get the last n entries from a stream.""" if entries_nb not in LAST_E_CHOICES: logger.critical("'%s' is not in '%s'", entries_nb, LAST_E_CHOICES) raise ValueError logger.debug("Requesting last '%s' entries…", entries_nb) last_entries = client.opnsrch_clt.search( body={"size": entries_nb, "_source": fields} ) logger.debug(pf(last_entries)) return last_entries["hits"]["hits"] def request_map_props(): """Get stream mapping. LDP indices are rolling and LDP do not implement the call to get indices related to an alias. """ logger.debug("Requesting mapping for '%s' stream…", client.LDP_STREAM_NAME) try: indices = list(client.opnsrch_clt.indices.get_alias("*")) mapping = client.opnsrch_clt.indices.get_mapping(client.LDP_ALIAS)[indices[0]][ "mappings" ] map_props = sorted(list(mapping["properties"])) logger.debug("Mapping received for alias: '%s'", client.LDP_ALIAS) logger.debug(map_props) except AuthorizationException: map_props = None logger.critical("AuthorizationException: check credentials") return map_props def strip_std_field(fields): """Remove standard LDP fields to ease human reading. Returns a list populated with a list of stream specific fields. """ for field in STD_FIELDS: fields.remove(field) return fields def main(options): """Execute as script. Functions related to the arguments passed.""" if options.mapping: response = request_map_props() elif options.last: response = request_last_entries( options.last, strip_std_field(request_map_props()) ) logger.debug( "Last '%s' entries for '%s' stream:", options.last, client.LDP_STREAM_NAME ) logger.debug(response) else: raise NotImplementedError return response if __name__ == "__main__": pargs = parse_args() setup_logging(pargs) result = main(pargs) if not pargs.debug and result: pp(result)