An 4376/overflow-traces (#226)

* stash

* txhash

* workflow

* prod

* drop dev
This commit is contained in:
Austin 2024-01-04 13:29:26 -05:00 committed by GitHub
parent b6b9643374
commit 28a7897706
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 526 additions and 21 deletions

View File

@ -0,0 +1,44 @@
name: dbt_run_overflow_models
run-name: dbt_run_overflow_models
on:
workflow_dispatch:
branches:
- "main"
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run -m "optimism_models,tag:overflow"

View File

@ -51,6 +51,15 @@ query-comment:
models:
+copy_grants: true
+on_schema_change: "append_new_columns"
optimism_models:
silver:
overflow:
silver__overflowed_traces:
+snowflake_warehouse: "DBT_SNOWPARK"
bronze:
overflow:
bronze__overflowed_traces:
+snowflake_warehouse: "DBT_SNOWPARK"
# In this example config, we tell dbt to build all models in the example/ directory
# as tables. These settings can be overridden in the individual model files

View File

@ -0,0 +1,79 @@
{{ config (
materialized = "view"
) }}
{% for item in range(
1,
11
) %}
SELECT
o.file_name,
f.block_number,
f.index_vals,
f.path,
f.key,
f.value_
FROM
(
SELECT
file_name,
file_url,
index_cols,
[overflowed_block, overflowed_tx] AS index_vals
FROM
(
SELECT
block_number,
POSITION,
file_name,
file_url,
index_cols,
VALUE [0] AS overflowed_block,
VALUE [1] AS overflowed_tx,
block_number = overflowed_block
AND POSITION = overflowed_tx AS missing
FROM
(
SELECT
block_number,
POSITION,
file_name,
file_url,
index_cols,
utils.udf_detect_overflowed_responses(
file_url,
index_cols
) AS index_vals
FROM
{{ ref("bronze__potential_overflowed_traces") }}
WHERE
row_no = {{ item }}
),
LATERAL FLATTEN (
input => index_vals
)
)
WHERE
missing = TRUE
) o,
TABLE(
utils.udtf_flatten_overflowed_responses(
o.file_url,
o.index_cols,
[o.index_vals]
)
) f
WHERE
NOT IS_OBJECT(
f.value_
)
AND NOT IS_ARRAY(
f.value_
)
AND NOT IS_NULL_VALUE(
f.value_
) {% if not loop.last %}
UNION ALL
{% endif %}
{% endfor %}

View File

@ -0,0 +1,67 @@
{{ config (
materialized = "view"
) }}
WITH impacted_blocks AS (
SELECT
VALUE :: INT AS block_number
FROM
(
SELECT
blocks_impacted_array
FROM
{{ ref("silver_observability__traces_completeness") }}
ORDER BY
test_timestamp DESC
LIMIT
1
), LATERAL FLATTEN (
input => blocks_impacted_array
)
),
all_txs AS (
SELECT
t.block_number,
t.position,
t.tx_hash
FROM
{{ ref("silver__transactions") }}
t
JOIN impacted_blocks USING (block_number)
),
missing_txs AS (
SELECT
DISTINCT block_number,
POSITION,
file_name
FROM
all_txs
LEFT JOIN {{ ref("core__fact_traces") }}
tr USING (
block_number,
tx_hash
)
JOIN {{ ref("streamline__complete_debug_traceBlockByNumber") }} USING (block_number)
WHERE
tr.tx_hash IS NULL
)
SELECT
block_number,
POSITION,
file_name,
build_scoped_file_url(
@streamline.bronze.external_tables,
file_name
) AS file_url,
['block_number', 'array_index'] AS index_cols,
ROW_NUMBER() over (
ORDER BY
block_number ASC,
POSITION ASC
) AS row_no
FROM
missing_txs
ORDER BY
block_number ASC,
POSITION ASC

View File

@ -44,3 +44,47 @@ SELECT
eth_value_precise
FROM
{{ ref('silver__traces') }}
UNION ALL
SELECT
tx_hash,
block_number,
block_timestamp,
from_address,
to_address,
eth_value AS VALUE,
eth_value_precise_raw AS value_precise_raw,
eth_value_precise AS value_precise,
gas,
gas_used,
input,
output,
TYPE,
identifier,
DATA,
tx_status,
sub_traces,
trace_status,
error_reason,
trace_index,
COALESCE (
traces_id,
{{ dbt_utils.generate_surrogate_key(
['tx_hash', 'trace_index']
) }}
) AS fact_traces_id,
COALESCE(
inserted_timestamp,
'2000-01-01'
) AS inserted_timestamp,
COALESCE(
modified_timestamp,
'2000-01-01'
) AS modified_timestamp,
eth_value,
eth_value_precise_raw,
eth_value_precise
FROM
{{ source(
'optimism_silver',
'overflowed_traces'
) }}

View File

@ -81,7 +81,7 @@ broken_blocks AS (
FROM
{{ ref("silver__transactions") }}
tx
LEFT JOIN {{ ref("silver__traces") }}
LEFT JOIN {{ ref("core__fact_traces") }}
tr USING (
block_number,
tx_hash

View File

@ -6,11 +6,11 @@
SELECT
*
FROM
{{ ref('silver__traces') }}
{{ ref('core__fact_traces') }}
WHERE
block_number NOT IN (
SELECT
block_number
FROM
{{ ref('silver_observability__excluded_receipt_blocks') }}
)
SELECT
block_number
FROM
{{ ref('silver_observability__excluded_receipt_blocks') }}
)

View File

@ -4,8 +4,7 @@ models:
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- BLOCK_NUMBER
- TX_POSITION
- TX_HASH
- TRACE_INDEX
columns:
- name: BLOCK_NUMBER
@ -17,8 +16,7 @@ models:
- FLOAT
- name: BLOCK_TIMESTAMP
tests:
- not_null:
where: NOT IS_PENDING
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
@ -28,8 +26,7 @@ models:
- TIMESTAMP_NTZ
- name: TX_HASH
tests:
- not_null:
where: NOT IS_PENDING
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: FROM_ADDRESS

View File

@ -13,7 +13,7 @@ WITH last_3_days AS (
SELECT
*
FROM
{{ ref('silver__traces') }}
{{ ref('core__fact_traces') }}
WHERE
block_number >= (
SELECT

View File

@ -4,8 +4,7 @@ models:
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- BLOCK_NUMBER
- TX_POSITION
- TX_HASH
- TRACE_INDEX
columns:
- name: BLOCK_NUMBER
@ -17,8 +16,7 @@ models:
- FLOAT
- name: BLOCK_TIMESTAMP
tests:
- not_null:
where: NOT IS_PENDING
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
@ -28,8 +26,7 @@ models:
- TIMESTAMP_NTZ
- name: TX_HASH
tests:
- not_null:
where: NOT IS_PENDING
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+

View File

@ -0,0 +1,267 @@
{{ config (
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = ['block_number','tx_position'],
cluster_by = 'block_timestamp::date, _inserted_timestamp::date',
tags = ['overflow'],
full_refresh = false
) }}
WITH bronze_overflowed_traces AS (
SELECT
block_number :: INT AS block_number,
index_vals [1] :: INT AS tx_position,
IFF(
path IN (
'result',
'result.value',
'result.type',
'result.to',
'result.input',
'result.gasUsed',
'result.gas',
'result.from',
'result.output',
'result.error',
'result.revertReason',
'gasUsed',
'gas',
'type',
'to',
'from',
'value',
'input',
'error',
'output',
'revertReason',
'txHash',
'result.txHash'
),
'ORIGIN',
REGEXP_REPLACE(REGEXP_REPLACE(path, '[^0-9]+', '_'), '^_|_$', '')
) AS trace_address,
SYSDATE() :: timestamp_ltz AS _inserted_timestamp,
OBJECT_AGG(
key,
value_
) AS trace_json,
CASE
WHEN trace_address = 'ORIGIN' THEN NULL
WHEN POSITION(
'_' IN trace_address
) = 0 THEN 'ORIGIN'
ELSE REGEXP_REPLACE(
trace_address,
'_[0-9]+$',
'',
1,
1
)
END AS parent_trace_address,
SPLIT(
trace_address,
'_'
) AS str_array
FROM
{{ ref("bronze__overflowed_traces") }}
GROUP BY
block_number,
tx_position,
trace_address,
_inserted_timestamp
),
sub_traces AS (
SELECT
block_number,
tx_position,
parent_trace_address,
COUNT(*) AS sub_traces
FROM
bronze_overflowed_traces
GROUP BY
block_number,
tx_position,
parent_trace_address
),
num_array AS (
SELECT
block_number,
tx_position,
trace_address,
ARRAY_AGG(flat_value) AS num_array
FROM
(
SELECT
block_number,
tx_position,
trace_address,
IFF(
VALUE :: STRING = 'ORIGIN',
-1,
VALUE :: INT
) AS flat_value
FROM
bronze_overflowed_traces,
LATERAL FLATTEN (
input => str_array
)
)
GROUP BY
block_number,
tx_position,
trace_address
),
cleaned_traces AS (
SELECT
b.block_number,
b.tx_position,
b.trace_address,
IFNULL(
sub_traces,
0
) AS sub_traces,
num_array,
ROW_NUMBER() over (
PARTITION BY b.block_number,
b.tx_position
ORDER BY
num_array ASC
) - 1 AS trace_index,
trace_json,
b._inserted_timestamp
FROM
bronze_overflowed_traces b
LEFT JOIN sub_traces s
ON b.block_number = s.block_number
AND b.tx_position = s.tx_position
AND b.trace_address = s.parent_trace_address
JOIN num_array n
ON b.block_number = n.block_number
AND b.tx_position = n.tx_position
AND b.trace_address = n.trace_address
),
final_traces AS (
SELECT
tx_position,
trace_index,
block_number,
trace_address,
trace_json :error :: STRING AS error_reason,
trace_json :from :: STRING AS from_address,
trace_json :to :: STRING AS to_address,
IFNULL(
utils.udf_hex_to_int(
trace_json :value :: STRING
),
'0'
) AS eth_value_precise_raw,
utils.udf_decimal_adjust(
eth_value_precise_raw,
18
) AS eth_value_precise,
eth_value_precise :: FLOAT AS eth_value,
utils.udf_hex_to_int(
trace_json :gas :: STRING
) :: INT AS gas,
utils.udf_hex_to_int(
trace_json :gasUsed :: STRING
) :: INT AS gas_used,
trace_json :input :: STRING AS input,
trace_json :output :: STRING AS output,
trace_json :type :: STRING AS TYPE,
concat_ws(
'_',
TYPE,
trace_address
) AS identifier,
concat_ws(
'-',
block_number,
tx_position,
identifier
) AS _call_id,
_inserted_timestamp,
trace_json AS DATA,
sub_traces
FROM
cleaned_traces
),
new_records AS (
SELECT
f.block_number,
t.tx_hash,
t.block_timestamp,
t.tx_status,
f.tx_position,
f.trace_index,
f.from_address,
f.to_address,
f.eth_value_precise_raw,
f.eth_value_precise,
f.eth_value,
f.gas,
f.gas_used,
f.input,
f.output,
f.type,
f.identifier,
f.sub_traces,
f.error_reason,
IFF(
f.error_reason IS NULL,
'SUCCESS',
'FAIL'
) AS trace_status,
f.data,
IFF(
t.tx_hash IS NULL
OR t.block_timestamp IS NULL
OR t.tx_status IS NULL,
TRUE,
FALSE
) AS is_pending,
f._call_id,
f._inserted_timestamp
FROM
final_traces f
LEFT OUTER JOIN {{ ref('silver__transactions') }}
t
ON f.tx_position = t.position
AND f.block_number = t.block_number
)
SELECT
block_number,
tx_hash,
block_timestamp,
tx_status,
tx_position,
trace_index,
from_address,
to_address,
eth_value_precise,
eth_value,
gas,
gas_used,
input,
output,
TYPE,
identifier,
sub_traces,
error_reason,
trace_status,
DATA,
is_pending,
_call_id,
_inserted_timestamp,
eth_value_precise_raw,
{{ dbt_utils.generate_surrogate_key(
['tx_hash', 'trace_index']
) }} AS traces_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
new_records qualify(ROW_NUMBER() over(PARTITION BY block_number, tx_position, trace_index
ORDER BY
_inserted_timestamp DESC, is_pending ASC)) = 1

View File

@ -60,6 +60,7 @@ sources:
schema: silver
tables:
- name: verified_abis
- name: overflowed_traces
- name: optimism_bronze_api
database: optimism
schema: bronze_api

View File

@ -6,6 +6,6 @@ packages:
- package: dbt-labs/dbt_utils
version: 1.0.0
- git: https://github.com/FlipsideCrypto/fsc-utils.git
revision: v1.13.1
revision: v1.14.1
- package: get-select/dbt_snowflake_query_tags
version: [">=2.0.0", "<3.0.0"]