omni integration (temp)

This commit is contained in:
gregoriustanleyy 2025-05-23 18:16:42 +07:00
parent 4a6a90220f
commit b516ebac3b
2 changed files with 126 additions and 13 deletions

View File

@ -1,4 +1,5 @@
-- depends on: {{ ref('bronze__nearblocks_ft_metadata')}}
-- depends on: {{ ref('bronze__omni_metadata')}}
{{ config(
materialized = 'incremental',
@ -9,7 +10,7 @@
) }}
WITH bronze AS (
WITH nearblocks AS (
SELECT
VALUE :CONTRACT_ADDRESS :: STRING AS contract_address,
@ -29,7 +30,7 @@ WITH bronze AS (
)
{% endif %}
),
flatten_results AS (
nearblocks_metadata AS (
SELECT
VALUE :contract :: STRING AS contract_address,
VALUE :decimals :: INT AS decimals,
@ -37,28 +38,74 @@ flatten_results AS (
VALUE :symbol :: STRING AS symbol,
VALUE AS DATA
FROM
bronze,
nearblocks,
LATERAL FLATTEN(
input => DATA :contracts
)
),
omni AS (
SELECT
omni_address,
contract_address,
FROM
{{ ref('silver__omni_metadata')}}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
final AS (
-- Omni
SELECT
o.omni_address,
o.contract_address,
n.decimals,
n.name,
n.symbol,
n.data,
'omni' AS source
FROM
omni o
LEFT JOIN nearblocks_metadata n
ON o.contract_address = n.contract_address
UNION ALL
-- Nearblocks
SELECT
NULL AS omni_address,
n.contract_address,
n.decimals,
n.name,
n.symbol,
n.data,
'nearblocks' AS source
FROM
nearblocks_metadata n
WHERE n.contract_address NOT IN (SELECT contract_address FROM omni)
)
SELECT
contract_address,
omni_address,
decimals,
NAME,
name,
symbol,
DATA,
{{ dbt_utils.generate_surrogate_key(
['contract_address']
) }} AS ft_contract_metadata_id,
data,
source,
{{ dbt_utils.generate_surrogate_key(['omni_address', 'contract_address']) }} AS ft_contract_metadata_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
flatten_results
FROM final
qualify ROW_NUMBER() over (
PARTITION BY contract_address
ORDER BY
modified_timestamp DESC
PARTITION BY omni_address, contract_address
ORDER BY modified_timestamp DESC
) = 1

View File

@ -0,0 +1,66 @@
{{ config(
materialized = 'incremental',
merge_exclude_columns = ["inserted_timestamp"],
unique_key = 'omni_address',
tags = ['scheduled_non_core']
) }}
WITH omni AS (
SELECT
VALUE:CONTRACT_ADDRESS::STRING AS omni_address,
DATA
FROM
{{ ref('bronze__omni_metadata')}}
WHERE
typeof(DATA) != 'NULL_VALUE'
{% if is_incremental() %}
AND
_inserted_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
flattened AS (
SELECT
omni_address,
value::INT AS byte_val,
index
FROM
omni,
LATERAL FLATTEN(input => DATA:result)
),
hex_strings AS (
SELECT
omni_address,
LISTAGG(LPAD(TO_CHAR(byte_val, 'XX'), 2, '0'), '') WITHIN GROUP (ORDER BY index) AS hex_string
FROM
flattened
GROUP BY
omni_address
),
decode AS (
SELECT
omni_address,
TRY_PARSE_JSON(
livequery.utils.udf_hex_to_string(hex_string)
) AS decoded_result
FROM
hex_strings
)
SELECT
omni_address,
decoded_result AS contract_address,
{{ dbt_utils.generate_surrogate_key(
['omni_address']
) }} AS omni_metadata_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
decode
qualify(row_number() over (partition by omni_address order by inserted_timestamp asc)) = 1