This commit is contained in:
Austin 2024-06-27 10:36:21 -04:00 committed by GitHub
parent 3adc3e0ccb
commit 0c662e093a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 325 additions and 0 deletions

View File

@ -0,0 +1,44 @@
name: dbt_run_overflowed_traces2
run-name: dbt_run_overflowed_traces2
on:
workflow_dispatch:
branches:
- "main"
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run -m "optimism_models,tag:overflowed_traces2" --vars '{"OVERFLOWED_TRACES":True}'

View File

@ -0,0 +1,80 @@
{{ config (
materialized = "view",
tags = ['overflowed_traces2']
) }}
{% for item in range(
1,
11
) %}
SELECT
o.file_name,
f.block_number,
f.index_vals,
f.path,
f.key,
f.value_
FROM
(
SELECT
file_name,
file_url,
index_cols,
[overflowed_block, overflowed_tx] AS index_vals
FROM
(
SELECT
block_number,
POSITION,
file_name,
file_url,
index_cols,
VALUE [0] AS overflowed_block,
VALUE [1] AS overflowed_tx,
block_number = overflowed_block
AND POSITION = overflowed_tx AS missing
FROM
(
SELECT
block_number,
POSITION,
file_name,
file_url,
index_cols,
utils.udf_detect_overflowed_responses(
file_url,
index_cols
) AS index_vals
FROM
{{ ref("bronze__potential_overflowed_traces2") }}
WHERE
row_no = {{ item }}
),
LATERAL FLATTEN (
input => index_vals
)
)
WHERE
missing = TRUE
) o,
TABLE(
utils.udtf_flatten_overflowed_responses(
o.file_url,
o.index_cols,
[o.index_vals]
)
) f
WHERE
NOT IS_OBJECT(
f.value_
)
AND NOT IS_ARRAY(
f.value_
)
AND NOT IS_NULL_VALUE(
f.value_
) {% if not loop.last %}
UNION ALL
{% endif %}
{% endfor %}

View File

@ -0,0 +1,83 @@
{{ config (
materialized = "view",
tags = ['overflowed_traces2']
) }}
WITH impacted_blocks AS (
SELECT
blocks_impacted_array
FROM
{{ ref("silver_observability__traces_completeness") }}
WHERE
blocks_impacted_count > 0 {#
SELECT
blocks_impacted_array
FROM
{{ ref("silver_observability__traces_completeness") }}
ORDER BY
test_timestamp DESC
LIMIT
1 #}
), all_missing AS (
SELECT
DISTINCT VALUE :: INT AS block_number
FROM
impacted_blocks,
LATERAL FLATTEN (
input => blocks_impacted_array
)
),
all_txs AS (
SELECT
block_number,
POSITION AS tx_position,
tx_hash
FROM
{{ ref("silver__transactions") }}
JOIN all_missing USING (block_number)
),
missing_txs AS (
SELECT
DISTINCT txs.block_number,
txs.tx_position,
file_name
FROM
all_txs txs
LEFT JOIN {{ ref("silver__traces2") }}
tr2 USING (
block_number,
tx_position
)
JOIN {{ ref("streamline__complete_debug_traceBlockByNumber") }} USING (block_number)
LEFT JOIN {{ source(
'optimism_silver',
'overflowed_traces2'
) }}
ot USING (
block_number,
tx_position
)
WHERE
tr2.block_number IS NULL
AND ot.block_number IS NULL
)
SELECT
block_number,
tx_position AS POSITION,
file_name,
build_scoped_file_url(
@streamline.bronze.external_tables,
file_name
) AS file_url,
['block_number', 'array_index'] AS index_cols,
ROW_NUMBER() over (
ORDER BY
block_number ASC,
POSITION ASC
) AS row_no
FROM
missing_txs
ORDER BY
block_number ASC,
POSITION ASC

View File

@ -0,0 +1,117 @@
-- depends_on: {{ ref('bronze__overflowed_traces2') }}
{% set warehouse = 'DBT_SNOWPARK' if var('OVERFLOWED_TRACES') else target.warehouse %}
{{ config (
materialized = "incremental",
incremental_strategy = 'delete+insert',
unique_key = ['block_number','tx_position'],
cluster_by = ['modified_timestamp::DATE','partition_key'],
tags = ['overflowed_traces2'],
full_refresh = false,
snowflake_warehouse = warehouse
) }}
{% if is_incremental() %}
WITH bronze_overflowed_traces AS (
SELECT
block_number :: INT AS block_number,
ROUND(
block_number,
-3
) AS partition_key,
index_vals [1] :: INT AS tx_position,
IFF(
path IN (
'result',
'result.value',
'result.type',
'result.to',
'result.input',
'result.gasUsed',
'result.gas',
'result.from',
'result.output',
'result.error',
'result.revertReason',
'result.time',
'gasUsed',
'gas',
'type',
'to',
'from',
'value',
'input',
'error',
'output',
'revertReason',
'time'
),
'ORIGIN',
REGEXP_REPLACE(REGEXP_REPLACE(path, '[^0-9]+', '_'), '^_|_$', '')
) AS trace_address,
SYSDATE() :: timestamp_ltz AS _inserted_timestamp,
OBJECT_AGG(
key,
value_
) AS trace_json,
CASE
WHEN trace_address = 'ORIGIN' THEN NULL
WHEN POSITION(
'_' IN trace_address
) = 0 THEN 'ORIGIN'
ELSE REGEXP_REPLACE(
trace_address,
'_[0-9]+$',
'',
1,
1
)
END AS parent_trace_address,
SPLIT(
trace_address,
'_'
) AS trace_address_array
FROM
{{ ref("bronze__overflowed_traces2") }}
GROUP BY
block_number,
tx_position,
trace_address,
_inserted_timestamp
)
SELECT
block_number,
tx_position,
trace_address,
parent_trace_address,
trace_address_array,
trace_json,
partition_key,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['block_number', 'tx_position', 'trace_address']
) }} AS traces_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
bronze_overflowed_traces qualify(ROW_NUMBER() over(PARTITION BY traces_id
ORDER BY
_inserted_timestamp DESC)) = 1
{% else %}
SELECT
NULL :: INT AS block_number,
NULL :: INT tx_position,
NULL :: text AS trace_address,
NULL :: text AS parent_trace_address,
NULL :: ARRAY AS trace_address_array,
NULL :: OBJECT AS trace_json,
NULL :: INT AS partition_key,
NULL :: timestamp_ltz AS _inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['block_number', 'tx_position', 'trace_address']
) }} AS traces_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
{% endif %}

View File

@ -52,6 +52,7 @@ sources:
tables:
- name: verified_abis
- name: overflowed_traces
- name: overflowed_traces2
- name: optimism_bronze_api
database: optimism
schema: bronze_api