mirror of
https://github.com/FlipsideCrypto/fsc-utils.git
synced 2026-02-06 10:56:49 +00:00
transform logs udf
This commit is contained in:
parent
f764c6af44
commit
d4e3dae062
@ -146,5 +146,16 @@
|
||||
HANDLER = 'TimestampGenerator'
|
||||
sql: |
|
||||
{{ fsc_utils.create_udf_cron_to_prior_timestamps() | indent(4) }}
|
||||
|
||||
- name: {{ schema }}.udf_transform_logs
|
||||
signature:
|
||||
- [decoded, VARIANT]
|
||||
return_type: VARIANT
|
||||
options: |
|
||||
LANGUAGE PYTHON
|
||||
RUNTIME_VERSION = '3.8'
|
||||
HANDLER = 'transform'
|
||||
sql: |
|
||||
{{ fsc_utils.create_udf_transform_logs() | indent(4) }}
|
||||
{% endmacro %}
|
||||
|
||||
|
||||
@ -122,4 +122,59 @@ class TimestampGenerator:
|
||||
timestamps.append(prev_run)
|
||||
|
||||
return timestamps
|
||||
{% endmacro %}
|
||||
|
||||
{% macro create_udf_transform_logs() %}
|
||||
|
||||
from copy import deepcopy
|
||||
|
||||
def transform_tuple(components: list, values: list):
|
||||
transformed_values = []
|
||||
for i, component in enumerate(components):
|
||||
if i < len(values):
|
||||
if component["type"] == "tuple":
|
||||
transformed_values.append({"value": transform_tuple(component["components"], values[i]), **component})
|
||||
elif component["type"] == "tuple[]":
|
||||
if not values[i]:
|
||||
transformed_values.append({"value": [], **component})
|
||||
continue
|
||||
sub_values = [transform_tuple(component["components"], v) for v in values[i]]
|
||||
transformed_values.append({"value": sub_values, **component})
|
||||
else:
|
||||
transformed_values.append({"value": values[i], **component})
|
||||
return {item["name"]: item["value"] for item in transformed_values}
|
||||
|
||||
def transform_event(event: dict):
|
||||
new_event = deepcopy(event)
|
||||
if new_event.get("components"):
|
||||
components = new_event.get("components")
|
||||
|
||||
if not new_event["value"]:
|
||||
return new_event
|
||||
|
||||
if isinstance(new_event["value"][0], list):
|
||||
result_list = []
|
||||
for value_set in new_event["value"]:
|
||||
result_list.append(transform_tuple(components, value_set))
|
||||
new_event["value"] = result_list
|
||||
|
||||
else:
|
||||
new_event["value"] = transform_tuple(components, new_event["value"])
|
||||
|
||||
return new_event
|
||||
|
||||
else:
|
||||
return event
|
||||
|
||||
def transform(events: dict):
|
||||
try:
|
||||
results = [
|
||||
transform_event(event) if event.get("decoded") else event
|
||||
for event in events["data"]
|
||||
]
|
||||
events["data"] = results
|
||||
return events
|
||||
except:
|
||||
return events
|
||||
|
||||
{% endmacro %}
|
||||
Loading…
Reference in New Issue
Block a user