Merge pull request #474 from FlipsideCrypto/AN-6258-TokenMetadata

AN-6258-TokenMetadata
This commit is contained in:
Stanley 2025-06-11 21:58:55 +07:00 committed by GitHub
commit ede332a5a7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 583 additions and 156 deletions

View File

@ -0,0 +1,9 @@
{{ config (
materialized = 'view',
tags = ['streamline_helper']
) }}
{{ streamline_external_table_FR_query_v2(
model = "omni_metadata",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )"
) }}

View File

@ -0,0 +1,5 @@
{% docs asset_identifier %}
The onchain representation of a token id, which may include source chain metadata if involved in a crosschain bridge event. Unique per token in the FT contract metadata table.
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs crosschain_token_contract %}
The contract address of the token on the source chain, where applicable.
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs near_token_contract %}
The contract address of the token deployed to NEAR.
{% enddocs %}

View File

@ -3,21 +3,18 @@
tags = ['scheduled_non_core']
) }}
WITH ft_contract_metadata AS (
SELECT
*
FROM
{{ ref('silver__ft_contract_metadata') }}
)
SELECT
contract_address,
asset_identifier,
source_chain,
crosschain_token_contract,
near_token_contract AS contract_address,
NAME,
symbol,
decimals,
DATA,
ft_contract_metadata_id AS dim_ft_contract_metadata_id,
inserted_timestamp,
modified_timestamp
FROM
ft_contract_metadata
{{ ref('silver__ft_contract_metadata') }}
WHERE
name is not null OR symbol is not null OR decimals is not null

View File

@ -6,46 +6,30 @@ models:
Fungible Token contract metadata provided by the Nearblocks NFT endpoint.
columns:
- name: CONTRACT_ADDRESS
description: "{{ doc('contract_address')}}"
- name: ASSET_IDENTIFIER
description: "{{ doc('asset_identifier') }}"
tests:
- not_null
- unique
- name: SOURCE_CHAIN
description: "{{ doc('source_chain') }}"
- name: CROSSCHAIN_TOKEN_CONTRACT
description: "{{ doc('crosschain_token_contract') }}"
- name: CONTRACT_ADDRESS
description: "{{ doc('contract_address') }}"
- name: NAME
description: "{{ doc('name')}}"
tests:
- not_null
description: "{{ doc('name') }}"
- name: SYMBOL
description: "{{ doc('symbol')}}"
tests:
- not_null
- name: DATA
description: "{{ doc('data')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- VARIANT
- OBJECT
- ARRAY
description: "{{ doc('symbol') }}"
- name: DECIMALS
description: "{{ doc('decimals')}}"
- name: DATA
description: "{{ doc('data')}}"
description: "{{ doc('decimals') }}"
- name: DIM_FT_CONTRACT_METADATA_ID
description: "{{doc('id')}}"
tests:
- not_null
- unique:
where: inserted_timestamp >= SYSDATE() - INTERVAL '{{ var('DBT_TEST_LOOKBACK_DAYS', 14) }} days'
- name: INSERTED_TIMESTAMP
description: "{{doc('inserted_timestamp')}}"
- name: MODIFIED_TIMESTAMP
description: "{{doc('modified_timestamp')}}"

View File

@ -99,6 +99,7 @@ SELECT
FROM
{{ ref('core__fact_token_transfers') }}
t
{% if is_incremental() %}
ASOF JOIN hourly_prices p
MATCH_CONDITION (t.block_timestamp >= p.HOUR)
@ -108,7 +109,8 @@ FROM
ON (t.contract_address = p.token_address)
AND date_trunc('hour', t.block_timestamp) = p.HOUR
{% endif %}
LEFT JOIN {{ ref('silver__ft_contract_metadata') }} C USING (contract_address)
LEFT JOIN {{ ref('silver__ft_contract_metadata') }} C on (t.contract_address = C.asset_identifier)
{% if var("MANUAL_FIX") %}
WHERE
{{ partition_load_manual('no_buffer') }}

View File

