From 8d0b8ae290a5aed968f416ec8533701e92646d70 Mon Sep 17 00:00:00 2001 From: mattromano Date: Mon, 13 May 2024 21:26:14 -0700 Subject: [PATCH] add python historical model --- ...onze__defillama_historical_protocol_tvl.py | 78 ++++++++++++ ..._defillama_historical_protocol_tvl_old.sql | 77 ++++++++++++ .../bronze/bronze__defillama_protocols.sql | 4 + .../silver/silver__defillama_chains_tvl.sql | 116 +++++++++--------- 4 files changed, 219 insertions(+), 56 deletions(-) create mode 100644 models/defillama/bronze/bronze__defillama_historical_protocol_tvl.py create mode 100644 models/defillama/bronze/bronze__defillama_historical_protocol_tvl_old.sql diff --git a/models/defillama/bronze/bronze__defillama_historical_protocol_tvl.py b/models/defillama/bronze/bronze__defillama_historical_protocol_tvl.py new file mode 100644 index 0000000..f40ce95 --- /dev/null +++ b/models/defillama/bronze/bronze__defillama_historical_protocol_tvl.py @@ -0,0 +1,78 @@ +import pandas as pd +from datetime import datetime +import requests +import time +from snowflake.snowpark import Session, Row +from snowflake.snowpark.types import StructType, StructField, TimestampType, StringType +import logging + +logger = logging.getLogger("python_logger") + +def get_protocol_data(protocol_slug): + url = f'https://api.llama.fi/protocol/{protocol_slug}' + retries = 2 + backoff_factor = 0.5 + + for i in range(retries): + try: + response = requests.get(url) + if response.status_code == 200: + logger.info(f"Successfully fetched data for protocol_slug: {protocol_slug}") + return response.json() + else: + response.raise_for_status() + except (requests.exceptions.RequestException, requests.exceptions.ConnectionError) as e: + if i == retries - 1: + raise + else: + wait_time = backoff_factor * (2 ** i) + time.sleep(wait_time) + continue + +def model(dbt, session: Session): + dbt.config( + materialized="incremental", + unique_key="defillama_historical_protocol_tvl", + tags=["100"], + python_version="3.11", # Specify the Python runtime version + packages=["requests", "pandas", "snowflake-snowpark-python"] + ) + + # Load the bronze table and get distinct protocol_slug + bronze_df = dbt.ref("bronze__defillama_protocols").to_pandas() + bronze_df = bronze_df.head(50) + + # Ensure the DataFrame contains the expected columns + if 'PROTOCOL_SLUG' not in bronze_df.columns: + raise ValueError("Column 'protocol_slug' not found in the DataFrame") + + # Get distinct protocol_slug + protocol_slugs = bronze_df['PROTOCOL_SLUG'].drop_duplicates().tolist() + + results = [] + + # Loop over the protocol_slugs and fetch data + for protocol_slug in protocol_slugs: + try: + protocol_data = get_protocol_data(protocol_slug) + if protocol_data: + results.append(Row( + _inserted_timestamp=datetime.utcnow(), + json_data=str(protocol_data) # Convert JSON object to string for Snowpark + )) + except Exception as e: + # Continue to the next protocol_slug if an error occurs + continue + + # Create a DataFrame from the results + if results: + df = session.create_dataframe(results) + else: + # Return an empty DataFrame with the correct schema + schema = StructType([ + StructField("_inserted_timestamp", TimestampType(), True), + StructField("json_data", VariantType(), True) # Use VariantType for JSON + ]) + df = session.create_dataframe([], schema=schema) + + return df \ No newline at end of file diff --git a/models/defillama/bronze/bronze__defillama_historical_protocol_tvl_old.sql b/models/defillama/bronze/bronze__defillama_historical_protocol_tvl_old.sql new file mode 100644 index 0000000..29e51df --- /dev/null +++ b/models/defillama/bronze/bronze__defillama_historical_protocol_tvl_old.sql @@ -0,0 +1,77 @@ +{{ config( + materialized = 'incremental', + unique_key = 'defillama_historical_protocol_tvl', + tags = ['100'] +) }} + +WITH protocol_tvl AS ( + +{% for item in range(100) %} +( +SELECT + protocol_id, + protocol, + protocol_slug, + live.udf_api( + 'GET',concat('https://api.llama.fi/protocol/',protocol_slug),{},{} + ) AS read, + SYSDATE() AS _inserted_timestamp, +FROM ( + SELECT + protocol_id, + protocol, + protocol_slug, + row_num + FROM {{ ref('bronze__defillama_protocols') }} + WHERE row_num BETWEEN {{ item * 10 + 1 }} AND {{ (item + 1) * 10 }} + ) + {% if is_incremental() %} + WHERE protocol_slug NOT IN ( + SELECT + protocol_slug + FROM ( + SELECT + DISTINCT protocol_slug, + MAX(_inserted_timestamp::DATE) AS max_timestamp + FROM {{ this }} + GROUP BY 1 + HAVING CURRENT_DATE = max_timestamp + )) +{% endif %} +){% if not loop.last %} +UNION ALL +{% endif %} +{% endfor %} +), +flatten AS ( + select + protocol_id, + protocol, + protocol_slug, + to_timestamp(value:date) as date, + value:totalLiquidityUSD::float as tvl, + _inserted_timestamp + FROM protocol_tvl, + LATERAL FLATTEN (input=> read:data:tvl) +), +FINAL AS ( + select + protocol_id, + protocol, + protocol_slug, + date, + tvl, + _inserted_timestamp + FROM + flatten +) +SELECT + *, + {{ dbt_utils.generate_surrogate_key( + ['protocol_id','date'] + ) }} AS defillama_historical_protocol_tvl, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + FINAL diff --git a/models/defillama/bronze/bronze__defillama_protocols.sql b/models/defillama/bronze/bronze__defillama_protocols.sql index aac286a..5b53412 100644 --- a/models/defillama/bronze/bronze__defillama_protocols.sql +++ b/models/defillama/bronze/bronze__defillama_protocols.sql @@ -31,6 +31,10 @@ SELECT VALUE:audit_note::STRING AS audit_note, VALUE:category::STRING AS category, VALUE:chains AS chains, + ROW_NUMBER() over ( + ORDER BY + protocol_id + ) AS row_num, _inserted_timestamp FROM protocol_base, LATERAL FLATTEN (input=> read:data) diff --git a/models/defillama/silver/silver__defillama_chains_tvl.sql b/models/defillama/silver/silver__defillama_chains_tvl.sql index 4c4c46a..f8f1f93 100644 --- a/models/defillama/silver/silver__defillama_chains_tvl.sql +++ b/models/defillama/silver/silver__defillama_chains_tvl.sql @@ -1,60 +1,64 @@ -{{ config( - materialized = 'incremental', - unique_key = 'id', - full_refresh = false, - tags = ['defillama'] -) }} +import pandas as pd +from datetime import datetime +import requests +from snowflake.snowpark import Session, Row +from snowflake.snowpark.functions import col, lit, to_timestamp, flatten -WITH tvl_base AS ( +def get_protocol_data(protocol_slug): + url = f'https://api.llama.fi/protocol/{protocol_slug}' + response = requests.get(url) + if response.status_code == 200: + return response.json() + else: + return None -{% for item in range(5) %} -( -SELECT - chain_id, - chain, - live.udf_api( - 'GET',CONCAT('https://api.llama.fi/charts/',chain),{},{} - ) AS read, - SYSDATE() AS _inserted_timestamp -FROM ( - SELECT - DISTINCT chain, - chain_id, - row_num - FROM {{ ref('bronze__defillama_chains') }} - WHERE row_num BETWEEN {{ item * 60 + 1 }} AND {{ (item + 1) * 60 }} +def model(dbt, session: Session): + dbt.config( + materialized="incremental", + unique_key="defillama_historical_protocol_tvl", + tags=["100"] ) -{% if is_incremental() %} -WHERE chain NOT IN ( - SELECT - chain - FROM ( - SELECT - DISTINCT chain, - MAX(timestamp::DATE) AS max_timestamp - FROM {{ this }} - GROUP BY 1 - HAVING CURRENT_DATE = max_timestamp - ) -) -{% endif %} -) {% if not loop.last %} -UNION ALL -{% endif %} -{% endfor %} -) -SELECT - chain_id, - chain, - TO_TIMESTAMP(VALUE:date::INTEGER) AS timestamp, - VALUE:totalLiquidityUSD::INTEGER AS tvl_usd, - _inserted_timestamp, - {{ dbt_utils.generate_surrogate_key( - ['chain_id', 'chain', 'timestamp'] - ) }} AS id -FROM tvl_base, - LATERAL FLATTEN (input=> read:data) -qualify (ROW_NUMBER () over (PARTITION BY chain_id, chain, TIMESTAMP -ORDER BY - _inserted_timestamp DESC)) = 1 \ No newline at end of file + # Load the bronze table + bronze_df = dbt.ref("bronze__defillama_protocols").to_pandas() + + # Prepare data in chunks + chunk_size = 10 + chunks = [bronze_df[i:i + chunk_size] for i in range(0, len(bronze_df), chunk_size)] + + results = [] + + for chunk in chunks: + for index, row in chunk.iterrows(): + protocol_data = get_protocol_data(row['protocol_slug']) + if protocol_data: + for tvl_record in protocol_data['tvl']: + results.append(Row( + protocol_id=row['protocol_id'], + protocol=row['protocol'], + protocol_slug=row['protocol_slug'], + date=tvl_record['date'], + tvl=float(tvl_record['totalLiquidityUSD']), + _inserted_timestamp=datetime.utcnow() + )) + + if results: + df = session.create_dataframe(results) + + if dbt.is_incremental(): + max_timestamp = df.select(col("protocol_slug"), col("_inserted_timestamp").cast("DATE").alias("max_timestamp")) \ + .group_by(col("protocol_slug")) \ + .agg({"max_timestamp": "max"}) \ + .filter(col("max_timestamp") == datetime.utcnow().date()) + + df = df.filter(~df["protocol_slug"].isin(max_timestamp["protocol_slug"])) + + df = df.with_column("defillama_historical_protocol_tvl", lit(dbt.utils.generate_surrogate_key(["protocol_id", "date"]))) + df = df.with_column("inserted_timestamp", lit(datetime.utcnow())) + df = df.with_column("modified_timestamp", lit(datetime.utcnow())) + df = df.with_column("_invocation_id", lit(dbt.invocation_id)) + + return df + else: + return session.create_dataframe([]) +