STREAM-724/add-udfs-for-overflowed-responses (#27)

This commit is contained in:
Julius Remigio 2023-12-20 15:13:03 -08:00 committed by GitHub
parent 7bc119a94e
commit 382737cff0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 202 additions and 11 deletions

View File

@ -36,7 +36,7 @@
return_type: TEXT
options: |
NULL
LANGUAGE SQL
LANGUAGE SQL
STRICT IMMUTABLE
sql: |
SELECT
@ -120,8 +120,8 @@
PACKAGES = ('pycryptodome==3.15.0')
HANDLER = 'udf_encode'
sql: |
{{ fsc_utils.create_udf_keccak256() | indent(4) }}
{{ fsc_utils.create_udf_keccak256() | indent(4) }}
- name: {{ schema }}.udf_decimal_adjust
signature:
- [input, string]
@ -134,7 +134,7 @@
sql: |
{{ fsc_utils.create_udf_decimal_adjust() | indent(4) }}
- name: {{ schema }}.udf_cron_to_prior_timestamps
- name: {{ schema }}.udf_cron_to_prior_timestamps
signature:
- [workflow_name, STRING]
- [workflow_schedule, STRING]
@ -204,5 +204,41 @@
sql: |
{{ fsc_utils.create_udf_hex_to_tezos() | indent(4) }}
- name: {{ schema }}.udf_detect_overflowed_responses
signature:
- [file_url, STRING]
- [index_cols, ARRAY]
return_type: ARRAY
options: |
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
COMMENT = 'Detect overflowed responses larger than 16MB'
PACKAGES = ('snowflake-snowpark-python', 'pandas')
HANDLER = 'main'
sql: |
{{ fsc_utils.create_udf_detect_overflowed_responses() | indent(4) }}
- name: {{ schema }}.udtf_flatten_overflowed_responses
signature:
- [file_url, STRING]
- [index_cols, ARRAY]
- [index_vals, ARRAY]
return_type: |
table(block_number NUMBER,
metadata OBJECT,
seq NUMBER,
key STRING,
path STRING,
index NUMBER,
value_ VARIANT)
options: |
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
COMMENT = 'Flatten rows from a JSON file with overflowed responses larger than 16MB'
PACKAGES = ('snowflake-snowpark-python', 'pandas', 'simplejson', 'numpy')
HANDLER = 'FlattenRows'
sql: |
{{ fsc_utils.create_udtf_flatten_overflowed_responses() | indent(4) }}
{% endmacro %}

View File

@ -103,14 +103,14 @@ import croniter
import datetime
class TimestampGenerator:
def __init__(self):
pass
def process(self, workflow_name, workflow_schedule):
for timestamp in self.generate_timestamps(workflow_name, workflow_schedule):
yield (workflow_name, workflow_schedule, timestamp)
def generate_timestamps(self, workflow_name, workflow_schedule):
# Create a cron iterator object
cron = croniter.croniter(workflow_schedule)
@ -148,10 +148,10 @@ def transform_event(event: dict):
new_event = deepcopy(event)
if new_event.get("components"):
components = new_event.get("components")
if not new_event["value"]:
return new_event
if isinstance(new_event["value"][0], list):
result_list = []
for value_set in new_event["value"]:
@ -257,7 +257,7 @@ def transform_hex_to_bech32(input, hrp=''):
return 'Data conversion failed'
checksum = bech32_create_checksum(hrp, data5bit)
return hrp + '1' + ''.join([CHARSET[d] for d in data5bit + checksum])
{% endmacro %}
@ -341,4 +341,159 @@ def transform_hex_to_base58(input):
return encoded
{% endmacro %}
{% endmacro %}
{% macro create_udf_detect_overflowed_responses() %}
import pandas as pd
from snowflake.snowpark.files import SnowflakeFile
VARCHAR_MAX = 16_777_216
def main(file_url, index_cols):
with SnowflakeFile.open(file_url, 'rb') as f:
df = pd.read_json(f, lines=True, compression='gzip')
data_length = df["data"].astype(str).apply(len)
return df[data_length > VARCHAR_MAX][index_cols].values.tolist()
{% endmacro %}
{% macro create_udtf_flatten_overflowed_responses() %}
import simplejson as json
import numpy as np
import pandas as pd
from snowflake.snowpark.files import SnowflakeFile
VARCHAR_MAX = 16_777_216
class Flatten:
"""
Recursive function to flatten a nested JSON file
"""
def __init__(self, mode: str, exploded_key: list) -> None:
self.mode = mode
self.exploded_key = exploded_key
def _flatten_response(
self,
response_key: str,
responses: str,
block_number: int,
metadata: dict,
seq_index: int = 0,
path: str = "",
):
"""
Example:
input: {"a":1, "b":[77,88], "c": {"d":"X"}}
output:
- SEQ: A unique sequence number associated with the input record; the sequence is not guaranteed to be gap-free or ordered in any particular way.
- KEY: For maps or objects, this column contains the key to the exploded value.
- PATH: The path to the element within a data structure which needs to be flattened.
- INDEX: The index of the element, if it is an array; otherwise NULL.
- VALUE_: The value of the element of the flattened array/object.
"""
exploded_data = []
if self.mode == "array":
check_mode = isinstance(responses, list)
elif self.mode == "dict":
check_mode = isinstance(responses, dict)
elif self.mode == "both":
check_mode = isinstance(responses, list) or isinstance(responses, dict)
if check_mode:
if isinstance(responses, dict):
looped_keys = responses.keys()
for key in looped_keys:
next_path = f"{path}.{key}" if path else key
index = None
exploded_data.append(
{
"block_number": block_number,
"metadata": metadata,
"seq": seq_index,
"key": key,
"path": next_path,
"index": index,
"value_": responses[key],
}
)
exploded_data.extend(
self._flatten_response(
key,
responses[key],
block_number,
metadata,
seq_index,
next_path,
)
)
elif isinstance(responses, list):
looped_keys = range(len(responses))
if response_key in self.exploded_key or len(self.exploded_key) == 0:
for item_i, item in enumerate(responses):
if response_key == "result":
seq_index += 1
index = item_i
exploded_data.append(
{
"block_number": block_number,
"metadata": metadata,
"seq": seq_index,
"key": None,
"path": f"{path}[{item_i}]",
"index": index,
"value_": item,
}
)
exploded_data.extend(
self._flatten_response(
item_i,
item,
block_number,
metadata,
seq_index,
f"{path}[{item_i}]",
)
)
return exploded_data
class FlattenRows:
"""
Recursive function to flatten a given JSON file from Snowflake stage
"""
def process(self, file_url: str, index_cols: list, index_vals: list):
with SnowflakeFile.open(file_url, 'rb') as f:
df = pd.read_json(f, lines=True, compression='gzip')
df.set_index(index_cols, inplace=True)
df = df.loc[index_vals]
df.reset_index(inplace=True)
flattener = Flatten(mode="both", exploded_key=[])
flattened = pd.concat(
df.apply(
lambda x: flattener._flatten_response(
block_number=x["block_number"], metadata=x["metadata"], responses=x["data"], response_key=None
),
axis="columns",
)
.apply(pd.DataFrame.from_records)
.values.tolist()
)
cleansed = flattened.replace({np.nan: None})
overflow = cleansed["value_"].astype(str).apply(len) > VARCHAR_MAX
cleansed.loc[overflow, ["value_"]] = None
return list(cleansed.itertuples(index=False, name=None))
{% endmacro %}