diff --git a/models/defillama/bronze/bronze__defillama_stablecoins.sql b/models/defillama/bronze/bronze__defillama_stablecoins.sql index bf3b0a6..d617fe7 100644 --- a/models/defillama/bronze/bronze__defillama_stablecoins.sql +++ b/models/defillama/bronze/bronze__defillama_stablecoins.sql @@ -6,35 +6,38 @@ WITH stablecoin_base AS ( -SELECT - live.udf_api( - 'GET','https://stablecoins.llama.fi/stablecoins?includePrices=false',{},{} - ) AS read, - SYSDATE() AS _inserted_timestamp -), - -FINAL AS ( -SELECT - VALUE:id::STRING AS stablecoin_id, - VALUE:name::STRING AS stablecoin, - VALUE:symbol::STRING AS symbol, - VALUE:pegType::STRING AS peg_type, - VALUE:pegMechanism::STRING AS peg_mechanism, - VALUE:priceSource::STRING AS price_source, - VALUE:chains AS chains, - _inserted_timestamp -FROM stablecoin_base, - LATERAL FLATTEN (input=> read:data:peggedAssets) - -{% if is_incremental() %} -WHERE stablecoin_id NOT IN ( SELECT - DISTINCT stablecoin_id + live.udf_api( + 'GET', + 'https://stablecoins.llama.fi/stablecoins?includePrices=false',{},{} + ) AS READ, + SYSDATE() AS _inserted_timestamp +), +FINAL AS ( + SELECT + VALUE :id :: STRING AS stablecoin_id, + VALUE :name :: STRING AS stablecoin, + VALUE :symbol :: STRING AS symbol, + VALUE :pegType :: STRING AS peg_type, + VALUE :pegMechanism :: STRING AS peg_mechanism, + VALUE :priceSource :: STRING AS price_source, + VALUE :chains AS chains, + _inserted_timestamp FROM - {{ this }} -) -) + stablecoin_base, + LATERAL FLATTEN ( + input => READ :data :peggedAssets + ) +{% if is_incremental() %} +WHERE + stablecoin_id NOT IN ( + SELECT + DISTINCT stablecoin_id + FROM + {{ this }} + ) +) SELECT stablecoin_id, stablecoin, @@ -43,19 +46,22 @@ SELECT peg_mechanism, price_source, chains, - m.row_num + ROW_NUMBER() OVER (ORDER BY stablecoin) AS row_num, + m.row_num + ROW_NUMBER() over ( + ORDER BY + stablecoin + ) AS row_num, _inserted_timestamp -FROM FINAL -JOIN ( - SELECT - MAX(row_num) AS row_num - FROM - {{ this }} -) m ON 1=1 - +FROM + FINAL + JOIN ( + SELECT + MAX(row_num) AS row_num + FROM + {{ this }} + ) m + ON 1 = 1 {% else %} ) - SELECT stablecoin_id, stablecoin, @@ -64,7 +70,11 @@ SELECT peg_mechanism, price_source, chains, - ROW_NUMBER() OVER (ORDER BY stablecoin) AS row_num, + ROW_NUMBER() over ( + ORDER BY + stablecoin + ) AS row_num, _inserted_timestamp -FROM FINAL +FROM + FINAL {% endif %} diff --git a/models/defillama/silver/silver__defillama_chains_tvl.sql b/models/defillama/silver/silver__defillama_chains_tvl.sql index f8f1f93..4c4c46a 100644 --- a/models/defillama/silver/silver__defillama_chains_tvl.sql +++ b/models/defillama/silver/silver__defillama_chains_tvl.sql @@ -1,64 +1,60 @@ -import pandas as pd -from datetime import datetime -import requests -from snowflake.snowpark import Session, Row -from snowflake.snowpark.functions import col, lit, to_timestamp, flatten +{{ config( + materialized = 'incremental', + unique_key = 'id', + full_refresh = false, + tags = ['defillama'] +) }} -def get_protocol_data(protocol_slug): - url = f'https://api.llama.fi/protocol/{protocol_slug}' - response = requests.get(url) - if response.status_code == 200: - return response.json() - else: - return None +WITH tvl_base AS ( -def model(dbt, session: Session): - dbt.config( - materialized="incremental", - unique_key="defillama_historical_protocol_tvl", - tags=["100"] +{% for item in range(5) %} +( +SELECT + chain_id, + chain, + live.udf_api( + 'GET',CONCAT('https://api.llama.fi/charts/',chain),{},{} + ) AS read, + SYSDATE() AS _inserted_timestamp +FROM ( + SELECT + DISTINCT chain, + chain_id, + row_num + FROM {{ ref('bronze__defillama_chains') }} + WHERE row_num BETWEEN {{ item * 60 + 1 }} AND {{ (item + 1) * 60 }} ) +{% if is_incremental() %} +WHERE chain NOT IN ( + SELECT + chain + FROM ( + SELECT + DISTINCT chain, + MAX(timestamp::DATE) AS max_timestamp + FROM {{ this }} + GROUP BY 1 + HAVING CURRENT_DATE = max_timestamp + ) +) +{% endif %} +) {% if not loop.last %} +UNION ALL +{% endif %} +{% endfor %} +) - # Load the bronze table - bronze_df = dbt.ref("bronze__defillama_protocols").to_pandas() - - # Prepare data in chunks - chunk_size = 10 - chunks = [bronze_df[i:i + chunk_size] for i in range(0, len(bronze_df), chunk_size)] - - results = [] - - for chunk in chunks: - for index, row in chunk.iterrows(): - protocol_data = get_protocol_data(row['protocol_slug']) - if protocol_data: - for tvl_record in protocol_data['tvl']: - results.append(Row( - protocol_id=row['protocol_id'], - protocol=row['protocol'], - protocol_slug=row['protocol_slug'], - date=tvl_record['date'], - tvl=float(tvl_record['totalLiquidityUSD']), - _inserted_timestamp=datetime.utcnow() - )) - - if results: - df = session.create_dataframe(results) - - if dbt.is_incremental(): - max_timestamp = df.select(col("protocol_slug"), col("_inserted_timestamp").cast("DATE").alias("max_timestamp")) \ - .group_by(col("protocol_slug")) \ - .agg({"max_timestamp": "max"}) \ - .filter(col("max_timestamp") == datetime.utcnow().date()) - - df = df.filter(~df["protocol_slug"].isin(max_timestamp["protocol_slug"])) - - df = df.with_column("defillama_historical_protocol_tvl", lit(dbt.utils.generate_surrogate_key(["protocol_id", "date"]))) - df = df.with_column("inserted_timestamp", lit(datetime.utcnow())) - df = df.with_column("modified_timestamp", lit(datetime.utcnow())) - df = df.with_column("_invocation_id", lit(dbt.invocation_id)) - - return df - else: - return session.create_dataframe([]) - +SELECT + chain_id, + chain, + TO_TIMESTAMP(VALUE:date::INTEGER) AS timestamp, + VALUE:totalLiquidityUSD::INTEGER AS tvl_usd, + _inserted_timestamp, + {{ dbt_utils.generate_surrogate_key( + ['chain_id', 'chain', 'timestamp'] + ) }} AS id +FROM tvl_base, + LATERAL FLATTEN (input=> read:data) +qualify (ROW_NUMBER () over (PARTITION BY chain_id, chain, TIMESTAMP +ORDER BY + _inserted_timestamp DESC)) = 1 \ No newline at end of file diff --git a/models/defillama/silver/silver__defillama_stablecoin_supply.sql b/models/defillama/silver/silver__defillama_stablecoin_supply.sql index c321ee0..63d4d1e 100644 --- a/models/defillama/silver/silver__defillama_stablecoin_supply.sql +++ b/models/defillama/silver/silver__defillama_stablecoin_supply.sql @@ -1,6 +1,7 @@ {{ config( materialized = 'incremental', unique_key = 'stablecoin_id', + full_refresh = false, tags = ['defillama'] ) }} diff --git a/models/defillama/silver/silver__defillama_yields.sql b/models/defillama/silver/silver__defillama_yields.sql index 0301f21..25493d8 100644 --- a/models/defillama/silver/silver__defillama_yields.sql +++ b/models/defillama/silver/silver__defillama_yields.sql @@ -1,5 +1,6 @@ {{ config( materialized = 'incremental', + full_refresh = false, unique_key = 'defillama_yield_id', tags = ['defillama'] ) }} @@ -70,7 +71,7 @@ SELECT *, ROW_NUMBER() over ( ORDER BY - pool_id desc + pool_id DESC ) AS row_num, {{ dbt_utils.generate_surrogate_key( ['pool_id','chain','timestamp']