STREAM-1051 add ez_native_transfers sql

This commit is contained in:
shah 2024-10-14 18:24:23 -07:00 committed by Julius Remigio
parent a844411ef2
commit 213acecd40
2 changed files with 586 additions and 100 deletions

View File

@ -409,9 +409,8 @@
signature:
- [block_height, INTEGER, The start block height to get the transfers from]
- [to_latest, BOOLEAN, Whether to continue fetching transfers until the latest block or not]
- [native_token_address, STRING, The address of the native token to get the transfers of]
return_type:
- "TABLE(tx_hash STRING, block_number INTEGER, block_timestamp TIMESTAMP_NTZ, tx_position INTEGER, trace_index INTEGER, identifier STRING, origin_from_address STRING, origin_to_address STRING, origin_function_signature STRING, from_address STRING, to_address STRING, amount FLOAT, amount_precise_raw NUMBER, amount_precise FLOAT, amount_usd FLOAT, ez_native_transfers_id STRING, inserted_timestamp TIMESTAMP_NTZ, modified_timestamp TIMESTAMP_NTZ)"
- "TABLE(tx_hash STRING, block_number NUMBER(38,0), block_timestamp TIMESTAMP_NTZ(9), tx_position NUMBER(38,0), trace_index NUMBER(19,0), identifier STRING, origin_from_address STRING, origin_to_address STRING, origin_function_signature STRING, from_address STRING, to_address STRING, amount FLOAT, amount_precise_raw STRING, amount_precise STRING, amount_usd FLOAT, ez_native_transfers_id STRING, inserted_timestamp TIMESTAMP_NTZ(9), modified_timestamp TIMESTAMP_NTZ(9))"
options: |
NOT NULL
RETURNS NULL ON NULL INPUT

View File

