AN-5706/defillama-historical-data -fixes (#106)

This commit is contained in:
Matt Romano 2025-04-30 09:48:05 -07:00 committed by GitHub
parent 6c069aac3f
commit fe091a36ab
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 6264 additions and 12 deletions

View File

@ -0,0 +1,54 @@
name: dbt_run_defillama_history
run-name: dbt_run_defillama_history
on:
workflow_dispatch:
schedule:
# Runs every 30 minutes (see https://crontab.guru)
- cron: '*/30 * * * *'
env:
DBT_PROFILES_DIR: ${{ vars.DBT_PROFILES_DIR }}
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m models/defillama/streamline/streamline__defillama_protocol_historical.sql models/defillama/streamline/streamline__defillama_protocol_historical_complete.sql
notify-failure:
needs: [run_dbt_jobs]
if: failure()
uses: ./.github/workflows/slack_notify.yml
secrets:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}

View File

@ -46,6 +46,11 @@ jobs:
run: |
dbt run-operation run_sp_create_prod_clone
- name: Run Recreate UDFs
run: |
dbt run-operation fsc_utils.create_streamline_udfs --vars '{"UPDATE_UDFS_AND_SPS":True}' -t dev
dbt run -s livequery_models.deploy.core._live --vars '{"UPDATE_UDFS_AND_SPS":True}' -t dev
notify-failure:
needs: [run_dbt_jobs]
if: failure()

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,45 @@
-- depends_on: {{ ref('bronze__defillama_protocol_historical') }}
-- depends_on: {{ ref('bronze__defillama_protocol_historical_FR') }}
{{ config(
materialized = 'incremental',
unique_key = ['protocol_id', '_inserted_timestamp'],
cluster_by = ['partition_key'],
tags = ['defillama_history']
) }}
WITH protocol_base AS (
SELECT
VALUE:PROTOCOL_ID::INT AS protocol_id,
partition_key,
VALUE:data:category AS category,
_inserted_timestamp,
VALUE:data:chainTvls AS response
FROM
{% if is_incremental() %}
{{ ref('bronze__defillama_protocol_historical') }}
WHERE
_inserted_timestamp >= (
SELECT
max(_inserted_timestamp)
FROM
{{ this }}
)
{% else %}
{{ ref('bronze__defillama_protocol_historical_FR') }}
{% endif %}
)
select
protocol_id,
category,
partition_key,
response,
{{ dbt_utils.generate_surrogate_key(
['protocol_id','_inserted_timestamp']
) }} AS bronze_defillama_protocol_historical_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
_inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
from
protocol_base

View File

@ -0,0 +1,19 @@
version: 2
models:
- name: bronze__defillama_protocol_tvl_history
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- PROTOCOL_ID
- _INSERTED_TIMESTAMP
columns:
- name: PROTOCOL_ID
tests:
- not_null
- name: _INSERTED_TIMESTAMP
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ

View File

@ -16,6 +16,7 @@ SELECT
SELECT
VALUE:id::STRING AS protocol_id,
VALUE:slug::STRING AS protocol_slug,
REGEXP_REPLACE(VALUE:parentProtocol::STRING, '^parent#', '') AS parent_protocol,
VALUE:name::STRING AS protocol,
CASE
WHEN VALUE:address::STRING = '-' THEN NULL
@ -33,6 +34,10 @@ SELECT
VALUE:chains AS chains,
VALUE:url::STRING AS url,
VALUE:logo::STRING AS logo,
ROW_NUMBER() over (
ORDER BY
protocol_id::int
) AS row_num,
_inserted_timestamp
FROM protocol_base,
LATERAL FLATTEN (input=> read:data)

View File

