diff --git a/models/bronze/bronze__omni_metadata.sql b/models/bronze/bronze__omni_metadata.sql new file mode 100644 index 0000000..f2023db --- /dev/null +++ b/models/bronze/bronze__omni_metadata.sql @@ -0,0 +1,9 @@ +{{ config ( + materialized = 'view', + tags = ['streamline_helper'] +) }} + +{{ streamline_external_table_FR_query_v2( + model = "omni_metadata", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )" +) }} diff --git a/models/descriptions/asset_identifier.md b/models/descriptions/asset_identifier.md new file mode 100644 index 0000000..2f074f8 --- /dev/null +++ b/models/descriptions/asset_identifier.md @@ -0,0 +1,5 @@ +{% docs asset_identifier %} + +The onchain representation of a token id, which may include source chain metadata if involved in a crosschain bridge event. Unique per token in the FT contract metadata table. + +{% enddocs %} \ No newline at end of file diff --git a/models/descriptions/crosschain_token_contract.md b/models/descriptions/crosschain_token_contract.md new file mode 100644 index 0000000..2dfa559 --- /dev/null +++ b/models/descriptions/crosschain_token_contract.md @@ -0,0 +1,5 @@ +{% docs crosschain_token_contract %} + +The contract address of the token on the source chain, where applicable. + +{% enddocs %} \ No newline at end of file diff --git a/models/descriptions/near_token_contract.md b/models/descriptions/near_token_contract.md new file mode 100644 index 0000000..039a08b --- /dev/null +++ b/models/descriptions/near_token_contract.md @@ -0,0 +1,5 @@ +{% docs near_token_contract %} + +The contract address of the token deployed to NEAR. + +{% enddocs %} \ No newline at end of file diff --git a/models/gold/core/core__dim_ft_contract_metadata.sql b/models/gold/core/core__dim_ft_contract_metadata.sql index d226fba..e230edc 100644 --- a/models/gold/core/core__dim_ft_contract_metadata.sql +++ b/models/gold/core/core__dim_ft_contract_metadata.sql @@ -3,21 +3,18 @@ tags = ['scheduled_non_core'] ) }} -WITH ft_contract_metadata AS ( - - SELECT - * - FROM - {{ ref('silver__ft_contract_metadata') }} -) SELECT - contract_address, + asset_identifier, + source_chain, + crosschain_token_contract, + near_token_contract AS contract_address, NAME, symbol, decimals, - DATA, ft_contract_metadata_id AS dim_ft_contract_metadata_id, inserted_timestamp, modified_timestamp FROM - ft_contract_metadata + {{ ref('silver__ft_contract_metadata') }} +WHERE + name is not null OR symbol is not null OR decimals is not null diff --git a/models/gold/core/core__dim_ft_contract_metadata.yml b/models/gold/core/core__dim_ft_contract_metadata.yml index d193f77..16bf9f8 100644 --- a/models/gold/core/core__dim_ft_contract_metadata.yml +++ b/models/gold/core/core__dim_ft_contract_metadata.yml @@ -6,46 +6,30 @@ models: Fungible Token contract metadata provided by the Nearblocks NFT endpoint. columns: - - name: CONTRACT_ADDRESS - description: "{{ doc('contract_address')}}" + - name: ASSET_IDENTIFIER + description: "{{ doc('asset_identifier') }}" tests: - not_null - + - unique + - name: SOURCE_CHAIN + description: "{{ doc('source_chain') }}" + - name: CROSSCHAIN_TOKEN_CONTRACT + description: "{{ doc('crosschain_token_contract') }}" + - name: CONTRACT_ADDRESS + description: "{{ doc('contract_address') }}" - name: NAME - description: "{{ doc('name')}}" - tests: - - not_null - + description: "{{ doc('name') }}" - name: SYMBOL - description: "{{ doc('symbol')}}" - tests: - - not_null - - - name: DATA - description: "{{ doc('data')}}" - tests: - - not_null - - dbt_expectations.expect_column_values_to_be_in_type_list: - column_type_list: - - VARIANT - - OBJECT - - ARRAY - + description: "{{ doc('symbol') }}" - name: DECIMALS - description: "{{ doc('decimals')}}" - - - name: DATA - description: "{{ doc('data')}}" - + description: "{{ doc('decimals') }}" - name: DIM_FT_CONTRACT_METADATA_ID description: "{{doc('id')}}" tests: - not_null - unique: where: inserted_timestamp >= SYSDATE() - INTERVAL '{{ var('DBT_TEST_LOOKBACK_DAYS', 14) }} days' - - name: INSERTED_TIMESTAMP description: "{{doc('inserted_timestamp')}}" - - name: MODIFIED_TIMESTAMP description: "{{doc('modified_timestamp')}}" diff --git a/models/gold/core/core__ez_token_transfers.sql b/models/gold/core/core__ez_token_transfers.sql index 0bac9f1..072c8c5 100644 --- a/models/gold/core/core__ez_token_transfers.sql +++ b/models/gold/core/core__ez_token_transfers.sql @@ -99,6 +99,7 @@ SELECT FROM {{ ref('core__fact_token_transfers') }} t + {% if is_incremental() %} ASOF JOIN hourly_prices p MATCH_CONDITION (t.block_timestamp >= p.HOUR) @@ -108,7 +109,8 @@ FROM ON (t.contract_address = p.token_address) AND date_trunc('hour', t.block_timestamp) = p.HOUR {% endif %} - LEFT JOIN {{ ref('silver__ft_contract_metadata') }} C USING (contract_address) + LEFT JOIN {{ ref('silver__ft_contract_metadata') }} C on (t.contract_address = C.asset_identifier) + {% if var("MANUAL_FIX") %} WHERE {{ partition_load_manual('no_buffer') }} diff --git a/models/gold/defi/defi__ez_bridge_activity.sql b/models/gold/defi/defi__ez_bridge_activity.sql index 12e4423..51b2384 100644 --- a/models/gold/defi/defi__ez_bridge_activity.sql +++ b/models/gold/defi/defi__ez_bridge_activity.sql @@ -31,7 +31,7 @@ WITH fact_bridging AS ( ), labels AS ( SELECT - contract_address, + asset_identifier AS contract_address, name, symbol, decimals diff --git a/models/gold/defi/defi__ez_dex_swaps.sql b/models/gold/defi/defi__ez_dex_swaps.sql index 9301b4b..55d23dc 100644 --- a/models/gold/defi/defi__ez_dex_swaps.sql +++ b/models/gold/defi/defi__ez_dex_swaps.sql @@ -29,7 +29,7 @@ WITH dex_swaps AS ( ), labels AS ( SELECT - contract_address, + asset_identifier AS contract_address, NAME, symbol, decimals diff --git a/models/gold/defi/defi__ez_intents.sql b/models/gold/defi/defi__ez_intents.sql index d530519..0ac349d 100644 --- a/models/gold/defi/defi__ez_intents.sql +++ b/models/gold/defi/defi__ez_intents.sql @@ -9,7 +9,6 @@ tags = ['scheduled_non_core'] ) }} -- depends on {{ ref('defi__fact_intents') }} --- depends on {{ ref('silver__defuse_tokens_metadata') }} -- depends on {{ ref('silver__ft_contract_metadata') }} -- depends on {{ ref('price__ez_prices_hourly') }} @@ -85,12 +84,12 @@ WITH intents AS ( token_id, REGEXP_SUBSTR( token_id, - 'nep(141|245):(.*)', + 'nep(141|171|245):(.*)', 1, 1, 'e', 2 - ) AS contract_address_raw, + ) AS asset_identifier, referral, dip4_version, gas_burnt, @@ -123,42 +122,19 @@ WITH intents AS ( ), labels AS ( SELECT - near_token_id AS contract_address_raw, - SPLIT( - defuse_asset_identifier, - ':' - ) [0] :: STRING AS ecosystem, - SPLIT( - defuse_asset_identifier, - ':' - ) [1] :: STRING AS chain_id, - SPLIT( - defuse_asset_identifier, - ':' - ) [2] :: STRING AS contract_address, - asset_name AS symbol, - decimals - FROM - {{ ref('silver__defuse_tokens_metadata') }} - UNION ALL - SELECT - contract_address AS contract_address_raw, - 'near' AS ecosystem, - '397' AS chain_id, - contract_address, + asset_identifier, + source_chain, + crosschain_token_contract, + near_token_contract, symbol, decimals FROM {{ ref('silver__ft_contract_metadata') }} - WHERE - contract_address not in ( - select distinct near_token_id - from {{ ref('silver__defuse_tokens_metadata') }} - ) ), prices AS ( SELECT token_address AS contract_address, + blockchain, symbol, price, is_native, @@ -227,12 +203,9 @@ FINAL AS ( gas_burnt, receipt_succeeded, fact_intents_id AS ez_intents_id, - COALESCE( - dl.short_name, - l.ecosystem - ) AS blockchain, - l.contract_address, - l.contract_address = 'native' AS is_native, + l.source_chain AS blockchain, + l.crosschain_token_contract AS contract_address, + l.crosschain_token_contract = 'native' AS is_native, l.symbol, l.decimals, amount_raw / pow( @@ -251,25 +224,51 @@ FINAL AS ( FROM intents i LEFT JOIN labels l - ON i.contract_address_raw = l.contract_address_raw - LEFT JOIN EXTERNAL.defillama.dim_chains dl - ON l.chain_id = dl.chain_id + ON i.asset_identifier = l.asset_identifier ASOF JOIN prices p match_condition ( i.block_timestamp >= p.hour ) ON ( - l.contract_address = p.contract_address + l.crosschain_token_contract = p.contract_address ) ASOF JOIN prices_native p2 match_condition ( i.block_timestamp >= p2.hour ) ON ( upper(l.symbol) = upper(p2.symbol) - AND (l.contract_address = 'native') = p2.is_native + AND (l.crosschain_token_contract = 'native') = p2.is_native ) ) SELECT - *, + block_timestamp, + block_id, + tx_hash, + receipt_id, + receiver_id, + predecessor_id, + log_event, + token_id, + symbol, + amount_adj, + amount_usd, + owner_id, + old_owner_id, + new_owner_id, + amount_raw, + blockchain, + contract_address, + is_native, + price, + decimals, + gas_burnt, + memo, + referral, + dip4_version, + log_index, + log_event_index, + amount_index, + receipt_succeeded, + ez_intents_id, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp FROM diff --git a/models/gold/defi/defi__ez_lending.sql b/models/gold/defi/defi__ez_lending.sql index 6a71f36..e56154e 100644 --- a/models/gold/defi/defi__ez_lending.sql +++ b/models/gold/defi/defi__ez_lending.sql @@ -26,7 +26,7 @@ WITH lending AS ( ), labels AS ( SELECT - contract_address, + asset_identifier AS contract_address, NAME, symbol, IFF( diff --git a/models/silver/defi/lending/silver__burrow_lending.sql b/models/silver/defi/lending/silver__burrow_lending.sql index e3f069c..dfa27f8 100644 --- a/models/silver/defi/lending/silver__burrow_lending.sql +++ b/models/silver/defi/lending/silver__burrow_lending.sql @@ -6,7 +6,7 @@ WITH metadata AS ( SELECT - contract_address, + asset_identifier as contract_address, NAME, symbol, decimals diff --git a/models/silver/labels/silver__defuse_tokens_metadata.sql b/models/silver/labels/external/silver__defuse_ft_metadata.sql similarity index 51% rename from models/silver/labels/silver__defuse_tokens_metadata.sql rename to models/silver/labels/external/silver__defuse_ft_metadata.sql index c52767d..c05e0ef 100644 --- a/models/silver/labels/silver__defuse_tokens_metadata.sql +++ b/models/silver/labels/external/silver__defuse_ft_metadata.sql @@ -1,7 +1,8 @@ {{ config( materialized = 'incremental', + unique_key = 'defuse_ft_metadata_id', + incremental_strategy = 'merge', merge_exclude_columns = ["inserted_timestamp"], - unique_key = 'defuse_asset_identifier', tags = ['scheduled_non_core'] ) }} @@ -11,25 +12,43 @@ WITH api_call AS ( response FROM {{ ref('streamline__defuse_token_ids_realtime') }} -) +), +flattened AS ( SELECT + VALUE :defuse_asset_identifier :: STRING AS defuse_asset_identifier, VALUE :asset_name :: STRING AS asset_name, VALUE :decimals :: INT AS decimals, - VALUE :defuse_asset_identifier :: STRING AS defuse_asset_identifier, VALUE :min_deposit_amount :: STRING AS min_deposit_amount, VALUE :min_withdrawal_amount :: STRING AS min_withdrawal_amount, - VALUE :near_token_id :: STRING AS near_token_id, - VALUE :withdrawal_fee :: STRING AS withdrawal_fee, - {{ dbt_utils.generate_surrogate_key( - ['defuse_asset_identifier'] - ) }} AS defuse_token_ids_id, - SYSDATE() AS inserted_timestamp, - SYSDATE() AS modified_timestamp, - '{{ invocation_id }}' AS _invocation_id + VALUE :near_token_id :: STRING AS near_token_contract, + VALUE :withdrawal_fee :: STRING AS withdrawal_fee FROM api_call, LATERAL FLATTEN( input => response :data :result :tokens :: ARRAY ) +) +SELECT + defuse_asset_identifier, + CASE + WHEN SPLIT_PART(defuse_asset_identifier, ':', 0) = 'near' THEN 'near' + WHEN SPLIT_PART(defuse_asset_identifier, ':', ARRAY_SIZE(SPLIT(defuse_asset_identifier, ':'))) = 'native' THEN SPLIT_PART(near_token_contract, '.', 0) :: STRING + ELSE SPLIT_PART(near_token_contract, '-', 0) :: STRING + END AS source_chain, + SPLIT_PART(defuse_asset_identifier, ':', ARRAY_SIZE(SPLIT(defuse_asset_identifier, ':'))) AS crosschain_token_contract, + asset_name, + decimals, + min_deposit_amount, + min_withdrawal_amount, + near_token_contract, + withdrawal_fee, + {{ dbt_utils.generate_surrogate_key( + ['defuse_asset_identifier'] + ) }} AS defuse_ft_metadata_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + flattened qualify(row_number() over (partition by defuse_asset_identifier order by inserted_timestamp asc)) = 1 diff --git a/models/silver/labels/external/silver__nearblocks_ft_metadata.sql b/models/silver/labels/external/silver__nearblocks_ft_metadata.sql new file mode 100644 index 0000000..f024901 --- /dev/null +++ b/models/silver/labels/external/silver__nearblocks_ft_metadata.sql @@ -0,0 +1,71 @@ +{{ config( + materialized = 'incremental', + unique_key = 'nearblocks_ft_metadata_id', + incremental_strategy = 'merge', + merge_exclude_columns = ["inserted_timestamp"], + tags = ['scheduled_non_core'] +) }} + + + +WITH nearblocks AS ( + + SELECT + VALUE :CONTRACT_ADDRESS :: STRING AS contract_address, + DATA + FROM + {{ ref('bronze__nearblocks_ft_metadata')}} + WHERE + typeof(DATA) != 'NULL_VALUE' + + {% if is_incremental() %} + AND + _inserted_timestamp >= ( + SELECT + MAX(modified_timestamp) + FROM + {{ this }} + ) + {% endif %} +), +nearblocks_metadata AS ( + SELECT + VALUE :contract :: STRING AS near_token_contract, + VALUE :decimals :: INT AS decimals, + VALUE :name :: STRING AS NAME, + VALUE :symbol :: STRING AS symbol + FROM + nearblocks, + LATERAL FLATTEN( + input => DATA :contracts + ) +) +SELECT + near_token_contract, + decimals, + name, + symbol, + CASE + WHEN near_token_contract ilike 'sol-%.omft.near' THEN 'sol' + WHEN near_token_contract rlike '.*-0x[0-9a-fA-F]+\\.(duse|omft)\\.near' THEN SPLIT_PART(near_token_contract, '-', 1) :: STRING + WHEN near_token_contract ilike '%-native.duse.near' THEN SPLIT_PART(near_token_contract, '-', 1) :: STRING + ELSE 'near' + END AS source_chain, + IFF( + source_chain = 'near', + near_token_contract, + COALESCE( + REGEXP_SUBSTR(near_token_contract, '0x[a-fA-F0-9]{40}') :: STRING, + REGEXP_SUBSTR(near_token_contract, 'native') :: STRING, + REGEXP_SUBSTR(near_token_contract, 'sol-([^.]+)\\.omft\\.near', 1, 1, 'e', 1) + ) + ) AS crosschain_token_contract, + {{ dbt_utils.generate_surrogate_key( + ['near_token_contract'] + ) }} AS nearblocks_ft_metadata_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM nearblocks_metadata + +qualify(row_number() over (partition by near_token_contract order by modified_timestamp desc)) = 1 diff --git a/models/silver/labels/external/silver__omni_ft_metadata.sql b/models/silver/labels/external/silver__omni_ft_metadata.sql new file mode 100644 index 0000000..83669f3 --- /dev/null +++ b/models/silver/labels/external/silver__omni_ft_metadata.sql @@ -0,0 +1,66 @@ +{{ config( + materialized = 'incremental', + unique_key = 'omni_ft_metadata_id', + incremental_strategy = 'merge', + merge_exclude_columns = ["inserted_timestamp"], + tags = ['scheduled_non_core'] +) }} + +WITH omni AS ( + + SELECT + VALUE:OMNI_ASSET_IDENTIFIER::STRING AS omni_asset_identifier, + VALUE:data:result:result::ARRAY AS result_array, + DATA + FROM + {{ ref('bronze__omni_metadata') }} + {% if is_incremental() %} + WHERE + _inserted_timestamp >= ( + SELECT + MAX(modified_timestamp) + FROM + {{ this }} + ) + {% endif %} +), +try_decode_hex AS ( + SELECT + omni_asset_identifier, + b.value AS raw, + b.index, + LPAD(TRIM(to_char(b.value::INT, 'XXXXXXX'))::STRING, 2, '0') AS hex, + ROW_NUMBER() OVER (PARTITION BY omni_asset_identifier, raw, index, hex ORDER BY omni_asset_identifier) AS rn + FROM omni o, + TABLE(FLATTEN(o.result_array, recursive => TRUE)) b + WHERE IS_ARRAY(o.result_array) = TRUE + ORDER BY 1, 3 +), +decoded_response AS ( + SELECT + omni_asset_identifier, + ARRAY_TO_STRING(ARRAY_AGG(hex) WITHIN GROUP (ORDER BY index ASC), '') AS decoded_response + FROM try_decode_hex + GROUP BY omni_asset_identifier, rn + HAVING rn = 1 +), +conversion AS ( + SELECT + omni_asset_identifier, + TRIM(livequery.utils.udf_hex_to_string(decoded_response), '"') AS decoded_result + FROM decoded_response +) +SELECT + omni_asset_identifier, + SPLIT_PART(omni_asset_identifier, ':', 1) :: STRING AS source_chain, + SPLIT_PART(omni_asset_identifier, ':', 2) :: STRING AS crosschain_token_contract, + decoded_result AS near_token_contract, + {{ dbt_utils.generate_surrogate_key( + ['omni_asset_identifier'] + ) }} AS omni_ft_metadata_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + conversion +qualify(row_number() over (partition by omni_asset_identifier order by inserted_timestamp asc)) = 1 diff --git a/models/silver/labels/silver__ft_contract_metadata.sql b/models/silver/labels/silver__ft_contract_metadata.sql index 6d363ae..a08f376 100644 --- a/models/silver/labels/silver__ft_contract_metadata.sql +++ b/models/silver/labels/silver__ft_contract_metadata.sql @@ -1,27 +1,33 @@ --- depends on: {{ ref('bronze__nearblocks_ft_metadata')}} +-- depends on: {{ ref('silver__nearblocks_ft_metadata')}} +-- depends on: {{ ref('silver__omni_ft_metadata')}} +-- depends on: {{ ref('silver__defuse_ft_metadata')}} +-- depends on: {{ ref('streamline__omni_tokenlist')}} {{ config( materialized = 'incremental', - unique_key = 'contract_address', + unique_key = 'ft_contract_metadata_id', incremental_strategy = 'merge', merge_exclude_columns = ["inserted_timestamp"], tags = ['scheduled_non_core'] ) }} -WITH bronze AS ( +WITH nearblocks AS ( SELECT - VALUE :CONTRACT_ADDRESS :: STRING AS contract_address, - DATA + near_token_contract, + decimals, + name, + symbol, + source_chain, + crosschain_token_contract, + 'nearblocks' AS source FROM - {{ ref('bronze__nearblocks_ft_metadata')}} - WHERE - typeof(DATA) != 'NULL_VALUE' + {{ ref('silver__nearblocks_ft_metadata')}} {% if is_incremental() %} - AND - _inserted_timestamp >= ( + WHERE + modified_timestamp >= ( SELECT MAX(modified_timestamp) FROM @@ -29,36 +35,179 @@ WITH bronze AS ( ) {% endif %} ), -flatten_results AS ( +omni AS ( SELECT - VALUE :contract :: STRING AS contract_address, - VALUE :decimals :: INT AS decimals, - VALUE :name :: STRING AS NAME, - VALUE :symbol :: STRING AS symbol, - VALUE AS DATA + omni_asset_identifier AS asset_identifier, + o.source_chain, + o.crosschain_token_contract, + o.near_token_contract, + NULL AS decimals, + NULL AS name, + NULL AS symbol, + 'omni' AS source FROM - bronze, - LATERAL FLATTEN( - input => DATA :contracts + {{ ref('silver__omni_ft_metadata')}} o + + {% if is_incremental() %} + WHERE + o.modified_timestamp >= ( + SELECT + MAX(modified_timestamp) + FROM + {{ this }} ) + {% endif %} +), +omni_unmapped AS ( + SELECT + omni_asset_identifier AS asset_identifier, + o.source_chain, + o.crosschain_token_contract, + n.near_token_contract, + COALESCE(n.decimals, c.decimals) :: INT AS decimals, + COALESCE(n.name, c.name) :: STRING AS name, + COALESCE(n.symbol, c.symbol) :: STRING AS symbol, + 'omni_unmapped' AS source + FROM + {{ ref('streamline__omni_tokenlist')}} o + LEFT JOIN {{ ref('silver__nearblocks_ft_metadata') }} n + ON o.crosschain_token_contract = n.near_token_contract + AND o.source_chain = 'near' + LEFT JOIN {{ source('crosschain_silver', 'complete_token_asset_metadata')}} c + ON o.crosschain_token_contract = c.token_address + AND c.blockchain = 'solana' + -- note, this does not give use the Near token contract. + -- could join on symbol, but some symbols have multiple contract records as symbol is not unique + WHERE + o.source_chain IN ('near', 'sol') + + {% if is_incremental() %} + AND + o.modified_timestamp >= ( + SELECT + MAX(modified_timestamp) + FROM + {{ this }} + ) + {% endif %} +), +defuse AS ( + SELECT + d.near_token_contract AS asset_identifier, + d.source_chain, + d.crosschain_token_contract, + d.near_token_contract, + d.decimals, + NULL AS name, + asset_name AS symbol, + 'defuse' AS source + FROM + {{ ref('silver__defuse_ft_metadata')}} d + + {% if is_incremental() %} + WHERE + d.modified_timestamp >= ( + SELECT + MAX(modified_timestamp) + FROM + {{ this }} + ) + {% endif %} +), +final AS ( + -- Nearblocks + SELECT + near_token_contract AS asset_identifier, + source_chain, + crosschain_token_contract, + near_token_contract, + decimals, + name, + symbol, + source + FROM + nearblocks + + UNION ALL + + -- Omni + SELECT + asset_identifier, + source_chain, + crosschain_token_contract, + near_token_contract, + decimals, + name, + symbol, + source + FROM + omni + + UNION ALL + + -- Omni unmapped + SELECT + asset_identifier, + source_chain, + crosschain_token_contract, + near_token_contract, + decimals, + name, + symbol, + source + FROM + omni_unmapped + + UNION ALL + + -- Defuse + SELECT + asset_identifier, + source_chain, + crosschain_token_contract, + near_token_contract, + decimals, + name, + symbol, + source + FROM + defuse +), +final_joined AS ( + SELECT + f.asset_identifier, + f.source_chain, + f.crosschain_token_contract, + f.near_token_contract, + COALESCE(f.decimals, n.decimals) AS decimals, + COALESCE(f.name, n.name) AS name, + COALESCE(f.symbol, n.symbol) AS symbol, + f.source AS metadata_provider + FROM final f + LEFT JOIN {{ ref('silver__nearblocks_ft_metadata') }} n + ON f.near_token_contract = n.near_token_contract + AND f.source IN ('omni', 'defuse') + AND NOT (f.source = 'omni_unmapped' AND f.source_chain != 'near') ) SELECT - contract_address, + asset_identifier, + source_chain, + crosschain_token_contract, + near_token_contract, decimals, - NAME, + name, symbol, - DATA, - {{ dbt_utils.generate_surrogate_key( - ['contract_address'] - ) }} AS ft_contract_metadata_id, + metadata_provider, + {{ dbt_utils.generate_surrogate_key(['asset_identifier']) }} AS ft_contract_metadata_id, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, '{{ invocation_id }}' AS _invocation_id -FROM - flatten_results +FROM + final_joined qualify ROW_NUMBER() over ( - PARTITION BY contract_address - ORDER BY + PARTITION BY asset_identifier + ORDER BY + metadata_provider = 'defuse' DESC, -- prioritize defuse over nearblocks for those tokens modified_timestamp DESC ) = 1 diff --git a/models/silver/labels/silver__ft_contract_metadata.yml b/models/silver/labels/silver__ft_contract_metadata.yml index 22ef9e3..a964a0a 100644 --- a/models/silver/labels/silver__ft_contract_metadata.yml +++ b/models/silver/labels/silver__ft_contract_metadata.yml @@ -11,44 +11,33 @@ models: interval: 8 columns: - - name: CONTRACT_ADDRESS - description: "{{ doc('contract_address')}}" + - name: ASSET_IDENTIFIER + description: "{{ doc('asset_identifier') }}" tests: - not_null - unique - - dbt_expectations.expect_column_values_to_be_of_type: - column_type: VARCHAR - + - name: SOURCE_CHAIN + description: "{{ doc('source_chain') }}" + - name: CROSSCHAIN_TOKEN_CONTRACT + description: "{{ doc('crosschain_token_contract') }}" + - name: NEAR_TOKEN_CONTRACT + description: "{{ doc('near_token_contract') }}" - name: DECIMALS - description: "{{ doc('decimals')}}" - tests: - - not_null - + description: "{{ doc('decimals') }}" - name: NAME - description: "{{ doc('name')}}" - tests: - - not_null - - dbt_expectations.expect_column_values_to_be_of_type: - column_type: VARCHAR - + description: "{{ doc('name') }}" - name: SYMBOL - description: "{{ doc('symbol')}}" - tests: - - not_null - - dbt_expectations.expect_column_values_to_be_of_type: - column_type: VARCHAR - - - name: DATA - description: "{{ doc('data')}}" - + description: "{{ doc('symbol') }}" + - name: METADATA_PROVIDER + description: "Source of the metadata record (e.g., omni, nearblocks, defuse)." - name: FT_CONTRACT_METADATA_ID description: "{{doc('id')}}" - + tests: + - not_null + - unique - name: INSERTED_TIMESTAMP description: "{{doc('inserted_timestamp')}}" - - name: MODIFIED_TIMESTAMP description: "{{doc('modified_timestamp')}}" - - name: _INVOCATION_ID description: "{{doc('invocation_id')}}" diff --git a/models/silver/prices/silver__complete_token_prices.sql b/models/silver/prices/silver__complete_token_prices.sql index 8e71778..b845710 100644 --- a/models/silver/prices/silver__complete_token_prices.sql +++ b/models/silver/prices/silver__complete_token_prices.sql @@ -86,4 +86,4 @@ FROM complete_token_prices p LEFT JOIN {{ ref('silver__ft_contract_metadata') }} ft - ON p.token_address = ft.contract_address + ON p.token_address = ft.asset_identifier diff --git a/models/silver/transfers/bridging/silver__bridge_allbridge.sql b/models/silver/transfers/bridging/silver__bridge_allbridge.sql index 9214ca2..34afb26 100644 --- a/models/silver/transfers/bridging/silver__bridge_allbridge.sql +++ b/models/silver/transfers/bridging/silver__bridge_allbridge.sql @@ -41,7 +41,7 @@ WITH functioncall AS ( ), metadata AS ( SELECT - contract_address, + asset_identifier as contract_address, NAME, symbol, decimals diff --git a/models/sources.yml b/models/sources.yml index e9e4cd7..12dc903 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -43,3 +43,4 @@ sources: - name: chunks_v2 - name: transactions_v2 - name: nearblocks_ft_metadata + - name: omni_metadata diff --git a/models/streamline/external/token_metadata/nearblocks/streamline__ft_tokenlist.sql b/models/streamline/external/token_metadata/nearblocks/streamline__ft_tokenlist.sql index f9f798a..936467f 100644 --- a/models/streamline/external/token_metadata/nearblocks/streamline__ft_tokenlist.sql +++ b/models/streamline/external/token_metadata/nearblocks/streamline__ft_tokenlist.sql @@ -1,7 +1,6 @@ {{ config ( materialized = "incremental", incremental_strategy = 'merge', - incremental_predicates = ["dynamic_range_predicate","modified_timestamp::date"], unique_key = "contract_address", cluster_by = ['modified_timestamp::DATE'], post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(contract_address)", diff --git a/models/streamline/external/token_metadata/omni/complete/streamline__omni_complete.sql b/models/streamline/external/token_metadata/omni/complete/streamline__omni_complete.sql new file mode 100644 index 0000000..7da8ad8 --- /dev/null +++ b/models/streamline/external/token_metadata/omni/complete/streamline__omni_complete.sql @@ -0,0 +1,35 @@ +-- depends_on: {{ ref('bronze__omni_metadata') }} +{{ config( + materialized = "incremental", + unique_key = "omni_complete_id", + merge_exclude_columns = ["inserted_timestamp"], + tags = ['streamline_non_core'] +) }} + +SELECT + VALUE:OMNI_ASSET_IDENTIFIER :: STRING AS omni_asset_identifier, + partition_key, + _inserted_timestamp, + VALUE:OMNI_ASSET_IDENTIFIER :: STRING AS omni_complete_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + {{ ref('bronze__omni_metadata') }} +WHERE + typeof(DATA) != 'NULL_VALUE' + +{% if is_incremental() %} +AND + _inserted_timestamp >= COALESCE( + ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + {{ this }} + ), + '1900-01-01' :: timestamp_ntz + ) +{% endif %} + +qualify(row_number() over (partition by omni_asset_identifier order by _inserted_timestamp desc)) = 1 diff --git a/models/streamline/external/token_metadata/omni/realtime/streamline__omni_realtime.sql b/models/streamline/external/token_metadata/omni/realtime/streamline__omni_realtime.sql new file mode 100644 index 0000000..8407203 --- /dev/null +++ b/models/streamline/external/token_metadata/omni/realtime/streamline__omni_realtime.sql @@ -0,0 +1,54 @@ +-- depends_on: {{ ref('streamline__omni_tokenlist') }} +{{ config( + materialized = 'view', + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params = { + "external_table": "omni_metadata", + "sql_limit": "30", + "producer_batch_size": "5", + "worker_batch_size": "5", + "sql_source": "{{this.identifier}}" + } + ), + tags = ['streamline_non_core'] +) }} + +WITH omni_token AS ( + + SELECT + omni_asset_identifier + FROM + {{ ref('streamline__omni_tokenlist') }} + WHERE + source_chain NOT IN ('near', 'sol') + EXCEPT + SELECT + omni_asset_identifier + FROM + {{ ref('streamline__omni_complete')}} +) +SELECT + omni_asset_identifier, + DATE_PART('EPOCH', SYSDATE()) :: INTEGER AS partition_key, + {{ target.database }}.LIVE.UDF_API( + 'POST', + '{Service}', + OBJECT_CONSTRUCT('Content-Type', 'application/json'), + OBJECT_CONSTRUCT( + 'jsonrpc', '2.0', + 'id', 'dontcare', + 'method', 'query', + 'params', OBJECT_CONSTRUCT( + 'request_type', 'call_function', + 'finality', 'final', + 'account_id', 'omni.bridge.near', + 'method_name', 'get_token_id', + 'args_base64', BASE64_ENCODE(OBJECT_CONSTRUCT('address', omni_asset_identifier) :: STRING) + ) + ), + 'Vault/prod/near/quicknode/mainnet' + ) AS request +FROM + omni_token diff --git a/models/streamline/external/token_metadata/omni/streamline__omni_tokenlist.sql b/models/streamline/external/token_metadata/omni/streamline__omni_tokenlist.sql new file mode 100644 index 0000000..6b9150d --- /dev/null +++ b/models/streamline/external/token_metadata/omni/streamline__omni_tokenlist.sql @@ -0,0 +1,38 @@ +{{ config ( + materialized = "incremental", + incremental_strategy = 'merge', + unique_key = "omni_asset_identifier", + cluster_by = ['modified_timestamp::DATE'], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(omni_asset_identifier);", + tags = ['streamline_non_core'] +) }} + +WITH omni_token AS ( + SELECT + DISTINCT raw_token_id AS omni_asset_identifier + FROM + {{ ref('silver__bridge_omni') }} + +{% if is_incremental() %} +WHERE modified_timestamp >= ( + SELECT + COALESCE(MAX(modified_timestamp), '1970-01-01') + FROM + {{ this }}) +{% endif %} +) +SELECT + omni_asset_identifier, + SPLIT_PART(omni_asset_identifier, ':', 1) :: STRING AS source_chain, + SPLIT_PART(omni_asset_identifier, ':', 2) :: STRING AS crosschain_token_contract, + {{ dbt_utils.generate_surrogate_key( + ['omni_asset_identifier'] + ) }} AS omni_tokenlist_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + omni_token +QUALIFY (ROW_NUMBER() OVER (PARTITION BY omni_asset_identifier + ORDER BY + modified_timestamp ASC) = 1)