AN-4280/heal-logic (#148)

* vars, readme, tx_success

* defi heal logic

* lookbacks on lps

* var format

* heal logic

* dex swaps heal

* pool name

* column names
This commit is contained in:
drethereum 2024-05-29 11:02:18 -06:00 committed by GitHub
parent c9fabd832d
commit d145a43b0e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 2214 additions and 674 deletions

View File

@ -35,35 +35,43 @@ gnosis:
query_tag: <TAG>
```
### Variables
## Variables
To control the creation of UDF or SP macros with dbt run:
* UPDATE_UDFS_AND_SPS
When True, executes all macros included in the on-run-start hooks within dbt_project.yml on model run as normal
When False, none of the on-run-start macros are executed on model run
* Default values are False
* When True, executes all macros included in the on-run-start hooks within dbt_project.yml on model run as normal
* When False, none of the on-run-start macros are executed on model run
Default values are False
* Usage: `dbt run --vars '{"UPDATE_UDFS_AND_SPS":True}' -m ...`
* Usage:
dbt run --var '{"UPDATE_UDFS_AND_SPS":True}' -m ...
Use a variable to heal a model incrementally:
* HEAL_MODEL
* Default is FALSE (Boolean)
* When FALSE, logic will be negated
* When TRUE, heal logic will apply
* Include `heal` in model tags within the config block for inclusion in the `dbt_run_heal_models` workflow, e.g. `tags = 'heal'`
To reload records in a curated complete table without a full-refresh, such as `silver_bridge.complete_bridge_activity`:
* HEAL_CURATED_MODEL
Default is an empty array []
When item is included in var array [], incremental logic will be skipped for that CTE / code block
When item is not included in var array [] or does not match specified item in model, incremental logic will apply
Example set up: `{% if is_incremental() and 'axelar' not in var('HEAL_CURATED_MODEL') %}`
* Usage: `dbt run --vars '{"HEAL_MODEL":True}' -m ...`
* Usage:
Single CTE: dbt run --var '{"HEAL_CURATED_MODEL":"axelar"}' -m ...
Multiple CTEs: dbt run --var '{"HEAL_CURATED_MODEL":["axelar","across","celer_cbridge"]}' -m ...
Use a variable to negate incremental logic:
* Example use case: reload records in a curated complete table without a full-refresh, such as `silver_bridge.complete_bridge_activity`:
* HEAL_MODELS
* Default is an empty array []
* When item is included in var array [], incremental logic will be skipped for that CTE / code block
* When item is not included in var array [] or does not match specified item in model, incremental logic will apply
* Example set up: `{% if is_incremental() and 'axelar' not in var('HEAL_MODELS') %}`
### Resources:
- Learn more about dbt [in the docs](https://docs.getdbt.com/docs/introduction)
- Check out [Discourse](https://discourse.getdbt.com/) for commonly asked questions and answers
- Join the [chat](https://community.getdbt.com/) on Slack for live discussions and support
- Find [dbt events](https://events.getdbt.com) near you
- Check out [the blog](https://blog.getdbt.com/) for the latest news on dbt's development and best practices
* Usage:
* Single CTE: `dbt run --vars '{"HEAL_MODELS":"axelar"}' -m ...`
* Multiple CTEs: `dbt run --vars '{"HEAL_MODELS":["axelar","across","celer_cbridge"]}' -m ...`
Use a variable to extend the incremental lookback period:
* LOOKBACK
* Default is a string representing the specified time interval e.g. '12 hours', '7 days' etc.
* Example set up: `SELECT MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'`
* Usage: `dbt run --vars '{"LOOKBACK":"36 hours"}' -m ...`
## Applying Model Tags
@ -97,7 +105,7 @@ To add/update a model's snowflake tags, add/modify the `meta` model property und
By default, model tags are pushed to Snowflake on each load. You can disable this by setting the `UPDATE_SNOWFLAKE_TAGS` project variable to `False` during a run.
```
dbt run --var '{"UPDATE_SNOWFLAKE_TAGS":False}' -s models/core/core__fact_blocks.sql
dbt run --vars '{"UPDATE_SNOWFLAKE_TAGS":False}' -s models/core/core__fact_blocks.sql
```
### Querying for existing tags on a model in snowflake
@ -105,4 +113,11 @@ dbt run --var '{"UPDATE_SNOWFLAKE_TAGS":False}' -s models/core/core__fact_blocks
```
select *
from table(gnosis.information_schema.tag_references('gnosis.core.fact_blocks', 'table'));
```
```
### Resources:
- Learn more about dbt [in the docs](https://docs.getdbt.com/docs/introduction)
- Check out [Discourse](https://discourse.getdbt.com/) for commonly asked questions and answers
- Join the [chat](https://community.getdbt.com/) on Slack for live discussions and support
- Find [dbt events](https://events.getdbt.com) near you
- Check out [the blog](https://blog.getdbt.com/) for the latest news on dbt's development and best practices

View File

@ -66,7 +66,7 @@ vars:
WAIT: 0
OBSERV_FULL_TEST: False
HEAL_MODEL: False
HEAL_CURATED_MODEL: []
HEAL_MODELS: []
START_GHA_TASKS: False
API_INTEGRATION: '{{ var("config")[target.name]["API_INTEGRATION"] if var("config")[target.name] else var("config")["dev"]["API_INTEGRATION"] }}'
EXTERNAL_FUNCTION_URI: '{{ var("config")[target.name]["EXTERNAL_FUNCTION_URI"] if var("config")[target.name] else var("config")["dev"]["EXTERNAL_FUNCTION_URI"] }}'

View File

@ -33,7 +33,13 @@ SELECT
token_symbol,
amount_unadj,
amount,
amount_usd,
ROUND(
CASE
WHEN amount_usd < 1e+15 THEN amount_usd
ELSE NULL
END,
2
) AS amount_usd,
COALESCE (
complete_bridge_activity_id,
{{ dbt_utils.generate_surrogate_key(

View File

@ -24,10 +24,26 @@ SELECT
event_name,
amount_in_unadj,
amount_in,
amount_in_usd,
ROUND(
CASE
WHEN amount_out_usd IS NULL
OR ABS((amount_in_usd - amount_out_usd) / NULLIF(amount_out_usd, 0)) > 0.75
OR ABS((amount_in_usd - amount_out_usd) / NULLIF(amount_in_usd, 0)) > 0.75 THEN NULL
ELSE amount_in_usd
END,
2
) AS amount_in_usd,
amount_out_unadj,
amount_out,
amount_out_usd,
ROUND(
CASE
WHEN amount_in_usd IS NULL
OR ABS((amount_out_usd - amount_in_usd) / NULLIF(amount_in_usd, 0)) > 0.75
OR ABS((amount_out_usd - amount_in_usd) / NULLIF(amount_out_usd, 0)) > 0.75 THEN NULL
ELSE amount_out_usd
END,
2
) AS amount_out_usd,
sender,
tx_to,
event_index,

View File

@ -35,7 +35,7 @@ AND _inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '36 hours'
) - INTERVAL '{{ var('LOOKBACK', '4 hours') }}'
FROM
{{ this }}
)
@ -169,7 +169,7 @@ heal_model AS (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '36 hours'
) - INTERVAL '{{ var('LOOKBACK', '4 hours') }}'
FROM
{{ this }}
)
@ -195,7 +195,7 @@ heal_model AS (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '36 hours'
) - INTERVAL '{{ var('LOOKBACK', '4 hours') }}'
FROM
{{ this }}
)

View File

@ -46,6 +46,7 @@ WITH base_evt AS (
WHERE
topics [0] :: STRING = '0x89d8051e597ab4178a863a5190407b98abfeff406aa8db90c59af76612e58f01'
AND contract_address = '0x3795c36e7d12a8c252a20c5a7b455f7c57b60283'
AND tx_status = 'SUCCESS'
{% if is_incremental() %}
AND _inserted_timestamp >= (

View File

@ -14,6 +14,7 @@ WITH base_contracts AS (
{{ ref('silver__logs') }}
WHERE
topics [0] :: STRING = '0xe35dddd4ea75d7e9b3fe93af4f4e40e778c3da4074c9d93e7c6536f1e803c1eb'
AND tx_status = 'SUCCESS'
{% if is_incremental() %}
AND _inserted_timestamp >= (

View File

@ -51,6 +51,7 @@ WITH base_evt AS (
WHERE
topics [0] :: STRING = '0xe35dddd4ea75d7e9b3fe93af4f4e40e778c3da4074c9d93e7c6536f1e803c1eb'
AND origin_to_address IS NOT NULL
AND tx_status = 'SUCCESS'
{% if is_incremental() %}
AND _inserted_timestamp >= (

View File

@ -126,6 +126,7 @@ dst_info AS (
WHERE
contract_address = '0x25ab3efd52e6470681ce037cd546dc60726948d3'
AND topics [0] :: STRING = '0x5ce4019f772fda6cb703b26bce3ec3006eb36b73f1d3a0eb441213317d9f5e9d'
AND tx_status = 'SUCCESS'
{% if is_incremental() %}
AND _inserted_timestamp >= (

View File

@ -1,12 +1,14 @@
-- depends_on: {{ ref('silver__complete_token_prices') }}
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = ['block_number','platform','version'],
cluster_by = ['block_timestamp::DATE'],
tags = ['curated','reorg']
tags = ['curated','reorg','heal']
) }}
WITH celer_cbridge AS (
SELECT
block_number,
block_timestamp,
@ -31,11 +33,11 @@ WITH celer_cbridge AS (
FROM
{{ ref('silver_bridge__celer_cbridge_send') }}
{% if is_incremental() and 'celer_cbridge' not in var('HEAL_CURATED_MODEL') %}
{% if is_incremental() and 'celer_cbridge' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '36 hours'
MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
@ -66,11 +68,11 @@ hop AS (
FROM
{{ ref('silver_bridge__hop_transfersent') }}
{% if is_incremental() and 'hop' not in var('HEAL_CURATED_MODEL') %}
{% if is_incremental() and 'hop' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '36 hours'
MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
@ -101,11 +103,11 @@ meson AS (
FROM
{{ ref('silver_bridge__meson_transfers') }}
{% if is_incremental() and 'meson' not in var('HEAL_CURATED_MODEL') %}
{% if is_incremental() and 'meson' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '36 hours'
MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
@ -127,7 +129,7 @@ all_protocols AS (
FROM
meson
),
FINAL AS (
complete_bridge_activity AS (
SELECT
block_number,
block_timestamp,
@ -144,14 +146,12 @@ FINAL AS (
receiver,
destination_chain_receiver,
CASE
WHEN platform = 'meson'
THEN destination_chain_id :: STRING
WHEN platform = 'meson' THEN destination_chain_id :: STRING
WHEN d.chain_id IS NULL THEN destination_chain_id :: STRING
ELSE d.chain_id :: STRING
END AS destination_chain_id,
CASE
WHEN platform = 'meson'
THEN LOWER(destination_chain)
WHEN platform = 'meson' THEN LOWER(destination_chain)
WHEN d.chain IS NULL THEN LOWER(destination_chain)
ELSE LOWER(
d.chain
@ -171,7 +171,7 @@ FINAL AS (
2
)
ELSE NULL
END AS amount_usd_unadj,
END AS amount_usd,
_id,
b._inserted_timestamp
FROM
@ -196,6 +196,183 @@ FINAL AS (
) = LOWER(
b.destination_chain
)
),
{% if is_incremental() and var(
'HEAL_MODEL'
) %}
heal_model AS (
SELECT
block_number,
block_timestamp,
origin_from_address,
origin_to_address,
origin_function_signature,
tx_hash,
event_index,
bridge_address,
event_name,
platform,
version,
sender,
receiver,
destination_chain_receiver,
destination_chain_id,
destination_chain,
t0.token_address,
C.token_symbol AS token_symbol,
C.token_decimals AS token_decimals,
amount_unadj,
CASE
WHEN C.token_decimals IS NOT NULL THEN (amount_unadj / pow(10, C.token_decimals))
ELSE amount_unadj
END AS amount_heal,
CASE
WHEN C.token_decimals IS NOT NULL THEN amount_heal * p.price
ELSE NULL
END AS amount_usd_heal,
_id,
t0._inserted_timestamp
FROM
{{ this }}
t0
LEFT JOIN {{ ref('silver__contracts') }} C
ON t0.token_address = C.contract_address
LEFT JOIN {{ ref('price__ez_prices_hourly') }}
p
ON t0.token_address = p.token_address
AND DATE_TRUNC(
'hour',
block_timestamp
) = p.hour
WHERE
CONCAT(
t0.block_number,
'-',
t0.platform,
'-',
t0.version
) IN (
SELECT
CONCAT(
t1.block_number,
'-',
t1.platform,
'-',
t1.version
)
FROM
{{ this }}
t1
WHERE
t1.token_decimals IS NULL
AND t1._inserted_timestamp < (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
AND EXISTS (
SELECT
1
FROM
{{ ref('silver__contracts') }} C
WHERE
C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE())
AND C.token_decimals IS NOT NULL
AND C.contract_address = t1.token_address)
GROUP BY
1
)
OR CONCAT(
t0.block_number,
'-',
t0.platform,
'-',
t0.version
) IN (
SELECT
CONCAT(
t2.block_number,
'-',
t2.platform,
'-',
t2.version
)
FROM
{{ this }}
t2
WHERE
t2.amount_usd IS NULL
AND t2._inserted_timestamp < (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
AND EXISTS (
SELECT
1
FROM
{{ ref('silver__complete_token_prices') }}
p
WHERE
p._inserted_timestamp > DATEADD('DAY', -14, SYSDATE())
AND p.price IS NOT NULL
AND p.token_address = t2.token_address
AND p.hour = DATE_TRUNC(
'hour',
t2.block_timestamp
)
)
GROUP BY
1
)
),
{% endif %}
FINAL AS (
SELECT
*
FROM
complete_bridge_activity
{% if is_incremental() and var(
'HEAL_MODEL'
) %}
UNION ALL
SELECT
block_number,
block_timestamp,
origin_from_address,
origin_to_address,
origin_function_signature,
tx_hash,
event_index,
bridge_address,
event_name,
platform,
version,
sender,
receiver,
destination_chain_receiver,
destination_chain_id,
destination_chain,
token_address,
token_symbol,
token_decimals,
amount_unadj,
amount_heal AS amount,
amount_usd_heal AS amount_usd,
_id,
_inserted_timestamp
FROM
heal_model
{% endif %}
)
SELECT
block_number,
@ -219,21 +396,18 @@ SELECT
token_decimals,
amount_unadj,
amount,
CASE
WHEN amount_usd_unadj < 1e+15 THEN amount_usd_unadj
ELSE NULL
END AS amount_usd,
amount_usd,
_id,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['_id']
['_id']
) }} AS complete_bridge_activity_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
FINAL
WHERE destination_chain <> 'gnosis'
qualify (ROW_NUMBER() over (PARTITION BY _id
FINAL
WHERE
destination_chain <> 'gnosis' qualify (ROW_NUMBER() over (PARTITION BY _id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -31,6 +31,7 @@ WITH pools_registered AS (
WHERE
topics [0] :: STRING = '0x3c13bc30b8e878c53fd2a36b679409c073afd75950be43d8858768e956fbc20e' --PoolRegistered
AND contract_address = '0xba12222222228d8ba445958a75a0704d566bf2c8'
AND tx_status = 'SUCCESS'
{% if is_incremental() %}
AND _inserted_timestamp >= (
@ -72,6 +73,7 @@ tokens_registered AS (
FROM
pools_registered
)
AND tx_status = 'SUCCESS'
),
function_sigs AS (
SELECT

View File

@ -54,6 +54,7 @@ swaps_base AS (
WHERE
topics [0] :: STRING = '0x2170c741c41531aec20e7c107c24eecfdd15e69c9bb0a8dd37b1840b9e0b207b'
AND contract_address = '0xba12222222228d8ba445958a75a0704d566bf2c8'
AND tx_status = 'SUCCESS'
{% if is_incremental() %}
AND _inserted_timestamp >= (

View File

@ -33,7 +33,8 @@ WITH contract_deployments AS (
'0xd19baeadc667cf2015e395f2b08668ef120f41f5'
)
AND TYPE ILIKE 'create%'
AND tx_status ILIKE 'success'
AND tx_status = 'SUCCESS'
AND trace_status = 'SUCCESS'
{% if is_incremental() %}
AND _inserted_timestamp >= (

View File

@ -80,6 +80,7 @@ curve_base AS (
--TokenExchange
'0xd013ca23e77a65003c2c659c5442c00c805371b7fc1ebd4c206c41d1536bd90b' --TokenExchangeUnderlying
)
AND tx_status = 'SUCCESS'
{% if is_incremental() %}
AND _inserted_timestamp >= (
@ -148,6 +149,7 @@ token_transfers AS (
curve_base
)
AND CONCAT('0x', SUBSTR(topics [2] :: STRING, 27, 40)) <> '0x0000000000000000000000000000000000000000'
AND tx_status = 'SUCCESS'
{% if is_incremental() %}
AND _inserted_timestamp >= (

View File

@ -27,6 +27,7 @@ WITH pool_creation AS (
WHERE
contract_address = LOWER('0xA818b4F111Ccac7AA31D0BCc0806d64F2E0737D7')
AND topics [0] :: STRING = '0x0d3648bd0f6ba80134a33ba9275ac585d9d315f0ad8355cddefde31afa28d0e9' --PairCreated
AND tx_status = 'SUCCESS'
{% if is_incremental() %}
AND _inserted_timestamp >= (

View File

@ -3,15 +3,15 @@
incremental_strategy = 'delete+insert',
unique_key = ['block_number','platform','version'],
cluster_by = ['block_timestamp::DATE'],
tags = ['curated','reorg']
tags = ['curated','reorg','heal']
) }}
WITH contracts AS (
SELECT
contract_address AS address,
token_symbol AS symbol,
token_decimals AS decimals,
contract_address,
token_symbol,
token_decimals,
_inserted_timestamp
FROM
{{ ref('silver__contracts') }}
@ -24,10 +24,8 @@ balancer AS (
contract_address,
pool_address,
pool_name,
'balancer' AS platform,
'v1' AS version,
_log_id AS _id,
_inserted_timestamp,
NULL AS fee,
NULL AS tick_spacing,
token0,
token1,
token2,
@ -35,15 +33,19 @@ balancer AS (
token4,
token5,
token6,
token7
token7,
'balancer' AS platform,
'v1' AS version,
_log_id AS _id,
_inserted_timestamp
FROM
{{ ref('silver_dex__balancer_pools') }}
{% if is_incremental() %}
{% if is_incremental() and 'balancer' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '12 hours'
MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
@ -57,10 +59,8 @@ curve AS (
deployer_address AS contract_address,
pool_address,
pool_name,
'curve' AS platform,
'v1' AS version,
_call_id AS _id,
_inserted_timestamp,
NULL AS fee,
NULL AS tick_spacing,
MAX(
CASE
WHEN token_num = 1 THEN token_address
@ -100,15 +100,19 @@ curve AS (
CASE
WHEN token_num = 8 THEN token_address
END
) AS token7
) AS token7,
'curve' AS platform,
'v1' AS version,
_call_id AS _id,
_inserted_timestamp
FROM
{{ ref('silver_dex__curve_pools') }}
{% if is_incremental() %}
{% if is_incremental() and 'curve' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '12 hours'
MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
@ -124,8 +128,16 @@ honeyswap AS (
contract_address,
pool_address,
NULL AS pool_name,
NULL AS fee,
NULL AS tick_spacing,
token0,
token1,
NULL AS token2,
NULL AS token3,
NULL AS token4,
NULL AS token5,
NULL AS token6,
NULL AS token7,
'honeyswap' AS platform,
'v1' AS version,
_log_id AS _id,
@ -133,11 +145,11 @@ honeyswap AS (
FROM
{{ ref('silver_dex__honeyswap_pools') }}
{% if is_incremental() %}
{% if is_incremental() and 'honeyswap' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '12 hours'
MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
@ -151,8 +163,16 @@ swapr AS (
contract_address,
pool_address,
NULL AS pool_name,
NULL AS fee,
NULL AS tick_spacing,
token0,
token1,
NULL AS token2,
NULL AS token3,
NULL AS token4,
NULL AS token5,
NULL AS token6,
NULL AS token7,
'swapr' AS platform,
'v1' AS version,
_log_id AS _id,
@ -160,11 +180,11 @@ swapr AS (
FROM
{{ ref('silver_dex__swapr_pools') }}
{% if is_incremental() %}
{% if is_incremental() and 'swapr' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '12 hours'
MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
@ -178,8 +198,16 @@ sushi AS (
contract_address,
pool_address,
NULL AS pool_name,
NULL AS fee,
NULL AS tick_spacing,
token0,
token1,
NULL AS token2,
NULL AS token3,
NULL AS token4,
NULL AS token5,
NULL AS token6,
NULL AS token7,
'sushiswap' AS platform,
'v1' AS version,
_log_id AS _id,
@ -187,17 +215,17 @@ sushi AS (
FROM
{{ ref('silver_dex__sushi_pools') }}
{% if is_incremental() %}
{% if is_incremental() and 'sushi' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '12 hours'
MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
{% endif %}
),
all_pools_standard AS (
all_pools AS (
SELECT
*
FROM
@ -212,8 +240,7 @@ all_pools_standard AS (
*
FROM
sushi
),
all_pools_other AS (
UNION ALL
SELECT
*
FROM
@ -224,96 +251,73 @@ all_pools_other AS (
FROM
curve
),
FINAL AS (
--when pool_name is null and balancer,curve
complete_lps AS (
SELECT
block_number,
block_timestamp,
tx_hash,
contract_address,
p.contract_address,
pool_address,
CASE
WHEN pool_name IS NULL THEN CONCAT(
WHEN pool_name IS NOT NULL THEN pool_name
WHEN pool_name IS NULL
AND platform IN (
'balancer',
'curve'
) THEN CONCAT(
COALESCE(c0.token_symbol, SUBSTRING(token0, 1, 5) || '...' || SUBSTRING(token0, 39, 42)),
CASE
WHEN token1 IS NOT NULL THEN '-' || COALESCE(c1.token_symbol, SUBSTRING(token1, 1, 5) || '...' || SUBSTRING(token1, 39, 42))
ELSE ''
END,
CASE
WHEN token2 IS NOT NULL THEN '-' || COALESCE(c2.token_symbol, SUBSTRING(token2, 1, 5) || '...' || SUBSTRING(token2, 39, 42))
ELSE ''
END,
CASE
WHEN token3 IS NOT NULL THEN '-' || COALESCE(c3.token_symbol, SUBSTRING(token3, 1, 5) || '...' || SUBSTRING(token3, 39, 42))
ELSE ''
END,
CASE
WHEN token4 IS NOT NULL THEN '-' || COALESCE(c4.token_symbol, SUBSTRING(token4, 1, 5) || '...' || SUBSTRING(token4, 39, 42))
ELSE ''
END,
CASE
WHEN token5 IS NOT NULL THEN '-' || COALESCE(c5.token_symbol, SUBSTRING(token5, 1, 5) || '...' || SUBSTRING(token5, 39, 42))
ELSE ''
END,
CASE
WHEN token6 IS NOT NULL THEN '-' || COALESCE(c6.token_symbol, SUBSTRING(token6, 1, 5) || '...' || SUBSTRING(token6, 39, 42))
ELSE ''
END,
CASE
WHEN token7 IS NOT NULL THEN '-' || COALESCE(c7.token_symbol, SUBSTRING(token7, 1, 5) || '...' || SUBSTRING(token7, 39, 42))
ELSE ''
END
)
ELSE CONCAT(
COALESCE(
c0.symbol,
c0.token_symbol,
CONCAT(SUBSTRING(token0, 1, 5), '...', SUBSTRING(token0, 39, 42))
),
'-',
COALESCE(
c1.symbol,
c1.token_symbol,
CONCAT(SUBSTRING(token1, 1, 5), '...', SUBSTRING(token1, 39, 42))
)
)
ELSE pool_name
END AS pool_name,
OBJECT_CONSTRUCT(
'token0',
token0,
'token1',
token1
) AS tokens,
OBJECT_CONSTRUCT(
'token0',
c0.symbol,
'token1',
c1.symbol
) AS symbols,
OBJECT_CONSTRUCT(
'token0',
c0.decimals,
'token1',
c1.decimals
) AS decimals,
platform,
version,
_id,
p._inserted_timestamp
FROM
all_pools_standard p
LEFT JOIN contracts c0
ON c0.address = p.token0
LEFT JOIN contracts c1
ON c1.address = p.token1
UNION ALL
SELECT
block_number,
block_timestamp,
tx_hash,
contract_address,
pool_address,
CASE
WHEN pool_name IS NULL THEN CONCAT(
COALESCE(c0.symbol, SUBSTRING(token0, 1, 5) || '...' || SUBSTRING(token0, 39, 42)),
CASE
WHEN token1 IS NOT NULL THEN '-' || COALESCE(c1.symbol, SUBSTRING(token1, 1, 5) || '...' || SUBSTRING(token1, 39, 42))
ELSE ''
END,
CASE
WHEN token2 IS NOT NULL THEN '-' || COALESCE(c2.symbol, SUBSTRING(token2, 1, 5) || '...' || SUBSTRING(token2, 39, 42))
ELSE ''
END,
CASE
WHEN token3 IS NOT NULL THEN '-' || COALESCE(c3.symbol, SUBSTRING(token3, 1, 5) || '...' || SUBSTRING(token3, 39, 42))
ELSE ''
END,
CASE
WHEN token4 IS NOT NULL THEN '-' || COALESCE(c4.symbol, SUBSTRING(token4, 1, 5) || '...' || SUBSTRING(token4, 39, 42))
ELSE ''
END,
CASE
WHEN token5 IS NOT NULL THEN '-' || COALESCE(c5.symbol, SUBSTRING(token5, 1, 5) || '...' || SUBSTRING(token5, 39, 42))
ELSE ''
END,
CASE
WHEN token6 IS NOT NULL THEN '-' || COALESCE(c6.symbol, SUBSTRING(token6, 1, 5) || '...' || SUBSTRING(token6, 39, 42))
ELSE ''
END,
CASE
WHEN token7 IS NOT NULL THEN '-' || COALESCE(c7.symbol, SUBSTRING(token7, 1, 5) || '...' || SUBSTRING(token7, 39, 42))
ELSE ''
END
)
ELSE pool_name
END AS pool_name,
fee,
tick_spacing,
token0,
token1,
token2,
token3,
token4,
token5,
token6,
token7,
OBJECT_CONSTRUCT(
'token0',
token0,
@ -334,62 +338,554 @@ FINAL AS (
) AS tokens,
OBJECT_CONSTRUCT(
'token0',
c0.symbol,
c0.token_symbol,
'token1',
c1.symbol,
c1.token_symbol,
'token2',
c2.symbol,
c2.token_symbol,
'token3',
c3.symbol,
c3.token_symbol,
'token4',
c4.symbol,
c4.token_symbol,
'token5',
c5.symbol,
c5.token_symbol,
'token6',
c6.symbol,
c6.token_symbol,
'token7',
c7.symbol
c7.token_symbol
) AS symbols,
OBJECT_CONSTRUCT(
'token0',
c0.decimals,
c0.token_decimals,
'token1',
c1.decimals,
c1.token_decimals,
'token2',
c2.decimals,
c2.token_decimals,
'token3',
c3.decimals,
c3.token_decimals,
'token4',
c4.decimals,
c4.token_decimals,
'token5',
c5.decimals,
c5.token_decimals,
'token6',
c6.decimals,
c6.token_decimals,
'token7',
c7.decimals
c7.token_decimals
) AS decimals,
platform,
version,
_id,
p._inserted_timestamp
FROM
all_pools_other p
all_pools p
LEFT JOIN contracts c0
ON c0.address = p.token0
ON c0.contract_address = p.token0
LEFT JOIN contracts c1
ON c1.address = p.token1
ON c1.contract_address = p.token1
LEFT JOIN contracts c2
ON c2.address = p.token2
ON c2.contract_address = p.token2
LEFT JOIN contracts c3
ON c3.address = p.token3
ON c3.contract_address = p.token3
LEFT JOIN contracts c4
ON c4.address = p.token4
ON c4.contract_address = p.token4
LEFT JOIN contracts c5
ON c5.address = p.token5
ON c5.contract_address = p.token5
LEFT JOIN contracts c6
ON c6.address = p.token6
ON c6.contract_address = p.token6
LEFT JOIN contracts c7
ON c7.address = p.token7
ON c7.contract_address = p.token7
),
{% if is_incremental() and var(
'HEAL_MODEL'
) %}
heal_model AS (
SELECT
block_number,
block_timestamp,
tx_hash,
t0.contract_address,
pool_address,
CASE
WHEN pool_name IS NOT NULL THEN pool_name
WHEN pool_name IS NULL
AND platform IN (
'balancer',
'curve'
) THEN CONCAT(
COALESCE(c0.token_symbol, SUBSTRING(token0, 1, 5) || '...' || SUBSTRING(token0, 39, 42)),
CASE
WHEN token1 IS NOT NULL THEN '-' || COALESCE(c1.token_symbol, SUBSTRING(token1, 1, 5) || '...' || SUBSTRING(token1, 39, 42))
ELSE ''
END,
CASE
WHEN token2 IS NOT NULL THEN '-' || COALESCE(c2.token_symbol, SUBSTRING(token2, 1, 5) || '...' || SUBSTRING(token2, 39, 42))
ELSE ''
END,
CASE
WHEN token3 IS NOT NULL THEN '-' || COALESCE(c3.token_symbol, SUBSTRING(token3, 1, 5) || '...' || SUBSTRING(token3, 39, 42))
ELSE ''
END,
CASE
WHEN token4 IS NOT NULL THEN '-' || COALESCE(c4.token_symbol, SUBSTRING(token4, 1, 5) || '...' || SUBSTRING(token4, 39, 42))
ELSE ''
END,
CASE
WHEN token5 IS NOT NULL THEN '-' || COALESCE(c5.token_symbol, SUBSTRING(token5, 1, 5) || '...' || SUBSTRING(token5, 39, 42))
ELSE ''
END,
CASE
WHEN token6 IS NOT NULL THEN '-' || COALESCE(c6.token_symbol, SUBSTRING(token6, 1, 5) || '...' || SUBSTRING(token6, 39, 42))
ELSE ''
END,
CASE
WHEN token7 IS NOT NULL THEN '-' || COALESCE(c7.token_symbol, SUBSTRING(token7, 1, 5) || '...' || SUBSTRING(token7, 39, 42))
ELSE ''
END
)
ELSE CONCAT(
COALESCE(
c0.token_symbol,
CONCAT(SUBSTRING(token0, 1, 5), '...', SUBSTRING(token0, 39, 42))
),
'-',
COALESCE(
c1.token_symbol,
CONCAT(SUBSTRING(token1, 1, 5), '...', SUBSTRING(token1, 39, 42))
)
)
END AS pool_name_heal,
fee,
tick_spacing,
token0,
token1,
token2,
token3,
token4,
token5,
token6,
token7,
tokens,
OBJECT_CONSTRUCT(
'token0',
c0.token_symbol,
'token1',
c1.token_symbol,
'token2',
c2.token_symbol,
'token3',
c3.token_symbol,
'token4',
c4.token_symbol,
'token5',
c5.token_symbol,
'token6',
c6.token_symbol,
'token7',
c7.token_symbol
) AS symbols_heal,
OBJECT_CONSTRUCT(
'token0',
c0.token_decimals,
'token1',
c1.token_decimals,
'token2',
c2.token_decimals,
'token3',
c3.token_decimals,
'token4',
c4.token_decimals,
'token5',
c5.token_decimals,
'token6',
c6.token_decimals,
'token7',
c7.token_decimals
) AS decimals_heal,
platform,
version,
_id,
t0._inserted_timestamp
FROM
{{ this }}
t0
LEFT JOIN contracts c0
ON c0.contract_address = t0.token0
LEFT JOIN contracts c1
ON c1.contract_address = t0.token1
LEFT JOIN contracts c2
ON c2.contract_address = t0.token2
LEFT JOIN contracts c3
ON c3.contract_address = t0.token3
LEFT JOIN contracts c4
ON c4.contract_address = t0.token4
LEFT JOIN contracts c5
ON c5.contract_address = t0.token5
LEFT JOIN contracts c6
ON c6.contract_address = t0.token6
LEFT JOIN contracts c7
ON c7.contract_address = t0.token7
WHERE
CONCAT(
t0.block_number,
'-',
t0.platform,
'-',
t0.version
) IN (
SELECT
CONCAT(
t1.block_number,
'-',
t1.platform,
'-',
t1.version
)
FROM
{{ this }}
t1
WHERE
t1.decimals :token0 :: INT IS NULL
AND t1._inserted_timestamp < (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
AND EXISTS (
SELECT
1
FROM
{{ ref('silver__contracts') }} C
WHERE
C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE())
AND C.token_decimals IS NOT NULL
AND C.contract_address = t1.tokens :token0 :: STRING)
GROUP BY
1
)
OR CONCAT(
t0.block_number,
'-',
t0.platform,
'-',
t0.version
) IN (
SELECT
CONCAT(
t2.block_number,
'-',
t2.platform,
'-',
t2.version
)
FROM
{{ this }}
t2
WHERE
t2.decimals :token1 :: INT IS NULL
AND t2._inserted_timestamp < (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
AND EXISTS (
SELECT
1
FROM
{{ ref('silver__contracts') }} C
WHERE
C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE())
AND C.token_decimals IS NOT NULL
AND C.contract_address = t2.tokens :token1 :: STRING)
GROUP BY
1
)
OR CONCAT(
t0.block_number,
'-',
t0.platform,
'-',
t0.version
) IN (
SELECT
CONCAT(
t3.block_number,
'-',
t3.platform,
'-',
t3.version
)
FROM
{{ this }}
t3
WHERE
t3.decimals :token2 :: INT IS NULL
AND t3._inserted_timestamp < (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
AND EXISTS (
SELECT
1
FROM
{{ ref('silver__contracts') }} C
WHERE
C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE())
AND C.token_decimals IS NOT NULL
AND C.contract_address = t3.tokens :token2 :: STRING)
GROUP BY
1
)
OR CONCAT(
t0.block_number,
'-',
t0.platform,
'-',
t0.version
) IN (
SELECT
CONCAT(
t4.block_number,
'-',
t4.platform,
'-',
t4.version
)
FROM
{{ this }}
t4
WHERE
t4.decimals :token3 :: INT IS NULL
AND t4._inserted_timestamp < (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
AND EXISTS (
SELECT
1
FROM
{{ ref('silver__contracts') }} C
WHERE
C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE())
AND C.token_decimals IS NOT NULL
AND C.contract_address = t4.tokens :token3 :: STRING)
GROUP BY
1
)
OR CONCAT(
t0.block_number,
'-',
t0.platform,
'-',
t0.version
) IN (
SELECT
CONCAT(
t5.block_number,
'-',
t5.platform,
'-',
t5.version
)
FROM
{{ this }}
t5
WHERE
t5.decimals :token4 :: INT IS NULL
AND t5._inserted_timestamp < (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
AND EXISTS (
SELECT
1
FROM
{{ ref('silver__contracts') }} C
WHERE
C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE())
AND C.token_decimals IS NOT NULL
AND C.contract_address = t5.tokens :token4 :: STRING)
GROUP BY
1
)
OR CONCAT(
t0.block_number,
'-',
t0.platform,
'-',
t0.version
) IN (
SELECT
CONCAT(
t6.block_number,
'-',
t6.platform,
'-',
t6.version
)
FROM
{{ this }}
t6
WHERE
t6.decimals :token5 :: INT IS NULL
AND t6._inserted_timestamp < (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
AND EXISTS (
SELECT
1
FROM
{{ ref('silver__contracts') }} C
WHERE
C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE())
AND C.token_decimals IS NOT NULL
AND C.contract_address = t6.tokens :token5 :: STRING)
GROUP BY
1
)
OR CONCAT(
t0.block_number,
'-',
t0.platform,
'-',
t0.version
) IN (
SELECT
CONCAT(
t7.block_number,
'-',
t7.platform,
'-',
t7.version
)
FROM
{{ this }}
t7
WHERE
t7.decimals :token6 :: INT IS NULL
AND t7._inserted_timestamp < (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
AND EXISTS (
SELECT
1
FROM
{{ ref('silver__contracts') }} C
WHERE
C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE())
AND C.token_decimals IS NOT NULL
AND C.contract_address = t7.tokens :token6 :: STRING)
GROUP BY
1
)
OR CONCAT(
t0.block_number,
'-',
t0.platform,
'-',
t0.version
) IN (
SELECT
CONCAT(
t8.block_number,
'-',
t8.platform,
'-',
t8.version
)
FROM
{{ this }}
t8
WHERE
t8.decimals :token7 :: INT IS NULL
AND t8._inserted_timestamp < (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
AND EXISTS (
SELECT
1
FROM
{{ ref('silver__contracts') }} C
WHERE
C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE())
AND C.token_decimals IS NOT NULL
AND C.contract_address = t8.tokens :token7 :: STRING)
GROUP BY
1
)
),
{% endif %}
FINAL AS (
SELECT
*
FROM
complete_lps
{% if is_incremental() and var(
'HEAL_MODEL'
) %}
UNION ALL
SELECT
block_number,
block_timestamp,
tx_hash,
contract_address,
pool_address,
pool_name_heal AS pool_name,
fee,
tick_spacing,
token0,
token1,
token2,
token3,
token4,
token5,
token6,
token7,
tokens,
symbols_heal AS symbols,
decimals_heal AS decimals,
platform,
version,
_id,
_inserted_timestamp
FROM
heal_model
{% endif %}
)
SELECT
block_number,
@ -403,6 +899,16 @@ SELECT
tokens,
symbols,
decimals,
fee,
tick_spacing,
token0,
token1,
token2,
token3,
token4,
token5,
token6,
token7,
_id,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(

File diff suppressed because it is too large Load Diff

View File

@ -27,6 +27,7 @@ WITH pool_creation AS (
WHERE
contract_address = LOWER('0xc35DADB65012eC5796536bD9864eD8773aBc74C4')
AND topics [0] :: STRING = '0x0d3648bd0f6ba80134a33ba9275ac585d9d315f0ad8355cddefde31afa28d0e9' --PairCreated
AND tx_status = 'SUCCESS'
{% if is_incremental() %}
AND _inserted_timestamp >= (

View File

@ -27,6 +27,7 @@ WITH pool_creation AS (
WHERE
contract_address = LOWER('0x5D48C95AdfFD4B40c1AAADc4e08fc44117E02179')
AND topics [0] :: STRING = '0x0d3648bd0f6ba80134a33ba9275ac585d9d315f0ad8355cddefde31afa28d0e9' --PairCreated
AND tx_status = 'SUCCESS'
{% if is_incremental() %}
AND _inserted_timestamp >= (

View File

@ -1,9 +1,10 @@
-- depends_on: {{ ref('silver__complete_token_prices') }}
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = ['block_number','platform'],
cluster_by = ['block_timestamp::DATE'],
tags = ['reorg','curated']
tags = ['reorg','curated','heal']
) }}
WITH aave AS (
@ -30,13 +31,13 @@ WITH aave AS (
FROM
{{ ref('silver__aave_borrows') }} A
{% if is_incremental() and 'aave' not in var('HEAL_CURATED_MODEL') %}
{% if is_incremental() and 'aave' not in var('HEAL_MODELS') %}
WHERE
A._inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '36 hours'
) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
@ -65,13 +66,13 @@ agave AS (
FROM
{{ ref('silver__agave_borrows') }} A
{% if is_incremental() and 'agave' not in var('HEAL_CURATED_MODEL') %}
{% if is_incremental() and 'agave' not in var('HEAL_MODELS') %}
WHERE
A._inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '36 hours'
) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
@ -100,13 +101,13 @@ spark AS (
FROM
{{ ref('silver__spark_borrows') }} A
{% if is_incremental() and 'spark' not in var('HEAL_CURATED_MODEL') %}
{% if is_incremental() and 'spark' not in var('HEAL_MODELS') %}
WHERE
A._inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '36 hours'
) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
@ -135,13 +136,13 @@ realt AS (
FROM
{{ ref('silver__realt_borrows') }} A
{% if is_incremental() and 'realt' not in var('HEAL_CURATED_MODEL') %}
{% if is_incremental() and 'realt' not in var('HEAL_MODELS') %}
WHERE
A._inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '36 hours'
) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
@ -168,7 +169,7 @@ borrow_union AS (
FROM
realt
),
FINAL AS (
complete_lending_borrows AS (
SELECT
tx_hash,
block_number,
@ -178,10 +179,7 @@ FINAL AS (
origin_to_address,
origin_function_signature,
b.contract_address,
CASE
WHEN platform = 'Compound V3' THEN 'Withdraw'
ELSE 'Borrow'
END AS event_name,
'Borrow' AS event_name,
borrower,
protocol_market,
b.token_address,
@ -205,8 +203,126 @@ FINAL AS (
'hour',
block_timestamp
) = p.hour
LEFT JOIN {{ ref('silver__contracts') }} C
ON b.token_address = C.contract_address
),
{% if is_incremental() and var(
'HEAL_MODEL'
) %}
heal_model AS (
SELECT
tx_hash,
block_number,
block_timestamp,
event_index,
origin_from_address,
origin_to_address,
origin_function_signature,
t0.contract_address,
event_name,
borrower,
protocol_market,
t0.token_address,
t0.token_symbol,
amount_unadj,
amount,
ROUND(
amount * p.price,
2
) AS amount_usd_heal,
platform,
t0.blockchain,
t0._LOG_ID,
t0._INSERTED_TIMESTAMP
FROM
{{ this }}
t0
LEFT JOIN {{ ref('price__ez_prices_hourly') }}
p
ON t0.token_address = p.token_address
AND DATE_TRUNC(
'hour',
block_timestamp
) = p.hour
WHERE
CONCAT(
t0.block_number,
'-',
t0.platform
) IN (
SELECT
CONCAT(
t1.block_number,
'-',
t1.platform
)
FROM
{{ this }}
t1
WHERE
t1.amount_usd IS NULL
AND t1._inserted_timestamp < (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
AND EXISTS (
SELECT
1
FROM
{{ ref('silver__complete_token_prices') }}
p
WHERE
p._inserted_timestamp > DATEADD('DAY', -14, SYSDATE())
AND p.price IS NOT NULL
AND p.token_address = t1.token_address
AND p.hour = DATE_TRUNC(
'hour',
t1.block_timestamp
)
)
GROUP BY
1
)
),
{% endif %}
FINAL AS (
SELECT
*
FROM
complete_lending_borrows
{% if is_incremental() and var(
'HEAL_MODEL'
) %}
UNION ALL
SELECT
tx_hash,
block_number,
block_timestamp,
event_index,
origin_from_address,
origin_to_address,
origin_function_signature,
contract_address,
event_name,
borrower,
protocol_market,
token_address,
token_symbol,
amount_unadj,
amount,
amount_usd_heal AS amount_usd,
platform,
blockchain,
_LOG_ID,
_INSERTED_TIMESTAMP
FROM
heal_model
{% endif %}
)
SELECT
*,

View File

@ -1,9 +1,10 @@
-- depends_on: {{ ref('silver__complete_token_prices') }}
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = ['block_number','platform'],
cluster_by = ['block_timestamp::DATE'],
tags = ['reorg','curated']
tags = ['reorg','curated','heal']
) }}
WITH aave AS (
@ -30,11 +31,11 @@ WITH aave AS (
FROM
{{ ref('silver__aave_deposits') }}
{% if is_incremental() and 'aave' not in var('HEAL_CURATED_MODEL') %}
{% if is_incremental() and 'aave' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '36 hours'
MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
@ -63,11 +64,11 @@ agave AS (
FROM
{{ ref('silver__agave_deposits') }}
{% if is_incremental() and 'agave' not in var('HEAL_CURATED_MODEL') %}
{% if is_incremental() and 'agave' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '36 hours'
MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
@ -96,11 +97,11 @@ spark AS (
FROM
{{ ref('silver__spark_deposits') }}
{% if is_incremental() and 'spark' not in var('HEAL_CURATED_MODEL') %}
{% if is_incremental() and 'spark' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '36 hours'
MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
@ -129,11 +130,11 @@ realt AS (
FROM
{{ ref('silver__realt_deposits') }}
{% if is_incremental() and 'realt' not in var('HEAL_CURATED_MODEL') %}
{% if is_incremental() and 'realt' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '36 hours'
MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
@ -160,7 +161,7 @@ deposit_union AS (
FROM
realt
),
FINAL AS (
complete_lending_deposits AS (
SELECT
tx_hash,
block_number,
@ -171,7 +172,6 @@ FINAL AS (
origin_function_signature,
A.contract_address,
CASE
WHEN platform = 'Compound V3' THEN 'SupplyCollateral'
WHEN platform = 'Aave V3' THEN 'Supply'
ELSE 'Deposit'
END AS event_name,
@ -198,8 +198,126 @@ FINAL AS (
'hour',
block_timestamp
) = p.hour
LEFT JOIN {{ ref('silver__contracts') }} C
ON A.token_address = C.contract_address
),
{% if is_incremental() and var(
'HEAL_MODEL'
) %}
heal_model AS (
SELECT
tx_hash,
block_number,
block_timestamp,
event_index,
origin_from_address,
origin_to_address,
origin_function_signature,
t0.contract_address,
event_name,
protocol_market,
depositor,
t0.token_address,
t0.token_symbol,
amount_unadj,
amount,
ROUND(
amount * p.price,
2
) AS amount_usd_heal,
platform,
t0.blockchain,
t0._LOG_ID,
t0._INSERTED_TIMESTAMP
FROM
{{ this }}
t0
LEFT JOIN {{ ref('price__ez_prices_hourly') }}
p
ON t0.token_address = p.token_address
AND DATE_TRUNC(
'hour',
block_timestamp
) = p.hour
WHERE
CONCAT(
t0.block_number,
'-',
t0.platform
) IN (
SELECT
CONCAT(
t1.block_number,
'-',
t1.platform
)
FROM
{{ this }}
t1
WHERE
t1.amount_usd IS NULL
AND t1._inserted_timestamp < (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
AND EXISTS (
SELECT
1
FROM
{{ ref('silver__complete_token_prices') }}
p
WHERE
p._inserted_timestamp > DATEADD('DAY', -14, SYSDATE())
AND p.price IS NOT NULL
AND p.token_address = t1.token_address
AND p.hour = DATE_TRUNC(
'hour',
t1.block_timestamp
)
)
GROUP BY
1
)
),
{% endif %}
FINAL AS (
SELECT
*
FROM
complete_lending_deposits
{% if is_incremental() and var(
'HEAL_MODEL'
) %}
UNION ALL
SELECT
tx_hash,
block_number,
block_timestamp,
event_index,
origin_from_address,
origin_to_address,
origin_function_signature,
contract_address,
event_name,
protocol_market,
depositor,
token_address,
token_symbol,
amount_unadj,
amount,
amount_usd_heal AS amount_usd,
platform,
blockchain,
_LOG_ID,
_INSERTED_TIMESTAMP
FROM
heal_model
{% endif %}
)
SELECT
*,

View File

@ -1,9 +1,10 @@
-- depends_on: {{ ref('silver__complete_token_prices') }}
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = ['block_number','platform'],
cluster_by = ['block_timestamp::DATE'],
tags = ['reorg','curated']
tags = ['reorg','curated','heal']
) }}
WITH aave AS (
@ -33,11 +34,11 @@ WITH aave AS (
FROM
{{ ref('silver__aave_flashloans') }}
{% if is_incremental() and 'aave' not in var('HEAL_CURATED_MODEL') %}
{% if is_incremental() and 'aave' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '36 hours'
MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
@ -69,11 +70,11 @@ spark AS (
FROM
{{ ref('silver__spark_flashloans') }}
{% if is_incremental() and 'spark' not in var('HEAL_CURATED_MODEL') %}
{% if is_incremental() and 'spark' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '36 hours'
MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
@ -105,11 +106,11 @@ agave AS (
FROM
{{ ref('silver__agave_flashloans') }}
{% if is_incremental() and 'agave' not in var('HEAL_CURATED_MODEL') %}
{% if is_incremental() and 'agave' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '36 hours'
MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
@ -141,11 +142,11 @@ realt AS (
FROM
{{ ref('silver__realt_flashloans') }}
{% if is_incremental() and 'realt' not in var('HEAL_CURATED_MODEL') %}
{% if is_incremental() and 'realt' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '36 hours'
MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
@ -172,7 +173,7 @@ flashloan_union AS (
FROM
realt
),
FINAL AS (
complete_lending_flashloans AS (
SELECT
tx_hash,
block_number,
@ -187,7 +188,7 @@ FINAL AS (
initiator_address AS initiator,
target_address AS target,
f.token_address AS flashloan_token,
token_symbol AS flashloan_token_symbol,
f.symbol AS flashloan_token_symbol,
flashloan_amount_unadj,
flashloan_amount,
ROUND(
@ -213,8 +214,179 @@ FINAL AS (
'hour',
block_timestamp
) = p.hour
LEFT JOIN {{ ref('silver__contracts') }} C
ON f.token_address = C.contract_address
),
{% if is_incremental() and var(
'HEAL_MODEL'
) %}
heal_model AS (
SELECT
tx_hash,
block_number,
block_timestamp,
event_index,
origin_from_address,
origin_to_address,
origin_function_signature,
t0.contract_address,
event_name,
protocol_market,
initiator,
target,
flashloan_token,
flashloan_token_symbol,
flashloan_amount_unadj,
flashloan_amount,
ROUND(
flashloan_amount * p.price,
2
) AS flashloan_amount_usd_heal,
premium_amount_unadj,
premium_amount,
ROUND(
premium_amount * p.price,
2
) AS premium_amount_usd_heal,
platform,
t0.blockchain,
t0._LOG_ID,
t0._INSERTED_TIMESTAMP
FROM
{{ this }}
t0
LEFT JOIN {{ ref('price__ez_prices_hourly') }}
p
ON t0.flashloan_token = p.token_address
AND DATE_TRUNC(
'hour',
block_timestamp
) = p.hour
WHERE
CONCAT(
t0.block_number,
'-',
t0.platform
) IN (
SELECT
CONCAT(
t1.block_number,
'-',
t1.platform
)
FROM
{{ this }}
t1
WHERE
t1.flashloan_amount_usd IS NULL
AND t1._inserted_timestamp < (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
AND EXISTS (
SELECT
1
FROM
{{ ref('silver__complete_token_prices') }}
p
WHERE
p._inserted_timestamp > DATEADD('DAY', -14, SYSDATE())
AND p.price IS NOT NULL
AND p.token_address = t1.flashloan_token
AND p.hour = DATE_TRUNC(
'hour',
t1.block_timestamp
)
)
GROUP BY
1
)
OR CONCAT(
t0.block_number,
'-',
t0.platform
) IN (
SELECT
CONCAT(
t2.block_number,
'-',
t2.platform
)
FROM
{{ this }}
t2
WHERE
t2.premium_amount_usd IS NULL
AND t2._inserted_timestamp < (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
AND EXISTS (
SELECT
1
FROM
{{ ref('silver__complete_token_prices') }}
p
WHERE
p._inserted_timestamp > DATEADD('DAY', -14, SYSDATE())
AND p.price IS NOT NULL
AND p.token_address = t2.flashloan_token
AND p.hour = DATE_TRUNC(
'hour',
t2.block_timestamp
)
)
GROUP BY
1
)
),
{% endif %}
FINAL AS (
SELECT
*
FROM
complete_lending_flashloans
{% if is_incremental() and var(
'HEAL_MODEL'
) %}
UNION ALL
SELECT
tx_hash,
block_number,
block_timestamp,
event_index,
origin_from_address,
origin_to_address,
origin_function_signature,
contract_address,
event_name,
protocol_market,
initiator,
target,
flashloan_token,
flashloan_token_symbol,
flashloan_amount_unadj,
flashloan_amount,
flashloan_amount_usd_heal AS flashloan_amount_usd,
premium_amount_unadj,
premium_amount,
premium_amount_usd_heal AS premium_amount_usd,
platform,
blockchain,
_LOG_ID,
_INSERTED_TIMESTAMP
FROM
heal_model
{% endif %}
)
SELECT
*,

View File

@ -1,9 +1,10 @@
-- depends_on: {{ ref('silver__complete_token_prices') }}
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = ['block_number','platform'],
cluster_by = ['block_timestamp::DATE'],
tags = ['reorg','curated']
tags = ['reorg','curated','heal']
) }}
WITH aave AS (
@ -34,11 +35,11 @@ WITH aave AS (
FROM
{{ ref('silver__aave_liquidations') }}
{% if is_incremental() and 'aave' not in var('HEAL_CURATED_MODEL') %}
{% if is_incremental() and 'aave' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '36 hours'
MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
@ -71,11 +72,11 @@ spark AS (
FROM
{{ ref('silver__spark_liquidations') }}
{% if is_incremental() and 'spark' not in var('HEAL_CURATED_MODEL') %}
{% if is_incremental() and 'spark' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '36 hours'
MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
@ -108,11 +109,11 @@ agave AS (
FROM
{{ ref('silver__agave_liquidations') }}
{% if is_incremental() and 'agave' not in var('HEAL_CURATED_MODEL') %}
{% if is_incremental() and 'agave' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '36 hours'
MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
@ -145,11 +146,11 @@ realt AS (
FROM
{{ ref('silver__realt_liquidations') }}
{% if is_incremental() and 'realt' not in var('HEAL_CURATED_MODEL') %}
{% if is_incremental() and 'realt' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '36 hours'
MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
@ -176,40 +177,7 @@ liquidation_union AS (
FROM
realt
),
contracts AS (
SELECT
*
FROM
{{ ref('silver__contracts') }} C
WHERE
C.contract_address IN (
SELECT
DISTINCT(collateral_asset) AS asset
FROM
liquidation_union
)
),
prices AS (
SELECT
*
FROM
{{ ref('price__ez_prices_hourly') }}
p
WHERE
token_address IN (
SELECT
DISTINCT(collateral_asset) AS asset
FROM
liquidation_union
)
AND HOUR > (
SELECT
MIN(block_timestamp)
FROM
liquidation_union
)
),
FINAL AS (
complete_lending_liquidations AS (
SELECT
tx_hash,
block_number,
@ -219,10 +187,7 @@ FINAL AS (
origin_to_address,
origin_function_signature,
A.contract_address,
CASE
WHEN platform = 'Compound V3' THEN 'AbsorbCollateral'
ELSE 'LiquidationCall'
END AS event_name,
'LiquidationCall' AS event_name,
liquidator,
borrower,
protocol_collateral_asset AS protocol_market,
@ -230,16 +195,10 @@ FINAL AS (
collateral_asset_symbol AS collateral_token_symbol,
amount_unadj,
liquidated_amount AS amount,
CASE
WHEN platform <> 'Compound V3' THEN ROUND(
liquidated_amount * p.price,
2
)
ELSE ROUND(
liquidated_amount_usd,
2
)
END AS amount_usd,
ROUND(
liquidated_amount * p.price,
2
) AS amount_usd,
debt_asset AS debt_token,
debt_asset_symbol AS debt_token_symbol,
platform,
@ -248,14 +207,139 @@ FINAL AS (
A._INSERTED_TIMESTAMP
FROM
liquidation_union A
LEFT JOIN prices p
LEFT JOIN {{ ref('price__ez_prices_hourly') }}
p
ON collateral_asset = p.token_address
AND DATE_TRUNC(
'hour',
block_timestamp
) = p.hour
LEFT JOIN contracts C
ON collateral_asset = C.contract_address
),
{% if is_incremental() and var(
'HEAL_MODEL'
) %}
heal_model AS (
SELECT
tx_hash,
block_number,
block_timestamp,
event_index,
origin_from_address,
origin_to_address,
origin_function_signature,
t0.contract_address,
event_name,
liquidator,
borrower,
protocol_market,
collateral_token,
collateral_token_symbol,
amount_unadj,
amount,
ROUND(
amount * p.price,
2
) AS amount_usd_heal,
debt_token,
debt_token_symbol,
platform,
t0.blockchain,
t0._LOG_ID,
t0._INSERTED_TIMESTAMP
FROM
{{ this }}
t0
LEFT JOIN {{ ref('price__ez_prices_hourly') }}
p
ON t0.collateral_token = p.token_address
AND DATE_TRUNC(
'hour',
block_timestamp
) = p.hour
WHERE
CONCAT(
t0.block_number,
'-',
t0.platform
) IN (
SELECT
CONCAT(
t1.block_number,
'-',
t1.platform
)
FROM
{{ this }}
t1
WHERE
t1.amount_usd IS NULL
AND t1._inserted_timestamp < (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
AND EXISTS (
SELECT
1
FROM
{{ ref('silver__complete_token_prices') }}
p
WHERE
p._inserted_timestamp > DATEADD('DAY', -14, SYSDATE())
AND p.price IS NOT NULL
AND p.token_address = t1.collateral_token
AND p.hour = DATE_TRUNC(
'hour',
t1.block_timestamp
)
)
GROUP BY
1
)
),
{% endif %}
FINAL AS (
SELECT
*
FROM
complete_lending_liquidations
{% if is_incremental() and var(
'HEAL_MODEL'
) %}
UNION ALL
SELECT
tx_hash,
block_number,
block_timestamp,
event_index,
origin_from_address,
origin_to_address,
origin_function_signature,
contract_address,
event_name,
liquidator,
borrower,
protocol_market,
collateral_token,
collateral_token_symbol,
amount_unadj,
amount,
amount_usd_heal AS amount_usd,
debt_token,
debt_token_symbol,
platform,
blockchain,
_LOG_ID,
_INSERTED_TIMESTAMP
FROM
heal_model
{% endif %}
)
SELECT
*,

View File

@ -1,9 +1,10 @@
-- depends_on: {{ ref('silver__complete_token_prices') }}
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = ['block_number','platform'],
cluster_by = ['block_timestamp::DATE'],
tags = ['reorg','curated']
tags = ['reorg','curated','heal']
) }}
WITH aave AS (
@ -31,11 +32,11 @@ WITH aave AS (
FROM
{{ ref('silver__aave_repayments') }}
{% if is_incremental() and 'aave' not in var('HEAL_CURATED_MODEL') %}
{% if is_incremental() and 'aave' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '36 hours'
MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
@ -65,11 +66,11 @@ spark AS (
FROM
{{ ref('silver__spark_repayments') }}
{% if is_incremental() and 'spark' not in var('HEAL_CURATED_MODEL') %}
{% if is_incremental() and 'spark' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '36 hours'
MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
@ -99,11 +100,11 @@ agave AS (
FROM
{{ ref('silver__agave_repayments') }}
{% if is_incremental() and 'agave' not in var('HEAL_CURATED_MODEL') %}
{% if is_incremental() and 'agave' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '36 hours'
MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
@ -133,11 +134,11 @@ realt AS (
FROM
{{ ref('silver__realt_repayments') }}
{% if is_incremental() and 'realt' not in var('HEAL_CURATED_MODEL') %}
{% if is_incremental() and 'realt' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '36 hours'
MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
@ -164,7 +165,7 @@ repayments_union AS (
FROM
realt
),
FINAL AS (
complete_lending_repayments AS (
SELECT
tx_hash,
block_number,
@ -174,10 +175,7 @@ FINAL AS (
origin_to_address,
origin_function_signature,
A.contract_address,
CASE
WHEN platform = 'Compound V3' THEN 'Supply'
ELSE 'Repay'
END AS event_name,
'Repay' AS event_name,
protocol_market,
payer_address AS payer,
borrower,
@ -202,8 +200,128 @@ FINAL AS (
'hour',
block_timestamp
) = p.hour
LEFT JOIN {{ ref('silver__contracts') }} C
ON A.token_address = C.contract_address
),
{% if is_incremental() and var(
'HEAL_MODEL'
) %}
heal_model AS (
SELECT
tx_hash,
block_number,
block_timestamp,
event_index,
origin_from_address,
origin_to_address,
origin_function_signature,
t0.contract_address,
event_name,
protocol_market,
payer,
borrower,
t0.token_address,
t0.token_symbol,
amount_unadj,
amount,
ROUND(
amount * p.price,
2
) AS amount_usd_heal,
platform,
t0.blockchain,
t0._LOG_ID,
t0._INSERTED_TIMESTAMP
FROM
{{ this }}
t0
LEFT JOIN {{ ref('price__ez_prices_hourly') }}
p
ON t0.token_address = p.token_address
AND DATE_TRUNC(
'hour',
block_timestamp
) = p.hour
WHERE
CONCAT(
t0.block_number,
'-',
t0.platform
) IN (
SELECT
CONCAT(
t1.block_number,
'-',
t1.platform
)
FROM
{{ this }}
t1
WHERE
t1.amount_usd IS NULL
AND t1._inserted_timestamp < (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
AND EXISTS (
SELECT
1
FROM
{{ ref('silver__complete_token_prices') }}
p
WHERE
p._inserted_timestamp > DATEADD('DAY', -14, SYSDATE())
AND p.price IS NOT NULL
AND p.token_address = t1.token_address
AND p.hour = DATE_TRUNC(
'hour',
t1.block_timestamp
)
)
GROUP BY
1
)
),
{% endif %}
FINAL AS (
SELECT
*
FROM
complete_lending_repayments
{% if is_incremental() and var(
'HEAL_MODEL'
) %}
UNION ALL
SELECT
tx_hash,
block_number,
block_timestamp,
event_index,
origin_from_address,
origin_to_address,
origin_function_signature,
contract_address,
event_name,
protocol_market,
payer,
borrower,
token_address,
token_symbol,
amount_unadj,
amount,
amount_usd_heal AS amount_usd,
platform,
blockchain,
_LOG_ID,
_INSERTED_TIMESTAMP
FROM
heal_model
{% endif %}
)
SELECT
*,

View File

@ -1,9 +1,10 @@
-- depends_on: {{ ref('silver__complete_token_prices') }}
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = ['block_number','platform'],
cluster_by = ['block_timestamp::DATE'],
tags = ['reorg','curated']
tags = ['reorg','curated','heal']
) }}
WITH aave AS (
@ -30,13 +31,13 @@ WITH aave AS (
FROM
{{ ref('silver__aave_withdraws') }}
{% if is_incremental() and 'aave' not in var('HEAL_CURATED_MODEL') %}
{% if is_incremental() and 'aave' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '36 hours'
) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
@ -65,13 +66,13 @@ spark AS (
FROM
{{ ref('silver__spark_withdraws') }}
{% if is_incremental() and 'spark' not in var('HEAL_CURATED_MODEL') %}
{% if is_incremental() and 'spark' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '36 hours'
) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
@ -100,13 +101,13 @@ agave AS (
FROM
{{ ref('silver__agave_withdraws') }}
{% if is_incremental() and 'agave' not in var('HEAL_CURATED_MODEL') %}
{% if is_incremental() and 'agave' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '36 hours'
) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
@ -135,13 +136,13 @@ realt AS (
FROM
{{ ref('silver__realt_withdraws') }}
{% if is_incremental() and 'realt' not in var('HEAL_CURATED_MODEL') %}
{% if is_incremental() and 'realt' not in var('HEAL_MODELS') %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '36 hours'
) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
@ -168,7 +169,7 @@ withdraws_union AS (
FROM
realt
),
FINAL AS (
complete_lending_withdraws AS (
SELECT
tx_hash,
block_number,
@ -178,10 +179,7 @@ FINAL AS (
origin_to_address,
origin_function_signature,
A.contract_address,
CASE
WHEN platform = 'Compound V3' THEN 'WithdrawCollateral'
ELSE 'Withdraw'
END AS event_name,
'Withdraw' AS event_name,
protocol_market,
depositor_address AS depositor,
A.token_address,
@ -205,8 +203,126 @@ FINAL AS (
'hour',
block_timestamp
) = p.hour
LEFT JOIN {{ ref('silver__contracts') }} C
ON A.token_address = C.contract_address
),
{% if is_incremental() and var(
'HEAL_MODEL'
) %}
heal_model AS (
SELECT
tx_hash,
block_number,
block_timestamp,
event_index,
origin_from_address,
origin_to_address,
origin_function_signature,
t0.contract_address,
event_name,
protocol_market,
depositor,
t0.token_address,
t0.token_symbol,
amount_unadj,
amount,
ROUND(
amount * p.price,
2
) AS amount_usd_heal,
platform,
t0.blockchain,
t0._log_id,
t0._inserted_timestamp
FROM
{{ this }}
t0
LEFT JOIN {{ ref('price__ez_prices_hourly') }}
p
ON t0.token_address = p.token_address
AND DATE_TRUNC(
'hour',
block_timestamp
) = p.hour
WHERE
CONCAT(
t0.block_number,
'-',
t0.platform
) IN (
SELECT
CONCAT(
t1.block_number,
'-',
t1.platform
)
FROM
{{ this }}
t1
WHERE
t1.amount_usd IS NULL
AND t1._inserted_timestamp < (
SELECT
MAX(
_inserted_timestamp
) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
FROM
{{ this }}
)
AND EXISTS (
SELECT
1
FROM
{{ ref('silver__complete_token_prices') }}
p
WHERE
p._inserted_timestamp > DATEADD('DAY', -14, SYSDATE())
AND p.price IS NOT NULL
AND p.token_address = t1.token_address
AND p.hour = DATE_TRUNC(
'hour',
t1.block_timestamp
)
)
GROUP BY
1
)
),
{% endif %}
FINAL AS (
SELECT
*
FROM
complete_lending_withdraws
{% if is_incremental() and var(
'HEAL_MODEL'
) %}
UNION ALL
SELECT
tx_hash,
block_number,
block_timestamp,
event_index,
origin_from_address,
origin_to_address,
origin_function_signature,
contract_address,
event_name,
protocol_market,
depositor,
token_address,
token_symbol,
amount_unadj,
amount,
amount_usd_heal AS amount_usd,
platform,
blockchain,
_log_id,
_inserted_timestamp
FROM
heal_model
{% endif %}
)
SELECT
*,