diff --git a/macros/streamline/configs.yaml.sql b/macros/streamline/configs.yaml.sql index 3bb8771..97c8340 100644 --- a/macros/streamline/configs.yaml.sql +++ b/macros/streamline/configs.yaml.sql @@ -254,6 +254,18 @@ sql: | {{ fsc_utils.create_udtf_flatten_overflowed_responses() | indent(4) }} +- name: {{ schema }}.udf_decompress_zlib + signature: + - [compressed_string, STRING] + return_type: STRING + options: | + LANGUAGE PYTHON + RUNTIME_VERSION = '3.10' + COMMENT = 'Decompresses zlib/deflate-compressed data from Python bytes literal string format' + HANDLER = 'decompress_zlib' + sql: | + {{ fsc_utils.create_udf_decompress_zlib() | indent(4) }} + - name: {{ schema }}.udf_stablecoin_data_parse signature: - [peggeddata_content, STRING] diff --git a/macros/streamline/functions.py.sql b/macros/streamline/functions.py.sql index a56f8c4..ebd6c2f 100644 --- a/macros/streamline/functions.py.sql +++ b/macros/streamline/functions.py.sql @@ -542,6 +542,37 @@ class FlattenRows: return list(cleansed[np.roll(cleansed.columns.values, 1).tolist()].itertuples(index=False, name=None)) {% endmacro %} +{% macro create_udf_decompress_zlib() %} +import zlib +import codecs + +def decompress_zlib(compressed_string): + try: + if not compressed_string: + return None + + # Remove b prefix and suffix if present + if compressed_string.startswith("b'") and compressed_string.endswith("'"): + compressed_string = compressed_string[2:-1] + elif compressed_string.startswith('b"') and compressed_string.endswith('"'): + compressed_string = compressed_string[2:-1] + + # Decode the escaped string to bytes + compressed_bytes = codecs.decode(compressed_string, 'unicode_escape') + + # Convert to bytes if string + if isinstance(compressed_bytes, str): + compressed_bytes = compressed_bytes.encode('latin-1') + + # Decompress the zlib data + decompressed = zlib.decompress(compressed_bytes) + + # Return as UTF-8 string + return decompressed.decode('utf-8') + except Exception as e: + return f"Error decompressing: {str(e)}" +{% endmacro %} + {% macro create_udf_stablecoin_data_parse() %} import re