LDPy/ldpy.py

194 lines
4.6 KiB
Python
Executable File

#!/usr/bin/env python3
# coding: utf8
"""A basic opensearch.org client.
All start here.
Author: frederic zind 2022-08-02
Licenses: GNU AGPL v3: http://www.gnu.org/licenses/
"""
import argparse
import logging
import os
from pprint import pformat as pf
from pprint import pprint as pp
import sys
from opensearchpy import AuthorizationException
import client # pylint: disable=import-error
# ###
# Config
# ###
# Range for the request retrieving the last entries.
LAST_E_CHOICES = range(1, 51)
STD_FIELDS = [
"X-OVH-CONTENT-SIZE",
"X-OVH-DELIVERY-DATE",
"X-OVH-INPUT",
"gl2_source_input",
"gl2_source_node",
"source",
"streams",
"timestamp",
]
logger = logging.getLogger(os.path.splitext(os.path.basename(sys.argv[0]))[0])
def setup_logging(options):
"""Configure logging."""
root = logging.getLogger("")
logger.setLevel(options.debug and logging.DEBUG or logging.INFO)
if options.debug:
# Logging with --debug option
# Logging on terminal standard output
handler_tty = logging.StreamHandler()
handler_tty.setFormatter(
logging.Formatter("%(name)s::%(levelname)s::%(message)s")
)
root.addHandler(handler_tty)
else:
# Logging default setings
handler_tty = logging.StreamHandler()
handler_tty.setFormatter(logging.Formatter("%(levelname)s\t%(message)s"))
root.addHandler(handler_tty)
class CustomFormatter(
argparse.RawDescriptionHelpFormatter, argparse.ArgumentDefaultsHelpFormatter
):
"""This convert file docstring into help text."""
pass
def parse_args(args=sys.argv[1:]):
"""Argument parser.
Put values & describe parameters here not in your code
Ease usage with help & docs
"""
parser = argparse.ArgumentParser(
description=sys.modules[__name__].__doc__, formatter_class=CustomFormatter
)
group = parser.add_argument_group("log setting")
group.add_argument(
"-d",
"--debug",
action="store_true",
default=False,
help="Log activity in console",
)
group = parser.add_mutually_exclusive_group()
group.add_argument(
"-m",
"--mapping",
action="store_true",
help="Show the stream mapping",
)
group.add_argument(
"-l",
"--last",
choices=LAST_E_CHOICES,
const="3",
help="Last entries of the stream",
metavar="INT",
nargs="?",
type=int,
)
return parser.parse_args(args)
def request_last_entries(entries_nb, fields=[]):
"""Get the last n entries from a stream."""
if entries_nb not in LAST_E_CHOICES:
logger.critical("'%s' is not in '%s'", entries_nb, LAST_E_CHOICES)
raise ValueError
logger.debug("Requesting last '%s' entries…", entries_nb)
last_entries = client.opnsrch_clt.search(
body={"size": entries_nb, "_source": fields}
)
logger.debug(pf(last_entries))
return last_entries["hits"]["hits"]
def request_map_props():
"""Get stream mapping.
LDP indices are rolling and LDP do not implement the call
to get indices related to an alias.
"""
logger.debug("Requesting mapping for '%s' stream…", client.LDP_STREAM_NAME)
try:
indices = list(client.opnsrch_clt.indices.get_alias("*"))
mapping = client.opnsrch_clt.indices.get_mapping(client.LDP_ALIAS)[indices[0]][
"mappings"
]
map_props = sorted(list(mapping["properties"]))
logger.debug("Mapping received for alias: '%s'", client.LDP_ALIAS)
logger.debug(map_props)
except AuthorizationException:
map_props = None
logger.critical("AuthorizationException: check credentials")
return map_props
def strip_std_field(fields):
"""Remove standard LDP fields to ease human reading.
Returns a list populated with a list of stream specific fields.
"""
for field in STD_FIELDS:
fields.remove(field)
return fields
def main(options):
"""Execute as script. Functions related to the arguments passed."""
if options.mapping:
response = request_map_props()
elif options.last:
response = request_last_entries(
options.last, strip_std_field(request_map_props())
)
logger.debug(
"Last '%s' entries for '%s' stream:", options.last, client.LDP_STREAM_NAME
)
logger.debug(response)
else:
raise NotImplementedError
return response
if __name__ == "__main__":
pargs = parse_args()
setup_logging(pargs)
result = main(pargs)
if not pargs.debug and result:
pp(result)