@ -560,112 +560,599 @@ AND from_address IS NOT NULL
{% endmacro %}
{% macro evm_live_view_ez_native_transfers(schema, blockchain, network) %}
WITH spine AS (
{{ evm_live_view_target_blocks(blockchain, network) }}
),
raw_block_data AS (
WITH heights AS (
SELECT
s.block_number AS block_number,
live.udf_api(
'{service}/{Authentication}',
utils.udf_json_rpc_call(
'eth_getBlockByNumber',
[utils.udf_int_to_hex(s.block_number), true]
livequery_dev.live.udf_api(
'https://indulgent-frosty-sanctuary.quiknode.pro/22555ab2563d38edce551aa3ab524e595d9ccba8/',
livequery_Dev.utils.udf_json_rpc_call(
'eth_blockNumber',
[]
)
):data AS block_data,
b.value AS tx_data,
TO_TIMESTAMP_NTZ(utils.udf_hex_to_int(block_data:result:timestamp::string)) AS block_timestamp,
tx_data:hash::string AS tx_hash,
tx_data:from::string AS from_address,
tx_data:to_address::string AS to_address,
TRY_TO_NUMBER(utils.udf_hex_to_int(tx_data:value::string), 38, 0) / 1e18 AS eth_value,
TRY_TO_NUMBER(utils.udf_hex_to_int(tx_data:value::string), 38, 0) AS eth_value_precise_raw,
TRY_TO_NUMBER(utils.udf_hex_to_int(tx_data:value::string), 38, 0) / 1e18 AS eth_value_precise,
tx_data:input::string AS input,
utils.udf_hex_to_int(tx_data:transactionIndex::string)::INTEGER AS tx_position,
'CALL' AS TYPE,
'SUCCESS' AS tx_status,
'SUCCESS' AS trace_status,
CASE
WHEN LEFT(input, 10) = '0x' THEN SUBSTRING(input, 1, 10)
ELSE NULL
END AS origin_function_signature,
'native_transfer' AS identifier,
NULL AS trace_index
):data AS result,
livequery_dev.utils.udf_hex_to_int(result:result)::integer as latest_block_height,
coalesce(
arg_block_height,
latest_block_height
) as min_height,
iff(
coalesce(arg_to_latest, false),
latest_block_height,
min_height
) as max_height
),
spine as (
select
row_number() over (
order by
null
) -1 + coalesce(arg_block_height, 0)::integer as block_number,
min_height,
iff(
coalesce(arg_to_latest, false),
latest_block_height,
min_height
) as max_height,
latest_block_height
from
table(generator(ROWCOUNT => 500)),
heights
qualify block_number between min_height and max_height
),
raw_receipts as (
SELECT
latest_block_height,
block_number,
livequery_dev.live.udf_api(
'https://indulgent-frosty-sanctuary.quiknode.pro/22555ab2563d38edce551aa3ab524e595d9ccba8/',
livequery_Dev.utils.udf_json_rpc_call(
'eth_getBlockReceipts',
[livequery_dev.utils.udf_int_to_hex(block_number)]
)
):data.result AS result,
v.value as DATA
from
spine,
lateral flatten(result) v
),
raw_block_txs as (
SELECT
block_number,
livequery_dev.live.udf_api(
'https://indulgent-frosty-sanctuary.quiknode.pro/22555ab2563d38edce551aa3ab524e595d9ccba8/',
livequery_Dev.utils.udf_json_rpc_call(
'eth_getBlockByNumber',
[livequery_dev.utils.udf_int_to_hex(block_number), true]
)
):data.result AS DATA
from
spine
),
raw_txs as (
SELECT
block_number,
v.value as DATA
from
raw_block_txs r,
lateral flatten(r.data:transactions) v
),
blocks as (
select
block_number,
livequery_dev.utils.udf_hex_to_int(DATA :baseFeePerGas::STRING)::INT AS base_fee_per_gas,
livequery_dev.utils.udf_hex_to_int(DATA :difficulty::STRING)::INT AS difficulty,
DATA :extraData::STRING AS extra_data,
livequery_dev.utils.udf_hex_to_int(DATA :gasLimit::STRING)::INT AS gas_limit,
livequery_dev.utils.udf_hex_to_int(DATA :gasUsed::STRING)::INT AS gas_used,
DATA :hash::STRING AS HASH,
DATA :logsBloom::STRING AS logs_bloom,
DATA :miner::STRING AS miner,
livequery_dev.utils.udf_hex_to_int(DATA :nonce::STRING)::INT AS nonce,
livequery_dev.utils.udf_hex_to_int(DATA :number::STRING)::INT AS NUMBER,
DATA :parentHash::STRING AS parent_hash,
DATA :receiptsRoot::STRING AS receipts_root,
DATA :sha3Uncles::STRING AS sha3_uncles,
livequery_dev.utils.udf_hex_to_int(DATA :size::STRING)::INT AS SIZE,
DATA :stateRoot::STRING AS state_root,
livequery_dev.utils.udf_hex_to_int(DATA :timestamp::STRING)::TIMESTAMP AS block_timestamp,
livequery_dev.utils.udf_hex_to_int(DATA :totalDifficulty::STRING)::INT AS total_difficulty,
ARRAY_SIZE(DATA :transactions) AS tx_count,
DATA :transactionsRoot::STRING AS transactions_root,
DATA :uncles AS uncles,
DATA :withdrawals AS withdrawals,
DATA :withdrawalsRoot::STRING AS withdrawals_root,
md5(
cast(
coalesce(
cast(block_number as TEXT),
'_dbt_utils_surrogate_key_null_'
) as TEXT
)
) AS blocks_id,
livequery_dev.utils.udf_hex_to_int(DATA: blobGasUsed::STRING)::INT AS blob_gas_used,
livequery_dev.utils.udf_hex_to_int(DATA: excessBlobGas::STRING)::INT AS excess_blob_gas,
from
raw_block_txs
),
receipts as (
select
latest_block_height,
block_number,
DATA :blockHash::STRING AS block_hash,
livequery_dev.utils.udf_hex_to_int(DATA :blockNumber::STRING)::INT AS blockNumber,
livequery_dev.utils.udf_hex_to_int(DATA :cumulativeGasUsed::STRING)::INT AS cumulative_gas_used,
livequery_dev.utils.udf_hex_to_int(DATA :effectiveGasPrice::STRING)::INT / pow(10, 9) AS effective_gas_price,
DATA :from::STRING AS from_address,
livequery_dev.utils.udf_hex_to_int(DATA :gasUsed::STRING)::INT AS gas_used,
DATA :logs AS logs,
DATA :logsBloom::STRING AS logs_bloom,
livequery_dev.utils.udf_hex_to_int(DATA :status::STRING)::INT AS status,
CASE
WHEN status = 1 THEN TRUE
ELSE FALSE
END AS tx_success,
CASE
WHEN status = 1 THEN 'SUCCESS'
ELSE 'FAIL'
END AS tx_status,
DATA :to::STRING AS to_address1,
CASE
WHEN to_address1 = '' THEN NULL
ELSE to_address1
END AS to_address,
DATA :transactionHash::STRING AS tx_hash,
livequery_dev.utils.udf_hex_to_int(DATA :transactionIndex::STRING)::INT AS POSITION,
livequery_dev.utils.udf_hex_to_int(DATA :type::STRING)::INT AS TYPE,
livequery_dev.utils.udf_hex_to_int(DATA :effectiveGasPrice::STRING)::INT AS blob_gas_price,
livequery_dev.utils.udf_hex_to_int(DATA :gasUsed::STRING)::INT AS blob_gas_used
from
raw_receipts
),
txs as (
select
A.block_number AS block_number,
A.data :blockHash::STRING AS block_hash,
livequery_dev.utils.udf_hex_to_int(A.data :blockNumber::STRING)::INT AS blockNumber,
livequery_dev.utils.udf_hex_to_int(A.data :chainId::STRING)::INT AS chain_id,
A.data :from::STRING AS from_address,
livequery_dev.utils.udf_hex_to_int(A.data :gas::STRING)::INT AS gas,
livequery_dev.utils.udf_hex_to_int(A.data :gasPrice::STRING)::INT / pow(10, 9) AS gas_price,
A.data :hash::STRING AS tx_hash,
A.data :input::STRING AS input_data,
SUBSTR(input_data, 1, 10) AS origin_function_signature,
livequery_dev.utils.udf_hex_to_int(A.data :maxFeePerGas::STRING)::INT / pow(10, 9) AS max_fee_per_gas,
livequery_dev.utils.udf_hex_to_int(
A.data :maxPriorityFeePerGas::STRING
)::INT / pow(10, 9) AS max_priority_fee_per_gas,
livequery_dev.utils.udf_hex_to_int(A.data :nonce::STRING)::INT AS nonce,
A.data :r::STRING AS r,
A.data :s::STRING AS s,
A.data :to::STRING AS to_address1,
livequery_dev.utils.udf_hex_to_int(A.data :transactionIndex::STRING)::INT AS POSITION,
A.data :type::STRING AS TYPE,
A.data :v::STRING AS v,
livequery_dev.utils.udf_hex_to_int(A.data :value::STRING) AS value_precise_raw,
value_precise_raw * power(10, -18) AS value_precise,
value_precise::FLOAT AS VALUE,
A.data :accessList AS access_list,
A.data,
A.data: blobVersionedHashes::ARRAY AS blob_versioned_hashes,
livequery_dev.utils.udf_hex_to_int(A.data: maxFeePerGas::STRING)::INT AS max_fee_per_blob_gas,
block_timestamp,
CASE
WHEN block_timestamp IS NULL
OR tx_status IS NULL THEN TRUE
ELSE FALSE
END AS is_pending,
r.gas_used,
tx_success,
tx_status,
cumulative_gas_used,
effective_gas_price,
livequery_dev.utils.udf_hex_to_int(A.data :gasPrice) * power(10, -18) * r.gas_used AS tx_fee_precise,
COALESCE(tx_fee_precise::FLOAT, 0) AS tx_fee,
r.type as tx_type,
r.blob_gas_used,
r.blob_gas_price,
from
raw_txs A
left join blocks b on b.block_number = A.block_number
left join receipts as r on r.tx_hash = A.data :hash::STRING
),
raw_traces AS (
SELECT
s.block_number,
v.index::INT AS tx_position,
v.value:result AS full_traces,
SYSDATE() AS _inserted_timestamp
FROM spine s,
LATERAL FLATTEN(input => PARSE_JSON(
livequery_dev.live.udf_api(
'https://indulgent-frosty-sanctuary.quiknode.pro/22555ab2563d38edce551aa3ab524e595d9ccba8/',
livequery_Dev.utils.udf_json_rpc_call(
'debug_traceBlockByNumber',
[livequery_dev.utils.udf_int_to_hex(s.block_number), {'tracer': 'callTracer'}]
)
):data.result
)) v
),
flatten_traces AS (
SELECT
block_number,
tx_position,
IFF(
path IN (
'result',
'result.value',
'result.type',
'result.to',
'result.input',
'result.gasUsed',
'result.gas',
'result.from',
'result.output',
'result.error',
'result.revertReason',
'gasUsed',
'gas',
'type',
'to',
'from',
'value',
'input',
'error',
'output',
'revertReason'
),
'ORIGIN',
REGEXP_REPLACE(REGEXP_REPLACE(path, '[^0-9]+', '_'), '^_|_$', '')
) AS trace_address,
_inserted_timestamp,
OBJECT_AGG(
key,
VALUE
) AS trace_json,
CASE
WHEN trace_address = 'ORIGIN' THEN NULL
WHEN POSITION(
'_' IN trace_address
) = 0 THEN 'ORIGIN'
ELSE REGEXP_REPLACE(
trace_address,
'_[0-9]+$',
'',
1,
1
)
END AS parent_trace_address,
SPLIT(
trace_address,
'_'
) AS str_array
FROM
raw_traces,
TABLE(
FLATTEN(
input => PARSE_JSON(full_traces),
recursive => TRUE
)
) f
WHERE
f.index IS NULL
AND f.key != 'calls'
AND f.path != 'result'
GROUP BY
block_number,
tx_position,
trace_address,
_inserted_timestamp
),
sub_traces AS (
SELECT
block_number,
tx_position,
parent_trace_address,
COUNT(*) AS sub_traces
FROM
flatten_traces
GROUP BY
block_number,
tx_position,
parent_trace_address
),
num_array AS (
SELECT
block_number,
tx_position,
trace_address,
ARRAY_AGG(flat_value) AS num_array
FROM
(
SELECT
block_number,
tx_position,
trace_address,
IFF(
VALUE :: STRING = 'ORIGIN',
-1,
VALUE :: INT
) AS flat_value
FROM
flatten_traces,
LATERAL FLATTEN (
input => str_array
)
)
GROUP BY
block_number,
tx_position,
trace_address
),
cleaned_traces AS (
SELECT
b.block_number,
b.tx_position,
b.trace_address,
IFNULL(
sub_traces,
0
) AS sub_traces,
num_array,
ROW_NUMBER() over (
PARTITION BY b.block_number,
b.tx_position
ORDER BY
num_array ASC
) - 1 AS trace_index,
trace_json,
b._inserted_timestamp
FROM
flatten_traces b
LEFT JOIN sub_traces s
ON b.block_number = s.block_number
AND b.tx_position = s.tx_position
AND b.trace_address = s.parent_trace_address
JOIN num_array n
ON b.block_number = n.block_number
AND b.tx_position = n.tx_position
AND b.trace_address = n.trace_address
),
final_traces AS (
SELECT
tx_position,
trace_index,
block_number,
trace_address,
trace_json :error :: STRING AS error_reason,
trace_json :from :: STRING AS from_address,
trace_json :to :: STRING AS to_address,
IFNULL(
utils.udf_hex_to_int(
trace_json :value :: STRING
),
'0'
) AS eth_value_precise_raw,
ethereum.utils.udf_decimal_adjust(
eth_value_precise_raw,
18
) AS eth_value_precise,
eth_value_precise :: FLOAT AS eth_value,
utils.udf_hex_to_int(
trace_json :gas :: STRING
) :: INT AS gas,
utils.udf_hex_to_int(
trace_json :gasUsed :: STRING
) :: INT AS gas_used,
trace_json :input :: STRING AS input,
trace_json :output :: STRING AS output,
trace_json :type :: STRING AS TYPE,
concat_ws(
'_',
TYPE,
trace_address
) AS identifier,
concat_ws(
'-',
block_number,
tx_position,
identifier
) AS _call_id,
_inserted_timestamp,
trace_json AS DATA,
sub_traces
FROM
cleaned_traces
),
new_records AS (
SELECT
f.block_number,
t.tx_hash,
t.block_timestamp,
t.tx_status,
f.tx_position,
f.trace_index,
f.from_address,
f.to_address,
f.eth_value_precise_raw,
f.eth_value_precise,
f.eth_value,
f.gas,
f.gas_used,
f.input,
f.output,
f.type,
f.identifier,
f.sub_traces,
f.error_reason,
IFF(
f.error_reason IS NULL,
'SUCCESS',
'FAIL'
) AS trace_status,
f.data,
IFF(
t.tx_hash IS NULL
OR t.block_timestamp IS NULL
OR t.tx_status IS NULL,
TRUE,
FALSE
) AS is_pending,
f._call_id,
f._inserted_timestamp
FROM
spine s,
LATERAL FLATTEN(input => block_data:result:transactions) b
WHERE
TRY_TO_NUMBER(utils.udf_hex_to_int(tx_data:value::string), 38, 0) > 0
),
eth_base AS (
final_traces f
LEFT OUTER JOIN ethereum.silver.transactions t
ON f.tx_position = t.position
AND f.block_number = t.block_number
),
traces_final AS (
SELECT
block_number,
tx_hash,
block_timestamp,
tx_status,
tx_position,
trace_index,
from_address,
to_address,
eth_value_precise_raw,
eth_value_precise,
eth_value,
gas,
gas_used,
input,
output,
TYPE,
identifier,
sub_traces,
error_reason,
trace_status,
DATA,
is_pending,
_call_id,
_inserted_timestamp
FROM
new_records
),
eth_base AS (
SELECT
tx_hash,
block_number,
block_timestamp,
identifier,
from_address,
to_address,
eth_value AS amount,
_call_id,
_inserted_timestamp,
eth_value_precise_raw AS amount_precise_raw,
eth_value_precise AS amount_precise,
tx_position,
trace_index
FROM
traces_final
WHERE
eth_value > 0
AND tx_status = 'SUCCESS'
AND trace_status = 'SUCCESS'
AND TYPE NOT IN (
'DELEGATECALL',
'STATICCALL'
)
),
tx_table AS (
SELECT
block_number,
tx_hash,
from_address AS origin_from_address,
to_address1 AS origin_to_address,
origin_function_signature
FROM
txs
WHERE
tx_hash IN (
SELECT
DISTINCT tx_hash
FROM
eth_base
)
),
native_transfers AS (
SELECT
e.tx_hash,
e.block_number,
e.block_timestamp,
e.identifier,
t.origin_from_address,
t.origin_to_address,
t.origin_function_signature,
e.from_address,
e.to_address,
e.amount,
e.amount_precise_raw,
e.amount_precise,
ROUND(
e.amount * p.price,
2
) AS amount_usd,
e._call_id,
e._inserted_timestamp,
e.tx_position,
e.trace_index,
md5(
cast(
coalesce(cast(e.tx_hash as TEXT), '_dbt_utils_surrogate_key_null_')
|| '-' || coalesce(cast(e.trace_index as TEXT), '_dbt_utils_surrogate_key_null_')
as TEXT
)
) as native_transfers_id,
SYSDATE() as inserted_timestamp,
SYSDATE() as modified_timestamp
FROM
eth_base e
JOIN tx_table t ON e.tx_hash = t.tx_hash AND e.block_number = t.block_number
LEFT JOIN ETHEREUM.price.EZ_PRICES_HOURLY p
ON DATE_TRUNC('hour', e.block_timestamp) = p.HOUR
AND p.token_address = '0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2' -- WETH address
)
SELECT
tx_hash,
block_number,
block_timestamp,
identifier,
from_address,
to_address,
eth_value AS amount,
eth_value_precise_raw AS amount_precise_raw,
eth_value_precise AS amount_precise,
tx_position,
trace_index
trace_index,
identifier AS trace_type,
origin_from_address,
origin_to_address,
origin_function_signature,
from_address AS trace_from_address,
to_address AS trace_to_address,
amount,
amount_precise_raw,
amount_precise,
amount_usd,
COALESCE(
native_transfers_id,
md5(
cast(
coalesce(cast(tx_hash as TEXT), '_dbt_utils_surrogate_key_null_')
|| '-' || coalesce(cast(trace_index as TEXT), '_dbt_utils_surrogate_key_null_')
as TEXT
)
)
) AS ez_native_transfers_id,
COALESCE(
inserted_timestamp,
'2000-01-01'
) AS inserted_timestamp,
COALESCE(
modified_timestamp,
'2000-01-01'
) AS modified_timestamp
FROM
raw_block_data
WHERE
eth_value > 0
AND tx_status = 'SUCCESS'
AND trace_status = 'SUCCESS'
AND TYPE NOT IN ('DELEGATECALL', 'STATICCALL')
),
tx_table AS (
SELECT
block_number,
block_timestamp,
tx_hash,
from_address AS origin_from_address,
to_address AS origin_to_address,
origin_function_signature
FROM
raw_block_data
WHERE
tx_hash IN (SELECT DISTINCT tx_hash FROM eth_base)
),
price_data AS (
SELECT
DATE_TRUNC('hour', e.block_timestamp) AS hour,
AVG(p.price) AS price
FROM
eth_base e
JOIN {{ blockchain }}.PRICE.EZ_PRICES_HOURLY p
ON DATE_TRUNC('hour', e.block_timestamp) = p.hour
AND p.token_address = native_token_address
GROUP BY 1
)
SELECT
A.tx_hash,
A.block_number,
A.block_timestamp,
A.tx_position,
A.trace_index,
A.identifier,
T.origin_from_address,
T.origin_to_address,
T.origin_function_signature,
A.from_address,
A.to_address,
A.amount::FLOAT,
A.amount_precise_raw::NUMBER(38,0),
A.amount_precise::FLOAT,
ROUND(A.amount * P.price, 2)::FLOAT AS amount_usd,
MD5(CONCAT(A.tx_hash, '|', COALESCE(A.trace_index::STRING, ''))) AS ez_native_transfers_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp
FROM
eth_base A
LEFT JOIN price_data P ON DATE_TRUNC('hour', A.block_timestamp) = P.hour
JOIN tx_table T ON A.tx_hash = T.tx_hash AND A.block_number = T.block_number
native_transfers
QUALIFY (ROW_NUMBER() OVER (
PARTITION BY block_number, tx_position, trace_index
ORDER BY _inserted_timestamp DESC
)) = 1
{% endmacro %}