Merge pull request #459 from FlipsideCrypto/quickfix-incr-logic-fix

quickfix/incr logic - ft transfers method+
This commit is contained in:
Jack Forgash 2025-05-02 10:41:42 -06:00 committed by GitHub
commit f3244e35da
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 171 additions and 42 deletions

View File

@ -8,6 +8,60 @@
tags = ['curated', 'scheduled_non_core'],
) }}
{% if execute %}
{% if is_incremental() and not var("MANUAL_FIX") %}
{% do log("Incremental and not MANUAL_FIX", info=True) %}
{% set max_mod_query %}
SELECT
MAX(modified_timestamp) modified_timestamp
FROM
{{ this }}
{% endset %}
{% set max_mod = run_query(max_mod_query) [0] [0] %}
{% if not max_mod or max_mod == 'None' %}
{% set max_mod = '2099-01-01' %}
{% endif %}
{% do log("max_mod: " ~ max_mod, info=True) %}
{% set min_block_date_query %}
SELECT
MIN(
block_timestamp :: DATE
)
FROM
(
SELECT
MIN(block_timestamp) block_timestamp
FROM
{{ ref('core__ez_actions') }} A
WHERE
modified_timestamp >= '{{max_mod}}'
UNION ALL
SELECT
MIN(block_timestamp) block_timestamp
FROM
{{ ref('silver__token_transfer_native') }} A
WHERE
modified_timestamp >= '{{max_mod}}'
)
{% endset %}
{% set min_bd = run_query(min_block_date_query) [0] [0] %}
{% if not min_bd or min_bd == 'None' %}
{% set min_bd = '2099-01-01' %}
{% endif %}
{% do log("min_bd: " ~ min_bd, info=True) %}
{% endif %}
{% endif %}
WITH
lockup_actions AS (
SELECT
@ -21,7 +75,8 @@ lockup_actions AS (
action_data,
tx_succeeded,
receipt_succeeded,
_partition_by_block_number
_partition_by_block_number,
modified_timestamp
FROM
{{ ref('core__ez_actions') }}
WHERE
@ -42,13 +97,8 @@ lockup_actions AS (
{% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer') }}
{% else %}
{% if is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% if is_incremental() %}
AND block_timestamp :: DATE >= '{{min_bd}}'
{% endif %}
{% endif %}
@ -61,20 +111,15 @@ xfers AS (
block_id,
amount_unadj :: INT AS deposit,
_partition_by_block_number,
_inserted_timestamp
modified_timestamp
FROM
{{ ref('silver__token_transfer_native') }}
{% if var("MANUAL_FIX") %}
WHERE {{ partition_load_manual('no_buffer') }}
{% else %}
{% if is_incremental() %}
WHERE modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% if is_incremental() %}
WHERE block_timestamp :: DATE >= '{{min_bd}}'
{% endif %}
{% endif %}
),
@ -98,7 +143,8 @@ agg_arguments AS (
COUNT(
DISTINCT action_data :method_name :: STRING
) AS method_count,
MIN(_partition_by_block_number) AS _partition_by_block_number
MIN(_partition_by_block_number) AS _partition_by_block_number,
MIN(modified_timestamp) AS modified_timestamp
FROM
lockup_actions
GROUP BY
@ -106,22 +152,19 @@ agg_arguments AS (
),
lockup_xfers AS (
SELECT
tx_hash,
receipt_id,
block_timestamp,
block_id,
deposit,
_partition_by_block_number,
_inserted_timestamp
xfers.tx_hash,
xfers.receipt_id,
xfers.block_timestamp,
xfers.block_id,
xfers.deposit,
xfers._partition_by_block_number,
xfers.modified_timestamp
FROM
xfers
LEFT JOIN lockup_actions l
ON xfers.tx_hash = l.tx_hash
WHERE
tx_hash IN (
SELECT
DISTINCT tx_hash
FROM
lockup_actions
)
l.tx_hash IS NOT NULL
),
parse_args_json AS (
SELECT
@ -164,6 +207,13 @@ parse_args_json AS (
agg_arguments A
LEFT JOIN lockup_xfers x
ON A.receipt_ids :new :: STRING = x.receipt_id
{% if is_incremental() and not var("MANUAL_FIX") %}
WHERE
GREATEST(
COALESCE(A.modified_timestamp, '1970-01-01'),
COALESCE(x.modified_timestamp, '1970-01-01')
) >= '{{max_mod}}'
{% endif %}
),
FINAL AS (
SELECT

View File

@ -8,6 +8,61 @@
tags = ['curated','scheduled_non_core']
) }}
{% if execute %}
{% if is_incremental() and not var("MANUAL_FIX") %}
{% do log("Incremental and not MANUAL_FIX", info=True) %}
{% set max_mod_query %}
SELECT
MAX(modified_timestamp) modified_timestamp
FROM
{{ this }}
{% endset %}
{% set max_mod = run_query(max_mod_query) [0] [0] %}
{% if not max_mod or max_mod == 'None' %}
{% set max_mod = '2099-01-01' %}
{% endif %}
{% do log("max_mod: " ~ max_mod, info=True) %}
{% set min_block_date_query %}
SELECT
MIN(
block_timestamp :: DATE
)
FROM
(
SELECT
MIN(block_timestamp) block_timestamp
FROM
{{ ref('core__ez_actions') }} A
WHERE
modified_timestamp >= '{{max_mod}}'
UNION ALL
SELECT
MIN(block_timestamp) block_timestamp
FROM
{{ ref('silver__logs_s3') }} A
WHERE
modified_timestamp >= '{{max_mod}}'
)
{% endset %}
{% set min_bd = run_query(min_block_date_query) [0] [0] %}
{% if not min_bd or min_bd == 'None' %}
{% set min_bd = '2099-01-01' %}
{% endif %}
{% do log("min_bd: " ~ min_bd, info=True) %}
{% endif %}
{% endif %}
WITH ft_transfer_actions AS (
SELECT
block_id,
@ -19,7 +74,8 @@ WITH ft_transfer_actions AS (
receipt_predecessor_id AS predecessor_id,
receipt_signer_id AS signer_id,
receipt_succeeded,
_partition_by_block_number
_partition_by_block_number,
modified_timestamp
FROM
{{ ref('core__ez_actions') }}
WHERE
@ -31,12 +87,30 @@ WITH ft_transfer_actions AS (
AND {{ partition_load_manual('no_buffer') }}
{% else %}
{% if is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
AND block_timestamp :: DATE >= '{{min_bd}}'
{% endif %}
{% endif %}
),
logs AS (
SELECT
block_id,
block_timestamp,
tx_hash,
receipt_id,
log_index,
clean_log AS log_value,
_partition_by_block_number,
modified_timestamp
FROM
{{ ref('silver__logs_s3') }}
WHERE
receipt_succeeded
{% if var("MANUAL_FIX") %}
AND
{{ partition_load_manual('no_buffer') }}
{% else %}
{% if is_incremental() %}
AND block_timestamp :: DATE >= '{{min_bd}}'
{% endif %}
{% endif %}
),
@ -47,19 +121,24 @@ ft_transfer_logs AS (
l.tx_hash,
l.receipt_id,
l.log_index,
l.clean_log AS log_value,
l.log_value,
l._partition_by_block_number,
a.contract_address,
a.predecessor_id,
a.signer_id,
a.action_index
FROM
{{ ref('silver__logs_s3') }} l
INNER JOIN ft_transfer_actions a
logs l
INNER JOIN ft_transfer_actions a
ON l.tx_hash = a.tx_hash
AND l.receipt_id = a.receipt_id
WHERE
l.receipt_succeeded
{% if is_incremental() and not var("MANUAL_FIX") %}
WHERE
GREATEST(
COALESCE(l.modified_timestamp, '1970-01-01'),
COALESCE(a.modified_timestamp, '1970-01-01')
) >= '{{max_mod}}'
{% endif %}
),
ft_transfers_final AS (
SELECT