Compare commits

...

7 Commits

Author SHA1 Message Date
48e81048dc 🔀 Merge branch 12-fields
Reviewed-on: #16

Close #12
2022-10-11 00:43:10 +00:00
Freezed
00d3a3e915 📝 Update README #12 2022-10-11 02:37:09 +02:00
Freezed
949520d260 ️ Build request with stream specific fields #12
All LDP stream shares standards fields
This commit remove those from the stream mapping and build a request asking only
for the stream specific fields.
2022-10-11 02:14:17 +02:00
Freezed
6ed57de1aa 🎨 Normalize field stripping #12
`strip_std_field()` will be used to strip fields from the `get_map_props()` in
the next iterations.
This modification aim to format data passed to `strip_std_field()` then
we can use it easyly later for other options.
2022-10-10 09:58:44 +02:00
Freezed
cc16378233 🎨 Remove std case from conditional stmt #12 2022-10-10 09:35:09 +02:00
Freezed
4e01b8a5c7 🔀 Merge branch '13-std-fields'
Resolve "Strip LDP standards fields"

Closes #13

See merge request forga/tool/ovh/ldpy!4
2022-10-05 23:57:33 +00:00
Freezed
d1b19fa491 Normalize fields stripping #13
Previous code was based on demo’s fields to remove undesired one.
As LDP populate hits with some standards fields, this code just remove it.
TODO #13 : build request to retrieve only desired fields
2022-10-06 01:56:31 +02:00
3 changed files with 99 additions and 87 deletions

View File

