breakdown

This commit is contained in:
WHYTEWYLL 2024-09-13 20:35:38 +03:00
parent 657a9c7f21
commit ecaacc008e
12 changed files with 371 additions and 205 deletions

View File

@ -60,7 +60,7 @@ SELECT
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ ref('silver__token_transfers') }}
{{ ref('silver__token_transfers_complete') }}
t
LEFT JOIN hourly_prices p
ON t.contract_address = p.token_address

View File

@ -1,171 +0,0 @@
{{ config(
materialized = 'incremental',
incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','modified_timestamp::Date'],
unique_key = 'transfers_id',
incremental_strategy = 'merge',
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,action_id,contract_address,from_address,to_address);",
tags = ['curated','scheduled_non_core']
) }}
{# Note - multisource model #}
-- Curation Challenge - 'https://flipsidecrypto.xyz/Hossein/transfer-sector-of-near-curation-challenge-zgM44F'
------------------------------ NEAR Tokens (NEP 141) --------------------------------
WITH orders_final AS (
SELECT
*
FROM
{{ref('silver__token_transfer_orders')}}
),
add_liquidity AS (
SELECT
*
FROM
{{ref('silver__token_transfer_liquidity')}}
),
ft_transfers_method AS (
SELECT
*
FROM
{{ref('silver__token_transfer_ft_transfers_method')}}
),
ft_transfers_event AS (
SELECT
*
FROM
{{ref('silver__token_transfer_ft_transfers_event')}}
),
ft_mints_final AS (
SELECT
*
FROM
{{ref('silver__token_transfer_mints')}}
),
nep_transfers AS (
SELECT
*
FROM
ft_transfers_method
UNION ALL
SELECT
*
FROM
ft_transfers_event
UNION ALL
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
contract_address,
from_address,
to_address,
amount_unadjusted,
memo,
rn,
_inserted_timestamp,
_modified_timestamp,
_partition_by_block_number
FROM
ft_mints_final
UNION ALL
SELECT
*
FROM
orders_final
UNION ALL
SELECT
*
FROM
add_liquidity
),
native_transfers AS (
SELECT
*
FROM
{{ref('silver__token_transfer_base_native')}}
),
------------------------------ MODELS --------------------------------
native_final AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
'wrap.near' AS contract_address,
from_address :: STRING,
to_address :: STRING,
NULL AS memo,
'0' AS rn,
'native' AS transfer_type,
amount_unadjusted :: STRING AS amount_raw,
amount_unadjusted :: FLOAT AS amount_raw_precise,
_inserted_timestamp,
_modified_timestamp,
_partition_by_block_number
FROM
native_transfers
),
nep_final AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
contract_address,
from_address,
to_address,
memo,
rn :: STRING AS rn,
'nep141' AS transfer_type,
amount_unadjusted :: STRING AS amount_raw,
amount_unadjusted :: FLOAT AS amount_raw_precise,
_inserted_timestamp,
_modified_timestamp,
_partition_by_block_number
FROM
nep_transfers
),
------------------------------ FINAL --------------------------------
transfer_union AS (
SELECT
*
FROM
nep_final
UNION ALL
SELECT
*
FROM
native_final
),
FINAL AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
rn,
contract_address,
from_address,
to_address,
memo,
amount_raw,
amount_raw_precise,
transfer_type,
_inserted_timestamp,
_modified_timestamp,
_partition_by_block_number
FROM
transfer_union
)
SELECT
*,
{{ dbt_utils.generate_surrogate_key(
['tx_hash', 'action_id','contract_address','amount_raw','from_address','to_address','memo','rn']
) }} AS transfers_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
FINAL

View File

@ -3,7 +3,7 @@
incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','_modified_timestamp::Date'],
unique_key = 'action_id',
unique_key = 'transfers_native_id',
incremental_strategy = 'merge',
tags = ['curated','scheduled_non_core']
) }}
@ -37,7 +37,34 @@ WITH native_transfers AS (
{{ this }}
)
{% endif %}
)
),
FINAL AS (
SELECT
*
FROM native_transfers
block_id,
block_timestamp,
tx_hash,
action_id,
'wrap.near' AS contract_address,
from_address :: STRING AS from_address,
to_address :: STRING AS to_address,
NULL AS memo,
'0' AS rn,
'native' AS transfer_type,
amount_unadjusted :: STRING AS amount_raw,
amount_unadjusted :: FLOAT AS amount_raw_precise,
_inserted_timestamp,
_modified_timestamp,
_partition_by_block_number
FROM
native_transfers
)
SELECT
*,
{{ dbt_utils.generate_surrogate_key(
['action_id']
) }} AS transfers_native_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
FINAL

