mirror of
https://github.com/FlipsideCrypto/external-models.git
synced 2026-02-06 09:26:44 +00:00
silver stuff
This commit is contained in:
parent
c356888a1f
commit
387604b126
@ -6,35 +6,38 @@
|
||||
|
||||
WITH stablecoin_base AS (
|
||||
|
||||
SELECT
|
||||
live.udf_api(
|
||||
'GET','https://stablecoins.llama.fi/stablecoins?includePrices=false',{},{}
|
||||
) AS read,
|
||||
SYSDATE() AS _inserted_timestamp
|
||||
),
|
||||
|
||||
FINAL AS (
|
||||
SELECT
|
||||
VALUE:id::STRING AS stablecoin_id,
|
||||
VALUE:name::STRING AS stablecoin,
|
||||
VALUE:symbol::STRING AS symbol,
|
||||
VALUE:pegType::STRING AS peg_type,
|
||||
VALUE:pegMechanism::STRING AS peg_mechanism,
|
||||
VALUE:priceSource::STRING AS price_source,
|
||||
VALUE:chains AS chains,
|
||||
_inserted_timestamp
|
||||
FROM stablecoin_base,
|
||||
LATERAL FLATTEN (input=> read:data:peggedAssets)
|
||||
|
||||
{% if is_incremental() %}
|
||||
WHERE stablecoin_id NOT IN (
|
||||
SELECT
|
||||
DISTINCT stablecoin_id
|
||||
live.udf_api(
|
||||
'GET',
|
||||
'https://stablecoins.llama.fi/stablecoins?includePrices=false',{},{}
|
||||
) AS READ,
|
||||
SYSDATE() AS _inserted_timestamp
|
||||
),
|
||||
FINAL AS (
|
||||
SELECT
|
||||
VALUE :id :: STRING AS stablecoin_id,
|
||||
VALUE :name :: STRING AS stablecoin,
|
||||
VALUE :symbol :: STRING AS symbol,
|
||||
VALUE :pegType :: STRING AS peg_type,
|
||||
VALUE :pegMechanism :: STRING AS peg_mechanism,
|
||||
VALUE :priceSource :: STRING AS price_source,
|
||||
VALUE :chains AS chains,
|
||||
_inserted_timestamp
|
||||
FROM
|
||||
{{ this }}
|
||||
)
|
||||
)
|
||||
stablecoin_base,
|
||||
LATERAL FLATTEN (
|
||||
input => READ :data :peggedAssets
|
||||
)
|
||||
|
||||
{% if is_incremental() %}
|
||||
WHERE
|
||||
stablecoin_id NOT IN (
|
||||
SELECT
|
||||
DISTINCT stablecoin_id
|
||||
FROM
|
||||
{{ this }}
|
||||
)
|
||||
)
|
||||
SELECT
|
||||
stablecoin_id,
|
||||
stablecoin,
|
||||
@ -43,19 +46,22 @@ SELECT
|
||||
peg_mechanism,
|
||||
price_source,
|
||||
chains,
|
||||
m.row_num + ROW_NUMBER() OVER (ORDER BY stablecoin) AS row_num,
|
||||
m.row_num + ROW_NUMBER() over (
|
||||
ORDER BY
|
||||
stablecoin
|
||||
) AS row_num,
|
||||
_inserted_timestamp
|
||||
FROM FINAL
|
||||
JOIN (
|
||||
SELECT
|
||||
MAX(row_num) AS row_num
|
||||
FROM
|
||||
{{ this }}
|
||||
) m ON 1=1
|
||||
|
||||
FROM
|
||||
FINAL
|
||||
JOIN (
|
||||
SELECT
|
||||
MAX(row_num) AS row_num
|
||||
FROM
|
||||
{{ this }}
|
||||
) m
|
||||
ON 1 = 1
|
||||
{% else %}
|
||||
)
|
||||
|
||||
SELECT
|
||||
stablecoin_id,
|
||||
stablecoin,
|
||||
@ -64,7 +70,11 @@ SELECT
|
||||
peg_mechanism,
|
||||
price_source,
|
||||
chains,
|
||||
ROW_NUMBER() OVER (ORDER BY stablecoin) AS row_num,
|
||||
ROW_NUMBER() over (
|
||||
ORDER BY
|
||||
stablecoin
|
||||
) AS row_num,
|
||||
_inserted_timestamp
|
||||
FROM FINAL
|
||||
FROM
|
||||
FINAL
|
||||
{% endif %}
|
||||
|
||||
@ -1,64 +1,60 @@
|
||||
import pandas as pd
|
||||
from datetime import datetime
|
||||
import requests
|
||||
from snowflake.snowpark import Session, Row
|
||||
from snowflake.snowpark.functions import col, lit, to_timestamp, flatten
|
||||
{{ config(
|
||||
materialized = 'incremental',
|
||||
unique_key = 'id',
|
||||
full_refresh = false,
|
||||
tags = ['defillama']
|
||||
) }}
|
||||
|
||||
def get_protocol_data(protocol_slug):
|
||||
url = f'https://api.llama.fi/protocol/{protocol_slug}'
|
||||
response = requests.get(url)
|
||||
if response.status_code == 200:
|
||||
return response.json()
|
||||
else:
|
||||
return None
|
||||
WITH tvl_base AS (
|
||||
|
||||
def model(dbt, session: Session):
|
||||
dbt.config(
|
||||
materialized="incremental",
|
||||
unique_key="defillama_historical_protocol_tvl",
|
||||
tags=["100"]
|
||||
{% for item in range(5) %}
|
||||
(
|
||||
SELECT
|
||||
chain_id,
|
||||
chain,
|
||||
live.udf_api(
|
||||
'GET',CONCAT('https://api.llama.fi/charts/',chain),{},{}
|
||||
) AS read,
|
||||
SYSDATE() AS _inserted_timestamp
|
||||
FROM (
|
||||
SELECT
|
||||
DISTINCT chain,
|
||||
chain_id,
|
||||
row_num
|
||||
FROM {{ ref('bronze__defillama_chains') }}
|
||||
WHERE row_num BETWEEN {{ item * 60 + 1 }} AND {{ (item + 1) * 60 }}
|
||||
)
|
||||
{% if is_incremental() %}
|
||||
WHERE chain NOT IN (
|
||||
SELECT
|
||||
chain
|
||||
FROM (
|
||||
SELECT
|
||||
DISTINCT chain,
|
||||
MAX(timestamp::DATE) AS max_timestamp
|
||||
FROM {{ this }}
|
||||
GROUP BY 1
|
||||
HAVING CURRENT_DATE = max_timestamp
|
||||
)
|
||||
)
|
||||
{% endif %}
|
||||
) {% if not loop.last %}
|
||||
UNION ALL
|
||||
{% endif %}
|
||||
{% endfor %}
|
||||
)
|
||||
|
||||
# Load the bronze table
|
||||
bronze_df = dbt.ref("bronze__defillama_protocols").to_pandas()
|
||||
|
||||
# Prepare data in chunks
|
||||
chunk_size = 10
|
||||
chunks = [bronze_df[i:i + chunk_size] for i in range(0, len(bronze_df), chunk_size)]
|
||||
|
||||
results = []
|
||||
|
||||
for chunk in chunks:
|
||||
for index, row in chunk.iterrows():
|
||||
protocol_data = get_protocol_data(row['protocol_slug'])
|
||||
if protocol_data:
|
||||
for tvl_record in protocol_data['tvl']:
|
||||
results.append(Row(
|
||||
protocol_id=row['protocol_id'],
|
||||
protocol=row['protocol'],
|
||||
protocol_slug=row['protocol_slug'],
|
||||
date=tvl_record['date'],
|
||||
tvl=float(tvl_record['totalLiquidityUSD']),
|
||||
_inserted_timestamp=datetime.utcnow()
|
||||
))
|
||||
|
||||
if results:
|
||||
df = session.create_dataframe(results)
|
||||
|
||||
if dbt.is_incremental():
|
||||
max_timestamp = df.select(col("protocol_slug"), col("_inserted_timestamp").cast("DATE").alias("max_timestamp")) \
|
||||
.group_by(col("protocol_slug")) \
|
||||
.agg({"max_timestamp": "max"}) \
|
||||
.filter(col("max_timestamp") == datetime.utcnow().date())
|
||||
|
||||
df = df.filter(~df["protocol_slug"].isin(max_timestamp["protocol_slug"]))
|
||||
|
||||
df = df.with_column("defillama_historical_protocol_tvl", lit(dbt.utils.generate_surrogate_key(["protocol_id", "date"])))
|
||||
df = df.with_column("inserted_timestamp", lit(datetime.utcnow()))
|
||||
df = df.with_column("modified_timestamp", lit(datetime.utcnow()))
|
||||
df = df.with_column("_invocation_id", lit(dbt.invocation_id))
|
||||
|
||||
return df
|
||||
else:
|
||||
return session.create_dataframe([])
|
||||
|
||||
SELECT
|
||||
chain_id,
|
||||
chain,
|
||||
TO_TIMESTAMP(VALUE:date::INTEGER) AS timestamp,
|
||||
VALUE:totalLiquidityUSD::INTEGER AS tvl_usd,
|
||||
_inserted_timestamp,
|
||||
{{ dbt_utils.generate_surrogate_key(
|
||||
['chain_id', 'chain', 'timestamp']
|
||||
) }} AS id
|
||||
FROM tvl_base,
|
||||
LATERAL FLATTEN (input=> read:data)
|
||||
qualify (ROW_NUMBER () over (PARTITION BY chain_id, chain, TIMESTAMP
|
||||
ORDER BY
|
||||
_inserted_timestamp DESC)) = 1
|
||||
@ -1,6 +1,7 @@
|
||||
{{ config(
|
||||
materialized = 'incremental',
|
||||
unique_key = 'stablecoin_id',
|
||||
full_refresh = false,
|
||||
tags = ['defillama']
|
||||
) }}
|
||||
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
{{ config(
|
||||
materialized = 'incremental',
|
||||
full_refresh = false,
|
||||
unique_key = 'defillama_yield_id',
|
||||
tags = ['defillama']
|
||||
) }}
|
||||
@ -70,7 +71,7 @@ SELECT
|
||||
*,
|
||||
ROW_NUMBER() over (
|
||||
ORDER BY
|
||||
pool_id desc
|
||||
pool_id DESC
|
||||
) AS row_num,
|
||||
{{ dbt_utils.generate_surrogate_key(
|
||||
['pool_id','chain','timestamp']
|
||||
|
||||
Loading…
Reference in New Issue
Block a user