@ -1,10 +1,9 @@
LDPy
====
🚧 _work in progress_ 🚧
-------------------------
This project moved to: [`git.afpy.org`](https://git.afpy.org/fcode/ldpy)
_Take a look on other branches if you are curious…_
🚧 _work in progress, take a look on other branches if you are curious…_
---
@ -26,7 +25,7 @@ An [OVH Log Data Platform](https://docs.ovh.com/fr/logs-data-platform/) python c
Get the source & edit settings:
```bash
git clone git@gitlab.com:forga/tool/ovh/ldpy.git && cd ldpy
git clone git@git.afpy.org:fcode/LDPy.git && cd ldpy
cp client.sample.py client.py
${EDITOR} client.py
```
@ -40,13 +39,14 @@ pip install -r requirements.txt
```
* Get stream mapping: `./ldpy.py --mapping`
* Get last `n` entries: `./ldpy.py --last`
🚧 Devel quick-start
--------------------
- Built with `Python 3.8`
- Code linting with [`flake8`](https://pypi.org/project/flake8/), [`pylint`](https://pypi.org/project/pylint), [`black`](https://pypi.org/project/black) & [pydocstyle](https://pypi.org/project/pydocstyle/).
- Code linting with [`flake8`](https://pypi.org/project/flake8/), [`pylint`](https://pypi.org/project/pylint), [`black`](https://pypi.org/project/black) & [`pydocstyle`](https://pypi.org/project/pydocstyle/).
- Install development tools:
* `pip install -r requirements-dev.txt`
- A `Makefile` with tools: run `make help` to have a look

87
ldpy.py
View File

@ -14,15 +14,31 @@ import argparse
import logging
import os
from pprint import pformat as pf
from pprint import pprint as pp
import sys
from opensearchpy import AuthorizationException
import client # pylint: disable=import-error
# Ranqe for the request retrieving the last entries.
# ###
# Config
# ###
# Range for the request retrieving the last entries.
LAST_E_CHOICES = range(1, 51)
STD_FIELDS = [
"X-OVH-CONTENT-SIZE",
"X-OVH-DELIVERY-DATE",
"X-OVH-INPUT",
"gl2_source_input",
"gl2_source_node",
"source",
"streams",
"timestamp",
]
logger = logging.getLogger(os.path.splitext(os.path.basename(sys.argv[0]))[0])
@ -97,34 +113,38 @@ def parse_args(args=sys.argv[1:]):
return parser.parse_args(args)
def get_last_entries(entries_nb):
def request_last_entries(entries_nb, fields=[]):
"""Get the last n entries from a stream."""
if entries_nb in LAST_E_CHOICES:
logger.debug("Wait before getting '%s' entries!", entries_nb)
query = {"size": entries_nb}
last_entries = client.opnsrch_clt.search(body=query)
logger.debug(pf(last_entries))
else:
if entries_nb not in LAST_E_CHOICES:
logger.critical("'%s' is not in '%s'", entries_nb, LAST_E_CHOICES)
raise ValueError
return last_entries
logger.debug("Requesting last '%s' entries…", entries_nb)
last_entries = client.opnsrch_clt.search(
body={"size": entries_nb, "_source": fields}
)
logger.debug(pf(last_entries))
return last_entries["hits"]["hits"]
def get_map_props():
def request_map_props():
"""Get stream mapping.
LDP indices are rolling and LDP do not implement the call
to get indices related to an alias.
"""
logger.debug("Requesting mapping for '%s' stream…", client.LDP_STREAM_NAME)
try:
indices = list(client.opnsrch_clt.indices.get_alias("*"))
mapping = client.opnsrch_clt.indices.get_mapping(client.LDP_ALIAS)[indices[0]][
"mappings"
]
map_props = sorted(list(mapping["properties"]))
logger.debug("Mapping found for alias: '%s'", client.LDP_ALIAS)
logger.debug("Mapping received for alias: '%s'", client.LDP_ALIAS)
logger.debug(map_props)
except AuthorizationException:
map_props = None
@ -133,46 +153,31 @@ def get_map_props():
return map_props
def strip_demo_entries(raw_data):
"""Remove keys in entries to ease human reading.
def strip_std_field(fields):
"""Remove standard LDP fields to ease human reading.
Returns a list populated with a dict for each entry.
This is a specific function for demo stream /!\
Returns a list populated with a list of stream specific fields.
"""
stripped_data = []
for field in STD_FIELDS:
fields.remove(field)
for raw_hit in raw_data["hits"]["hits"]:
stripped_hit = {}
for key in [
"source",
"category",
"title",
"message",
"rating_num",
"timestamp",
]:
stripped_hit[key] = raw_hit["_source"][key]
stripped_data.append(stripped_hit)
return stripped_data
return fields
def main(options):
"""Execute as script. Functions related to the arguments passed.
Data stripper use a function build only for demo stream: `strip_demo_entries()`
"""
"""Execute as script. Functions related to the arguments passed."""
if options.mapping:
response = get_map_props()
logger.debug("Mapping for '%s' stream: %s", client.LDP_STREAM_NAME, response)
response = request_map_props()
elif options.last:
response = pf(strip_demo_entries(get_last_entries(options.last)))
response = request_last_entries(
options.last, strip_std_field(request_map_props())
)
logger.debug(
"Last '%s' entries for '%s' stream:", options.last, client.LDP_STREAM_NAME
)
logger.debug(response)
else:
raise NotImplementedError
@ -185,4 +190,4 @@ if __name__ == "__main__":
result = main(pargs)
if not pargs.debug and result:
print(result)
pp(result)

View File

@ -13,11 +13,13 @@ import pytest
from pytest import mark, raises
import ldpy
from ldpy import LAST_E_CHOICES
from ldpy import LAST_E_CHOICES, STD_FIELDS
# Min and Max values for the request retrieving the last entries.
LAST_E_MIN = LAST_E_CHOICES[0] - 1
LAST_E_MAX = len(LAST_E_CHOICES) + 1
META_FIELDS = ["_id", "_source", "_shards", "hits"]
DEMO_STREAM_FIELDS = ["rating_num", "message", "title", "id", "category"]
# Faking options from argparse
@ -84,57 +86,61 @@ def test_parse_args_last_const():
# ###
# Testing strip_demo_entries()
# Testing strip_std_field()
# ###
def test_strip_demo_entries():
"""Remove keys/values to ease human readinq in demo stream."""
payload = {
"hits": {
"hits": [
{
"_id": "-",
"_source": {
"X-OVH-": "-",
"X-OVH-INPUT": "-",
"gl2_source": "-",
"gl2_source_node": "-",
"id": "-",
"source": "-",
"category": "-",
"title": "-",
"message": "-",
"rating_num": 42,
"streams": "-",
"timestamp": "-",
},
},
],
},
}
stripped_entry = ldpy.strip_demo_entries(payload)[0]
@mark.parametrize("fields", [META_FIELDS, STD_FIELDS])
def test_strip_std_field(fields):
"""Remove fields present in all LDP stream."""
payload = [
"category",
"gl2_source_input",
"gl2_source_node",
"id",
"message",
"rating_num",
"source",
"streams",
"timestamp",
"title",
"X-OVH-CONTENT-SIZE",
"X-OVH-DELIVERY-DATE",
"X-OVH-INPUT",
]
stripped_entry = ldpy.strip_std_field(payload)
for key in ["source", "category", "title", "message", "rating_num", "timestamp"]:
stripped_entry.pop(key)
assert isinstance(stripped_entry, list)
assert len(stripped_entry) == 0
for field in fields:
assert field not in stripped_entry
# ###
# Testing get_last_entries()
# Testing request_last_entries()
# ###
@mark.parametrize("entry_np", [LAST_E_MIN, LAST_E_MAX])
def test_get_last_entries_out_of_range(entry_np):
@mark.parametrize("opt_last", [LAST_E_MIN, LAST_E_MAX])
def test_request_last_entries_out_of_range(opt_last):
"""Value is out of range for the last entries."""
with raises(ValueError):
ldpy.get_last_entries(entry_np)
ldpy.request_last_entries(opt_last)
@pytest.mark.vcr()
@mark.parametrize("entry_np", LAST_E_CHOICES)
def test_get_last_entries_in_range(entry_np):
@mark.parametrize(
"opt_last, fields",
[
(LAST_E_CHOICES[0], DEMO_STREAM_FIELDS),
(len(LAST_E_CHOICES), DEMO_STREAM_FIELDS),
],
)
def test_request_last_entries_in_range(opt_last, fields):
"""Value is in range for the last entries."""
response = ldpy.get_last_entries(entry_np)
assert len(response["hits"]["hits"]) == entry_np
response_id = opt_last - 1
response = ldpy.request_last_entries(opt_last, fields)
assert len(response) == opt_last
for field in STD_FIELDS:
assert field not in response[0]["_source"]
assert field not in response[response_id]["_source"]
# ###
@ -152,7 +158,7 @@ def test_main_without_option():
def test_main_demo_with_mapping():
"""Called with mapping option.
`main()` just tranfers `get_map_props()` return: this test is not really useful
`main()` just transfers `request_map_props()` return: this test is not really useful
"""
options = FakeOptions(["mapping"])
response = ldpy.main(options)
@ -166,4 +172,5 @@ def test_main_demo_with_last_const():
options = FakeOptions(["last"])
response = ldpy.main(options)
assert isinstance(response, str)
assert isinstance(response, list)
assert isinstance(response[0], dict)