This commit is contained in:
Austin 2025-01-22 15:47:25 -05:00
parent 90fc8a80cd
commit b8b56d2868
8 changed files with 219 additions and 66 deletions

View File

@ -1,8 +1,9 @@
{{ config (
materialized = 'view'
materialized = 'view',
tags = ['traces']
) }}
{{ streamline_external_table_FR_query_v2(
model = "evm_traces",
model = "evm_traces_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )"
) }}

View File

@ -1,8 +1,9 @@
{{ config (
materialized = 'view'
materialized = 'view',
tags = ['traces']
) }}
{{ streamline_external_table_query_v2(
model = "evm_traces",
model = "evm_traces_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )"
) }}

View File

@ -1,76 +1,143 @@
-- depends_on: {{ ref('bronze_evm__traces') }}
-- depends_on: {{ ref('bronze_evm__FR_traces') }}
{{ config(
materialized = 'incremental',
unique_key = "evm_traces_id",
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['_inserted_timestamp :: DATE', '_partition_by_block_id'],
{{ config (
materialized = "incremental",
incremental_strategy = 'delete+insert',
unique_key = "block_number",
cluster_by = ['modified_timestamp::DATE','partition_key'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)",
enabled = false,
tags = ['evm']
) }}
WITH traces AS (
SELECT
block_number,
DATA,
partition_key AS _partition_by_block_id,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze_evm__traces') }}
WHERE
_inserted_timestamp >= (
WITH base AS (
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
block_number,
partition_key,
DATA :result AS full_traces,
_inserted_timestamp
FROM
{{ ref('bronze_evm__traces') }}
WHERE DATA :result IS NOT NULL
{% if is_incremental()%}
and _inserted_timestamp >= (
SELECT
COALESCE(MAX(_inserted_timestamp), '1900-01-01') _inserted_timestamp
FROM
{{ this }}
)
{% else %}
{{ ref('bronze_evm__FR_traces') }}
)
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY block_number
ORDER BY
_inserted_timestamp DESC)) = 1
qualify(ROW_NUMBER() over (PARTITION BY block_number ORDER BY _inserted_timestamp DESC)) = 1
),
bronze_traces AS (
select
block_number,
partition_key,
index as tx_position,
value:result as full_traces,
_inserted_timestamp
from base,
lateral flatten (input=>full_traces)
),
flatten_traces AS (
SELECT
block_number,
INDEX AS array_index,
VALUE :: variant AS trace_response,
_partition_by_block_id,
_inserted_timestamp
tx_position,
partition_key,
IFF(
path IN (
'result',
'result.value',
'result.type',
'result.to',
'result.input',
'result.gasUsed',
'result.gas',
'result.from',
'result.output',
'result.error',
'result.revertReason',
'result.time',
'gasUsed',
'gas',
'type',
'to',
'from',
'value',
'input',
'error',
'output',
'time',
'revertReason'
),
'ORIGIN',
REGEXP_REPLACE(REGEXP_REPLACE(path, '[^0-9]+', '_'), '^_|_$', '')
) AS trace_address,
_inserted_timestamp,
OBJECT_AGG(
key,
VALUE
) AS trace_json,
CASE
WHEN trace_address = 'ORIGIN' THEN NULL
WHEN POSITION(
'_' IN trace_address
) = 0 THEN 'ORIGIN'
ELSE REGEXP_REPLACE(
trace_address,
'_[0-9]+$',
'',
1,
1
)
END AS parent_trace_address,
SPLIT(
trace_address,
'_'
) AS trace_address_array
FROM
traces,
LATERAL FLATTEN (
DATA :result :: variant
)
bronze_traces txs,
TABLE(
FLATTEN(
input => PARSE_JSON(
txs.full_traces
),
recursive => TRUE
)
) f
WHERE
f.index IS NULL
AND f.key != 'calls'
AND f.path != 'result'
GROUP BY
block_number,
tx_position,
partition_key,
trace_address,
_inserted_timestamp
)
SELECT
block_number,
array_index,
trace_response :from :: STRING AS from_address,
utils.udf_hex_to_int(
trace_response :gas :: STRING
) AS gas,
utils.udf_hex_to_int(
trace_response :gasUsed :: STRING
) AS gas_used,
trace_response :input :: STRING AS input,
trace_response :to :: STRING AS to_address,
trace_response :type :: STRING AS trace_type,
utils.udf_hex_to_int(
trace_response :value :: STRING
) AS VALUE,
{{ dbt_utils.generate_surrogate_key(
['block_number', 'array_index']
) }} AS evm_traces_id,
_partition_by_block_id,
tx_position,
trace_address,
parent_trace_address,
trace_address_array,
trace_json,
partition_key,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['block_number'] +
['tx_position'] +
['trace_address']
) }} AS traces_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
flatten_traces
flatten_traces qualify(ROW_NUMBER() over(PARTITION BY traces_id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,11 @@
version: 2
models:
- name: silver_evm__traces
tests:
- dbt_utils.recency:
datepart: hour
field: inserted_timestamp
interval: 2
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- traces_id

View File

@ -10,7 +10,7 @@
) }}
SELECT
block_number,
value:"BLOCK_NUMBER"::INT AS block_number,
partition_key,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(

View File

@ -0,0 +1,75 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"evm_traces_v2",
"sql_limit" :"1000000",
"producer_batch_size" :"2000",
"worker_batch_size" :"1000",
"sql_source" :"{{this.identifier}}",
"exploded_key": tojson(["result"])}
),
tags = ['streamline_history_evm']
) }}
WITH tbl AS (
SELECT
block_number
FROM
{{ ref('streamline__evm_blocks') }}
WHERE block_number IS NOT NULL
and block_number in (select distinct block_number from {{ ref('core_evm__fact_transactions') }})
EXCEPT
SELECT
block_number
FROM
{{ ref('streamline__complete_get_evm_traces') }}
),
ready_blocks AS (
SELECT
block_number
FROM
tbl
limit 5
)
SELECT
block_number,
DATE_PART(epoch_second, SYSDATE())::STRING AS request_timestamp,
'{{ invocation_id }}' AS _invocation_id,
ROUND(
block_number,
-3
) :: INT AS partition_key,
{{ target.database }}.live.udf_api(
'POST',
'{Service}',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json'
),
OBJECT_CONSTRUCT(
'id',
block_number,
'jsonrpc',
'2.0',
'method',
'debug_traceBlockByNumber',
'params',
ARRAY_CONSTRUCT(
utils.udf_int_to_hex(block_number),
OBJECT_CONSTRUCT(
'tracer', 'callTracer',
'timeout', '180s'
)
)
),
'Vault/{{ target.name }}/flow/evm/mainnet'
) AS request
FROM
ready_blocks
ORDER BY
block_number DESC
limit 1000000

View File

@ -3,11 +3,12 @@
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"evm_traces",
params ={ "external_table" :"evm_traces_v2",
"sql_limit" :"25000",
"producer_batch_size" :"5000",
"worker_batch_size" :"1000",
"sql_source" :"{{this.identifier}}" }
"sql_source" :"{{this.identifier}}",
"exploded_key": tojson(["result"])}
),
tags = ['streamline_realtime_evm']
) }}
@ -59,11 +60,6 @@ ready_blocks AS (
block_number
FROM
tbl
{# UNION ALL
SELECT
block_number
FROM
{{ ref("_missing_traces") }} #}
)
SELECT
block_number,
@ -92,7 +88,7 @@ SELECT
utils.udf_int_to_hex(block_number),
OBJECT_CONSTRUCT(
'tracer', 'callTracer',
'timeout', '30s'
'timeout', '180s'
)
)
),
@ -102,3 +98,4 @@ FROM
ready_blocks
ORDER BY
block_number DESC
limit 25000

View File

@ -130,6 +130,7 @@ sources:
- name: points_transfers
- name: minting_assets
- name: contract_abis
- name: evm_traces_v2
- name: crosschain_silver
database: crosschain