add createpool add inc predicates

This commit is contained in:
Eric Laurello 2025-05-28 10:20:14 -04:00
parent 2d26ec2110
commit a0bcf39ccc
2 changed files with 208 additions and 46 deletions

View File

@ -0,0 +1,134 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = "block_number",
tags = ['silver_bridge','defi','bridge','curated']
) }}
WITH base_contracts AS (
SELECT
tx_hash,
block_number,
block_timestamp,
from_address,
to_address AS contract_address,
concat_ws(
'-',
block_number,
tx_position,
CONCAT(
TYPE,
'_',
trace_address
)
) AS _call_id,
modified_timestamp AS _inserted_timestamp
FROM
{{ ref('core_evm__fact_traces') }}
WHERE
from_address = '0x1d7c6783328c145393e84fb47a7f7c548f5ee28d'
AND TYPE ILIKE 'create%'
AND tx_succeeded
AND trace_succeeded
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '12 hours'
FROM
{{ this }}
)
AND to_address NOT IN (
SELECT
DISTINCT pool_address
FROM
{{ this }}
)
{% endif %}
),
function_sigs AS (
SELECT
'0xfc0c546a' AS function_sig,
'token' AS function_name
),
inputs AS (
SELECT
contract_address,
block_number,
function_sig,
function_name,
0 AS function_input,
CONCAT(
function_sig,
LPAD(
function_input,
64,
0
)
) AS DATA
FROM
base_contracts
JOIN function_sigs
ON 1 = 1
),
contract_reads AS (
SELECT
contract_address,
block_number,
function_sig,
function_name,
function_input,
DATA,
utils.udf_json_rpc_call(
'eth_call',
[{ 'to': contract_address, 'from': null, 'data': data }, utils.udf_int_to_hex(block_number) ]
) AS rpc_request,
live.udf_api(
'POST',
CONCAT(
'{service}',
'/',
'{Authentication}'
),{},
rpc_request,
'Vault/prod/ethereum/quicknode/mainnet'
) AS read_output,
SYSDATE() AS _inserted_timestamp
FROM
inputs
),
reads_flat AS (
SELECT
read_output,
read_output :data :id :: STRING AS read_id,
read_output :data :result :: STRING AS read_result,
SPLIT(
read_id,
'-'
) AS read_id_object,
function_sig,
function_name,
function_input,
DATA,
contract_address,
block_number,
_inserted_timestamp
FROM
contract_reads
)
SELECT
read_output,
read_id,
read_result,
read_id_object,
function_sig,
function_name,
function_input,
DATA,
block_number,
contract_address AS pool_address,
CONCAT('0x', SUBSTR(read_result, 27, 40)) AS token_address,
_inserted_timestamp
FROM
reads_flat

View File

