🔀 Merge branch 12-fields

Reviewed-on: #16

Close #12
This commit is contained in:
freezed 2022-10-11 00:43:10 +00:00
commit 48e81048dc
3 changed files with 70 additions and 66 deletions

View File

@ -1,10 +1,9 @@
LDPy LDPy
==== ====
🚧 _work in progress_ 🚧 This project moved to: [`git.afpy.org`](https://git.afpy.org/fcode/ldpy)
-------------------------
_Take a look on other branches if you are curious…_ 🚧 _work in progress, take a look on other branches if you are curious…_
--- ---
@ -26,7 +25,7 @@ An [OVH Log Data Platform](https://docs.ovh.com/fr/logs-data-platform/) python c
Get the source & edit settings: Get the source & edit settings:
```bash ```bash
git clone git@gitlab.com:forga/tool/ovh/ldpy.git && cd ldpy git clone git@git.afpy.org:fcode/LDPy.git && cd ldpy
cp client.sample.py client.py cp client.sample.py client.py
${EDITOR} client.py ${EDITOR} client.py
``` ```
@ -40,13 +39,14 @@ pip install -r requirements.txt
``` ```
* Get stream mapping: `./ldpy.py --mapping` * Get stream mapping: `./ldpy.py --mapping`
* Get last `n` entries: `./ldpy.py --last`
🚧 Devel quick-start 🚧 Devel quick-start
-------------------- --------------------
- Built with `Python 3.8` - Built with `Python 3.8`
- Code linting with [`flake8`](https://pypi.org/project/flake8/), [`pylint`](https://pypi.org/project/pylint), [`black`](https://pypi.org/project/black) & [pydocstyle](https://pypi.org/project/pydocstyle/). - Code linting with [`flake8`](https://pypi.org/project/flake8/), [`pylint`](https://pypi.org/project/pylint), [`black`](https://pypi.org/project/black) & [`pydocstyle`](https://pypi.org/project/pydocstyle/).
- Install development tools: - Install development tools:
* `pip install -r requirements-dev.txt` * `pip install -r requirements-dev.txt`
- A `Makefile` with tools: run `make help` to have a look - A `Makefile` with tools: run `make help` to have a look

50
ldpy.py
View File

@ -113,34 +113,38 @@ def parse_args(args=sys.argv[1:]):
return parser.parse_args(args) return parser.parse_args(args)
def get_last_entries(entries_nb): def request_last_entries(entries_nb, fields=[]):
"""Get the last n entries from a stream.""" """Get the last n entries from a stream."""
if entries_nb in LAST_E_CHOICES: if entries_nb not in LAST_E_CHOICES:
logger.debug("Wait before getting '%s' entries!", entries_nb)
query = {"size": entries_nb}
last_entries = client.opnsrch_clt.search(body=query)
logger.debug(pf(last_entries))
else:
logger.critical("'%s' is not in '%s'", entries_nb, LAST_E_CHOICES) logger.critical("'%s' is not in '%s'", entries_nb, LAST_E_CHOICES)
raise ValueError raise ValueError
return last_entries logger.debug("Requesting last '%s' entries…", entries_nb)
last_entries = client.opnsrch_clt.search(
body={"size": entries_nb, "_source": fields}
)
logger.debug(pf(last_entries))
return last_entries["hits"]["hits"]
def get_map_props(): def request_map_props():
"""Get stream mapping. """Get stream mapping.
LDP indices are rolling and LDP do not implement the call LDP indices are rolling and LDP do not implement the call
to get indices related to an alias. to get indices related to an alias.
""" """
logger.debug("Requesting mapping for '%s' stream…", client.LDP_STREAM_NAME)
try: try:
indices = list(client.opnsrch_clt.indices.get_alias("*")) indices = list(client.opnsrch_clt.indices.get_alias("*"))
mapping = client.opnsrch_clt.indices.get_mapping(client.LDP_ALIAS)[indices[0]][ mapping = client.opnsrch_clt.indices.get_mapping(client.LDP_ALIAS)[indices[0]][
"mappings" "mappings"
] ]
map_props = sorted(list(mapping["properties"])) map_props = sorted(list(mapping["properties"]))
logger.debug("Mapping found for alias: '%s'", client.LDP_ALIAS) logger.debug("Mapping received for alias: '%s'", client.LDP_ALIAS)
logger.debug(map_props)
except AuthorizationException: except AuthorizationException:
map_props = None map_props = None
@ -149,33 +153,31 @@ def get_map_props():
return map_props return map_props
def strip_std_field(initial_entries): def strip_std_field(fields):
"""Remove standard LDP fields to ease human reading. """Remove standard LDP fields to ease human reading.
Returns a list populated with a dict for each entry. Returns a list populated with a list of stream specific fields.
""" """
stripped_entries = [] for field in STD_FIELDS:
for initial_hit in initial_entries["hits"]["hits"]: fields.remove(field)
for key in STD_FIELDS: return fields
del initial_hit["_source"][key]
stripped_entries.append(initial_hit["_source"])
return stripped_entries
def main(options): def main(options):
"""Execute as script. Functions related to the arguments passed.""" """Execute as script. Functions related to the arguments passed."""
if options.mapping: if options.mapping:
response = get_map_props() response = request_map_props()
logger.debug("Mapping for '%s' stream: %s", client.LDP_STREAM_NAME, response)
elif options.last: elif options.last:
response = strip_std_field(get_last_entries(options.last)) response = request_last_entries(
options.last, strip_std_field(request_map_props())
)
logger.debug( logger.debug(
"Last '%s' entries for '%s' stream:", options.last, client.LDP_STREAM_NAME "Last '%s' entries for '%s' stream:", options.last, client.LDP_STREAM_NAME
) )
logger.debug(response) logger.debug(response)
else: else:
raise NotImplementedError raise NotImplementedError

View File

@ -19,6 +19,7 @@ from ldpy import LAST_E_CHOICES, STD_FIELDS
LAST_E_MIN = LAST_E_CHOICES[0] - 1 LAST_E_MIN = LAST_E_CHOICES[0] - 1
LAST_E_MAX = len(LAST_E_CHOICES) + 1 LAST_E_MAX = len(LAST_E_CHOICES) + 1
META_FIELDS = ["_id", "_source", "_shards", "hits"] META_FIELDS = ["_id", "_source", "_shards", "hits"]
DEMO_STREAM_FIELDS = ["rating_num", "message", "title", "id", "category"]
# Faking options from argparse # Faking options from argparse
@ -89,56 +90,57 @@ def test_parse_args_last_const():
# ### # ###
@mark.parametrize("fields", [META_FIELDS, STD_FIELDS]) @mark.parametrize("fields", [META_FIELDS, STD_FIELDS])
def test_strip_std_field(fields): def test_strip_std_field(fields):
"""Remove keys/values present in all LDP stream.""" """Remove fields present in all LDP stream."""
payload = { payload = [
"_shards": {}, "category",
"hits": { "gl2_source_input",
"hits": [ "gl2_source_node",
{ "id",
"_id": "-", "message",
"_source": { "rating_num",
"category": "-", "source",
"gl2_source_input": "", "streams",
"gl2_source_node": "-", "timestamp",
"id": "-", "title",
"message": "-", "X-OVH-CONTENT-SIZE",
"rating_num": 42, "X-OVH-DELIVERY-DATE",
"source": "-", "X-OVH-INPUT",
"streams": [""], ]
"timestamp": "2022-10-04 20:59:22.364402", stripped_entry = ldpy.strip_std_field(payload)
"title": "-",
"X-OVH-CONTENT-SIZE": 42,
"X-OVH-DELIVERY-DATE": "2022-10-04T20:59:22.578894878Z",
"X-OVH-INPUT": "",
},
},
],
},
}
stripped_entry = ldpy.strip_std_field(payload)[0]
assert isinstance(stripped_entry, dict) assert isinstance(stripped_entry, list)
for field in fields: for field in fields:
assert field not in stripped_entry assert field not in stripped_entry
# ### # ###
# Testing get_last_entries() # Testing request_last_entries()
# ### # ###
@mark.parametrize("entry_np", [LAST_E_MIN, LAST_E_MAX]) @mark.parametrize("opt_last", [LAST_E_MIN, LAST_E_MAX])
def test_get_last_entries_out_of_range(entry_np): def test_request_last_entries_out_of_range(opt_last):
"""Value is out of range for the last entries.""" """Value is out of range for the last entries."""
with raises(ValueError): with raises(ValueError):
ldpy.get_last_entries(entry_np) ldpy.request_last_entries(opt_last)
@pytest.mark.vcr() @pytest.mark.vcr()
@mark.parametrize("entry_np", LAST_E_CHOICES) @mark.parametrize(
def test_get_last_entries_in_range(entry_np): "opt_last, fields",
[
(LAST_E_CHOICES[0], DEMO_STREAM_FIELDS),
(len(LAST_E_CHOICES), DEMO_STREAM_FIELDS),
],
)
def test_request_last_entries_in_range(opt_last, fields):
"""Value is in range for the last entries.""" """Value is in range for the last entries."""
response = ldpy.get_last_entries(entry_np) response_id = opt_last - 1
assert len(response["hits"]["hits"]) == entry_np response = ldpy.request_last_entries(opt_last, fields)
assert len(response) == opt_last
for field in STD_FIELDS:
assert field not in response[0]["_source"]
assert field not in response[response_id]["_source"]
# ### # ###
@ -156,7 +158,7 @@ def test_main_without_option():
def test_main_demo_with_mapping(): def test_main_demo_with_mapping():
"""Called with mapping option. """Called with mapping option.
`main()` just tranfers `get_map_props()` return: this test is not really useful `main()` just transfers `request_map_props()` return: this test is not really useful
""" """
options = FakeOptions(["mapping"]) options = FakeOptions(["mapping"])
response = ldpy.main(options) response = ldpy.main(options)