️ Build request with stream specific fields #12

All LDP stream shares standards fields
This commit remove those from the stream mapping and build a request asking only
for the stream specific fields.
This commit is contained in:
Freezed 2022-10-11 01:11:31 +02:00
parent 6ed57de1aa
commit 949520d260
2 changed files with 59 additions and 49 deletions

41
ldpy.py
View File

@ -113,33 +113,38 @@ def parse_args(args=sys.argv[1:]):
return parser.parse_args(args) return parser.parse_args(args)
def get_last_entries(entries_nb): def request_last_entries(entries_nb, fields=[]):
"""Get the last n entries from a stream.""" """Get the last n entries from a stream."""
if entries_nb not in LAST_E_CHOICES: if entries_nb not in LAST_E_CHOICES:
logger.critical("'%s' is not in '%s'", entries_nb, LAST_E_CHOICES) logger.critical("'%s' is not in '%s'", entries_nb, LAST_E_CHOICES)
raise ValueError raise ValueError
logger.debug("Wait before getting '%s' entries!", entries_nb) logger.debug("Requesting last '%s' entries…", entries_nb)
query = {"size": entries_nb} last_entries = client.opnsrch_clt.search(
last_entries = client.opnsrch_clt.search(body=query) body={"size": entries_nb, "_source": fields}
)
logger.debug(pf(last_entries)) logger.debug(pf(last_entries))
return last_entries["hits"]["hits"] return last_entries["hits"]["hits"]
def get_map_props(): def request_map_props():
"""Get stream mapping. """Get stream mapping.
LDP indices are rolling and LDP do not implement the call LDP indices are rolling and LDP do not implement the call
to get indices related to an alias. to get indices related to an alias.
""" """
logger.debug("Requesting mapping for '%s' stream…", client.LDP_STREAM_NAME)
try: try:
indices = list(client.opnsrch_clt.indices.get_alias("*")) indices = list(client.opnsrch_clt.indices.get_alias("*"))
mapping = client.opnsrch_clt.indices.get_mapping(client.LDP_ALIAS)[indices[0]][ mapping = client.opnsrch_clt.indices.get_mapping(client.LDP_ALIAS)[indices[0]][
"mappings" "mappings"
] ]
map_props = sorted(list(mapping["properties"])) map_props = sorted(list(mapping["properties"]))
logger.debug("Mapping found for alias: '%s'", client.LDP_ALIAS) logger.debug("Mapping received for alias: '%s'", client.LDP_ALIAS)
logger.debug(map_props)
except AuthorizationException: except AuthorizationException:
map_props = None map_props = None
@ -148,33 +153,31 @@ def get_map_props():
return map_props return map_props
def strip_std_field(initial_entries): def strip_std_field(fields):
"""Remove standard LDP fields to ease human reading. """Remove standard LDP fields to ease human reading.
Returns a list populated with a dict for each entry. Returns a list populated with a list of stream specific fields.
""" """
stripped_entries = [] for field in STD_FIELDS:
for initial_hit in initial_entries: fields.remove(field)
for key in STD_FIELDS: return fields
del initial_hit["_source"][key]
stripped_entries.append(initial_hit["_source"])
return stripped_entries
def main(options): def main(options):
"""Execute as script. Functions related to the arguments passed.""" """Execute as script. Functions related to the arguments passed."""
if options.mapping: if options.mapping:
response = get_map_props() response = request_map_props()
logger.debug("Mapping for '%s' stream: %s", client.LDP_STREAM_NAME, response)
elif options.last: elif options.last:
response = strip_std_field(get_last_entries(options.last)) response = request_last_entries(
options.last, strip_std_field(request_map_props())
)
logger.debug( logger.debug(
"Last '%s' entries for '%s' stream:", options.last, client.LDP_STREAM_NAME "Last '%s' entries for '%s' stream:", options.last, client.LDP_STREAM_NAME
) )
logger.debug(response) logger.debug(response)
else: else:
raise NotImplementedError raise NotImplementedError

View File

