Merge pull request #287 from FlipsideCrypto/upd-models

add _partition col and manual fix logic to transfers models
This commit is contained in:
Jack Forgash 2024-04-30 16:31:00 -06:00 committed by GitHub
commit 4e48007ffe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 61 additions and 27 deletions

View File

@ -18,12 +18,16 @@ WITH actions_events AS (
receiver_id,
logs,
_inserted_timestamp,
modified_timestamp as _modified_timestamp
modified_timestamp as _modified_timestamp,
_partition_by_block_number
FROM
{{ ref('silver__actions_events_function_call_s3') }}
WHERE
receipt_succeeded = TRUE
AND logs [0] IS NOT NULL
{% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer') }}
{% else %}
{% if is_incremental() %}
AND modified_timestamp >= (
SELECT
@ -32,6 +36,7 @@ WITH actions_events AS (
{{ this }}
)
{% endif %}
{% endif %}
),
-------------------------------- NFT Transfers --------------------------------
@ -46,7 +51,8 @@ nft_logs AS (
receiver_id AS contract_id,
b.index as logs_rn,
_inserted_timestamp,
_modified_timestamp
_modified_timestamp,
_partition_by_block_number
FROM
actions_events
JOIN LATERAL FLATTEN(
@ -78,7 +84,8 @@ nft_transfers AS (
token_ids [0] :: STRING AS token_id,
logs_rn + A.index as transfer_rn,
_inserted_timestamp,
_modified_timestamp
_modified_timestamp,
_partition_by_block_number
FROM
nft_logs
JOIN LATERAL FLATTEN(
@ -100,7 +107,8 @@ nft_final AS (
B.value :: STRING AS token_id,
transfer_rn + B.index as rn,
_inserted_timestamp,
_modified_timestamp
_modified_timestamp,
_partition_by_block_number
FROM
nft_transfers
JOIN LATERAL FLATTEN(
@ -119,7 +127,8 @@ FINAL AS (
to_address,
token_id,
_inserted_timestamp,
_modified_timestamp
_modified_timestamp,
_partition_by_block_number
FROM
nft_final
)

View File

@ -24,12 +24,16 @@ WITH actions_events AS (
logs,
receipt_succeeded,
_inserted_timestamp,
modified_timestamp as _modified_timestamp
modified_timestamp as _modified_timestamp,
_partition_by_block_number
FROM
{{ ref('silver__actions_events_function_call_s3') }}
WHERE
receipt_succeeded = TRUE
AND logs [0] IS NOT NULL
{% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer') }}
{% else %}
{% if is_incremental() %}
AND _modified_timestamp >= (
SELECT
@ -38,6 +42,7 @@ WITH actions_events AS (
{{ this }}
)
{% endif %}
{% endif %}
),
swaps_raw AS (
SELECT
@ -53,18 +58,23 @@ swaps_raw AS (
amount_in_raw,
amount_out_raw,
_inserted_timestamp,
modified_timestamp AS _modified_timestamp
modified_timestamp AS _modified_timestamp,
_partition_by_block_number
FROM
{{ ref('silver__dex_swaps_v2') }}
{% if is_incremental() %}
WHERE
_modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
{% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer') }}
{% else %}
{% if is_incremental() %}
WHERE
_modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
{% endif %}
),
---------------------------- Native Token Transfers ------------------------------
native_transfers AS (
@ -80,11 +90,15 @@ native_transfers AS (
--numeric validation (there are some exceptions that needs to be ignored)
receipt_succeeded,
_inserted_timestamp,
_modified_timestamp
_modified_timestamp,
_partition_by_block_number
FROM
{{ ref('silver__transfers_s3') }}
WHERE
status = TRUE AND deposit != 0
{% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer') }}
{% else %}
{% if is_incremental() %}
AND inserted_timestamp >= (
SELECT
@ -93,6 +107,7 @@ native_transfers AS (
{{ this }}
)
{% endif %}
{% endif %}
),
------------------------------ NEAR Tokens (NEP 141) --------------------------------
swaps AS (
@ -108,7 +123,8 @@ swaps AS (
'swap' AS memo,
swap_index as rn,
_inserted_timestamp,
_modified_timestamp
_modified_timestamp,
_partition_by_block_number
FROM
swaps_raw
UNION ALL
@ -124,7 +140,8 @@ swaps AS (
'swap' AS memo,
swap_index + 1 as rn,
_inserted_timestamp,
_modified_timestamp
_modified_timestamp,
_partition_by_block_number
FROM
swaps_raw
),
@ -139,7 +156,8 @@ orders AS (
DATA :event :: STRING AS event,
g.index as rn,
_inserted_timestamp,
_modified_timestamp
_modified_timestamp,
_partition_by_block_number
FROM
actions_events
JOIN LATERAL FLATTEN(
@ -163,7 +181,8 @@ orders_final AS (
'order' AS memo,
f.index as rn,
_inserted_timestamp,
_modified_timestamp
_modified_timestamp,
_partition_by_block_number
FROM
orders
JOIN LATERAL FLATTEN(
@ -199,7 +218,8 @@ add_liquidity AS (
'add_liquidity' AS memo,
index as rn,
_inserted_timestamp,
_modified_timestamp
_modified_timestamp,
_partition_by_block_number
FROM
actions_events,
LATERAL FLATTEN (
@ -224,7 +244,8 @@ ft_transfers_mints AS (
b.index as logs_rn,
receiver_id AS contract_address,
_inserted_timestamp,
_modified_timestamp
_modified_timestamp,
_partition_by_block_number
FROM
actions_events
JOIN LATERAL FLATTEN(
@ -255,7 +276,8 @@ ft_transfers_mints_final AS (
f.value :memo :: STRING AS memo,
logs_rn + f.index as rn,
_inserted_timestamp,
_modified_timestamp
_modified_timestamp,
_partition_by_block_number
FROM
ft_transfers_mints
JOIN LATERAL FLATTEN(
@ -302,7 +324,8 @@ native_final AS (
amount_unadjusted :: STRING AS amount_raw,
amount_unadjusted :: FLOAT AS amount_raw_precise,
_inserted_timestamp,
_modified_timestamp
_modified_timestamp,
_partition_by_block_number
FROM
native_transfers
),
@ -322,7 +345,8 @@ nep_final AS (
amount_unadjusted :: STRING AS amount_raw,
amount_unadjusted :: FLOAT AS amount_raw_precise,
_inserted_timestamp,
_modified_timestamp
_modified_timestamp,
_partition_by_block_number
FROM
nep_transfers
),
@ -354,7 +378,8 @@ FINAL AS (
amount_raw_precise,
transfer_type,
_inserted_timestamp,
_modified_timestamp
_modified_timestamp,
_partition_by_block_number
FROM
transfer_union