STREAM-1053 Refactor fact_transactions to use reusable LiveView CTE's

This commit is contained in:
shah 2024-10-16 20:25:43 -07:00 committed by Julius Remigio
parent 1da9b610fb
commit 3594bdbda6

View File

@ -468,235 +468,72 @@ LEFT JOIN _silver_decoded_logs AS C USING (block_number, _log_id)
{% macro evm_live_view_fact_transactions(schema, blockchain, network) %}
WITH heights AS (
SELECT
{{ schema }}.udf_rpc('eth_blockNumber', []) as result,
livequery_dev.utils.udf_hex_to_int(result:result)::integer as latest_block_height,
coalesce(
block_height,
latest_block_height
) as min_height,
iff(
coalesce(to_latest, false),
latest_block_height,
min_height
) as max_height
),
spine as (
select
row_number() over (
order by
null
) -1 + coalesce(block_height, 0)::integer as block_number,
min_height,
iff(
coalesce(to_latest, false),
latest_block_height,
min_height
) as max_height,
latest_block_height
from
table(generator(ROWCOUNT => 1000)),
heights
qualify block_number between min_height and max_height
),
raw_receipts as (
SELECT
latest_block_height,
block_number,
{{ schema }}.udf_rpc(
'eth_getBlockReceipts',
[utils.udf_int_to_hex(block_number)]) AS result,
v.value as DATA
from
spine,
lateral flatten(result) v
),
raw_block_txs as (
SELECT
block_number,
{{ schema }}.udf_rpc(
'eth_getBlockByNumber',
[utils.udf_int_to_hex(block_number), true]) AS DATA
from
spine
),
raw_txs as (
SELECT
block_number,
v.value as DATA
from
raw_block_txs r,
lateral flatten(r.data:transactions) v
),
blocks as (
select
block_number,
livequery_dev.utils.udf_hex_to_int(DATA :baseFeePerGas::STRING)::INT AS base_fee_per_gas,
livequery_dev.utils.udf_hex_to_int(DATA :difficulty::STRING)::INT AS difficulty,
DATA :extraData::STRING AS extra_data,
livequery_dev.utils.udf_hex_to_int(DATA :gasLimit::STRING)::INT AS gas_limit,
livequery_dev.utils.udf_hex_to_int(DATA :gasUsed::STRING)::INT AS gas_used,
DATA :hash::STRING AS HASH,
DATA :logsBloom::STRING AS logs_bloom,
DATA :miner::STRING AS miner,
livequery_dev.utils.udf_hex_to_int(DATA :nonce::STRING)::INT AS nonce,
livequery_dev.utils.udf_hex_to_int(DATA :number::STRING)::INT AS NUMBER,
DATA :parentHash::STRING AS parent_hash,
DATA :receiptsRoot::STRING AS receipts_root,
DATA :sha3Uncles::STRING AS sha3_uncles,
livequery_dev.utils.udf_hex_to_int(DATA :size::STRING)::INT AS SIZE,
DATA :stateRoot::STRING AS state_root,
livequery_dev.utils.udf_hex_to_int(DATA :timestamp::STRING)::TIMESTAMP AS block_timestamp,
livequery_dev.utils.udf_hex_to_int(DATA :totalDifficulty::STRING)::INT AS total_difficulty,
ARRAY_SIZE(DATA :transactions) AS tx_count,
DATA :transactionsRoot::STRING AS transactions_root,
DATA :uncles AS uncles,
DATA :withdrawals AS withdrawals,
DATA :withdrawalsRoot::STRING AS withdrawals_root,
md5(
cast(
coalesce(
cast(block_number as TEXT),
'_dbt_utils_surrogate_key_null_'
) as TEXT
)
) AS blocks_id,
livequery_dev.utils.udf_hex_to_int(DATA: blobGasUsed::STRING)::INT AS blob_gas_used,
livequery_dev.utils.udf_hex_to_int(DATA: excessBlobGas::STRING)::INT AS excess_blob_gas,
from
raw_block_txs
),
receipts as (
select
latest_block_height,
block_number,
DATA :blockHash::STRING AS block_hash,
livequery_dev.utils.udf_hex_to_int(DATA :blockNumber::STRING)::INT AS blockNumber,
livequery_dev.utils.udf_hex_to_int(DATA :cumulativeGasUsed::STRING)::INT AS cumulative_gas_used,
livequery_dev.utils.udf_hex_to_int(DATA :effectiveGasPrice::STRING)::INT / pow(10, 9) AS effective_gas_price,
DATA :from::STRING AS from_address,
livequery_dev.utils.udf_hex_to_int(DATA :gasUsed::STRING)::INT AS gas_used,
DATA :logs AS logs,
DATA :logsBloom::STRING AS logs_bloom,
livequery_dev.utils.udf_hex_to_int(DATA :status::STRING)::INT AS status,
CASE
WHEN status = 1 THEN TRUE
ELSE FALSE
END AS tx_success,
CASE
WHEN status = 1 THEN 'SUCCESS'
ELSE 'FAIL'
END AS tx_status,
DATA :to::STRING AS to_address1,
CASE
WHEN to_address1 = '' THEN NULL
ELSE to_address1
END AS to_address,
DATA :transactionHash::STRING AS tx_hash,
livequery_dev.utils.udf_hex_to_int(DATA :transactionIndex::STRING)::INT AS POSITION,
livequery_dev.utils.udf_hex_to_int(DATA :type::STRING)::INT AS TYPE,
livequery_dev.utils.udf_hex_to_int(DATA :effectiveGasPrice::STRING)::INT AS blob_gas_price,
livequery_dev.utils.udf_hex_to_int(DATA :gasUsed::STRING)::INT AS blob_gas_used
from
raw_receipts
),
txs as (
select
A.block_number AS block_number,
A.data :blockHash::STRING AS block_hash,
livequery_dev.utils.udf_hex_to_int(A.data :blockNumber::STRING)::INT AS blockNumber,
livequery_dev.utils.udf_hex_to_int(A.data :chainId::STRING)::INT AS chain_id,
A.data :from::STRING AS from_address,
livequery_dev.utils.udf_hex_to_int(A.data :gas::STRING)::INT AS gas,
livequery_dev.utils.udf_hex_to_int(A.data :gasPrice::STRING)::INT / pow(10, 9) AS gas_price,
A.data :hash::STRING AS tx_hash,
A.data :input::STRING AS input_data,
SUBSTR(input_data, 1, 10) AS origin_function_signature,
livequery_dev.utils.udf_hex_to_int(A.data :maxFeePerGas::STRING)::INT / pow(10, 9) AS max_fee_per_gas,
livequery_dev.utils.udf_hex_to_int(
A.data :maxPriorityFeePerGas::STRING
)::INT / pow(10, 9) AS max_priority_fee_per_gas,
livequery_dev.utils.udf_hex_to_int(A.data :nonce::STRING)::INT AS nonce,
A.data :r::STRING AS r,
A.data :s::STRING AS s,
A.data :to::STRING AS to_address,
livequery_dev.utils.udf_hex_to_int(A.data :transactionIndex::STRING)::INT AS POSITION,
A.data :type::STRING AS TYPE,
A.data :v::STRING AS v,
livequery_dev.utils.udf_hex_to_int(A.data :value::STRING) AS value_precise_raw,
value_precise_raw * power(10, -18) AS value_precise,
value_precise::FLOAT AS VALUE,
A.data :accessList AS access_list,
A.data,
A.data: blobVersionedHashes::ARRAY AS blob_versioned_hashes,
livequery_dev.utils.udf_hex_to_int(A.data: maxFeePerGas::STRING)::INT AS max_fee_per_blob_gas,
block_timestamp,
CASE
WHEN block_timestamp IS NULL
OR tx_status IS NULL THEN TRUE
ELSE FALSE
END AS is_pending,
r.gas_used,
tx_success,
tx_status,
cumulative_gas_used,
effective_gas_price,
livequery_dev.utils.udf_hex_to_int(A.data :gasPrice) * power(10, -18) * r.gas_used AS tx_fee_precise,
COALESCE(tx_fee_precise::FLOAT, 0) AS tx_fee,
r.type as tx_type,
r.blob_gas_used,
r.blob_gas_price,
from
raw_txs A
left join blocks b on b.block_number = A.block_number
left join receipts as r on r.tx_hash = A.data :hash::STRING
)
SELECT
block_number,
block_timestamp,
block_hash,
tx_hash,
nonce,
POSITION,
origin_function_signature,
from_address,
to_address,
VALUE,
value_precise_raw,
value_precise::STRING as value_precise,
tx_fee,
tx_fee_precise::STRING as tx_fee_precise,
gas_price,
gas AS gas_limit,
gas_used,
cumulative_gas_used,
input_data,
tx_status AS status,
effective_gas_price,
max_fee_per_gas,
max_priority_fee_per_gas,
r,
s,
v,
tx_type,
chain_id,
blob_versioned_hashes,
max_fee_per_blob_gas,
blob_gas_used,
blob_gas_price,
md5(
cast(
coalesce(
cast(tx_hash as TEXT),
'_dbt_utils_surrogate_key_null_'
) as TEXT
)
) AS fact_transactions_id,
SYSDATE() inserted_timestamp,
SYSDATE() AS modified_timestamp
FROM
txs
WITH spine AS (
{{ evm_live_view_target_blocks(schema, blockchain, network) | indent(4) -}}
),
raw_receipts AS (
{{ evm_live_view_bronze_receipts(schema, 'spine') | indent(4) -}}
),
raw_block_txs AS (
{{ evm_live_view_bronze_blocks(schema, 'spine') | indent(4) -}}
),
raw_transactions AS (
{{ evm_live_view_bronze_transactions('raw_block_txs') | indent(4) -}}
),
blocks AS (
{{ evm_live_view_silver_blocks('raw_block_txs') | indent(4) -}}
),
receipts AS (
{{ evm_live_view_silver_receipts('raw_receipts') | indent(4) -}}
),
transactions AS (
{{ evm_live_view_silver_transactions('raw_transactions', 'blocks', 'receipts') | indent(4) -}}
),
SELECT
block_number,
block_timestamp,
block_hash,
tx_hash,
nonce,
POSITION,
origin_function_signature,
from_address,
to_address,
VALUE,
value_precise_raw,
value_precise::STRING as value_precise,
tx_fee,
tx_fee_precise::STRING as tx_fee_precise,
gas_price,
gas AS gas_limit,
gas_used,
cumulative_gas_used,
input_data,
tx_status AS status,
effective_gas_price,
max_fee_per_gas,
max_priority_fee_per_gas,
r,
s,
v,
tx_type,
chain_id,
blob_versioned_hashes,
max_fee_per_blob_gas,
blob_gas_used,
blob_gas_price,
md5(
cast(
coalesce(
cast(tx_hash as TEXT),
'_dbt_utils_surrogate_key_null_'
) as TEXT
)
) AS fact_transactions_id,
SYSDATE() inserted_timestamp,
SYSDATE() AS modified_timestamp
FROM
transactions
{% endmacro %}