@ -19,6 +19,7 @@ from ldpy import LAST_E_CHOICES, STD_FIELDS
LAST_E_MIN = LAST_E_CHOICES[0] - 1 LAST_E_MIN = LAST_E_CHOICES[0] - 1
LAST_E_MAX = len(LAST_E_CHOICES) + 1 LAST_E_MAX = len(LAST_E_CHOICES) + 1
META_FIELDS = ["_id", "_source", "_shards", "hits"] META_FIELDS = ["_id", "_source", "_shards", "hits"]
DEMO_STREAM_FIELDS = ["rating_num", "message", "title", "id", "category"]
# Faking options from argparse # Faking options from argparse
@ -89,51 +90,57 @@ def test_parse_args_last_const():
# ### # ###
@mark.parametrize("fields", [META_FIELDS, STD_FIELDS]) @mark.parametrize("fields", [META_FIELDS, STD_FIELDS])
def test_strip_std_field(fields): def test_strip_std_field(fields):
"""Remove keys/values present in all LDP stream.""" """Remove fields present in all LDP stream."""
payload = [ payload = [
{ "category",
"_id": "-", "gl2_source_input",
"_source": { "gl2_source_node",
"category": "-", "id",
"gl2_source_input": "", "message",
"gl2_source_node": "-", "rating_num",
"id": "-", "source",
"message": "-", "streams",
"rating_num": 42, "timestamp",
"source": "-", "title",
"streams": [""], "X-OVH-CONTENT-SIZE",
"timestamp": "2022-10-04 20:59:22.364402", "X-OVH-DELIVERY-DATE",
"title": "-", "X-OVH-INPUT",
"X-OVH-CONTENT-SIZE": 42,
"X-OVH-DELIVERY-DATE": "2022-10-04T20:59:22.578894878Z",
"X-OVH-INPUT": "",
},
},
] ]
stripped_entry = ldpy.strip_std_field(payload)[0] stripped_entry = ldpy.strip_std_field(payload)
assert isinstance(stripped_entry, dict) assert isinstance(stripped_entry, list)
for field in fields: for field in fields:
assert field not in stripped_entry assert field not in stripped_entry
# ### # ###
# Testing get_last_entries() # Testing request_last_entries()
# ### # ###
@mark.parametrize("entry_np", [LAST_E_MIN, LAST_E_MAX]) @mark.parametrize("opt_last", [LAST_E_MIN, LAST_E_MAX])
def test_get_last_entries_out_of_range(entry_np): def test_request_last_entries_out_of_range(opt_last):
"""Value is out of range for the last entries.""" """Value is out of range for the last entries."""
with raises(ValueError): with raises(ValueError):
ldpy.get_last_entries(entry_np) ldpy.request_last_entries(opt_last)
@pytest.mark.vcr() @pytest.mark.vcr()
@mark.parametrize("entry_np", LAST_E_CHOICES) @mark.parametrize(
def test_get_last_entries_in_range(entry_np): "opt_last, fields",
[
(LAST_E_CHOICES[0], DEMO_STREAM_FIELDS),
(len(LAST_E_CHOICES), DEMO_STREAM_FIELDS),
],
)
def test_request_last_entries_in_range(opt_last, fields):
"""Value is in range for the last entries.""" """Value is in range for the last entries."""
response = ldpy.get_last_entries(entry_np) response_id = opt_last - 1
assert len(response) == entry_np response = ldpy.request_last_entries(opt_last, fields)
assert len(response) == opt_last
for field in STD_FIELDS:
assert field not in response[0]["_source"]
assert field not in response[response_id]["_source"]
# ### # ###
@ -151,7 +158,7 @@ def test_main_without_option():
def test_main_demo_with_mapping(): def test_main_demo_with_mapping():
"""Called with mapping option. """Called with mapping option.
`main()` just tranfers `get_map_props()` return: this test is not really useful `main()` just transfers `request_map_props()` return: this test is not really useful
""" """
options = FakeOptions(["mapping"]) options = FakeOptions(["mapping"])
response = ldpy.main(options) response = ldpy.main(options)