@ -2,7 +2,7 @@
materialized = 'table',
enabled = false,
unique_key = ['stablecoin_id','timestamp'],
tags = ['defillama']
tags = ['stale']
) }}
WITH stablecoin_base AS ({% for item in range(50) %}

View File

@ -13,7 +13,7 @@ models:
- name: CATEGORY
description: The category the protocol belongs to.
- name: MARKET_CAP
description: The total market cap of the protocol.
description: The total market cap of the protocol. This is not available for all dates as the historical API does not provide this information.
- name: SYMBOL
description: The symbol of the token associated with the protocol.
- name: CHAIN_TVL

View File

@ -1,14 +1,15 @@
-- depends_on: {{ ref('silver__defillama_protocol_tvl_history') }}
{{ config(
materialized = 'incremental',
unique_key = ['defillama_tvl_id'],
cluster_by = ['chain'],
cluster_by = ['timestamp', 'protocol_id'],
tags = ['defillama']
) }}
WITH FINAL AS (
SELECT
SYSDATE() :: DATE AS TIMESTAMP,
_inserted_timestamp :: DATE AS TIMESTAMP,
protocol_id,
category,
NAME AS protocol,
@ -39,6 +40,41 @@ WHERE
)
{% endif %}
)
{% if is_incremental() and var('HEAL', false) %}
,
historical_heal as(
SELECT
timestamp,
protocol_id,
category,
protocol,
NULL AS market_cap,
symbol,
NULL AS tvl,
NULL AS tvl_prev_day,
NULL AS tvl_prev_week,
NULL AS tvl_prev_month,
chain,
chain_tvl,
chain_tvl_prev_day,
chain_tvl_prev_week,
chain_tvl_prev_month,
_inserted_timestamp,
defillama_protocol_tvl_history_id as defillama_tvl_id,
inserted_timestamp,
modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ ref('silver__defillama_protocol_tvl_history') }}
WHERE
defillama_protocol_tvl_history_id not in (
select
distinct defillama_tvl_id
FROM
{{ this }}
)
)
{% endif %}
SELECT
*,
{{ dbt_utils.generate_surrogate_key(
@ -49,3 +85,10 @@ SELECT
'{{ invocation_id }}' AS _invocation_id
FROM
FINAL
{% if is_incremental() and var('HEAL', false) %}
UNION ALL
SELECT
*
FROM
historical_heal
{% endif %}

View File

@ -0,0 +1,105 @@
{{ config(
materialized = 'incremental',
unique_key = ['protocol_id', 'chain', 'timestamp'],
cluster_by = ['timestamp', 'protocol_id', 'chain'],
tags = ['defillama_history']
) }}
WITH tvl_history AS (
SELECT
protocol_id,
category,
r.key AS chain,
r.value AS VALUE,
_inserted_timestamp
FROM
{{ ref('bronze__defillama_protocol_tvl_history') }},
LATERAL FLATTEN (response) AS r
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
max(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
daily_tvl_data AS (
SELECT
protocol_id,
category,
chain,
r.value :date AS unix_timestamp,
DATE_TRUNC(
'HOUR',
TIMEADD(
SECOND,
r.value :date :: INT,
'1970-01-01' :: timestamp_ntz
)
)::DATE AS timestamp,
r.value :totalLiquidityUSD::INT AS chain_tvl,
_inserted_timestamp
FROM
tvl_history,
LATERAL FLATTEN (
VALUE :tvl
) AS r
),
daily_tvl_with_lags AS (
SELECT
timestamp,
protocol_id,
category,
chain,
chain_tvl,
LAG(chain_tvl, 1) OVER (
PARTITION BY protocol_id, chain
ORDER BY timestamp
) AS chain_tvl_prev_day,
LAG(chain_tvl, 7) OVER (
PARTITION BY protocol_id, chain
ORDER BY timestamp
) AS chain_tvl_prev_week,
LAG(chain_tvl, 30) OVER (
PARTITION BY protocol_id, chain
ORDER BY timestamp
) AS chain_tvl_prev_month,
_inserted_timestamp
FROM
daily_tvl_data
QUALIFY ROW_NUMBER() OVER (
PARTITION BY timestamp, protocol_id, chain
ORDER BY _inserted_timestamp DESC
) = 1
)
SELECT
d.timestamp,
d.protocol_id,
d.category,
p.protocol,
p.symbol,
d.chain,
d.chain_tvl,
d.chain_tvl_prev_day,
d.chain_tvl_prev_week,
d.chain_tvl_prev_month,
d._inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['d.protocol_id','d.chain','d.timestamp']
) }} AS defillama_protocol_tvl_history_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
daily_tvl_with_lags d
LEFT JOIN {{ ref('bronze__defillama_protocols') }} p
ON p.protocol_id = d.protocol_id
QUALIFY ROW_NUMBER() OVER (
PARTITION BY d.timestamp, d.protocol_id, d.chain
ORDER BY d._inserted_timestamp DESC
) = 1

View File

@ -0,0 +1,33 @@
version: 2
models:
- name: silver__defillama_protocol_tvl_history
columns:
- name: timestamp
tests:
- not_null
- name: protocol_id
tests:
- not_null
- name: category
- name: protocol
- name: symbol
- name: chain
tests:
- not_null
- name: chain_tvl
tests:
- not_null
- name: chain_tvl_prev_day
- name: chain_tvl_prev_week
- name: chain_tvl_prev_month
- name: defillama_protocol_tvl_history_id
tests:
- unique
- not_null
- name: _inserted_timestamp
tests:
- not_null
- name: inserted_timestamp
- name: modified_timestamp
- name: _invocation_id

View File

@ -2,7 +2,7 @@
materialized = 'table',
enabled = false,
unique_key = 'defillama_stablecoin_supply_id',
tags = ['defillama']
tags = ['stale']
) }}
WITH expand_flatten AS (

View File

@ -0,0 +1,9 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_query_v2(
model = 'defillama_protocol_historical',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "partition_key",
other_cols = "value:PROTOCOL_ID::INTEGER AS protocol_id"
) }}

