add stable coin parse udf configs and logic

This commit is contained in:
mattromano 2025-07-30 11:10:13 -07:00
parent 957f7252ab
commit cfc2c69de8
2 changed files with 240 additions and 0 deletions

View File

@ -254,5 +254,32 @@
sql: |
{{ fsc_utils.create_udtf_flatten_overflowed_responses() | indent(4) }}
- name: {{ schema }}.udf_stablecoin_data_parse
signature:
- [peggeddata_content, STRING]
return_type: |
TABLE (
id STRING,
name STRING,
address STRING,
symbol STRING,
onCoinGecko BOOLEAN,
gecko_id STRING,
cmcId STRING,
pegType STRING,
pegMechanism STRING,
priceSource STRING,
deadFrom STRING,
delisted BOOLEAN,
deprecated BOOLEAN,
doublecounted BOOLEAN
)
options: |
LANGUAGE PYTHON
RUNTIME_VERSION = '3.10'
HANDLER = 'udf_stablecoin_data_parse'
sql: |
{{ fsc_utils.create_udf_stablecoin_data_parse() | indent(4) }}
{% endmacro %}

View File

@ -540,4 +540,217 @@ class FlattenRows:
cleansed["index_cols"] = cleansed[temp_index_cols].apply(list, axis=1)
cleansed.drop(columns=temp_index_cols, inplace=True, errors="ignore")
return list(cleansed[np.roll(cleansed.columns.values, 1).tolist()].itertuples(index=False, name=None))
{% endmacro %}
{% macro create_udf_stablecoin_data_parse() %}
import re
class udf_stablecoin_data_parse:
def process(self, peggeddata_content):
"""Main parsing function"""
def extract_field_value(obj_text, field_name):
"""Extract field value from object text using regex patterns"""
# Handle different field patterns
patterns = [
rf'{field_name}\s*:\s*"([^"]*)"',
rf"{field_name}\s*:\s*'([^']*)'",
rf'{field_name}\s*:\s*`([^`]*)`',
rf'{field_name}\s*:\s*(true|false|null|undefined)',
rf'{field_name}\s*:\s*([^,}}\n]+)'
]
for pattern in patterns:
match = re.search(pattern, obj_text, re.IGNORECASE | re.DOTALL)
if match:
value = match.group(1).strip()
# Clean up the value
value = re.sub(r'[,}}\n]', '', value).strip()
if value.lower() in ('null', 'undefined', ''):
return None
# Handle boolean values
if value.lower() == 'true':
return True
if value.lower() == 'false':
return False
return value
return None
def convert_value(value, expected_type):
"""Convert value to appropriate type"""
if value is None:
return None
if expected_type == 'BOOLEAN':
if isinstance(value, bool):
return value
if isinstance(value, str):
lower = value.lower()
if lower == 'true':
return True
if lower == 'false':
return False
return None
return str(value) if value is not None else None
try:
# Find the main array content - make the regex non-greedy but capture everything
array_match = re.search(r'export\s+default\s*\[(.*)\];?\s*$', peggeddata_content, re.DOTALL)
if not array_match:
raise Exception('Could not find exported array in peggedData content')
array_content = array_match.group(1).strip()
# Use a simpler regex-based approach to split objects
# Remove comments and clean up the array content first
# Instead of removing line comments entirely, just remove the // markers but keep the content
clean_content = re.sub(r'^\s*//\s*', '', array_content, flags=re.MULTILINE) # Remove // at start of lines
clean_content = re.sub(r'\n\s*//\s*', '\n', clean_content) # Remove // from middle of lines
# Instead of removing block comments entirely, just remove the comment markers but keep the content
clean_content = re.sub(r'/\*', '', clean_content) # Remove opening block comment markers
clean_content = re.sub(r'\*/', '', clean_content) # Remove closing block comment markers
# Find all objects using regex - look for {...} patterns
# This is more reliable than manual parsing
object_pattern = r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}'
matches = re.finditer(object_pattern, clean_content, re.DOTALL)
objects = []
for match in matches:
obj_text = match.group(0).strip()
if obj_text and len(obj_text) > 10: # Filter out small matches
objects.append(obj_text)
# If the simple regex didn't work, try a more complex nested approach
if not objects:
# More complex regex for nested objects
nested_pattern = r'\{(?:[^{}]|(?:\{[^{}]*\}))*\}'
nested_matches = re.findall(nested_pattern, clean_content, re.DOTALL)
objects = [obj.strip() for obj in nested_matches if len(obj.strip()) > 20]
# Still no objects? Try manual parsing with better logic
if not objects:
objects = []
current_object = ''
brace_count = 0
in_string = False
string_char = ''
i = 0
while i < len(clean_content):
char = clean_content[i]
# Handle string literals
if not in_string and char in ('"', "'", '`'):
in_string = True
string_char = char
elif in_string and char == string_char:
# Check if it's escaped
if i > 0 and clean_content[i-1] != '\\':
in_string = False
string_char = ''
# Handle braces only when not in string
if not in_string:
if char == '{':
if brace_count == 0:
current_object = '{' # Start new object
else:
current_object += char
brace_count += 1
elif char == '}':
current_object += char
brace_count -= 1
if brace_count == 0 and current_object.strip():
# Complete object found
objects.append(current_object.strip())
current_object = ''
elif brace_count > 0:
current_object += char
else:
if brace_count > 0:
current_object += char
i += 1
if not objects:
# Last resort: try splitting on id: pattern
id_splits = re.split(r'\n\s*id:\s*["\']', clean_content)
if len(id_splits) > 1:
objects = []
for i, part in enumerate(id_splits[1:], 1): # Skip first empty part
# Try to reconstruct the object
obj_start = clean_content.find(f'id:', clean_content.find(part))
if obj_start > 0:
# Look backwards for opening brace
brace_start = clean_content.rfind('{', 0, obj_start)
if brace_start >= 0:
# Look forward for matching closing brace
brace_count = 0
for j in range(brace_start, len(clean_content)):
if clean_content[j] == '{':
brace_count += 1
elif clean_content[j] == '}':
brace_count -= 1
if brace_count == 0:
obj_text = clean_content[brace_start:j+1].strip()
if len(obj_text) > 20:
objects.append(obj_text)
break
if not objects:
raise Exception(f'No objects found after all parsing attempts. Sample content: {clean_content[:500]}...')
# Process each object and extract the required fields
for i, obj_text in enumerate(objects):
try:
data = {
'id': extract_field_value(obj_text, 'id'),
'name': extract_field_value(obj_text, 'name'),
'address': extract_field_value(obj_text, 'address'),
'symbol': extract_field_value(obj_text, 'symbol'),
'onCoinGecko': extract_field_value(obj_text, 'onCoinGecko'),
'gecko_id': extract_field_value(obj_text, 'gecko_id'),
'cmcId': extract_field_value(obj_text, 'cmcId'),
'pegType': extract_field_value(obj_text, 'pegType'),
'pegMechanism': extract_field_value(obj_text, 'pegMechanism'),
'priceSource': extract_field_value(obj_text, 'priceSource'),
'deadFrom': extract_field_value(obj_text, 'deadFrom'),
'delisted': extract_field_value(obj_text, 'delisted'),
'deprecated': extract_field_value(obj_text, 'deprecated'),
'doublecounted': extract_field_value(obj_text, 'doublecounted')
}
# Only include objects that have at least id and name
if data['id'] and data['name']:
yield (
convert_value(data['id'], 'STRING'),
convert_value(data['name'], 'STRING'),
convert_value(data['address'], 'STRING'),
convert_value(data['symbol'], 'STRING'),
convert_value(data['onCoinGecko'], 'BOOLEAN'),
convert_value(data['gecko_id'], 'STRING'),
convert_value(data['cmcId'], 'STRING'),
convert_value(data['pegType'], 'STRING'),
convert_value(data['pegMechanism'], 'STRING'),
convert_value(data['priceSource'], 'STRING'),
convert_value(data['deadFrom'], 'STRING'),
convert_value(data['delisted'], 'BOOLEAN'),
convert_value(data['deprecated'], 'BOOLEAN'),
convert_value(data['doublecounted'], 'BOOLEAN')
)
except Exception as obj_error:
# Skip malformed objects but continue processing
continue
except Exception as error:
raise Exception(f'Error parsing peggedData content: {str(error)}')
{% endmacro %}