@ -31,7 +31,7 @@ WITH fact_bridging AS (
),
labels AS (
SELECT
contract_address,
asset_identifier AS contract_address,
name,
symbol,
decimals

View File

@ -29,7 +29,7 @@ WITH dex_swaps AS (
),
labels AS (
SELECT
contract_address,
asset_identifier AS contract_address,
NAME,
symbol,
decimals

View File

@ -9,7 +9,6 @@
tags = ['scheduled_non_core']
) }}
-- depends on {{ ref('defi__fact_intents') }}
-- depends on {{ ref('silver__defuse_tokens_metadata') }}
-- depends on {{ ref('silver__ft_contract_metadata') }}
-- depends on {{ ref('price__ez_prices_hourly') }}
@ -85,12 +84,12 @@ WITH intents AS (
token_id,
REGEXP_SUBSTR(
token_id,
'nep(141|245):(.*)',
'nep(141|171|245):(.*)',
1,
1,
'e',
2
) AS contract_address_raw,
) AS asset_identifier,
referral,
dip4_version,
gas_burnt,
@ -123,42 +122,19 @@ WITH intents AS (
),
labels AS (
SELECT
near_token_id AS contract_address_raw,
SPLIT(
defuse_asset_identifier,
':'
) [0] :: STRING AS ecosystem,
SPLIT(
defuse_asset_identifier,
':'
) [1] :: STRING AS chain_id,
SPLIT(
defuse_asset_identifier,
':'
) [2] :: STRING AS contract_address,
asset_name AS symbol,
decimals
FROM
{{ ref('silver__defuse_tokens_metadata') }}
UNION ALL
SELECT
contract_address AS contract_address_raw,
'near' AS ecosystem,
'397' AS chain_id,
contract_address,
asset_identifier,
source_chain,
crosschain_token_contract,
near_token_contract,
symbol,
decimals
FROM
{{ ref('silver__ft_contract_metadata') }}
WHERE
contract_address not in (
select distinct near_token_id
from {{ ref('silver__defuse_tokens_metadata') }}
)
),
prices AS (
SELECT
token_address AS contract_address,
blockchain,
symbol,
price,
is_native,
@ -227,12 +203,9 @@ FINAL AS (
gas_burnt,
receipt_succeeded,
fact_intents_id AS ez_intents_id,
COALESCE(
dl.short_name,
l.ecosystem
) AS blockchain,
l.contract_address,
l.contract_address = 'native' AS is_native,
l.source_chain AS blockchain,
l.crosschain_token_contract AS contract_address,
l.crosschain_token_contract = 'native' AS is_native,
l.symbol,
l.decimals,
amount_raw / pow(
@ -251,25 +224,51 @@ FINAL AS (
FROM
intents i
LEFT JOIN labels l
ON i.contract_address_raw = l.contract_address_raw
LEFT JOIN EXTERNAL.defillama.dim_chains dl
ON l.chain_id = dl.chain_id
ON i.asset_identifier = l.asset_identifier
ASOF JOIN prices p match_condition (
i.block_timestamp >= p.hour
)
ON (
l.contract_address = p.contract_address
l.crosschain_token_contract = p.contract_address
)
ASOF JOIN prices_native p2 match_condition (
i.block_timestamp >= p2.hour
)
ON (
upper(l.symbol) = upper(p2.symbol)
AND (l.contract_address = 'native') = p2.is_native
AND (l.crosschain_token_contract = 'native') = p2.is_native
)
)
SELECT
*,
block_timestamp,
block_id,
tx_hash,
receipt_id,
receiver_id,
predecessor_id,
log_event,
token_id,
symbol,
amount_adj,
amount_usd,
owner_id,
old_owner_id,
new_owner_id,
amount_raw,
blockchain,
contract_address,
is_native,
price,
decimals,
gas_burnt,
memo,
referral,
dip4_version,
log_index,
log_event_index,
amount_index,
receipt_succeeded,
ez_intents_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp
FROM

View File

@ -26,7 +26,7 @@ WITH lending AS (
),
labels AS (
SELECT
contract_address,
asset_identifier AS contract_address,
NAME,
symbol,
IFF(

View File

@ -6,7 +6,7 @@
WITH metadata AS (
SELECT
contract_address,
asset_identifier as contract_address,
NAME,
symbol,
decimals

View File

@ -1,7 +1,8 @@
{{ config(
materialized = 'incremental',
unique_key = 'defuse_ft_metadata_id',
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp"],
unique_key = 'defuse_asset_identifier',
tags = ['scheduled_non_core']
) }}
@ -11,25 +12,43 @@ WITH api_call AS (
response
FROM
{{ ref('streamline__defuse_token_ids_realtime') }}
)
),
flattened AS (
SELECT
VALUE :defuse_asset_identifier :: STRING AS defuse_asset_identifier,
VALUE :asset_name :: STRING AS asset_name,
VALUE :decimals :: INT AS decimals,
VALUE :defuse_asset_identifier :: STRING AS defuse_asset_identifier,
VALUE :min_deposit_amount :: STRING AS min_deposit_amount,
VALUE :min_withdrawal_amount :: STRING AS min_withdrawal_amount,
VALUE :near_token_id :: STRING AS near_token_id,
VALUE :withdrawal_fee :: STRING AS withdrawal_fee,
{{ dbt_utils.generate_surrogate_key(
['defuse_asset_identifier']
) }} AS defuse_token_ids_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
VALUE :near_token_id :: STRING AS near_token_contract,
VALUE :withdrawal_fee :: STRING AS withdrawal_fee
FROM
api_call,
LATERAL FLATTEN(
input => response :data :result :tokens :: ARRAY
)
)
SELECT
defuse_asset_identifier,
CASE
WHEN SPLIT_PART(defuse_asset_identifier, ':', 0) = 'near' THEN 'near'
WHEN SPLIT_PART(defuse_asset_identifier, ':', ARRAY_SIZE(SPLIT(defuse_asset_identifier, ':'))) = 'native' THEN SPLIT_PART(near_token_contract, '.', 0) :: STRING
ELSE SPLIT_PART(near_token_contract, '-', 0) :: STRING
END AS source_chain,
SPLIT_PART(defuse_asset_identifier, ':', ARRAY_SIZE(SPLIT(defuse_asset_identifier, ':'))) AS crosschain_token_contract,
asset_name,
decimals,
min_deposit_amount,
min_withdrawal_amount,
near_token_contract,
withdrawal_fee,
{{ dbt_utils.generate_surrogate_key(
['defuse_asset_identifier']
) }} AS defuse_ft_metadata_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
flattened
qualify(row_number() over (partition by defuse_asset_identifier order by inserted_timestamp asc)) = 1

View File

@ -0,0 +1,71 @@
{{ config(
materialized = 'incremental',
unique_key = 'nearblocks_ft_metadata_id',
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp"],
tags = ['scheduled_non_core']
) }}
WITH nearblocks AS (
SELECT
VALUE :CONTRACT_ADDRESS :: STRING AS contract_address,
DATA
FROM
{{ ref('bronze__nearblocks_ft_metadata')}}
WHERE
typeof(DATA) != 'NULL_VALUE'
{% if is_incremental() %}
AND
_inserted_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
nearblocks_metadata AS (
SELECT
VALUE :contract :: STRING AS near_token_contract,
VALUE :decimals :: INT AS decimals,
VALUE :name :: STRING AS NAME,
VALUE :symbol :: STRING AS symbol
FROM
nearblocks,
LATERAL FLATTEN(
input => DATA :contracts
)
)
SELECT
near_token_contract,
decimals,
name,
symbol,
CASE
WHEN near_token_contract ilike 'sol-%.omft.near' THEN 'sol'
WHEN near_token_contract rlike '.*-0x[0-9a-fA-F]+\\.(duse|omft)\\.near' THEN SPLIT_PART(near_token_contract, '-', 1) :: STRING
WHEN near_token_contract ilike '%-native.duse.near' THEN SPLIT_PART(near_token_contract, '-', 1) :: STRING
ELSE 'near'
END AS source_chain,
IFF(
source_chain = 'near',
near_token_contract,
COALESCE(
REGEXP_SUBSTR(near_token_contract, '0x[a-fA-F0-9]{40}') :: STRING,
REGEXP_SUBSTR(near_token_contract, 'native') :: STRING,
REGEXP_SUBSTR(near_token_contract, 'sol-([^.]+)\\.omft\\.near', 1, 1, 'e', 1)
)
) AS crosschain_token_contract,
{{ dbt_utils.generate_surrogate_key(
['near_token_contract']
) }} AS nearblocks_ft_metadata_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM nearblocks_metadata
qualify(row_number() over (partition by near_token_contract order by modified_timestamp desc)) = 1

View File

@ -0,0 +1,66 @@
{{ config(
materialized = 'incremental',
unique_key = 'omni_ft_metadata_id',
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp"],
tags = ['scheduled_non_core']
) }}
WITH omni AS (
SELECT
VALUE:OMNI_ASSET_IDENTIFIER::STRING AS omni_asset_identifier,
VALUE:data:result:result::ARRAY AS result_array,
DATA
FROM
{{ ref('bronze__omni_metadata') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
try_decode_hex AS (
SELECT
omni_asset_identifier,
b.value AS raw,
b.index,
LPAD(TRIM(to_char(b.value::INT, 'XXXXXXX'))::STRING, 2, '0') AS hex,
ROW_NUMBER() OVER (PARTITION BY omni_asset_identifier, raw, index, hex ORDER BY omni_asset_identifier) AS rn
FROM omni o,
TABLE(FLATTEN(o.result_array, recursive => TRUE)) b
WHERE IS_ARRAY(o.result_array) = TRUE
ORDER BY 1, 3
),
decoded_response AS (
SELECT
omni_asset_identifier,
ARRAY_TO_STRING(ARRAY_AGG(hex) WITHIN GROUP (ORDER BY index ASC), '') AS decoded_response
FROM try_decode_hex
GROUP BY omni_asset_identifier, rn
HAVING rn = 1
),
conversion AS (
SELECT
omni_asset_identifier,
TRIM(livequery.utils.udf_hex_to_string(decoded_response), '"') AS decoded_result
FROM decoded_response
)
SELECT
omni_asset_identifier,
SPLIT_PART(omni_asset_identifier, ':', 1) :: STRING AS source_chain,
SPLIT_PART(omni_asset_identifier, ':', 2) :: STRING AS crosschain_token_contract,
decoded_result AS near_token_contract,
{{ dbt_utils.generate_surrogate_key(
['omni_asset_identifier']
) }} AS omni_ft_metadata_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
conversion
qualify(row_number() over (partition by omni_asset_identifier order by inserted_timestamp asc)) = 1

View File

@ -1,27 +1,33 @@
-- depends on: {{ ref('bronze__nearblocks_ft_metadata')}}
-- depends on: {{ ref('silver__nearblocks_ft_metadata')}}
-- depends on: {{ ref('silver__omni_ft_metadata')}}
-- depends on: {{ ref('silver__defuse_ft_metadata')}}
-- depends on: {{ ref('streamline__omni_tokenlist')}}
{{ config(
materialized = 'incremental',
unique_key = 'contract_address',
unique_key = 'ft_contract_metadata_id',
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp"],
tags = ['scheduled_non_core']
) }}
WITH bronze AS (
WITH nearblocks AS (
SELECT
VALUE :CONTRACT_ADDRESS :: STRING AS contract_address,
DATA
near_token_contract,
decimals,
name,
symbol,
source_chain,
crosschain_token_contract,
'nearblocks' AS source
FROM
{{ ref('bronze__nearblocks_ft_metadata')}}
WHERE
typeof(DATA) != 'NULL_VALUE'
{{ ref('silver__nearblocks_ft_metadata')}}
{% if is_incremental() %}
AND
_inserted_timestamp >= (
WHERE
modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
@ -29,36 +35,179 @@ WITH bronze AS (
)
{% endif %}
),
flatten_results AS (
omni AS (
SELECT
VALUE :contract :: STRING AS contract_address,
VALUE :decimals :: INT AS decimals,
VALUE :name :: STRING AS NAME,
VALUE :symbol :: STRING AS symbol,
VALUE AS DATA
omni_asset_identifier AS asset_identifier,
o.source_chain,
o.crosschain_token_contract,
o.near_token_contract,
NULL AS decimals,
NULL AS name,
NULL AS symbol,
'omni' AS source
FROM
bronze,
LATERAL FLATTEN(
input => DATA :contracts
{{ ref('silver__omni_ft_metadata')}} o
{% if is_incremental() %}
WHERE
o.modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
omni_unmapped AS (
SELECT
omni_asset_identifier AS asset_identifier,
o.source_chain,
o.crosschain_token_contract,
n.near_token_contract,
COALESCE(n.decimals, c.decimals) :: INT AS decimals,
COALESCE(n.name, c.name) :: STRING AS name,
COALESCE(n.symbol, c.symbol) :: STRING AS symbol,
'omni_unmapped' AS source
FROM
{{ ref('streamline__omni_tokenlist')}} o
LEFT JOIN {{ ref('silver__nearblocks_ft_metadata') }} n
ON o.crosschain_token_contract = n.near_token_contract
AND o.source_chain = 'near'
LEFT JOIN {{ source('crosschain_silver', 'complete_token_asset_metadata')}} c
ON o.crosschain_token_contract = c.token_address
AND c.blockchain = 'solana'
-- note, this does not give use the Near token contract.
-- could join on symbol, but some symbols have multiple contract records as symbol is not unique
WHERE
o.source_chain IN ('near', 'sol')
{% if is_incremental() %}
AND
o.modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
defuse AS (
SELECT
d.near_token_contract AS asset_identifier,
d.source_chain,
d.crosschain_token_contract,
d.near_token_contract,
d.decimals,
NULL AS name,
asset_name AS symbol,
'defuse' AS source
FROM
{{ ref('silver__defuse_ft_metadata')}} d
{% if is_incremental() %}
WHERE
d.modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
final AS (
-- Nearblocks
SELECT
near_token_contract AS asset_identifier,
source_chain,
crosschain_token_contract,
near_token_contract,
decimals,
name,
symbol,
source
FROM
nearblocks
UNION ALL
-- Omni
SELECT
asset_identifier,
source_chain,
crosschain_token_contract,
near_token_contract,
decimals,
name,
symbol,
source
FROM
omni
UNION ALL
-- Omni unmapped
SELECT
asset_identifier,
source_chain,
crosschain_token_contract,
near_token_contract,
decimals,
name,
symbol,
source
FROM
omni_unmapped
UNION ALL
-- Defuse
SELECT
asset_identifier,
source_chain,
crosschain_token_contract,
near_token_contract,
decimals,
name,
symbol,
source
FROM
defuse
),
final_joined AS (
SELECT
f.asset_identifier,
f.source_chain,
f.crosschain_token_contract,
f.near_token_contract,
COALESCE(f.decimals, n.decimals) AS decimals,
COALESCE(f.name, n.name) AS name,
COALESCE(f.symbol, n.symbol) AS symbol,
f.source AS metadata_provider
FROM final f
LEFT JOIN {{ ref('silver__nearblocks_ft_metadata') }} n
ON f.near_token_contract = n.near_token_contract
AND f.source IN ('omni', 'defuse')
AND NOT (f.source = 'omni_unmapped' AND f.source_chain != 'near')
)
SELECT
contract_address,
asset_identifier,
source_chain,
crosschain_token_contract,
near_token_contract,
decimals,
NAME,
name,
symbol,
DATA,
{{ dbt_utils.generate_surrogate_key(
['contract_address']
) }} AS ft_contract_metadata_id,
metadata_provider,
{{ dbt_utils.generate_surrogate_key(['asset_identifier']) }} AS ft_contract_metadata_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
flatten_results
FROM
final_joined
qualify ROW_NUMBER() over (
PARTITION BY contract_address
ORDER BY
PARTITION BY asset_identifier
ORDER BY
metadata_provider = 'defuse' DESC, -- prioritize defuse over nearblocks for those tokens
modified_timestamp DESC
) = 1

View File

@ -11,44 +11,33 @@ models:
interval: 8
columns:
- name: CONTRACT_ADDRESS
description: "{{ doc('contract_address')}}"
- name: ASSET_IDENTIFIER
description: "{{ doc('asset_identifier') }}"
tests:
- not_null
- unique
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: VARCHAR
- name: SOURCE_CHAIN
description: "{{ doc('source_chain') }}"
- name: CROSSCHAIN_TOKEN_CONTRACT
description: "{{ doc('crosschain_token_contract') }}"
- name: NEAR_TOKEN_CONTRACT
description: "{{ doc('near_token_contract') }}"
- name: DECIMALS
description: "{{ doc('decimals')}}"
tests:
- not_null
description: "{{ doc('decimals') }}"
- name: NAME
description: "{{ doc('name')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: VARCHAR
description: "{{ doc('name') }}"
- name: SYMBOL
description: "{{ doc('symbol')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: VARCHAR
- name: DATA
description: "{{ doc('data')}}"
description: "{{ doc('symbol') }}"
- name: METADATA_PROVIDER
description: "Source of the metadata record (e.g., omni, nearblocks, defuse)."
- name: FT_CONTRACT_METADATA_ID
description: "{{doc('id')}}"
tests:
- not_null
- unique
- name: INSERTED_TIMESTAMP
description: "{{doc('inserted_timestamp')}}"
- name: MODIFIED_TIMESTAMP
description: "{{doc('modified_timestamp')}}"
- name: _INVOCATION_ID
description: "{{doc('invocation_id')}}"

View File

@ -86,4 +86,4 @@ FROM
complete_token_prices p
LEFT JOIN {{ ref('silver__ft_contract_metadata') }}
ft
ON p.token_address = ft.contract_address
ON p.token_address = ft.asset_identifier

View File

@ -41,7 +41,7 @@ WITH functioncall AS (
),
metadata AS (
SELECT
contract_address,
asset_identifier as contract_address,
NAME,
symbol,
decimals

View File

@ -43,3 +43,4 @@ sources:
- name: chunks_v2
- name: transactions_v2
- name: nearblocks_ft_metadata
- name: omni_metadata

View File

@ -1,7 +1,6 @@
{{ config (
materialized = "incremental",
incremental_strategy = 'merge',
incremental_predicates = ["dynamic_range_predicate","modified_timestamp::date"],
unique_key = "contract_address",
cluster_by = ['modified_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(contract_address)",

View File

@ -0,0 +1,35 @@
-- depends_on: {{ ref('bronze__omni_metadata') }}
{{ config(
materialized = "incremental",
unique_key = "omni_complete_id",
merge_exclude_columns = ["inserted_timestamp"],
tags = ['streamline_non_core']
) }}
SELECT
VALUE:OMNI_ASSET_IDENTIFIER :: STRING AS omni_asset_identifier,
partition_key,
_inserted_timestamp,
VALUE:OMNI_ASSET_IDENTIFIER :: STRING AS omni_complete_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ ref('bronze__omni_metadata') }}
WHERE
typeof(DATA) != 'NULL_VALUE'
{% if is_incremental() %}
AND
_inserted_timestamp >= COALESCE(
(
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
{{ this }}
),
'1900-01-01' :: timestamp_ntz
)
{% endif %}
qualify(row_number() over (partition by omni_asset_identifier order by _inserted_timestamp desc)) = 1

View File

@ -0,0 +1,54 @@
-- depends_on: {{ ref('streamline__omni_tokenlist') }}
{{ config(
materialized = 'view',
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params = {
"external_table": "omni_metadata",
"sql_limit": "30",
"producer_batch_size": "5",
"worker_batch_size": "5",
"sql_source": "{{this.identifier}}"
}
),
tags = ['streamline_non_core']
) }}
WITH omni_token AS (
SELECT
omni_asset_identifier
FROM
{{ ref('streamline__omni_tokenlist') }}
WHERE
source_chain NOT IN ('near', 'sol')
EXCEPT
SELECT
omni_asset_identifier
FROM
{{ ref('streamline__omni_complete')}}
)
SELECT
omni_asset_identifier,
DATE_PART('EPOCH', SYSDATE()) :: INTEGER AS partition_key,
{{ target.database }}.LIVE.UDF_API(
'POST',
'{Service}',
OBJECT_CONSTRUCT('Content-Type', 'application/json'),
OBJECT_CONSTRUCT(
'jsonrpc', '2.0',
'id', 'dontcare',
'method', 'query',
'params', OBJECT_CONSTRUCT(
'request_type', 'call_function',
'finality', 'final',
'account_id', 'omni.bridge.near',
'method_name', 'get_token_id',
'args_base64', BASE64_ENCODE(OBJECT_CONSTRUCT('address', omni_asset_identifier) :: STRING)
)
),
'Vault/prod/near/quicknode/mainnet'
) AS request
FROM
omni_token

View File

@ -0,0 +1,38 @@
{{ config (
materialized = "incremental",
incremental_strategy = 'merge',
unique_key = "omni_asset_identifier",
cluster_by = ['modified_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(omni_asset_identifier);",
tags = ['streamline_non_core']
) }}
WITH omni_token AS (
SELECT
DISTINCT raw_token_id AS omni_asset_identifier
FROM
{{ ref('silver__bridge_omni') }}
{% if is_incremental() %}
WHERE modified_timestamp >= (
SELECT
COALESCE(MAX(modified_timestamp), '1970-01-01')
FROM
{{ this }})
{% endif %}
)
SELECT
omni_asset_identifier,
SPLIT_PART(omni_asset_identifier, ':', 1) :: STRING AS source_chain,
SPLIT_PART(omni_asset_identifier, ':', 2) :: STRING AS crosschain_token_contract,
{{ dbt_utils.generate_surrogate_key(
['omni_asset_identifier']
) }} AS omni_tokenlist_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
omni_token
QUALIFY (ROW_NUMBER() OVER (PARTITION BY omni_asset_identifier
ORDER BY
modified_timestamp ASC) = 1)