diff --git a/macros/create_udfs.sql b/macros/create_udfs.sql index c618b6aa..0f31f0d9 100644 --- a/macros/create_udfs.sql +++ b/macros/create_udfs.sql @@ -2,6 +2,9 @@ {% if var("UPDATE_UDFS_AND_SPS") %} {% set sql %} CREATE schema if NOT EXISTS silver; + {{ create_udf_transform_logs( + schema = 'silver' + ) }} {{ create_udtf_get_base_table( schema = "streamline" ) }} diff --git a/macros/python/transform_logs.sql b/macros/python/transform_logs.sql new file mode 100644 index 00000000..cf56d24e --- /dev/null +++ b/macros/python/transform_logs.sql @@ -0,0 +1,34 @@ +{% macro create_udf_transform_logs(schema) %} +create or replace function {{ schema }}.udf_transform_logs(decoded variant) +returns variant +language python +runtime_version = '3.8' +handler = 'transform' as $$ +from copy import deepcopy + +def transform_event(event: dict): + new_event = deepcopy(event) + if new_event.get("components"): + components = new_event.get("components") + for iy, y in enumerate(new_event["value"]): + for i, c in enumerate(components): + y[i] = {"value": y[i], **c} + new_event["value"][iy] = {z["name"]: z["value"] for z in y} + return new_event + else: + return event + + +def transform(events: list): + try: + results = [ + transform_event(event) if event["decoded"] else event + for event in events["data"] + ] + events["data"] = results + return events + except: + return events +$$; + +{% endmacro %} \ No newline at end of file