Fix traces2/ethereum (#1002)

* add in traces fix

* add workflow
This commit is contained in:
Sam 2024-12-18 22:58:25 +08:00 committed by GitHub
parent 5637212c64
commit d197a75612
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 209 additions and 0 deletions

View File

@ -0,0 +1,45 @@
name: dbt_run_traces_fix
run-name: dbt_run_traces_fix
on:
workflow_dispatch:
schedule:
# every 30 minutes (see https://crontab.guru)
- cron: "*/30 * * * *"
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod_2xl
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run -m "ethereum_models,tag:traces_fix"

View File

@ -0,0 +1,164 @@
{{ config (
materialized = "incremental",
incremental_strategy = 'delete+insert',
unique_key = ["block_number", "tx_position", "trace_address"],
tags = ['traces_fix']
) }}
{% set batch_query %}
SELECT
MAX(next_batch_id) AS next_batch_id
FROM
(
SELECT
1 AS next_batch_id
{% if is_incremental() %}
UNION ALL
SELECT
COALESCE(MAX(batch_id), 0) + 1 AS next_batch_id
FROM
{{ this }}
{% endif %}) {% endset %}
{% if execute %}
{% set result = run_query(batch_query) %}
{{ log(
"Debug - Batch Query result: " ~ result,
info = True
) }}
{% set batch_id = result.columns [0] [0] %}
{% if batch_id > 43 %}
{{ exceptions.raise_compiler_error("Processing complete - reached max batch_id of 43") }}
{% endif %}
{% set block_size = 500000 %}
{% set block_start = 1 + (
batch_id - 1
) * block_size %}
{% set block_end = batch_id * block_size %}
{{ log(
"Processing batch_id: " ~ batch_id ~ ", blocks: " ~ block_start ~ " to " ~ block_end,
info = True
) }}
{% endif %}
WITH silver_traces AS (
SELECT
block_number,
tx_position,
trace_address,
parent_trace_address,
trace_json
FROM
{{ ref('silver__traces2') }}
WHERE
block_number BETWEEN {{ block_start }}
AND {{ block_end }}
),
errored_traces AS (
SELECT
block_number,
tx_position,
trace_address,
trace_json
FROM
silver_traces
WHERE
trace_json :error :: STRING IS NOT NULL
),
error_logic AS (
SELECT
b0.block_number,
b0.tx_position,
b0.trace_address,
b0.trace_json :error :: STRING AS error,
b1.trace_json :error :: STRING AS any_error,
b2.trace_json :error :: STRING AS origin_error
FROM
silver_traces b0
LEFT JOIN errored_traces b1
ON b0.block_number = b1.block_number
AND b0.tx_position = b1.tx_position
AND b0.trace_address RLIKE CONCAT(
'^',
b1.trace_address,
'(_[0-9]+)*$'
)
LEFT JOIN errored_traces b2
ON b0.block_number = b2.block_number
AND b0.tx_position = b2.tx_position
AND b2.trace_address = 'ORIGIN'
),
aggregated_errors AS (
SELECT
block_number,
tx_position,
trace_address,
error,
IFF(MAX(any_error) IS NULL
AND error IS NULL
AND origin_error IS NULL, TRUE, FALSE) AS trace_succeeded
FROM
error_logic
GROUP BY
block_number,
tx_position,
trace_address,
error,
origin_error),
prod AS (
SELECT
block_number,
tx_position,
tx_hash,
trace_address,
trace_succeeded AS prod_trace_succeeded
FROM
{{ ref('silver__fact_traces2') }}
WHERE
block_number BETWEEN {{ block_start }}
AND {{ block_end }}
),
final_errors AS (
SELECT
block_number,
tx_position,
trace_address,
error,
trace_succeeded,
prod_trace_succeeded
FROM
aggregated_errors
INNER JOIN prod USING (
block_number,
tx_position,
trace_address
)
WHERE
prod_trace_succeeded != trace_succeeded
UNION ALL
SELECT
NULL AS block_number,
NULL AS tx_position,
NULL AS trace_address,
NULL AS error,
NULL AS trace_succeeded,
NULL AS prod_trace_succeeded
),
batch AS (
SELECT
{{ batch_id }} AS batch_id
)
SELECT
batch_id,
block_number,
tx_position,
trace_address,
error,
trace_succeeded,
prod_trace_succeeded
FROM
batch
CROSS JOIN final_errors