Merge pull request #492 from FlipsideCrypto/AN-6447-Intents-ez-bridging

AN-6447 Intents-ez-bridging
This commit is contained in:
Stanley 2025-08-11 22:31:18 +07:00 committed by GitHub
commit 7ceabc8d52
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 1017 additions and 67 deletions

3
.gitignore vendored
View File

@ -24,3 +24,6 @@ local*
.cursorignore
.cursorrules
.cursor/mcp.json
CLAUDE.local.md
__pycache__/

View File

@ -0,0 +1,5 @@
{% docs _invocation_id %}
Unique identifier for the dbt invocation that created or last updated this record. This field is used for tracking and debugging purposes, allowing identification of which dbt run processed each record.
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs bridge_address %}
The address of the bridge contract that facilitated the cross-chain transfer. This field identifies the specific bridge protocol or contract responsible for executing the bridge intent operation, allowing for analysis of bridge usage patterns and protocol-specific metrics.
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs token_address %}
The parsed and standardized token contract address extracted from the raw token identifier. This field contains the clean contract address without blockchain prefixes or additional formatting, making it suitable for joins with other token-related tables and cross-chain analysis.
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs token_address_raw %}
The raw token identifier as it appears on-chain, including the full token ID with blockchain prefix and contract address. For NEAR tokens, this includes the full token ID format (e.g., 'nep141:token.near.omft.near'). This field preserves the original token representation before any parsing or standardization is applied.
{% enddocs %}

View File