View File

@ -3,7 +3,7 @@
incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','_modified_timestamp::Date'],
unique_key = 'action_id',
unique_key = 'transfers_base_id',
incremental_strategy = 'merge',
tags = ['curated','scheduled_non_core']
) }}
@ -46,6 +46,12 @@ WITH nep141 AS (
{% endif %}
)
SELECT
*
*,
{{ dbt_utils.generate_surrogate_key(
['action_id']
) }} AS transfers_base_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
nep141

View File

@ -3,7 +3,7 @@
incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','_modified_timestamp::Date'],
unique_key = 'action_id',
unique_key = 'transfers_event_id',
incremental_strategy = 'merge',
tags = ['curated','scheduled_non_core']
) }}
@ -24,17 +24,19 @@ WITH actions_events AS (
logs,
receipt_succeeded,
_inserted_timestamp,
_modified_timestamp,
modified_timestamp as _modified_timestamp,
_partition_by_block_number
FROM
{{ ref('silver__token_transfer_base') }}
{% if is_incremental() %}
WHERE _modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
FROM
{{ this }}
)
{% if var("MANUAL_FIX") %}
WHERE {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
WHERE _modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
ft_transfers_event AS (
@ -89,6 +91,12 @@ ft_transfers_final AS (
amount_unadjusted > 0
)
SELECT
*
*,
{{ dbt_utils.generate_surrogate_key(
['action_id']
) }} AS transfers_event_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
ft_transfers_final

View File

@ -25,11 +25,13 @@ WITH actions_events AS (
logs,
receipt_succeeded,
_inserted_timestamp,
_modified_timestamp,
modified_timestamp as _modified_timestamp,
_partition_by_block_number
FROM
{{ ref('silver__token_transfer_base') }}
{% if is_incremental() %}
{% if var("MANUAL_FIX") %}
WHERE {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
WHERE _modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
@ -82,6 +84,12 @@ ft_transfers_method AS (
AND amount_unadjusted IS NOT NULL
)
SELECT
*
*,
{{ dbt_utils.generate_surrogate_key(
['action_id']
) }} AS transfers_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
ft_transfers_method

View File

@ -3,7 +3,7 @@
incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','_modified_timestamp::Date'],
unique_key = 'action_id',
unique_key = 'transfers_liquidity_id',
incremental_strategy = 'merge',
tags = ['curated','scheduled_non_core']
) }}
@ -24,11 +24,13 @@ WITH actions_events AS (
logs,
receipt_succeeded,
_inserted_timestamp,
_modified_timestamp,
modified_timestamp as _modified_timestamp,
_partition_by_block_number
FROM
{{ ref('silver__token_transfer_base') }}
{% if is_incremental() %}
{% if var("MANUAL_FIX") %}
WHERE {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
WHERE _modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
@ -81,6 +83,12 @@ add_liquidity AS (
logs [0] LIKE 'Liquidity added [%minted % shares'
)
SELECT
*
*,
{{ dbt_utils.generate_surrogate_key(
['action_id']
) }} AS transfers_liquidity_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
add_liquidity

View File

@ -23,11 +23,13 @@ WITH actions_events AS (
logs,
receipt_succeeded,
_inserted_timestamp,
_modified_timestamp,
modified_timestamp as _modified_timestamp,
_partition_by_block_number
FROM
{{ ref('silver__token_transfer_base') }}
{% if is_incremental() %}
{% if var("MANUAL_FIX") %}
WHERE {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
WHERE _modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
@ -89,10 +91,13 @@ ft_mints_final AS (
amount_unadjusted > 0
)
SELECT
{{ dbt_utils.generate_surrogate_key(
*,
{{ dbt_utils.generate_surrogate_key(
['action_id','rn']
) }} AS mint_id,
*
)}} AS mint_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
ft_mints_final
WHERE

View File

@ -0,0 +1,175 @@
{{ config(
materialized = 'incremental',
incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','_modified_timestamp::Date'],
unique_key = 'transfers_non_native_id',
incremental_strategy = 'merge',
tags = ['curated','scheduled_non_core']
) }}
WITH nep_transfers AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
contract_address,
from_address,
to_address,
amount_unadjusted,
memo,
rn,
_inserted_timestamp,
modified_timestamp as _modified_timestamp,
_partition_by_block_number
FROM
{{ref('silver__token_transfer_ft_transfers_method')}}
{% if var("MANUAL_FIX") %}
WHERE {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
WHERE _modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
FROM
{{ this }}
)
{% endif %}
UNION ALL
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
contract_address,
from_address,
to_address,
amount_unadjusted,
memo,
rn,
_inserted_timestamp,
modified_timestamp as _modified_timestamp,
_partition_by_block_number
FROM
{{ref('silver__token_transfer_ft_transfers_event')}}
{% if var("MANUAL_FIX") %}
WHERE {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
WHERE _modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
FROM
{{ this }}
)
{% endif %}
UNION ALL
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
contract_address,
from_address,
to_address,
amount_unadjusted,
memo,
rn,
_inserted_timestamp,
modified_timestamp as _modified_timestamp,
_partition_by_block_number
FROM
{{ref('silver__token_transfer_mints')}}
{% if var("MANUAL_FIX") %}
WHERE {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
WHERE _modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
FROM
{{ this }}
)
{% endif %}
UNION ALL
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
contract_address,
from_address,
to_address,
amount_unadjusted,
memo,
rn,
_inserted_timestamp,
modified_timestamp as _modified_timestamp,
_partition_by_block_number
FROM
{{ref('silver__token_transfer_orders')}}
{% if var("MANUAL_FIX") %}
WHERE {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
WHERE _modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
FROM
{{ this }}
)
{% endif %}
UNION ALL
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
contract_address,
from_address,
to_address,
amount_unadjusted,
memo,
rn,
_inserted_timestamp,
modified_timestamp as _modified_timestamp,
_partition_by_block_number
FROM
{{ref('silver__token_transfer_liquidity')}}
{% if var("MANUAL_FIX") %}
WHERE {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
WHERE _modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
FINAL AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
contract_address,
from_address,
to_address,
memo,
rn :: STRING AS rn,
'nep141' AS transfer_type,
amount_unadjusted :: STRING AS amount_raw,
amount_unadjusted :: FLOAT AS amount_raw_precise,
_inserted_timestamp,
_modified_timestamp,
_partition_by_block_number
FROM
nep_transfers
)
SELECT
*,
{{ dbt_utils.generate_surrogate_key(
['action_id']
) }} AS transfers_non_native_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
FINAL

View File

@ -3,7 +3,7 @@
incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','_modified_timestamp::Date'],
unique_key = 'action_id',
unique_key = 'transfers_orders_id',
incremental_strategy = 'merge',
tags = ['curated','scheduled_non_core']
) }}
@ -23,11 +23,13 @@ WITH actions_events AS (
logs,
receipt_succeeded,
_inserted_timestamp,
_modified_timestamp,
modified_timestamp as _modified_timestamp,
_partition_by_block_number
FROM
{{ ref('silver__token_transfer_base') }}
{% if is_incremental() %}
{% if var("MANUAL_FIX") %}
WHERE {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
WHERE _modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
@ -83,6 +85,12 @@ orders_final AS (
amount_unadjusted > 0
)
SELECT
*
*,
{{ dbt_utils.generate_surrogate_key(
['action_id']
) }} AS transfers_orders_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
orders_final

View File

@ -0,0 +1,92 @@
{{ config(
materialized = 'incremental',
incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','modified_timestamp::Date'],
unique_key = 'transfers_id',
incremental_strategy = 'merge',
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,action_id,contract_address,from_address,to_address);",
tags = ['curated','scheduled_non_core']
) }}
WITH native_transfers AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
rn,
contract_address,
from_address,
to_address,
memo,
amount_raw,
amount_raw_precise,
transfer_type,
_inserted_timestamp,
modified_timestamp as _modified_timestamp,
_partition_by_block_number
FROM
{{ref('silver__token_transfer_native')}}
{% if var("MANUAL_FIX") %}
WHERE {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
WHERE _modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
non_native AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
rn,
contract_address,
from_address,
to_address,
memo,
amount_raw,
amount_raw_precise,
transfer_type,
_inserted_timestamp,
modified_timestamp as _modified_timestamp,
_partition_by_block_number
FROM
{{ref('silver__token_transfer_non_native_complete')}}
{% if var("MANUAL_FIX") %}
WHERE {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
WHERE _modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
FINAL AS (
SELECT
*
FROM
non_native
UNION ALL
SELECT
*
FROM
native_transfers
)
SELECT
*,
{{ dbt_utils.generate_surrogate_key(
['tx_hash', 'action_id','contract_address','amount_raw','from_address','to_address','memo','rn']
) }} AS transfers_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
FINAL

View File

@ -1,7 +1,7 @@
version: 2
models:
- name: silver__token_transfers
- name: silver__token_transfers_complete
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns: