From 213acecd405f9d102d84b01b2abe13a0b9d7d0da Mon Sep 17 00:00:00 2001 From: shah Date: Mon, 14 Oct 2024 18:24:23 -0700 Subject: [PATCH] STREAM-1051 add ez_native_transfers sql --- macros/evm/evm.yaml.sql | 3 +- macros/evm/evm_live_views.sql | 683 +++++++++++++++++++++++++++++----- 2 files changed, 586 insertions(+), 100 deletions(-) diff --git a/macros/evm/evm.yaml.sql b/macros/evm/evm.yaml.sql index 4b207bd..152fc32 100644 --- a/macros/evm/evm.yaml.sql +++ b/macros/evm/evm.yaml.sql @@ -409,9 +409,8 @@ signature: - [block_height, INTEGER, The start block height to get the transfers from] - [to_latest, BOOLEAN, Whether to continue fetching transfers until the latest block or not] - - [native_token_address, STRING, The address of the native token to get the transfers of] return_type: - - "TABLE(tx_hash STRING, block_number INTEGER, block_timestamp TIMESTAMP_NTZ, tx_position INTEGER, trace_index INTEGER, identifier STRING, origin_from_address STRING, origin_to_address STRING, origin_function_signature STRING, from_address STRING, to_address STRING, amount FLOAT, amount_precise_raw NUMBER, amount_precise FLOAT, amount_usd FLOAT, ez_native_transfers_id STRING, inserted_timestamp TIMESTAMP_NTZ, modified_timestamp TIMESTAMP_NTZ)" + - "TABLE(tx_hash STRING, block_number NUMBER(38,0), block_timestamp TIMESTAMP_NTZ(9), tx_position NUMBER(38,0), trace_index NUMBER(19,0), identifier STRING, origin_from_address STRING, origin_to_address STRING, origin_function_signature STRING, from_address STRING, to_address STRING, amount FLOAT, amount_precise_raw STRING, amount_precise STRING, amount_usd FLOAT, ez_native_transfers_id STRING, inserted_timestamp TIMESTAMP_NTZ(9), modified_timestamp TIMESTAMP_NTZ(9))" options: | NOT NULL RETURNS NULL ON NULL INPUT diff --git a/macros/evm/evm_live_views.sql b/macros/evm/evm_live_views.sql index 5605f06..ff4b03b 100644 --- a/macros/evm/evm_live_views.sql +++ b/macros/evm/evm_live_views.sql @@ -560,112 +560,599 @@ AND from_address IS NOT NULL {% endmacro %} {% macro evm_live_view_ez_native_transfers(schema, blockchain, network) %} -WITH spine AS ( - {{ evm_live_view_target_blocks(blockchain, network) }} -), - -raw_block_data AS ( +WITH heights AS ( SELECT - s.block_number AS block_number, - live.udf_api( - '{service}/{Authentication}', - utils.udf_json_rpc_call( - 'eth_getBlockByNumber', - [utils.udf_int_to_hex(s.block_number), true] + livequery_dev.live.udf_api( + 'https://indulgent-frosty-sanctuary.quiknode.pro/22555ab2563d38edce551aa3ab524e595d9ccba8/', + livequery_Dev.utils.udf_json_rpc_call( + 'eth_blockNumber', + [] ) - ):data AS block_data, - b.value AS tx_data, - TO_TIMESTAMP_NTZ(utils.udf_hex_to_int(block_data:result:timestamp::string)) AS block_timestamp, - tx_data:hash::string AS tx_hash, - tx_data:from::string AS from_address, - tx_data:to_address::string AS to_address, - TRY_TO_NUMBER(utils.udf_hex_to_int(tx_data:value::string), 38, 0) / 1e18 AS eth_value, - TRY_TO_NUMBER(utils.udf_hex_to_int(tx_data:value::string), 38, 0) AS eth_value_precise_raw, - TRY_TO_NUMBER(utils.udf_hex_to_int(tx_data:value::string), 38, 0) / 1e18 AS eth_value_precise, - tx_data:input::string AS input, - utils.udf_hex_to_int(tx_data:transactionIndex::string)::INTEGER AS tx_position, - 'CALL' AS TYPE, - 'SUCCESS' AS tx_status, - 'SUCCESS' AS trace_status, - CASE - WHEN LEFT(input, 10) = '0x' THEN SUBSTRING(input, 1, 10) - ELSE NULL - END AS origin_function_signature, - 'native_transfer' AS identifier, - NULL AS trace_index + ):data AS result, + livequery_dev.utils.udf_hex_to_int(result:result)::integer as latest_block_height, + coalesce( + arg_block_height, + latest_block_height + ) as min_height, + iff( + coalesce(arg_to_latest, false), + latest_block_height, + min_height + ) as max_height + ), + spine as ( + select + row_number() over ( + order by + null + ) -1 + coalesce(arg_block_height, 0)::integer as block_number, + min_height, + iff( + coalesce(arg_to_latest, false), + latest_block_height, + min_height + ) as max_height, + latest_block_height + from + table(generator(ROWCOUNT => 500)), + heights + qualify block_number between min_height and max_height + ), + raw_receipts as ( + SELECT + latest_block_height, + block_number, + livequery_dev.live.udf_api( + 'https://indulgent-frosty-sanctuary.quiknode.pro/22555ab2563d38edce551aa3ab524e595d9ccba8/', + livequery_Dev.utils.udf_json_rpc_call( + 'eth_getBlockReceipts', + [livequery_dev.utils.udf_int_to_hex(block_number)] + ) + ):data.result AS result, + v.value as DATA + from + spine, + lateral flatten(result) v + ), + raw_block_txs as ( + SELECT + block_number, + livequery_dev.live.udf_api( + 'https://indulgent-frosty-sanctuary.quiknode.pro/22555ab2563d38edce551aa3ab524e595d9ccba8/', + livequery_Dev.utils.udf_json_rpc_call( + 'eth_getBlockByNumber', + [livequery_dev.utils.udf_int_to_hex(block_number), true] + ) + ):data.result AS DATA + from + spine + ), + raw_txs as ( + SELECT + block_number, + v.value as DATA + from + raw_block_txs r, + lateral flatten(r.data:transactions) v + ), + blocks as ( + select + block_number, + livequery_dev.utils.udf_hex_to_int(DATA :baseFeePerGas::STRING)::INT AS base_fee_per_gas, + livequery_dev.utils.udf_hex_to_int(DATA :difficulty::STRING)::INT AS difficulty, + DATA :extraData::STRING AS extra_data, + livequery_dev.utils.udf_hex_to_int(DATA :gasLimit::STRING)::INT AS gas_limit, + livequery_dev.utils.udf_hex_to_int(DATA :gasUsed::STRING)::INT AS gas_used, + DATA :hash::STRING AS HASH, + DATA :logsBloom::STRING AS logs_bloom, + DATA :miner::STRING AS miner, + livequery_dev.utils.udf_hex_to_int(DATA :nonce::STRING)::INT AS nonce, + livequery_dev.utils.udf_hex_to_int(DATA :number::STRING)::INT AS NUMBER, + DATA :parentHash::STRING AS parent_hash, + DATA :receiptsRoot::STRING AS receipts_root, + DATA :sha3Uncles::STRING AS sha3_uncles, + livequery_dev.utils.udf_hex_to_int(DATA :size::STRING)::INT AS SIZE, + DATA :stateRoot::STRING AS state_root, + livequery_dev.utils.udf_hex_to_int(DATA :timestamp::STRING)::TIMESTAMP AS block_timestamp, + livequery_dev.utils.udf_hex_to_int(DATA :totalDifficulty::STRING)::INT AS total_difficulty, + ARRAY_SIZE(DATA :transactions) AS tx_count, + DATA :transactionsRoot::STRING AS transactions_root, + DATA :uncles AS uncles, + DATA :withdrawals AS withdrawals, + DATA :withdrawalsRoot::STRING AS withdrawals_root, + md5( + cast( + coalesce( + cast(block_number as TEXT), + '_dbt_utils_surrogate_key_null_' + ) as TEXT + ) + ) AS blocks_id, + livequery_dev.utils.udf_hex_to_int(DATA: blobGasUsed::STRING)::INT AS blob_gas_used, + livequery_dev.utils.udf_hex_to_int(DATA: excessBlobGas::STRING)::INT AS excess_blob_gas, + from + raw_block_txs + ), + receipts as ( + select + latest_block_height, + block_number, + DATA :blockHash::STRING AS block_hash, + livequery_dev.utils.udf_hex_to_int(DATA :blockNumber::STRING)::INT AS blockNumber, + livequery_dev.utils.udf_hex_to_int(DATA :cumulativeGasUsed::STRING)::INT AS cumulative_gas_used, + livequery_dev.utils.udf_hex_to_int(DATA :effectiveGasPrice::STRING)::INT / pow(10, 9) AS effective_gas_price, + DATA :from::STRING AS from_address, + livequery_dev.utils.udf_hex_to_int(DATA :gasUsed::STRING)::INT AS gas_used, + DATA :logs AS logs, + DATA :logsBloom::STRING AS logs_bloom, + livequery_dev.utils.udf_hex_to_int(DATA :status::STRING)::INT AS status, + CASE + WHEN status = 1 THEN TRUE + ELSE FALSE + END AS tx_success, + CASE + WHEN status = 1 THEN 'SUCCESS' + ELSE 'FAIL' + END AS tx_status, + DATA :to::STRING AS to_address1, + CASE + WHEN to_address1 = '' THEN NULL + ELSE to_address1 + END AS to_address, + DATA :transactionHash::STRING AS tx_hash, + livequery_dev.utils.udf_hex_to_int(DATA :transactionIndex::STRING)::INT AS POSITION, + livequery_dev.utils.udf_hex_to_int(DATA :type::STRING)::INT AS TYPE, + livequery_dev.utils.udf_hex_to_int(DATA :effectiveGasPrice::STRING)::INT AS blob_gas_price, + livequery_dev.utils.udf_hex_to_int(DATA :gasUsed::STRING)::INT AS blob_gas_used + from + raw_receipts + ), + txs as ( + select + A.block_number AS block_number, + A.data :blockHash::STRING AS block_hash, + livequery_dev.utils.udf_hex_to_int(A.data :blockNumber::STRING)::INT AS blockNumber, + livequery_dev.utils.udf_hex_to_int(A.data :chainId::STRING)::INT AS chain_id, + A.data :from::STRING AS from_address, + livequery_dev.utils.udf_hex_to_int(A.data :gas::STRING)::INT AS gas, + livequery_dev.utils.udf_hex_to_int(A.data :gasPrice::STRING)::INT / pow(10, 9) AS gas_price, + A.data :hash::STRING AS tx_hash, + A.data :input::STRING AS input_data, + SUBSTR(input_data, 1, 10) AS origin_function_signature, + livequery_dev.utils.udf_hex_to_int(A.data :maxFeePerGas::STRING)::INT / pow(10, 9) AS max_fee_per_gas, + livequery_dev.utils.udf_hex_to_int( + A.data :maxPriorityFeePerGas::STRING + )::INT / pow(10, 9) AS max_priority_fee_per_gas, + livequery_dev.utils.udf_hex_to_int(A.data :nonce::STRING)::INT AS nonce, + A.data :r::STRING AS r, + A.data :s::STRING AS s, + A.data :to::STRING AS to_address1, + livequery_dev.utils.udf_hex_to_int(A.data :transactionIndex::STRING)::INT AS POSITION, + A.data :type::STRING AS TYPE, + A.data :v::STRING AS v, + livequery_dev.utils.udf_hex_to_int(A.data :value::STRING) AS value_precise_raw, + value_precise_raw * power(10, -18) AS value_precise, + value_precise::FLOAT AS VALUE, + A.data :accessList AS access_list, + A.data, + A.data: blobVersionedHashes::ARRAY AS blob_versioned_hashes, + livequery_dev.utils.udf_hex_to_int(A.data: maxFeePerGas::STRING)::INT AS max_fee_per_blob_gas, + block_timestamp, + CASE + WHEN block_timestamp IS NULL + OR tx_status IS NULL THEN TRUE + ELSE FALSE + END AS is_pending, + r.gas_used, + tx_success, + tx_status, + cumulative_gas_used, + effective_gas_price, + livequery_dev.utils.udf_hex_to_int(A.data :gasPrice) * power(10, -18) * r.gas_used AS tx_fee_precise, + COALESCE(tx_fee_precise::FLOAT, 0) AS tx_fee, + r.type as tx_type, + r.blob_gas_used, + r.blob_gas_price, + from + raw_txs A + left join blocks b on b.block_number = A.block_number + left join receipts as r on r.tx_hash = A.data :hash::STRING + ), + raw_traces AS ( + SELECT + s.block_number, + v.index::INT AS tx_position, + v.value:result AS full_traces, + SYSDATE() AS _inserted_timestamp + FROM spine s, + LATERAL FLATTEN(input => PARSE_JSON( + livequery_dev.live.udf_api( + 'https://indulgent-frosty-sanctuary.quiknode.pro/22555ab2563d38edce551aa3ab524e595d9ccba8/', + livequery_Dev.utils.udf_json_rpc_call( + 'debug_traceBlockByNumber', + [livequery_dev.utils.udf_int_to_hex(s.block_number), {'tracer': 'callTracer'}] + ) + ):data.result + )) v + ), + flatten_traces AS ( + SELECT + block_number, + tx_position, + IFF( + path IN ( + 'result', + 'result.value', + 'result.type', + 'result.to', + 'result.input', + 'result.gasUsed', + 'result.gas', + 'result.from', + 'result.output', + 'result.error', + 'result.revertReason', + 'gasUsed', + 'gas', + 'type', + 'to', + 'from', + 'value', + 'input', + 'error', + 'output', + 'revertReason' + ), + 'ORIGIN', + REGEXP_REPLACE(REGEXP_REPLACE(path, '[^0-9]+', '_'), '^_|_$', '') + ) AS trace_address, + _inserted_timestamp, + OBJECT_AGG( + key, + VALUE + ) AS trace_json, + CASE + WHEN trace_address = 'ORIGIN' THEN NULL + WHEN POSITION( + '_' IN trace_address + ) = 0 THEN 'ORIGIN' + ELSE REGEXP_REPLACE( + trace_address, + '_[0-9]+$', + '', + 1, + 1 + ) + END AS parent_trace_address, + SPLIT( + trace_address, + '_' + ) AS str_array + FROM + raw_traces, + TABLE( + FLATTEN( + input => PARSE_JSON(full_traces), + recursive => TRUE + ) + ) f + WHERE + f.index IS NULL + AND f.key != 'calls' + AND f.path != 'result' + GROUP BY + block_number, + tx_position, + trace_address, + _inserted_timestamp + ), + sub_traces AS ( + SELECT + block_number, + tx_position, + parent_trace_address, + COUNT(*) AS sub_traces + FROM + flatten_traces + GROUP BY + block_number, + tx_position, + parent_trace_address + ), + num_array AS ( + SELECT + block_number, + tx_position, + trace_address, + ARRAY_AGG(flat_value) AS num_array + FROM + ( + SELECT + block_number, + tx_position, + trace_address, + IFF( + VALUE :: STRING = 'ORIGIN', + -1, + VALUE :: INT + ) AS flat_value + FROM + flatten_traces, + LATERAL FLATTEN ( + input => str_array + ) + ) + GROUP BY + block_number, + tx_position, + trace_address + ), + cleaned_traces AS ( + SELECT + b.block_number, + b.tx_position, + b.trace_address, + IFNULL( + sub_traces, + 0 + ) AS sub_traces, + num_array, + ROW_NUMBER() over ( + PARTITION BY b.block_number, + b.tx_position + ORDER BY + num_array ASC + ) - 1 AS trace_index, + trace_json, + b._inserted_timestamp + FROM + flatten_traces b + LEFT JOIN sub_traces s + ON b.block_number = s.block_number + AND b.tx_position = s.tx_position + AND b.trace_address = s.parent_trace_address + JOIN num_array n + ON b.block_number = n.block_number + AND b.tx_position = n.tx_position + AND b.trace_address = n.trace_address + ), + final_traces AS ( + SELECT + tx_position, + trace_index, + block_number, + trace_address, + trace_json :error :: STRING AS error_reason, + trace_json :from :: STRING AS from_address, + trace_json :to :: STRING AS to_address, + IFNULL( + utils.udf_hex_to_int( + trace_json :value :: STRING + ), + '0' + ) AS eth_value_precise_raw, + ethereum.utils.udf_decimal_adjust( + eth_value_precise_raw, + 18 + ) AS eth_value_precise, + eth_value_precise :: FLOAT AS eth_value, + utils.udf_hex_to_int( + trace_json :gas :: STRING + ) :: INT AS gas, + utils.udf_hex_to_int( + trace_json :gasUsed :: STRING + ) :: INT AS gas_used, + trace_json :input :: STRING AS input, + trace_json :output :: STRING AS output, + trace_json :type :: STRING AS TYPE, + concat_ws( + '_', + TYPE, + trace_address + ) AS identifier, + concat_ws( + '-', + block_number, + tx_position, + identifier + ) AS _call_id, + _inserted_timestamp, + trace_json AS DATA, + sub_traces + FROM + cleaned_traces + ), + new_records AS ( + SELECT + f.block_number, + t.tx_hash, + t.block_timestamp, + t.tx_status, + f.tx_position, + f.trace_index, + f.from_address, + f.to_address, + f.eth_value_precise_raw, + f.eth_value_precise, + f.eth_value, + f.gas, + f.gas_used, + f.input, + f.output, + f.type, + f.identifier, + f.sub_traces, + f.error_reason, + IFF( + f.error_reason IS NULL, + 'SUCCESS', + 'FAIL' + ) AS trace_status, + f.data, + IFF( + t.tx_hash IS NULL + OR t.block_timestamp IS NULL + OR t.tx_status IS NULL, + TRUE, + FALSE + ) AS is_pending, + f._call_id, + f._inserted_timestamp FROM - spine s, - LATERAL FLATTEN(input => block_data:result:transactions) b - WHERE - TRY_TO_NUMBER(utils.udf_hex_to_int(tx_data:value::string), 38, 0) > 0 -), -eth_base AS ( + final_traces f + LEFT OUTER JOIN ethereum.silver.transactions t + ON f.tx_position = t.position + AND f.block_number = t.block_number + ), + traces_final AS ( + SELECT + block_number, + tx_hash, + block_timestamp, + tx_status, + tx_position, + trace_index, + from_address, + to_address, + eth_value_precise_raw, + eth_value_precise, + eth_value, + gas, + gas_used, + input, + output, + TYPE, + identifier, + sub_traces, + error_reason, + trace_status, + DATA, + is_pending, + _call_id, + _inserted_timestamp + FROM + new_records + ), + eth_base AS ( + SELECT + tx_hash, + block_number, + block_timestamp, + identifier, + from_address, + to_address, + eth_value AS amount, + _call_id, + _inserted_timestamp, + eth_value_precise_raw AS amount_precise_raw, + eth_value_precise AS amount_precise, + tx_position, + trace_index + FROM + traces_final + WHERE + eth_value > 0 + AND tx_status = 'SUCCESS' + AND trace_status = 'SUCCESS' + AND TYPE NOT IN ( + 'DELEGATECALL', + 'STATICCALL' + ) + ), + tx_table AS ( + SELECT + block_number, + tx_hash, + from_address AS origin_from_address, + to_address1 AS origin_to_address, + origin_function_signature + FROM + txs + WHERE + tx_hash IN ( + SELECT + DISTINCT tx_hash + FROM + eth_base + ) + ), + native_transfers AS ( + SELECT + e.tx_hash, + e.block_number, + e.block_timestamp, + e.identifier, + t.origin_from_address, + t.origin_to_address, + t.origin_function_signature, + e.from_address, + e.to_address, + e.amount, + e.amount_precise_raw, + e.amount_precise, + ROUND( + e.amount * p.price, + 2 + ) AS amount_usd, + e._call_id, + e._inserted_timestamp, + e.tx_position, + e.trace_index, + md5( + cast( + coalesce(cast(e.tx_hash as TEXT), '_dbt_utils_surrogate_key_null_') + || '-' || coalesce(cast(e.trace_index as TEXT), '_dbt_utils_surrogate_key_null_') + as TEXT + ) + ) as native_transfers_id, + SYSDATE() as inserted_timestamp, + SYSDATE() as modified_timestamp + FROM + eth_base e + JOIN tx_table t ON e.tx_hash = t.tx_hash AND e.block_number = t.block_number + LEFT JOIN ETHEREUM.price.EZ_PRICES_HOURLY p + ON DATE_TRUNC('hour', e.block_timestamp) = p.HOUR + AND p.token_address = '0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2' -- WETH address + ) SELECT tx_hash, block_number, block_timestamp, - identifier, - from_address, - to_address, - eth_value AS amount, - eth_value_precise_raw AS amount_precise_raw, - eth_value_precise AS amount_precise, tx_position, - trace_index + trace_index, + identifier AS trace_type, + origin_from_address, + origin_to_address, + origin_function_signature, + from_address AS trace_from_address, + to_address AS trace_to_address, + amount, + amount_precise_raw, + amount_precise, + amount_usd, + COALESCE( + native_transfers_id, + md5( + cast( + coalesce(cast(tx_hash as TEXT), '_dbt_utils_surrogate_key_null_') + || '-' || coalesce(cast(trace_index as TEXT), '_dbt_utils_surrogate_key_null_') + as TEXT + ) + ) + ) AS ez_native_transfers_id, + COALESCE( + inserted_timestamp, + '2000-01-01' + ) AS inserted_timestamp, + COALESCE( + modified_timestamp, + '2000-01-01' + ) AS modified_timestamp FROM - raw_block_data - WHERE - eth_value > 0 - AND tx_status = 'SUCCESS' - AND trace_status = 'SUCCESS' - AND TYPE NOT IN ('DELEGATECALL', 'STATICCALL') -), -tx_table AS ( - SELECT - block_number, - block_timestamp, - tx_hash, - from_address AS origin_from_address, - to_address AS origin_to_address, - origin_function_signature - FROM - raw_block_data - WHERE - tx_hash IN (SELECT DISTINCT tx_hash FROM eth_base) -), -price_data AS ( - SELECT - DATE_TRUNC('hour', e.block_timestamp) AS hour, - AVG(p.price) AS price - FROM - eth_base e - JOIN {{ blockchain }}.PRICE.EZ_PRICES_HOURLY p - ON DATE_TRUNC('hour', e.block_timestamp) = p.hour - AND p.token_address = native_token_address - GROUP BY 1 -) -SELECT - A.tx_hash, - A.block_number, - A.block_timestamp, - A.tx_position, - A.trace_index, - A.identifier, - T.origin_from_address, - T.origin_to_address, - T.origin_function_signature, - A.from_address, - A.to_address, - A.amount::FLOAT, - A.amount_precise_raw::NUMBER(38,0), - A.amount_precise::FLOAT, - ROUND(A.amount * P.price, 2)::FLOAT AS amount_usd, - MD5(CONCAT(A.tx_hash, '|', COALESCE(A.trace_index::STRING, ''))) AS ez_native_transfers_id, - SYSDATE() AS inserted_timestamp, - SYSDATE() AS modified_timestamp -FROM - eth_base A - LEFT JOIN price_data P ON DATE_TRUNC('hour', A.block_timestamp) = P.hour - JOIN tx_table T ON A.tx_hash = T.tx_hash AND A.block_number = T.block_number + native_transfers + QUALIFY (ROW_NUMBER() OVER ( + PARTITION BY block_number, tx_position, trace_index + ORDER BY _inserted_timestamp DESC + )) = 1 {% endmacro %}