diff --git a/macros/streamline/configs.yaml.sql b/macros/streamline/configs.yaml.sql index 3bb8771..59a9878 100644 --- a/macros/streamline/configs.yaml.sql +++ b/macros/streamline/configs.yaml.sql @@ -281,5 +281,18 @@ sql: | {{ fsc_utils.create_udf_stablecoin_data_parse() | indent(4) }} +- name: {{ schema }}.udf_decompress_zlib + signature: + - [compressed_string, STRING] + return_type: STRING + options: | + LANGUAGE PYTHON + RUNTIME_VERSION = '3.10' + HANDLER = 'decompress_zlib' + IMMUTABLE + COMMENT = 'Decompresses zlib/deflate-compressed data from Python bytes literal string format returned by udf_api_v2' + sql: | + {{ fsc_utils.create_udf_decompress_zlib() | indent(4) }} + {% endmacro %} diff --git a/macros/streamline/functions.py.sql b/macros/streamline/functions.py.sql index a56f8c4..628a07b 100644 --- a/macros/streamline/functions.py.sql +++ b/macros/streamline/functions.py.sql @@ -542,6 +542,37 @@ class FlattenRows: return list(cleansed[np.roll(cleansed.columns.values, 1).tolist()].itertuples(index=False, name=None)) {% endmacro %} +{% macro create_udf_decompress_zlib() %} +import zlib +import codecs + +def decompress_zlib(compressed_string): + try: + if not compressed_string: + return None + + # Remove b prefix and suffix if present + if compressed_string.startswith("b'") and compressed_string.endswith("'"): + compressed_string = compressed_string[2:-1] + elif compressed_string.startswith('b"') and compressed_string.endswith('"'): + compressed_string = compressed_string[2:-1] + + # Decode the escaped string to bytes + compressed_bytes = codecs.decode(compressed_string, 'unicode_escape') + + # Convert to bytes if string + if isinstance(compressed_bytes, str): + compressed_bytes = compressed_bytes.encode('latin-1') + + # Decompress the zlib data + decompressed = zlib.decompress(compressed_bytes) + + # Return as UTF-8 string + return decompressed.decode('utf-8') + except Exception as e: + return f"Error decompressing: {str(e)}" +{% endmacro %} + {% macro create_udf_stablecoin_data_parse() %} import re