@ -33,43 +33,57 @@ labels AS (
SELECT
asset_identifier AS contract_address,
name,
source_chain,
crosschain_token_contract,
near_token_contract,
symbol,
decimals
decimals
FROM
{{ ref('silver__ft_contract_metadata') }}
WHERE crosschain_token_contract IS NOT NULL
QUALIFY(ROW_NUMBER() OVER (
PARTITION BY asset_identifier
ORDER BY asset_identifier
) = 1)
),
prices AS (
SELECT
DATE_TRUNC(
'hour',
hour
) AS block_timestamp,
token_address AS contract_address,
AVG(price) AS price_usd,
MAX(symbol) AS symbol,
MAX(is_verified) AS token_is_verified
FROM
{{ ref('silver__complete_token_prices') }}
GROUP BY
1,
2
),
prices_mapping AS (
SELECT
block_timestamp,
CASE
WHEN contract_address = '0xf7413489c474ca4399eee604716c72879eea3615' THEN 'apys.token.a11bd.near'
WHEN contract_address = '0x3294395e62f4eb6af3f1fcf89f5602d90fb3ef69' THEN 'celo.token.a11bd.near'
WHEN contract_address = '0xd2877702675e6ceb975b4a1dff9fb7baf4c91ea9' THEN 'luna.token.a11bd.near'
WHEN contract_address = '0xa47c8bf37f92abed4a126bda807a7b7498661acd' THEN 'ust.token.a11bd.near'
WHEN contract_address = '0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2' THEN 'aurora'
ELSE contract_address
END AS contract_address,
prices_crosschain AS (
SELECT DISTINCT
token_address,
blockchain,
symbol,
price_usd,
token_is_verified
price,
decimals,
is_native,
is_verified,
hour
FROM
prices
{{ source('crosschain_price', 'ez_prices_hourly') }}
WHERE
NOT is_native
AND is_verified
QUALIFY(ROW_NUMBER() OVER (
PARTITION BY token_address, DATE_TRUNC('hour', hour)
ORDER BY hour DESC
) = 1)
),
prices_native AS (
SELECT DISTINCT
'native' AS token_address,
symbol,
price,
decimals,
is_native,
is_verified,
hour
FROM
{{ source('crosschain_price', 'ez_prices_hourly') }}
WHERE
is_native
AND is_verified
QUALIFY(ROW_NUMBER() OVER (
PARTITION BY name, DATE_TRUNC('hour', hour)
ORDER BY hour DESC
) = 1)
),
FINAL AS (
SELECT
@ -78,14 +92,14 @@ FINAL AS (
b.tx_hash,
COALESCE(w.near_contract_address, b.token_address) AS token_address,
b.amount_unadj,
b.amount_adj,
COALESCE(w.symbol, l1.symbol) as symbol,
COALESCE(w.symbol, l1.symbol, p1.symbol, p2.symbol) as symbol,
b.amount_adj / pow(
10,
l1.decimals
COALESCE(l1.decimals, p1.decimals, p2.decimals)
) AS amount,
amount * p1.price_usd AS amount_usd,
p1.token_is_verified AS token_is_verified,
COALESCE(p1.price, p2.price) AS price,
amount * COALESCE(p1.price, p2.price) AS amount_usd,
COALESCE(p1.is_verified, p2.is_verified, FALSE) AS token_is_verified,
b.destination_address,
b.source_address,
b.platform,
@ -101,14 +115,20 @@ FINAL AS (
FROM fact_bridging b
LEFT JOIN {{ ref('seeds__portalbridge_tokenids') }} w
ON b.token_address = w.wormhole_contract_address
LEFT JOIN labels l1
ON COALESCE(w.near_contract_address, b.token_address) = l1.contract_address
LEFT JOIN prices_mapping p1
ON COALESCE(w.near_contract_address, b.token_address) = p1.contract_address
AND DATE_TRUNC('hour', b.block_timestamp) = p1.block_timestamp
LEFT JOIN labels l1 ON (
COALESCE(w.near_contract_address, b.token_address) = l1.contract_address
)
LEFT JOIN prices_crosschain p1 ON (
COALESCE(l1.crosschain_token_contract, b.token_address) = p1.token_address
AND DATE_TRUNC('hour', b.block_timestamp) = DATE_TRUNC('hour', p1.hour)
)
LEFT JOIN prices_native p2 ON (
UPPER(l1.symbol) = UPPER(p2.symbol)
AND l1.crosschain_token_contract = p2.token_address
AND DATE_TRUNC('hour', b.block_timestamp) = DATE_TRUNC('hour', p2.hour)
)
)
SELECT
*,
COALESCE(token_is_verified, FALSE) AS token_is_verified
*
FROM
FINAL

View File

@ -39,12 +39,6 @@ models:
- not_null:
where: receipt_succeeded
- name: AMOUNT_ADJ
description: "{{ doc('amount_adj')}}"
tests:
- not_null:
where: receipt_succeeded
- name: SYMBOL
description: "{{ doc('symbol')}}"

View File

@ -27,6 +27,34 @@ WITH dex_swaps AS (
FROM
{{ ref('silver__dex_swaps_v2') }}
),
intents_swaps AS (
SELECT
tx_hash,
receipt_id,
block_id,
block_timestamp,
receiver_id,
signer_id,
swap_index,
amount_out_raw,
token_out,
amount_in_raw,
token_in,
swap_input_data,
log AS LOG,
intents_swap_id AS ez_dex_swaps_id,
inserted_timestamp,
modified_timestamp
FROM
{{ ref('silver__swap_intents') }}
),
all_swaps AS (
SELECT * FROM dex_swaps
UNION ALL
SELECT * FROM intents_swaps
),
labels AS (
SELECT
asset_identifier AS contract_address,
@ -85,7 +113,7 @@ FINAL AS (
s.inserted_timestamp,
s.modified_timestamp
FROM
dex_swaps s
all_swaps s
LEFT JOIN labels l1
ON s.token_out = l1.contract_address
LEFT JOIN labels l2

View File

@ -14,6 +14,7 @@
-- depends on {{ ref('silver__bridge_multichain') }}
-- depends on {{ ref('silver__bridge_allbridge') }}
-- depends on {{ ref('silver__bridge_omni') }}
-- depends on {{ ref('silver__bridge_intents') }}
{% if execute %}
@ -182,6 +183,36 @@ omni AS (
FROM
{{ ref('silver__bridge_omni') }}
),
intents AS (
SELECT
block_id,
block_timestamp,
tx_hash,
token_address,
amount_unadj AS amount_raw,
amount_adj,
destination_address,
source_address,
platform,
bridge_address,
destination_chain,
source_chain,
method_name,
direction,
receipt_succeeded,
bridge_intents_id AS fact_bridge_activity_id,
inserted_timestamp,
modified_timestamp
FROM
{{ ref('silver__bridge_intents') }}
{% if var('MANUAL_FIX') %}
WHERE {{ partition_load_manual('no_buffer', 'floor(block_id, -3)') }}
{% else %}
{% if is_incremental() %}
WHERE modified_timestamp > '{{ max_mod }}'
{% endif %}
{% endif %}
),
FINAL AS (
SELECT
*
@ -207,6 +238,11 @@ FINAL AS (
*
FROM
omni
UNION ALL
SELECT
*
FROM
intents
)
SELECT
block_id,

View File

@ -10,6 +10,74 @@
meta ={ 'database_tags':{ 'table':{ 'PURPOSE': 'DEFI, SWAPS' }} },
) }}
WITH dex_swaps AS (
SELECT
tx_hash,
receipt_id,
block_id,
block_timestamp,
receiver_id,
signer_id,
swap_index,
amount_out_raw,
token_out,
amount_in_raw,
token_in,
swap_input_data,
LOG,
dex_swaps_v2_id AS fact_dex_swaps_id,
modified_timestamp
FROM
{{ ref('silver__dex_swaps_v2') }}
{% if var("MANUAL_FIX") %}
WHERE {{ partition_load_manual('no_buffer', 'floor(block_id, -3)') }}
{% else %}
{% if is_incremental() %}
WHERE modified_timestamp > (
SELECT MAX(modified_timestamp) FROM {{ this }}
)
{% endif %}
{% endif %}
),
intents_swaps AS (
SELECT
tx_hash,
receipt_id,
block_id,
block_timestamp,
receiver_id,
signer_id,
swap_index,
amount_out_raw,
token_out,
amount_in_raw,
token_in,
swap_input_data,
log AS LOG,
intents_swap_id AS fact_dex_swaps_id,
modified_timestamp
FROM
{{ ref('silver__swap_intents') }}
{% if var("MANUAL_FIX") %}
WHERE {{ partition_load_manual('no_buffer', 'block_timestamp::date') }}
{% else %}
{% if is_incremental() %}
WHERE modified_timestamp > (
SELECT MAX(modified_timestamp) FROM {{ this }}
)
{% endif %}
{% endif %}
),
FINAL AS (
SELECT * FROM dex_swaps
UNION ALL
SELECT * FROM intents_swaps
)
SELECT
tx_hash,
receipt_id,
@ -24,18 +92,8 @@ SELECT
token_in,
swap_input_data,
LOG,
dex_swaps_v2_id AS fact_dex_swaps_id,
fact_dex_swaps_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp
FROM
{{ ref('silver__dex_swaps_v2') }}
{% if var("MANUAL_FIX") %}
WHERE {{ partition_load_manual('no_buffer', 'floor(block_id, -3)') }}
{% else %}
{% if is_incremental() %}
WHERE modified_timestamp > (
SELECT MAX(modified_timestamp) FROM {{ this }}
)
{% endif %}
{% endif %}
FINAL

View File

@ -149,7 +149,7 @@ swap_outcome AS (
LOG,
'.*Swapped (\\d+) (.*) for (\\d+) (.*)',
'\\1'
) :: INT AS amount_in_raw,
) :: NUMERIC(38,0) AS amount_in_raw,
REGEXP_REPLACE(
LOG,
'.*Swapped \\d+ (\\S+) for (\\d+) (.*)',
@ -159,7 +159,7 @@ swap_outcome AS (
LOG,
'.*Swapped \\d+ \\S+ for (\\d+) (.*)',
'\\1'
) :: INT AS amount_out_raw,
) :: NUMERIC(38,0) AS amount_out_raw,
REGEXP_REPLACE(
LOG,
'.*Swapped \\d+ \\S+ for \\d+ (.*)',
@ -187,9 +187,9 @@ rhea_swap_outcome AS (
log_index ASC
) - 1 AS swap_index, -- keeping this as fallback but Rhea logs typically contain single swap / receipt
clean_log AS LOG,
TRY_PARSE_JSON(clean_log):data[0]:amount_in::INT AS amount_in_raw,
TRY_PARSE_JSON(clean_log):data[0]:amount_in::NUMERIC(38,0) AS amount_in_raw,
TRY_PARSE_JSON(clean_log):data[0]:token_in::STRING AS token_in,
TRY_PARSE_JSON(clean_log):data[0]:amount_out::INT AS amount_out_raw,
TRY_PARSE_JSON(clean_log):data[0]:amount_out::NUMERIC(38,0) AS amount_out_raw,
TRY_PARSE_JSON(clean_log):data[0]:token_out::STRING AS token_out,
_partition_by_block_number,
modified_timestamp,
@ -347,4 +347,4 @@ SELECT
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
FINAL
FINAL

View File

@ -0,0 +1,442 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
incremental_predicates = ["dynamic_range_predicate_custom","block_timestamp::date"],
merge_exclude_columns = ["inserted_timestamp"],
unique_key = 'intents_swap_id',
cluster_by = ['block_timestamp::DATE'],
tags = ['scheduled_non_core']
) }}
-- depends on {{ ref('silver__logs_s3') }}
-- depends on {{ ref('core__ez_actions') }}
-- depends on {{ ref('defi__fact_intents') }}
WITH intents_actions AS (
SELECT
block_id,
block_timestamp,
tx_hash,
tx_succeeded,
tx_signer,
receipt_id,
receipt_predecessor_id,
receipt_receiver_id,
receipt_succeeded,
action_index,
action_name,
action_data,
action_data:method_name::STRING AS method_name,
action_data:args::STRING AS args,
actions_id,
inserted_timestamp,
modified_timestamp
FROM
{{ ref('core__ez_actions') }}
WHERE
receipt_receiver_id = 'intents.near'
AND receipt_succeeded
AND tx_succeeded
{% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer', 'block_timestamp::date') }}
{% else %}
{% if is_incremental() %}
AND modified_timestamp > (
SELECT MAX(modified_timestamp) FROM {{ this }}
)
{% endif %}
{% endif %}
),
intents_logs AS (
SELECT
tx_hash,
receipt_id,
block_id,
block_timestamp,
receiver_id,
predecessor_id,
signer_id,
log_index,
clean_log,
TRY_PARSE_JSON(clean_log) AS log_json,
TRY_PARSE_JSON(clean_log):event::STRING AS log_event,
TRY_PARSE_JSON(clean_log):standard::STRING AS log_standard,
receipt_succeeded,
inserted_timestamp,
modified_timestamp,
_partition_by_block_number
FROM
{{ ref('silver__logs_s3') }}
WHERE
receiver_id = 'intents.near'
AND receipt_succeeded
AND TRY_PARSE_JSON(clean_log):standard::STRING IN ('nep245', 'dip4')
{% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer', 'block_timestamp::date') }}
{% else %}
{% if is_incremental() %}
AND modified_timestamp > (
SELECT MAX(modified_timestamp) FROM {{ this }}
)
{% endif %}
{% endif %}
),
token_diff_events AS (
SELECT
tx_hash,
receipt_id,
block_id,
block_timestamp,
log_index,
log_json:data AS data_array,
evt.value:intent_hash::STRING AS intent_hash,
evt.value:account_id::STRING AS account_id,
evt.value:diff AS token_diff,
evt.value:referral::STRING AS referral,
evt.index AS event_index,
inserted_timestamp,
modified_timestamp,
_partition_by_block_number
FROM
intents_logs,
LATERAL FLATTEN(input => log_json:data) AS evt
WHERE
log_event = 'token_diff'
AND log_standard = 'dip4'
),
token_diff_flattened AS (
SELECT
tx_hash,
receipt_id,
block_id,
block_timestamp,
log_index,
intent_hash,
account_id,
referral,
event_index,
token_key.key::STRING AS token_id,
token_key.value::STRING AS token_amount_raw,
CASE
WHEN token_key.value::NUMERIC < 0 THEN 'token_in'
WHEN token_key.value::NUMERIC > 0 THEN 'token_out'
ELSE 'no_change'
END AS token_direction,
ABS(token_key.value::NUMERIC)::STRING AS abs_amount_raw,
inserted_timestamp,
modified_timestamp,
_partition_by_block_number
FROM
token_diff_events,
LATERAL FLATTEN(input => token_diff) AS token_key
WHERE
token_key.value::NUMERIC != 0
),
intents_events AS (
SELECT
tx_hash,
receipt_id,
block_id,
block_timestamp,
receiver_id,
predecessor_id,
log_event,
token_id,
owner_id,
old_owner_id,
new_owner_id,
amount_raw::STRING AS amount_raw,
token_id AS contract_address,
log_index,
log_event_index,
amount_index,
fact_intents_id,
inserted_timestamp,
modified_timestamp
FROM
{{ ref('defi__fact_intents') }}
WHERE
token_id IS NOT NULL
{% if var("MANUAL_FIX") %}
AND {{ partition_load_manual('no_buffer', 'block_timestamp::date') }}
{% else %}
{% if is_incremental() %}
AND modified_timestamp > (
SELECT MAX(modified_timestamp) FROM {{ this }}
)
{% endif %}
{% endif %}
),
intents_joined AS (
SELECT
e.tx_hash,
e.receipt_id,
e.block_id,
e.block_timestamp,
e.receiver_id,
e.predecessor_id,
e.log_event,
e.token_id,
e.owner_id,
e.old_owner_id,
e.new_owner_id,
e.amount_raw,
e.contract_address,
e.log_index,
e.log_event_index,
e.amount_index,
e.fact_intents_id,
e.inserted_timestamp,
e.modified_timestamp,
a.tx_signer,
a.action_name,
a.method_name,
a.args,
a.action_data,
l.clean_log,
l.log_json,
l.log_event AS parsed_log_event,
l.log_standard,
l._partition_by_block_number
FROM intents_events e
LEFT JOIN intents_actions a ON e.tx_hash = a.tx_hash AND e.receipt_id = a.receipt_id
LEFT JOIN intents_logs l ON e.tx_hash = l.tx_hash AND e.receipt_id = l.receipt_id
WHERE e.tx_hash IN (
SELECT DISTINCT tx_hash
FROM intents_logs
WHERE receiver_id = 'intents.near'
AND receipt_succeeded
AND TRY_PARSE_JSON(clean_log):event::STRING = 'token_diff'
)
),
intents_raw AS (
SELECT
ij.tx_hash,
ij.receipt_id,
ij.block_id,
ij.block_timestamp,
ij.receiver_id,
ij.predecessor_id,
ij.log_event,
ij.token_id,
ij.owner_id,
ij.old_owner_id,
ij.new_owner_id,
ij.amount_raw,
ij.contract_address,
ij.log_index,
ij.log_event_index,
ij.amount_index,
ij.fact_intents_id,
ij.inserted_timestamp,
ij.modified_timestamp,
-- Include joined data from actions and logs
ij.tx_signer,
ij.action_name,
ij.method_name,
ij.args,
ij.action_data,
ij.clean_log,
ij.log_json,
ij.parsed_log_event,
ij.log_standard,
ij._partition_by_block_number
FROM intents_joined ij
),
intents_order AS (
SELECT
tx_hash,
receipt_id,
block_id,
block_timestamp,
receiver_id,
predecessor_id,
token_id,
contract_address,
amount_raw::STRING AS amount_raw,
owner_id,
old_owner_id,
log_event,
log_index,
log_event_index,
amount_index,
inserted_timestamp,
modified_timestamp,
-- Include joined data from actions and logs
tx_signer,
action_name,
method_name,
args,
action_data,
clean_log,
log_json,
parsed_log_event,
log_standard,
_partition_by_block_number,
-- identify first and last tokens
ROW_NUMBER() OVER (
PARTITION BY tx_hash
ORDER BY log_index, log_event_index, amount_index
) AS token_order_asc,
ROW_NUMBER() OVER (
PARTITION BY tx_hash
ORDER BY log_index DESC, log_event_index DESC, amount_index DESC
) AS token_order_desc
FROM intents_raw
),
intents_swap AS (
SELECT
tx_hash,
-- Swap ordering
MAX(CASE WHEN token_order_asc = 1 THEN token_id END) AS token_in_id,
MAX(CASE WHEN token_order_desc = 1 THEN token_id END) AS token_out_id,
MAX(CASE WHEN token_order_asc = 1 THEN contract_address END) AS token_in_address,
MAX(CASE WHEN token_order_desc = 1 THEN contract_address END) AS token_out_address,
-- Total amount
SUM(CASE WHEN token_order_asc = 1 THEN amount_raw::NUMERIC ELSE 0 END)::STRING AS total_amount_in_raw,
SUM(CASE WHEN token_order_desc = 1 THEN amount_raw::NUMERIC ELSE 0 END)::STRING AS total_amount_out_raw
FROM intents_order
GROUP BY tx_hash
),
token_aggregated AS (
SELECT
tdf.tx_hash,
tdf.receipt_id,
tdf.intent_hash,
tdf.account_id,
tdf.referral,
tdf.token_id,
SUM(tdf.token_amount_raw::NUMERIC) AS net_amount,
MIN(tdf.block_id) AS block_id,
MIN(tdf.block_timestamp) AS block_timestamp,
MIN(tdf._partition_by_block_number) AS _partition_by_block_number
FROM token_diff_flattened tdf
WHERE tdf.intent_hash IS NOT NULL
GROUP BY tdf.tx_hash, tdf.receipt_id, tdf.intent_hash, tdf.account_id, tdf.referral, tdf.token_id
),
intents_token_flows AS (
SELECT
ta.tx_hash,
ta.receipt_id,
ta.intent_hash,
ta.account_id,
ta.referral,
MIN(ta.block_id) AS block_id,
MIN(ta.block_timestamp) AS block_timestamp,
MIN(ta.account_id) AS signer_id,
MIN(ta._partition_by_block_number) AS _partition_by_block_number,
MAX(il.clean_log) AS log,
MAX(il.log_standard) AS standard,
OBJECT_CONSTRUCT(
'intent_type', 'token_diff',
'intent_hash', ta.intent_hash,
'account_id', ta.account_id,
'referral', COALESCE(ta.referral, ''),
'standard', COALESCE(MAX(il.log_standard), ''),
'token_in', MAX(CASE WHEN ta.net_amount > 0 THEN ta.token_id END),
'token_out', MAX(CASE WHEN ta.net_amount < 0 THEN ta.token_id END),
'amount_in', MAX(CASE WHEN ta.net_amount > 0 THEN ta.net_amount END),
'amount_out', MAX(CASE WHEN ta.net_amount < 0 THEN ABS(ta.net_amount) END)
) AS swap_input_data,
MAX(CASE WHEN ta.net_amount > 0 THEN ta.token_id END) AS token_in,
MAX(CASE WHEN ta.net_amount > 0 THEN ta.net_amount END) AS amount_in_raw,
MAX(CASE WHEN ta.net_amount < 0 THEN ta.token_id END) AS token_out,
MAX(CASE WHEN ta.net_amount < 0 THEN ABS(ta.net_amount) END) AS amount_out_raw
FROM token_aggregated ta
LEFT JOIN intents_logs il ON ta.tx_hash = il.tx_hash AND ta.receipt_id = il.receipt_id AND il.log_event = 'token_diff'
GROUP BY ta.tx_hash, ta.receipt_id, ta.intent_hash, ta.account_id, ta.referral
),
intents_swaps_identified AS (
SELECT
tx_hash,
receipt_id,
block_id,
block_timestamp,
signer_id,
swap_input_data,
log,
_partition_by_block_number,
intent_hash,
account_id,
referral,
token_in,
amount_in_raw,
token_out,
amount_out_raw
FROM intents_token_flows
WHERE token_in IS NOT NULL
AND token_out IS NOT NULL
AND amount_in_raw > 0
AND amount_out_raw > 0
),
intents_mapped AS (
SELECT
tx_hash,
receipt_id,
block_id,
block_timestamp,
'intents.near' AS receiver_id,
signer_id,
intent_hash,
account_id,
referral,
ROW_NUMBER() OVER (PARTITION BY tx_hash ORDER BY intent_hash, receipt_id) - 1 AS swap_index,
amount_out_raw::NUMERIC(38,0) AS amount_out_raw,
token_out,
amount_in_raw::NUMERIC(38,0) AS amount_in_raw,
token_in,
swap_input_data,
log,
_partition_by_block_number
FROM intents_swaps_identified
WHERE token_in IS NOT NULL
AND token_out IS NOT NULL
AND amount_in_raw > 0
AND amount_out_raw > 0
AND intent_hash IS NOT NULL
),
FINAL AS (
SELECT
tx_hash,
receipt_id,
block_id,
block_timestamp,
receiver_id,
signer_id,
intent_hash,
account_id,
referral,
swap_index,
amount_out_raw,
token_out,
amount_in_raw,
token_in,
swap_input_data,
log,
_partition_by_block_number
FROM
intents_mapped
)
SELECT
*,
{{ dbt_utils.generate_surrogate_key(
['tx_hash', 'intent_hash', 'swap_index']
) }} AS intents_swap_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
FINAL

View File

@ -0,0 +1,100 @@
version: 2
models:
- name: silver__swap_intents
description: Silver layer model that processes intent-based swap events from NEAR blockchain, specifically tracking token exchanges through the Intents protocol on intents.near.
columns:
- name: BLOCK_ID
description: "{{ doc('block_id') }}"
tests:
- not_null
- name: BLOCK_TIMESTAMP
description: "{{ doc('block_timestamp') }}"
tests:
- not_null
- name: TX_HASH
description: "{{ doc('tx_hash') }}"
tests:
- not_null
- name: RECEIPT_ID
description: "{{ doc('receipt_id') }}"
tests:
- not_null
- name: RECEIVER_ID
description: "{{ doc('receiver_id') }}"
tests:
- not_null
- name: SIGNER_ID
description: "{{ doc('signer_id') }}"
tests:
- not_null
- name: INTENT_HASH
description: "Unique identifier for the intent transaction, used to track the specific swap intent"
tests:
- not_null
- name: ACCOUNT_ID
description: "Account ID of the user who initiated the intent swap"
tests:
- not_null
- name: REFERRAL
description: "Referral account ID for the swap transaction, if applicable"
- name: SWAP_INDEX
description: "Index of the swap within the transaction, starting from 0"
tests:
- not_null
- name: AMOUNT_OUT_RAW
description: "{{ doc('amount_unadj') }}"
tests:
- not_null
- name: TOKEN_OUT
description: "Token contract address for the output token (token being sent to user)"
tests:
- not_null
- name: AMOUNT_IN_RAW
description: "{{ doc('amount_unadj') }}"
tests:
- not_null
- name: TOKEN_IN
description: "Token contract address for the input token (token being received by platform)"
tests:
- not_null
- name: SWAP_INPUT_DATA
description: "JSON object containing detailed swap information including intent metadata, token details, and amounts"
tests:
- not_null
- name: LOG
description: "Raw log data from the blockchain event"
tests:
- not_null
- name: INTENTS_SWAP_ID
description: "{{ doc('id') }}"
tests:
- not_null
- unique
- name: INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp') }}"
tests:
- not_null
- name: MODIFIED_TIMESTAMP
description: "{{ doc('_modified_timestamp') }}"
tests:
- not_null

View File

@ -0,0 +1,142 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
unique_key = ['bridge_intents_id'],
merge_exclude_columns = ['inserted_timestamp'],
cluster_by = ['block_timestamp::DATE'],
incremental_predicates = ["dynamic_range_predicate_custom","block_timestamp::date"],
tags = ['scheduled_non_core']
) }}
-- depends on {{ ref('defi__fact_intents') }}
{% if execute %}
{% if is_incremental() %}
{% set max_mod_query %}
SELECT
MAX(modified_timestamp) modified_timestamp
FROM
{{ this }}
{% endset %}
{% set max_mod = run_query(max_mod_query) [0] [0] %}
{% if not max_mod or max_mod == 'None' %}
{% set max_mod = '2099-01-01' %}
{% endif %}
{% endif %}
{% endif %}
WITH bridge_intents AS (
SELECT
block_id,
block_timestamp,
tx_hash,
receipt_id,
receiver_id,
predecessor_id,
log_event,
log_index,
log_event_index,
owner_id,
token_id,
amount_raw,
memo,
gas_burnt,
receipt_succeeded,
fact_intents_id,
modified_timestamp
FROM {{ ref('defi__fact_intents') }}
WHERE
log_event IN ('mt_burn', 'mt_mint')
AND memo IN ('deposit', 'withdraw')
AND RIGHT(token_id, 10) = '.omft.near'
{% if var('MANUAL_FIX') %}
AND {{ partition_load_manual('no_buffer', 'floor(block_id, -3)') }}
{% else %}
{% if is_incremental() %}
AND modified_timestamp > '{{ max_mod }}'
{% endif %}
{% endif %}
),
bridge_intents_parsed AS (
SELECT
*,
REGEXP_SUBSTR(token_id, 'nep141:([^-\\.]+)', 1, 1, 'e', 1) AS blockchain,
CASE
WHEN token_id LIKE '%-%' THEN
REGEXP_SUBSTR(token_id, 'nep141:[^-]+-(.+)\\.omft\\.near', 1, 1, 'e', 1)
ELSE SPLIT(token_id, ':')[1]
END AS contract_address_raw
FROM bridge_intents
),
bridge_intents_mapped AS (
SELECT
block_id,
block_timestamp,
tx_hash,
receipt_id,
receiver_id,
predecessor_id,
log_event,
'execute_intents' AS method_name, -- Unnecessary to join actions just to get this
log_index,
log_event_index,
owner_id,
contract_address_raw AS token_address,
token_id AS token_address_raw,
amount_raw AS amount_unadj,
amount_raw AS amount_adj,
owner_id AS destination_address,
owner_id AS source_address,
'intents' AS platform,
receiver_id AS bridge_address,
CASE
WHEN log_event = 'mt_mint' AND memo = 'deposit' THEN LOWER(blockchain)
ELSE 'near'
END AS source_chain,
CASE
WHEN log_event = 'mt_burn' AND memo = 'withdraw' THEN LOWER(blockchain)
ELSE 'near'
END AS destination_chain,
CASE
WHEN log_event = 'mt_mint' AND memo = 'deposit' THEN 'inbound'
WHEN log_event = 'mt_burn' AND memo = 'withdraw' THEN 'outbound'
END AS direction,
receipt_succeeded,
memo,
fact_intents_id,
modified_timestamp
FROM bridge_intents_parsed
WHERE blockchain IS NOT NULL
AND blockchain != ''
AND blockchain != 'near'
)
SELECT
block_id,
block_timestamp,
tx_hash,
log_index,
log_event_index,
token_address,
token_address_raw,
amount_unadj,
amount_adj,
destination_address,
source_address,
platform,
bridge_address,
destination_chain,
source_chain,
method_name,
direction,
receipt_succeeded,
{{ dbt_utils.generate_surrogate_key(
['tx_hash', 'log_index', 'log_event_index', 'source_chain', 'destination_address', 'token_address', 'amount_unadj']
) }} AS bridge_intents_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM bridge_intents_mapped
ORDER BY block_timestamp DESC

