Retrieve last entries from stream #4

This commit is contained in:
Freezed 2022-09-26 10:24:40 +02:00
parent d04d4e235e
commit c656f4f1bd
3 changed files with 83 additions and 14 deletions

View File

@ -24,3 +24,4 @@ lint:
- pip install -r requirements.txt
- pip install -r requirements-dev.txt
- make --no-print-directory --quiet lint
- make --no-print-directory --quiet test

42
ldpy.py
View File

@ -17,6 +17,8 @@ import sys
import client # pylint: disable=import-error
LAST_E_CHOICES = range(1, 21)
logger = logging.getLogger(os.path.splitext(os.path.basename(sys.argv[0]))[0])
@ -61,7 +63,8 @@ def parse_args(args=sys.argv[1:]):
description=sys.modules[__name__].__doc__, formatter_class=CustomFormatter
)
parser.add_argument(
group = parser.add_argument_group("log setting")
group.add_argument(
"-d",
"--debug",
action="store_true",
@ -69,19 +72,46 @@ def parse_args(args=sys.argv[1:]):
help="Log activity in console",
)
parser.add_argument(
group = parser.add_mutually_exclusive_group()
group.add_argument(
"-m",
"--mapping",
action="store_true",
help="Show the stream mapping",
)
group.add_argument(
"-l",
"--last",
choices=LAST_E_CHOICES,
const="3",
help="Last entries of the stream",
metavar="INT",
nargs="?",
type=int,
)
return parser.parse_args(args)
def get_last_entries(entries_nb):
"""Get stream last n entries."""
if entries_nb in LAST_E_CHOICES:
logger.debug("Wait before getting '%s' entries!", entries_nb)
query = {"size": entries_nb}
last_entries = client.opnsrch_clt.search(body=query)
else:
logger.critical("'%s' is not in '%s'", entries_nb, LAST_E_CHOICES)
raise ValueError
return last_entries
def get_map_props():
"""Get stream mapping.
'demo' stream did not have an LDP_ALIAS
LDP indices are rolling and LDP do not implement the call
to get indices related to an alias.
"""
@ -108,8 +138,14 @@ def main(options):
if options.mapping:
response = get_map_props()
logger.debug("Mapping for '%s' stream: %s", client.LDP_STREAM_NAME, response)
elif options.last:
response = get_last_entries(options.last)
logger.debug(
"Last '%s' entries for '%s' stream:", options.last, client.LDP_STREAM_NAME
)
logger.debug(response)
else:
raise RuntimeError
raise NotImplementedError
return response

View File

@ -9,7 +9,7 @@ Author: frederic zind 2022-09-24
Licenses: GNU AGPL v3: http://www.gnu.org/licenses/
"""
import logging
from pytest import raises
from pytest import mark, raises
import ldpy
@ -19,12 +19,15 @@ class FakeOptions: # pylint: disable=too-few-public-methods
"""Options object mock."""
debug = False
last = None
mapping = False
def __init__(self, options):
"""Built loggers options."""
if "debug" in options:
self.debug = True
if "last" in options:
self.last = 3
if "mapping" in options:
self.mapping = True
@ -54,21 +57,41 @@ def test_logging_default():
# Testing parse_args()
# ###
def test_parse_args():
"""Defaults arguments are here."""
"""Mapping argument is set."""
cli_options = ldpy.parse_args(["--debug", "--mapping"])
assert cli_options.debug
assert cli_options.mapping
# ###
# Testing get_map_props()
# ###
def test_get_map_props():
"""This is an integration test.
def test_parse_args_last_set():
"""Last argument is set to a non-default value."""
cli_options = ldpy.parse_args(["--debug", "--last", "2"])
assert cli_options.debug
assert cli_options.last == 2
It need credentials to guery LDP stream.
"""
pass
def test_parse_args_last_const():
"""Last argument is set to a non-default value."""
cli_options = ldpy.parse_args(["--debug", "--last"])
assert cli_options.debug
assert cli_options.last == 3
# ###
# Testing get_last_entries()
# ###
@mark.parametrize("entry_np", [0, 21])
def test_get_last_entries_out_of_range(entry_np):
"""Value is out of range for the last entries."""
with raises(ValueError):
ldpy.get_last_entries(entry_np)
@mark.parametrize("entry_np", range(1, 21))
def test_get_last_entries_in_range(entry_np):
"""Value is in range for the last entries."""
response = ldpy.get_last_entries(entry_np)
assert len(response["hits"]["hits"]) == entry_np
# ###
@ -78,7 +101,7 @@ def test_main_without_option():
"""Called without option."""
options = FakeOptions([])
with raises(RuntimeError):
with raises(NotImplementedError):
ldpy.main(options)
@ -91,3 +114,12 @@ def test_main_demo_with_mapping():
response = ldpy.main(options)
assert isinstance(response, list)
def test_main_demo_with_last_const():
"""Called with last option."""
options = FakeOptions(["last"])
response = ldpy.main(options)
assert isinstance(response, dict)
assert isinstance(response["hits"]["hits"], list)