View File

@ -0,0 +1,9 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_FR_query_v2(
model = 'defillama_protocol_historical',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "partition_key",
other_cols = "value:PROTOCOL_ID::INTEGER AS protocol_id"
) }}

View File

@ -0,0 +1,63 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"defillama_protocol_historical",
"sql_limit" :"200",
"producer_batch_size" :"200",
"worker_batch_size" :"200",
"async_concurrent_requests" :"1",
"sql_source" :"{{this.identifier}}" }
),
tags = ['defillama_history']
) }}
WITH base AS (
SELECT
protocol_slug,
protocol_id,
row_num
FROM
{{ ref('bronze__defillama_protocols') }}
WHERE
protocol_id NOT IN (
SELECT
protocol_id
FROM
{{ ref('streamline__defillama_protocol_historical_complete') }}
WHERE
protocol_id IS NOT NULL
)
AND protocol_id IN (
SELECT
PROTOCOL_ID
FROM
{{ ref('bronze__defillama_protocol_tvl_historical_response_sizes') }}
WHERE
size_mb < 15
AND
status_code = 200
)
ORDER BY
row_num ASC
LIMIT 200
)
SELECT
protocol_id,
FLOOR(protocol_id / 10) * 10 AS partition_key,
{{ target.database }}.live.udf_api(
'GET',
'https://pro-api.llama.fi/{api_key}/api/protocol/' || protocol_slug,
OBJECT_CONSTRUCT(
'Content-Type', 'text/plain',
'Accept', 'text/plain'
),
{},
'Vault/prod/external/defillama'
) AS request
FROM
base
ORDER BY
row_num ASC

View File

@ -0,0 +1,42 @@
-- depends_on: {{ ref('bronze__defillama_protocol_historical_FR') }}
-- depends_on: {{ ref('bronze__defillama_protocol_historical') }}
{{ config (
materialized = "incremental",
unique_key = ['protocol_id','_inserted_timestamp'],
merge_exclude_columns = ["inserted_timestamp"],
tags = ['defillama_history']
) }}
WITH complete_data AS (
SELECT
VALUE:PROTOCOL_ID::INT AS protocol_id,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__defillama_protocol_historical') }}
WHERE
_inserted_timestamp > (
SELECT
max(_inserted_timestamp)
FROM
{{ this }}
)
{% else %}
{{ ref('bronze__defillama_protocol_historical_FR') }}
{% endif %}
)
SELECT
protocol_id,
{{ dbt_utils.generate_surrogate_key(
['protocol_id','_inserted_timestamp']
) }} AS complete_defillama_protocol_historical_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
_inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
complete_data qualify(ROW_NUMBER() over (PARTITION BY protocol_id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -8,6 +8,8 @@ sources:
- name: bitquery
- name: oklink
- name: artemis
- name: defillama_protocol_historical
- name: valuations_parquet
- name: tokenflow_eth
database: flipside_prod_db
schema: tokenflow_eth
@ -37,11 +39,6 @@ sources:
schema: silver
tables:
- name: nft_transfers
- name: bronze_streamline
database: streamline
schema: external
tables:
- name: valuations_parquet
- name: starknet_snapshot
database: tokenflow
schema: starknet

View File

@ -6,4 +6,4 @@ packages:
- package: dbt-labs/dbt_utils
version: 1.0.0
- git: https://github.com/FlipsideCrypto/fsc-utils.git
revision: v1.21.7
revision: v1.35.1