@ -1,7 +1,9 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
cluster_by = ['_inserted_timestamp::date'],
incremental_strategy = 'merge',
merge_exclude_columns = ['inserted_timestamp'],
incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"],
cluster_by = ['modified_timestamp::date'],
unique_key = 'tx_id',
tags = ['bridge', 'scheduled', 'streamline_scheduled', 'scheduled_non_core', 'stargate']
) }}
@ -44,15 +46,14 @@ WITH events AS (
AND event_data IS NOT NULL
{% if is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
AND modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
-- Process OFTSent events (outbound transfers)
oft_sent_events AS (
SELECT
@ -61,13 +62,18 @@ oft_sent_events AS (
block_height,
event_index,
event_contract AS bridge_contract,
event_data:amountSentLD::DOUBLE AS sent_amount,
event_data:amountReceivedLD::DOUBLE AS received_amount,
COALESCE(sent_amount - received_amount, 0) AS fee_amount,
LOWER(event_data:fromAddress::STRING) AS flow_wallet_address,
event_data:dstEid::NUMBER AS dst_endpoint_id,
event_data :amountSentLD :: DOUBLE AS sent_amount,
event_data :amountReceivedLD :: DOUBLE AS received_amount,
COALESCE(
sent_amount - received_amount,
0
) AS fee_amount,
LOWER(
event_data :fromAddress :: STRING
) AS flow_wallet_address,
event_data :dstEid :: NUMBER AS dst_endpoint_id,
30362 AS src_endpoint_id,
event_data:guid::STRING AS transfer_guid,
event_data :guid :: STRING AS transfer_guid,
'outbound' AS direction,
'stargate' AS bridge,
_inserted_timestamp
@ -76,7 +82,6 @@ oft_sent_events AS (
WHERE
event_type = 'OFTSent'
),
-- Process OFTReceived events (inbound transfers)
oft_received_events AS (
SELECT
@ -85,13 +90,15 @@ oft_received_events AS (
block_height,
event_index,
event_contract AS bridge_contract,
event_data:amountReceivedLD::DOUBLE AS received_amount,
event_data :amountReceivedLD :: DOUBLE AS received_amount,
0 AS fee_amount,
received_amount AS net_amount,
LOWER(event_data:toAddress::STRING) AS flow_wallet_address,
LOWER(
event_data :toAddress :: STRING
) AS flow_wallet_address,
NULL AS dst_endpoint_id,
event_data:srcEid::NUMBER AS src_endpoint_id,
event_data:guid::STRING AS transfer_guid,
event_data :srcEid :: NUMBER AS src_endpoint_id,
event_data :guid :: STRING AS transfer_guid,
'inbound' AS direction,
'stargate' AS bridge,
_inserted_timestamp
@ -100,7 +107,6 @@ oft_received_events AS (
WHERE
event_type = 'OFTReceived'
),
combined_events AS (
SELECT
tx_id,
@ -120,9 +126,7 @@ combined_events AS (
_inserted_timestamp
FROM
oft_sent_events
UNION ALL
SELECT
tx_id,
block_timestamp,
@ -142,50 +146,69 @@ combined_events AS (
FROM
oft_received_events
),
-- Join with token transfer data to get token information
token_transfers AS (
SELECT
tx_hash AS tx_id,
contract_address AS token_address,
name AS token_name,
raw_amount AS amount,
NAME AS token_name,
symbol AS token_symbol,
decimals,
event_index AS token_event_index,
ROW_NUMBER() OVER (PARTITION BY tx_hash ORDER BY event_index) AS rn
ROW_NUMBER() over (
PARTITION BY tx_hash
ORDER BY
event_index
) AS rn
FROM
{{ ref('core_evm__ez_token_transfers') }}
WHERE
tx_hash IN (SELECT tx_id FROM combined_events)
tx_hash IN (
SELECT
tx_id
FROM
combined_events
)
{% if is_incremental() %}
AND block_timestamp :: DATE >= '{{min_block_date}}'
{% endif %}
),
endpoint_ids AS (
SELECT endpoint_id, LOWER(blockchain) AS blockchain
FROM {{ ref('seeds__layerzero_endpoint_ids') }}
SELECT
endpoint_id,
LOWER(blockchain) AS blockchain
FROM
{{ ref('seeds__layerzero_endpoint_ids') }}
),
final AS (
FINAL AS (
SELECT
ce.tx_id,
ce.block_timestamp,
ce.block_height,
ce.bridge_contract AS bridge_address,
COALESCE(tt.token_address, NULL) AS token_address,
COALESCE(
tt.token_address,
NULL
) AS token_address,
ce.gross_amount,
ce.fee_amount AS amount_fee,
ce.net_amount,
ce.flow_wallet_address,
CASE
WHEN ce.direction = 'outbound' THEN 'flow_evm'
ELSE COALESCE(src.blockchain, 'other_chains')
CASE
WHEN ce.direction = 'outbound' THEN 'flow_evm'
ELSE COALESCE(
src.blockchain,
'other_chains'
)
END AS source_chain,
CASE
WHEN ce.direction = 'inbound' THEN 'flow_evm'
ELSE COALESCE(dst.blockchain, 'other_chains')
CASE
WHEN ce.direction = 'inbound' THEN 'flow_evm'
ELSE COALESCE(
dst.blockchain,
'other_chains'
)
END AS destination_chain,
ce.direction,
ce.bridge AS platform,
@ -197,10 +220,15 @@ final AS (
'{{ invocation_id }}' AS _invocation_id
FROM
combined_events ce
LEFT JOIN token_transfers tt ON ce.tx_id = tt.tx_id AND ce.gross_amount = tt.amount
LEFT JOIN endpoint_ids src ON src.endpoint_id = ce.src_endpoint_id
LEFT JOIN endpoint_ids dst ON dst.endpoint_id = ce.dst_endpoint_id
LEFT JOIN token_transfers tt
ON ce.tx_id = tt.tx_id
AND ce.gross_amount = tt.amount
LEFT JOIN endpoint_ids src
ON src.endpoint_id = ce.src_endpoint_id
LEFT JOIN endpoint_ids dst
ON dst.endpoint_id = ce.dst_endpoint_id
)
SELECT *
FROM final
SELECT
*
FROM
FINAL