diff --git a/models/defillama/bronze/bronze__defillama_historical_protocol_tvl.py b/models/defillama/bronze/bronze__defillama_historical_protocol_tvl.py deleted file mode 100644 index f40ce95..0000000 --- a/models/defillama/bronze/bronze__defillama_historical_protocol_tvl.py +++ /dev/null @@ -1,78 +0,0 @@ -import pandas as pd -from datetime import datetime -import requests -import time -from snowflake.snowpark import Session, Row -from snowflake.snowpark.types import StructType, StructField, TimestampType, StringType -import logging - -logger = logging.getLogger("python_logger") - -def get_protocol_data(protocol_slug): - url = f'https://api.llama.fi/protocol/{protocol_slug}' - retries = 2 - backoff_factor = 0.5 - - for i in range(retries): - try: - response = requests.get(url) - if response.status_code == 200: - logger.info(f"Successfully fetched data for protocol_slug: {protocol_slug}") - return response.json() - else: - response.raise_for_status() - except (requests.exceptions.RequestException, requests.exceptions.ConnectionError) as e: - if i == retries - 1: - raise - else: - wait_time = backoff_factor * (2 ** i) - time.sleep(wait_time) - continue - -def model(dbt, session: Session): - dbt.config( - materialized="incremental", - unique_key="defillama_historical_protocol_tvl", - tags=["100"], - python_version="3.11", # Specify the Python runtime version - packages=["requests", "pandas", "snowflake-snowpark-python"] - ) - - # Load the bronze table and get distinct protocol_slug - bronze_df = dbt.ref("bronze__defillama_protocols").to_pandas() - bronze_df = bronze_df.head(50) - - # Ensure the DataFrame contains the expected columns - if 'PROTOCOL_SLUG' not in bronze_df.columns: - raise ValueError("Column 'protocol_slug' not found in the DataFrame") - - # Get distinct protocol_slug - protocol_slugs = bronze_df['PROTOCOL_SLUG'].drop_duplicates().tolist() - - results = [] - - # Loop over the protocol_slugs and fetch data - for protocol_slug in protocol_slugs: - try: - protocol_data = get_protocol_data(protocol_slug) - if protocol_data: - results.append(Row( - _inserted_timestamp=datetime.utcnow(), - json_data=str(protocol_data) # Convert JSON object to string for Snowpark - )) - except Exception as e: - # Continue to the next protocol_slug if an error occurs - continue - - # Create a DataFrame from the results - if results: - df = session.create_dataframe(results) - else: - # Return an empty DataFrame with the correct schema - schema = StructType([ - StructField("_inserted_timestamp", TimestampType(), True), - StructField("json_data", VariantType(), True) # Use VariantType for JSON - ]) - df = session.create_dataframe([], schema=schema) - - return df \ No newline at end of file diff --git a/models/defillama/bronze/bronze__defillama_historical_protocol_tvl_old.sql b/models/defillama/bronze/bronze__defillama_historical_protocol_tvl.sql similarity index 92% rename from models/defillama/bronze/bronze__defillama_historical_protocol_tvl_old.sql rename to models/defillama/bronze/bronze__defillama_historical_protocol_tvl.sql index 29e51df..4c0479b 100644 --- a/models/defillama/bronze/bronze__defillama_historical_protocol_tvl_old.sql +++ b/models/defillama/bronze/bronze__defillama_historical_protocol_tvl.sql @@ -1,12 +1,13 @@ {{ config( materialized = 'incremental', - unique_key = 'defillama_historical_protocol_tvl', - tags = ['100'] + unique_key = 'defillama_historical_protocol_tvl' ) }} -WITH protocol_tvl AS ( +WITH -{% for item in range(100) %} +protocol_tvl AS ( + +{% for item in range(14) %} ( SELECT protocol_id, diff --git a/models/defillama/bronze/bronze__defillama_historical_protocol_tvl_2.sql b/models/defillama/bronze/bronze__defillama_historical_protocol_tvl_2.sql new file mode 100644 index 0000000..bca14fb --- /dev/null +++ b/models/defillama/bronze/bronze__defillama_historical_protocol_tvl_2.sql @@ -0,0 +1,78 @@ +{{ config( + materialized = 'incremental', + unique_key = 'defillama_historical_protocol_tvl' +) }} + +WITH + +protocol_tvl AS ( + +{% for item in range(14) %} +( +SELECT + protocol_id, + protocol, + protocol_slug, + live.udf_api( + 'GET',concat('https://api.llama.fi/protocol/',protocol_slug),{},{} + ) AS read, + SYSDATE() AS _inserted_timestamp, +FROM ( + SELECT + protocol_id, + protocol, + protocol_slug, + row_num + FROM {{ ref('bronze__defillama_protocols') }} + WHERE row_num BETWEEN {{ (item+14) * 10 + 1 }} AND {{ (item + 15) * 10 }} + ) + {% if is_incremental() %} + WHERE protocol_slug NOT IN ( + SELECT + protocol_slug + FROM ( + SELECT + DISTINCT protocol_slug, + MAX(_inserted_timestamp::DATE) AS max_timestamp + FROM {{ this }} + GROUP BY 1 + HAVING CURRENT_DATE = max_timestamp + )) +{% endif %} +){% if not loop.last %} +UNION ALL +{% endif %} +{% endfor %} +), +flatten AS ( + select + protocol_id, + protocol, + protocol_slug, + to_timestamp(value:date) as date, + value:totalLiquidityUSD::float as tvl, + _inserted_timestamp + FROM protocol_tvl, + LATERAL FLATTEN (input=> read:data:tvl) +), +FINAL AS ( + select + protocol_id, + protocol, + protocol_slug, + date, + tvl, + _inserted_timestamp + FROM + flatten +) +SELECT + *, + {{ dbt_utils.generate_surrogate_key( + ['protocol_id','date'] + ) }} AS defillama_historical_protocol_tvl, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + FINAL diff --git a/models/defillama/bronze/bronze__defillama_historical_protocol_tvl_3.sql b/models/defillama/bronze/bronze__defillama_historical_protocol_tvl_3.sql new file mode 100644 index 0000000..1b9a351 --- /dev/null +++ b/models/defillama/bronze/bronze__defillama_historical_protocol_tvl_3.sql @@ -0,0 +1,78 @@ +{{ config( + materialized = 'incremental', + unique_key = 'defillama_historical_protocol_tvl' +) }} + +WITH + +protocol_tvl AS ( + +{% for item in range(5) %} +( +SELECT + protocol_id, + protocol, + protocol_slug, + live.udf_api( + 'GET',concat('https://api.llama.fi/protocol/',protocol_slug),{},{} + ) AS read, + SYSDATE() AS _inserted_timestamp, +FROM ( + SELECT + protocol_id, + protocol, + protocol_slug, + row_num + FROM {{ ref('bronze__defillama_protocols') }} + WHERE row_num BETWEEN {{ (item+32) * 10 + 1 }} AND {{ (item + 33) * 10 }} + ) + {% if is_incremental() %} + WHERE protocol_slug NOT IN ( + SELECT + protocol_slug + FROM ( + SELECT + DISTINCT protocol_slug, + MAX(_inserted_timestamp::DATE) AS max_timestamp + FROM {{ this }} + GROUP BY 1 + HAVING CURRENT_DATE = max_timestamp + )) +{% endif %} +){% if not loop.last %} +UNION ALL +{% endif %} +{% endfor %} +), +flatten AS ( + select + protocol_id, + protocol, + protocol_slug, + to_timestamp(value:date) as date, + value:totalLiquidityUSD::float as tvl, + _inserted_timestamp + FROM protocol_tvl, + LATERAL FLATTEN (input=> read:data:tvl) +), +FINAL AS ( + select + protocol_id, + protocol, + protocol_slug, + date, + tvl, + _inserted_timestamp + FROM + flatten +) +SELECT + *, + {{ dbt_utils.generate_surrogate_key( + ['protocol_id','date'] + ) }} AS defillama_historical_protocol_tvl, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + FINAL