simplify complete models and add HEAL_MODELS like in crosschain

This commit is contained in:
Jack Forgash 2024-09-17 12:54:20 -06:00
parent ecaacc008e
commit bb073e002e
17 changed files with 254 additions and 487 deletions

View File

@ -64,6 +64,7 @@ vars:
STREAMLINE_LOAD_LOOKBACK_HOURS: 3
RECEIPT_MAP_LOOKBACK_HOURS: 6
IS_MIGRATION: False
HEAL_MODELS: []
dispatch:
- macro_namespace: dbt

View File

@ -3,3 +3,9 @@
An unadjusted amount (of tokens, price, etc.) for the relevant record. This is the number as it appears on chain and is not decimal adjusted.
{% enddocs %}
{% docs amount_unadj %}
An unadjusted amount (of tokens, price, etc.) for the relevant record. This is the number as it appears on chain and is not decimal adjusted.
{% enddocs %}

View File

@ -1,7 +1,7 @@
{{ config(
materialized = 'incremental',
incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"],
unique_key = "fact_token_transfers_id",
unique_key = "ez_token_transfers_id",
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE'],
@ -32,8 +32,8 @@ SELECT
from_address,
to_address,
memo,
amount_raw,
amount_raw_precise,
amount_unadjusted :: STRING AS amount_raw,
amount_unadjusted :: FLOAT AS amount_raw_precise,
IFF(
C.decimals IS NOT NULL,
utils.udf_decimal_adjust(
@ -53,12 +53,9 @@ SELECT
C.symbol AS symbol,
price AS token_price,
transfer_type,
{{ dbt_utils.generate_surrogate_key(
['transfers_id']
) }} AS fact_token_transfers_id,
transfers_complete_id AS ez_token_transfers_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
SYSDATE() AS modified_timestamp
FROM
{{ ref('silver__token_transfers_complete') }}
t

View File

@ -3,6 +3,12 @@ version: 2
models:
- name: core__ez_token_transfers
description: This table records all native token transfers and nep-141
tests:
- dbt_utils.recency:
datepart: hour
field: block_timestamp
interval: 2
columns:
- name: BLOCK_ID
description: "{{ doc('block_id')}}"
@ -51,7 +57,7 @@ models:
- name: HAS_PRICE
description: "Boolean value indicating if the token has a price"
- name: FACT_TOKEN_TRANSFERS_ID
- name: EZ_TOKEN_TRANSFERS_ID
description: "{{doc('id')}}"
tests:
- unique:

View File

@ -1,157 +0,0 @@
{{ config(
materialized = 'incremental',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE'],
unique_key = 'action_id',
incremental_strategy = 'merge',
tags = ['curated','scheduled_non_core'],
enabled = False
) }}
{# TODO - test and apply, where applicable.
Model disabled but will keep in repo for ref #}
{# Note - multisource model #}
{% if execute %}
{% if is_incremental() %}
{% set max_modified_query %}
SELECT
MAX(_modified_timestamp) AS _modified_timestamp
FROM
{{ this }}
{% endset %}
{% set max_modified_timestamp = run_query(max_modified_query).columns [0].values() [0] %}
{% endif %}
{% set query = """ CREATE OR REPLACE TEMPORARY TABLE silver.transfers_s3_temp_dates as SELECT MIN(COALESCE(b._modified_timestamp,'2050-01-01') ) as _modified_timestamp FROM """ ~ ref('silver__actions_events_s3') ~ """ a join """ ~ ref('silver__streamline_transactions_final') ~ """ b ON A.tx_hash = b.tx_hash WHERE a.action_name = 'Transfer' """ %}
{% set incr = "" %}
{% if is_incremental() %}
{% set incr = """ AND GREATEST(a._modified_timestamp,b._modified_timestamp) >= '""" ~ max_modified_timestamp ~ """' """ %}
{% endif %}
{% do run_query(
query ~ incr
) %}
{% endif %}
WITH action_events AS(
SELECT
tx_hash,
block_id,
block_timestamp,
action_id,
action_data :deposit :: INT AS deposit,
predecessor_id,
receiver_id,
signer_id,
receipt_succeeded,
gas_price,
gas_burnt,
tokens_burnt,
_partition_by_block_number,
_inserted_timestamp,
modified_timestamp AS _modified_timestamp
FROM
{{ ref('silver__actions_events_s3') }}
WHERE
action_name = 'Transfer' {% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer') }}
{% else %}
{% if is_incremental() %}
AND _modified_timestamp >= '{{ max_modified_timestamp }}'
{% endif %}
{% endif %}
),
txs AS (
SELECT
tx_hash,
tx :receipt :: ARRAY AS tx_receipt,
block_id,
block_timestamp,
tx_receiver,
tx_signer,
transaction_fee,
gas_used,
tx_succeeded,
_partition_by_block_number,
_inserted_timestamp,
modified_timestamp AS _modified_timestamp
FROM
{{ ref('silver__streamline_transactions_final') }}
{% if var("MANUAL_FIX") %}
WHERE
{{ partition_load_manual('no_buffer') }}
{% else %}
{% if is_incremental() %}
WHERE
_modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
FROM
silver.transfers_s3_temp_dates
)
{% endif %}
{% endif %}
),
actions AS (
SELECT
A.tx_hash,
A.action_id,
A.block_id,
A.block_timestamp,
t.tx_signer,
t.tx_receiver,
A.predecessor_id,
A.receiver_id,
A.signer_id,
A.deposit,
t.transaction_fee,
A.gas_burnt AS gas_used,
A.receipt_succeeded,
t.tx_succeeded,
A._partition_by_block_number,
A._inserted_timestamp,
A._modified_timestamp
FROM
action_events A
INNER JOIN txs t
ON A.tx_hash = t.tx_hash
),
FINAL AS (
SELECT
block_id,
block_timestamp,
action_id,
deposit,
tx_hash,
tx_signer,
tx_receiver,
predecessor_id,
signer_id,
receiver_id,
transaction_fee,
gas_used,
tx_succeeded,
receipt_succeeded,
array_min([tx_succeeded, receipt_succeeded]) :: BOOLEAN AS status,
_partition_by_block_number,
_inserted_timestamp,
_modified_timestamp
FROM
actions
)
SELECT
*,
{{ dbt_utils.generate_surrogate_key(
['action_id']
) }} AS transfers_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
FINAL

View File

@ -1,70 +0,0 @@
{{ config(
materialized = 'incremental',
incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','_modified_timestamp::Date'],
unique_key = 'transfers_native_id',
incremental_strategy = 'merge',
tags = ['curated','scheduled_non_core']
) }}
WITH native_transfers AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
predecessor_id AS from_address,
receiver_id AS to_address,
IFF(REGEXP_LIKE(deposit, '^[0-9]+$'), deposit, NULL) AS amount_unadjusted,
--numeric validation (there are some exceptions that needs to be ignored)
receipt_succeeded,
_inserted_timestamp,
modified_timestamp AS _modified_timestamp,
_partition_by_block_number
FROM
{{ ref('silver__transfers_s3') }}
WHERE
status = TRUE
AND deposit != 0
{% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
AND _modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
FINAL AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
'wrap.near' AS contract_address,
from_address :: STRING AS from_address,
to_address :: STRING AS to_address,
NULL AS memo,
'0' AS rn,
'native' AS transfer_type,
amount_unadjusted :: STRING AS amount_raw,
amount_unadjusted :: FLOAT AS amount_raw_precise,
_inserted_timestamp,
_modified_timestamp,
_partition_by_block_number
FROM
native_transfers
)
SELECT
*,
{{ dbt_utils.generate_surrogate_key(
['action_id']
) }} AS transfers_native_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
FINAL

View File

@ -5,7 +5,7 @@
cluster_by = ['block_timestamp::DATE'],
unique_key = 'action_id',
incremental_strategy = 'merge',
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,action_id, tx_hash,tx_signer,tx_receiver,predecessor_id,signer_id,receiver_id);",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,action_id,predecessor_id,receiver_id);",
tags = ['curated','scheduled_non_core']
) }}
{# Note - multisource model #}

View File

@ -15,6 +15,7 @@ WITH nep141 AS (
block_timestamp,
tx_hash,
action_id,
predecessor_id,
signer_id,
receiver_id,
action_name,
@ -54,4 +55,4 @@ SELECT
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
nep141
nep141

View File

@ -1,10 +1,9 @@
{{ config(
materialized = 'incremental',
incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','_modified_timestamp::Date'],
unique_key = 'transfers_event_id',
incremental_strategy = 'merge',
incremental_strategy = 'delete+insert',
tags = ['curated','scheduled_non_core']
) }}

View File

@ -2,10 +2,9 @@
{{ config(
materialized = 'incremental',
incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','_modified_timestamp::Date'],
unique_key = 'action_id',
incremental_strategy = 'merge',
incremental_strategy = 'delete+insert',
tags = ['curated','scheduled_non_core']
) }}
@ -92,4 +91,4 @@ SELECT
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
ft_transfers_method
ft_transfers_method

View File

@ -1,10 +1,9 @@
{{ config(
materialized = 'incremental',
incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','_modified_timestamp::Date'],
unique_key = 'transfers_liquidity_id',
incremental_strategy = 'merge',
incremental_strategy = 'delete+insert',
tags = ['curated','scheduled_non_core']
) }}
@ -91,4 +90,4 @@ SELECT
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
add_liquidity
add_liquidity

View File

@ -1,10 +1,9 @@
{{ config(
materialized = 'incremental',
incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','_modified_timestamp::Date'],
unique_key = 'mint_id',
incremental_strategy = 'merge',
incremental_strategy = 'delete+insert',
tags = ['curated','scheduled_non_core']
) }}
@ -101,4 +100,4 @@ SELECT
FROM
ft_mints_final
WHERE
mint_id IS NOT NULL
mint_id IS NOT NULL

View File

@ -1,175 +0,0 @@
{{ config(
materialized = 'incremental',
incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','_modified_timestamp::Date'],
unique_key = 'transfers_non_native_id',
incremental_strategy = 'merge',
tags = ['curated','scheduled_non_core']
) }}
WITH nep_transfers AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
contract_address,
from_address,
to_address,
amount_unadjusted,
memo,
rn,
_inserted_timestamp,
modified_timestamp as _modified_timestamp,
_partition_by_block_number
FROM
{{ref('silver__token_transfer_ft_transfers_method')}}
{% if var("MANUAL_FIX") %}
WHERE {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
WHERE _modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
FROM
{{ this }}
)
{% endif %}
UNION ALL
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
contract_address,
from_address,
to_address,
amount_unadjusted,
memo,
rn,
_inserted_timestamp,
modified_timestamp as _modified_timestamp,
_partition_by_block_number
FROM
{{ref('silver__token_transfer_ft_transfers_event')}}
{% if var("MANUAL_FIX") %}
WHERE {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
WHERE _modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
FROM
{{ this }}
)
{% endif %}
UNION ALL
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
contract_address,
from_address,
to_address,
amount_unadjusted,
memo,
rn,
_inserted_timestamp,
modified_timestamp as _modified_timestamp,
_partition_by_block_number
FROM
{{ref('silver__token_transfer_mints')}}
{% if var("MANUAL_FIX") %}
WHERE {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
WHERE _modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
FROM
{{ this }}
)
{% endif %}
UNION ALL
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
contract_address,
from_address,
to_address,
amount_unadjusted,
memo,
rn,
_inserted_timestamp,
modified_timestamp as _modified_timestamp,
_partition_by_block_number
FROM
{{ref('silver__token_transfer_orders')}}
{% if var("MANUAL_FIX") %}
WHERE {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
WHERE _modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
FROM
{{ this }}
)
{% endif %}
UNION ALL
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
contract_address,
from_address,
to_address,
amount_unadjusted,
memo,
rn,
_inserted_timestamp,
modified_timestamp as _modified_timestamp,
_partition_by_block_number
FROM
{{ref('silver__token_transfer_liquidity')}}
{% if var("MANUAL_FIX") %}
WHERE {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
WHERE _modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
FINAL AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
contract_address,
from_address,
to_address,
memo,
rn :: STRING AS rn,
'nep141' AS transfer_type,
amount_unadjusted :: STRING AS amount_raw,
amount_unadjusted :: FLOAT AS amount_raw_precise,
_inserted_timestamp,
_modified_timestamp,
_partition_by_block_number
FROM
nep_transfers
)
SELECT
*,
{{ dbt_utils.generate_surrogate_key(
['action_id']
) }} AS transfers_non_native_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
FINAL

View File

@ -1,10 +1,9 @@
{{ config(
materialized = 'incremental',
incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','_modified_timestamp::Date'],
unique_key = 'transfers_orders_id',
incremental_strategy = 'merge',
incremental_strategy = 'delete+insert',
tags = ['curated','scheduled_non_core']
) }}
@ -93,4 +92,4 @@ SELECT
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
orders_final
orders_final

View File

@ -3,13 +3,48 @@
incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','modified_timestamp::Date'],
unique_key = 'transfers_id',
unique_key = 'transfers_complete_id',
incremental_strategy = 'merge',
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,action_id,contract_address,from_address,to_address);",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,contract_address,from_address,to_address);",
tags = ['curated','scheduled_non_core']
) }}
WITH native_transfers AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
'0' AS rn,
'wrap.near' AS contract_address,
predecessor_id AS from_address,
receiver_id AS to_address,
NULL AS memo,
IFF(REGEXP_LIKE(deposit, '^[0-9]+$'), deposit, NULL) AS amount_unadjusted,
-- unadj, raw, adj?
'native' AS transfer_type,
_inserted_timestamp,
modified_timestamp AS _modified_timestamp,
_partition_by_block_number
FROM
{{ ref('silver__transfers_s3') }}
{% if var("MANUAL_FIX") %}
WHERE
{{ partition_load_manual('no_buffer') }}
{% elif is_incremental() and 'native' not in var('HEAL_MODELS') %}
WHERE
_modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
ft_transfers_method AS (
SELECT
block_id,
block_timestamp,
@ -20,27 +55,30 @@ WITH native_transfers AS (
from_address,
to_address,
memo,
amount_raw,
amount_raw_precise,
transfer_type,
amount_unadjusted,
'nep141' AS transfer_type,
_inserted_timestamp,
modified_timestamp as _modified_timestamp,
modified_timestamp AS _modified_timestamp,
_partition_by_block_number
FROM
{{ref('silver__token_transfer_native')}}
{% if var("MANUAL_FIX") %}
WHERE {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
WHERE _modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
FROM
{{ this }}
)
{% endif %}
{{ ref('silver__token_transfer_ft_transfers_method') }}
{% if var("MANUAL_FIX") %}
WHERE
{{ partition_load_manual('no_buffer') }}
{% elif is_incremental() and 'ft_transfers_method' not in var('HEAL_MODELS') %}
WHERE
_modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
non_native AS (
SELECT
ft_transfers_event AS (
SELECT
block_id,
block_timestamp,
tx_hash,
@ -50,41 +88,176 @@ non_native AS (
from_address,
to_address,
memo,
amount_raw,
amount_raw_precise,
transfer_type,
amount_unadjusted,
'nep141' AS transfer_type,
_inserted_timestamp,
modified_timestamp as _modified_timestamp,
modified_timestamp AS _modified_timestamp,
_partition_by_block_number
FROM
{{ref('silver__token_transfer_non_native_complete')}}
{% if var("MANUAL_FIX") %}
WHERE {{ partition_load_manual('no_buffer') }}
{% elif is_incremental() %}
WHERE _modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
FROM
{{ this }}
)
{% endif %}
{{ ref('silver__token_transfer_ft_transfers_event') }}
{% if var("MANUAL_FIX") %}
WHERE
{{ partition_load_manual('no_buffer') }}
{% elif is_incremental() and 'ft_transfers_event' not in var('HEAL_MODELS') %}
WHERE
_modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
mints AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
rn,
contract_address,
from_address,
to_address,
memo,
amount_unadjusted,
'nep141' AS transfer_type,
_inserted_timestamp,
modified_timestamp AS _modified_timestamp,
_partition_by_block_number
FROM
{{ ref('silver__token_transfer_mints') }}
{% if var("MANUAL_FIX") %}
WHERE
{{ partition_load_manual('no_buffer') }}
{% elif is_incremental() and 'mints' not in var('HEAL_MODELS') %}
WHERE
_modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
orders AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
rn,
contract_address,
from_address,
to_address,
memo,
amount_unadjusted,
'nep141' AS transfer_type,
_inserted_timestamp,
modified_timestamp AS _modified_timestamp,
_partition_by_block_number
FROM
{{ ref('silver__token_transfer_orders') }}
{% if var("MANUAL_FIX") %}
WHERE
{{ partition_load_manual('no_buffer') }}
{% elif is_incremental() and 'orders' not in var('HEAL_MODELS') %}
WHERE
_modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
liquidity AS (
SELECT
block_id,
block_timestamp,
tx_hash,
action_id,
rn,
contract_address,
from_address,
to_address,
memo,
amount_unadjusted,
'nep141' AS transfer_type,
_inserted_timestamp,
modified_timestamp AS _modified_timestamp,
_partition_by_block_number
FROM
{{ ref('silver__token_transfer_liquidity') }}
{% if var("MANUAL_FIX") %}
WHERE
{{ partition_load_manual('no_buffer') }}
{% elif is_incremental() and 'liquidity' not in var('HEAL_MODELS') %}
WHERE
_modified_timestamp >= (
SELECT
MAX(_modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
FINAL AS (
SELECT
*
FROM
non_native
native_transfers
UNION ALL
SELECT
*
FROM
native_transfers
ft_transfers_method
UNION ALL
SELECT
*
FROM
ft_transfers_event
UNION ALL
SELECT
*
FROM
mints
UNION ALL
SELECT
*
FROM
orders
UNION ALL
SELECT
*
FROM
liquidity
)
SELECT
*,
block_id,
block_timestamp,
tx_hash,
action_id,
rn,
contract_address,
from_address,
to_address,
memo,
amount_unadjusted,
transfer_type,
_inserted_timestamp,
_modified_timestamp,
_partition_by_block_number,
{{ dbt_utils.generate_surrogate_key(
['tx_hash', 'action_id','contract_address','amount_raw','from_address','to_address','memo','rn']
) }} AS transfers_id,
['action_id','contract_address','amount_unadjusted','from_address','to_address','rn']
) }} AS transfers_complete_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id

View File

@ -2,17 +2,7 @@ version: 2
models:
- name: silver__token_transfers_complete
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- tx_hash
- action_id
- contract_address
- amount_raw
- from_address
- to_address
- memo
- rn
description: |-
This table records all the Native Token + FTs Transfers of the Near blockchain.
columns:
@ -23,7 +13,7 @@ models:
description: "{{ doc('block_timestamp')}}"
tests:
- not_null:
where: _inserted_timestamp <= current_timestamp - interval '10 hour'
where: _inserted_timestamp <= SYSDATE() - interval '2 hours'
- name: TX_HASH
description: "{{ doc('tx_hash')}}"
@ -37,20 +27,17 @@ models:
- name: FROM_ADDRESS
description: "{{ doc('from_address')}}"
- name: RN
description: "Row number"
- name: TO_ADDRESS
description: "{{ doc('to_address')}}"
- name: RN
description: "Row number"
- name: MEMO
description: "{{ doc('memo')}}"
- name: AMOUNT_RAW
description: "{{ doc('amount_raw')}}"
- name: AMOUNT_RAW_PRECISE
description: "{{ doc('amount_adj')}}"
- name: AMOUNT_UNADJ
description: "{{ doc('amount_unadj')}}"
- name: TRANSFER_TYPE
description: "{{ doc('transfer_type')}}"
@ -61,8 +48,11 @@ models:
- name: _MODIFIED_TIMESTAMP
description: "{{ doc('_modified_timestamp')}}"
- name: TRANSFERS_ID
- name: TRANSFERS_COMPLETE_ID
description: "{{doc('id')}}"
tests:
- not_null
- unique
- name: INSERTED_TIMESTAMP
description: "{{doc('inserted_timestamp')}}"