This commit is contained in:
drethereum 2024-01-02 15:22:20 -07:00
commit b708802acd
2 changed files with 24 additions and 13 deletions

View File

@ -222,7 +222,7 @@
return_type: ARRAY
options: |
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
RUNTIME_VERSION = '3.11'
COMMENT = 'Detect overflowed responses larger than 16MB'
PACKAGES = ('snowflake-snowpark-python', 'pandas')
HANDLER = 'main'
@ -235,16 +235,19 @@
- [index_cols, ARRAY]
- [index_vals, ARRAY]
return_type: |
table(block_number NUMBER,
table(
index_vals ARRAY,
block_number NUMBER,
metadata OBJECT,
seq NUMBER,
key STRING,
path STRING,
index NUMBER,
value_ VARIANT)
value_ VARIANT
)
options: |
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
RUNTIME_VERSION = '3.11'
COMMENT = 'Flatten rows from a JSON file with overflowed responses larger than 16MB'
PACKAGES = ('snowflake-snowpark-python', 'pandas', 'simplejson', 'numpy')
HANDLER = 'FlattenRows'

View File

@ -385,6 +385,7 @@ def main(file_url, index_cols):
{% macro create_udtf_flatten_overflowed_responses() %}
import logging
import simplejson as json
import numpy as np
@ -393,6 +394,8 @@ from snowflake.snowpark.files import SnowflakeFile
VARCHAR_MAX = 16_777_216
logger = logging.getLogger("udtf_flatten_overflowed_responses")
class Flatten:
"""
Recursive function to flatten a nested JSON file
@ -499,27 +502,32 @@ class FlattenRows:
with SnowflakeFile.open(file_url, 'rb') as f:
df = pd.read_json(f, lines=True, compression='gzip')
df.set_index(index_cols, inplace=True)
df.set_index(index_cols, inplace=True, drop=False)
df = df.loc[index_vals]
df.reset_index(inplace=True)
flattener = Flatten(mode="both", exploded_key=[])
flattened = pd.concat(
df.apply(
df["value_"] = df.apply(
lambda x: flattener._flatten_response(
block_number=x["block_number"], metadata=x["metadata"], responses=x["data"], response_key=None
),
axis="columns",
)
.apply(pd.DataFrame.from_records)
.values.tolist()
)
df["value_"] = df["value_"].apply(pd.DataFrame.from_records)
df["index_cols"] = df.index
df = df[["index_cols", "value_"]]
flattened = pd.concat(
df["value_"].values.tolist(), keys=df["index_cols"].values.tolist()
).droplevel(-1)
cleansed = flattened.replace({np.nan: None})
overflow = cleansed["value_"].astype(str).apply(len) > VARCHAR_MAX
cleansed.loc[overflow, ["value_"]] = None
return list(cleansed.itertuples(index=False, name=None))
temp_index_cols = list(range(len(index_cols)))
cleansed = cleansed.reset_index(names=temp_index_cols, drop=False)
cleansed["index_cols"] = cleansed[temp_index_cols].apply(list, axis=1)
cleansed.drop(columns=temp_index_cols, inplace=True, errors="ignore")
return list(cleansed[np.roll(cleansed.columns.values, 1).tolist()].itertuples(index=False, name=None))
{% endmacro %}