View File

@ -0,0 +1,107 @@
version: 2
models:
- name: silver__bridge_intents
description: Silver layer model that processes bridge intent events from NEAR blockchain, specifically tracking mint/burn operations for cross-chain token transfers through the Intents protocol.
columns:
- name: BLOCK_ID
description: "{{ doc('block_id') }}"
tests:
- not_null
- name: BLOCK_TIMESTAMP
description: "{{ doc('block_timestamp') }}"
tests:
- not_null
- name: TX_HASH
description: "{{ doc('tx_hash') }}"
tests:
- not_null
- name: TOKEN_ADDRESS
description: "{{ doc('token_address') }}"
tests:
- not_null
- name: TOKEN_ADDRESS_RAW
description: "{{ doc('token_address_raw') }}"
tests:
- not_null
- name: AMOUNT_UNADJ
description: "{{ doc('amount_unadj') }}"
tests:
- not_null
- name: AMOUNT_ADJ
description: "{{ doc('amount_adj') }}"
tests:
- not_null
- name: DESTINATION_ADDRESS
description: "{{ doc('destination_address') }}"
tests:
- not_null
- name: SOURCE_ADDRESS
description: "{{ doc('source_address') }}"
tests:
- not_null
- name: PLATFORM
description: "{{ doc('platform') }}"
tests:
- not_null
- name: BRIDGE_ADDRESS
description: "{{ doc('bridge_address') }}"
tests:
- not_null
- name: DESTINATION_CHAIN
description: "{{ doc('destination_chain') }}"
tests:
- not_null
- name: SOURCE_CHAIN
description: "{{ doc('source_chain') }}"
tests:
- not_null
- name: METHOD_NAME
description: "{{ doc('method_name') }}"
tests:
- not_null
- name: DIRECTION
description: "{{ doc('direction') }}"
tests:
- not_null
- name: RECEIPT_SUCCEEDED
description: "{{ doc('receipt_succeeded') }}"
tests:
- not_null
- name: BRIDGE_INTENTS_ID
description: "{{ doc('id') }}"
tests:
- not_null
- unique
- name: INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp') }}"
tests:
- not_null
- name: MODIFIED_TIMESTAMP
description: "{{ doc('_modified_timestamp') }}"
tests:
- not_null
- name: _INVOCATION_ID
description: "{{ doc('_invocation_id') }}"
tests:
- not_null