This commit is contained in:
Austin 2024-05-28 14:07:53 -04:00 committed by GitHub
parent bda66f87be
commit 57410d3dfa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 634 additions and 1 deletions

View File

@ -0,0 +1,44 @@
name: dbt_run_overflowed_traces
run-name: dbt_run_overflowed_traces
on:
workflow_dispatch:
branches:
- "main"
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run -m "polygon_models,tag:overflowed_traces" --vars '{"OVERFLOWED_TRACES":True}'

View File

@ -64,6 +64,8 @@ vars:
UPDATE_SNOWFLAKE_TAGS: True
WAIT: 0
OBSERV_FULL_TEST: False
OVERFLOWED_RECEIPTS: False
OVERFLOWED_TRACES: False
BALANCES_START: 0
BALANCES_END: 15000000
HEAL_MODEL: False

View File

@ -0,0 +1,80 @@
{{ config (
materialized = "view",
tags = ['overflowed_traces']
) }}
{% for item in range(
1,
11
) %}
SELECT
o.file_name,
f.block_number,
f.index_vals,
f.path,
f.key,
f.value_
FROM
(
SELECT
file_name,
file_url,
index_cols,
[overflowed_block, overflowed_tx] AS index_vals
FROM
(
SELECT
block_number,
POSITION,
file_name,
file_url,
index_cols,
VALUE [0] AS overflowed_block,
VALUE [1] AS overflowed_tx,
block_number = overflowed_block
AND POSITION = overflowed_tx AS missing
FROM
(
SELECT
block_number,
POSITION,
file_name,
file_url,
index_cols,
utils.udf_detect_overflowed_responses(
file_url,
index_cols
) AS index_vals
FROM
{{ ref("bronze__potential_overflowed_traces") }}
WHERE
row_no = {{ item }}
),
LATERAL FLATTEN (
input => index_vals
)
)
WHERE
missing = TRUE
) o,
TABLE(
utils.udtf_flatten_overflowed_responses(
o.file_url,
o.index_cols,
[o.index_vals]
)
) f
WHERE
NOT IS_OBJECT(
f.value_
)
AND NOT IS_ARRAY(
f.value_
)
AND NOT IS_NULL_VALUE(
f.value_
) {% if not loop.last %}
UNION ALL
{% endif %}
{% endfor %}

View File

@ -0,0 +1,139 @@
{{ config (
materialized = "view",
tags = ['overflowed_traces']
) }}
WITH impacted_blocks AS (
SELECT
51073769 AS block_number
UNION
SELECT
50888353
UNION
SELECT
51103282
UNION
SELECT
50888044
UNION
SELECT
51105228
UNION
SELECT
51073794
UNION
SELECT
51102645
UNION
SELECT
50020883
UNION
SELECT
51073411
UNION
SELECT
51073884
UNION
SELECT
51104249
UNION
SELECT
51073379
UNION
SELECT
51105014
UNION
SELECT
51073239
UNION
SELECT
51096591
UNION
SELECT
50890486
UNION
SELECT
51055050
UNION
SELECT
50020889
UNION
SELECT
51103946
UNION
SELECT
51068789
UNION
SELECT
50891139
UNION
SELECT
51073292
UNION
SELECT
51073493
UNION
SELECT
50891095 {# remove after filling
SELECT
VALUE :: INT AS block_number
FROM
(
SELECT
blocks_impacted_array
FROM
{{ ref("silver_observability__traces_completeness") }}
ORDER BY
test_timestamp DESC
LIMIT
1
), LATERAL FLATTEN (
input => blocks_impacted_array
) #}
),
all_txs AS (
SELECT
t.block_number,
t.position,
t.tx_hash
FROM
{{ ref("silver__transactions") }}
t
JOIN impacted_blocks USING (block_number)
),
missing_txs AS (
SELECT
DISTINCT block_number,
POSITION,
file_name
FROM
all_txs
LEFT JOIN {{ ref("silver__traces") }}
tr USING (
block_number,
tx_hash
)
JOIN {{ ref("streamline__complete_debug_traceBlockByNumber") }} USING (block_number)
WHERE
tr.tx_hash IS NULL
)
SELECT
block_number,
POSITION,
file_name,
build_scoped_file_url(
@streamline.bronze.external_tables,
file_name
) AS file_url,
['block_number', 'array_index'] AS index_cols,
ROW_NUMBER() over (
ORDER BY
block_number ASC,
POSITION ASC
) AS row_no
FROM
missing_txs
ORDER BY
block_number ASC,
POSITION ASC

View File

@ -0,0 +1,183 @@
{{ config (
materialized = 'view',
tags = ['overflowed_traces']
) }}
WITH bronze_overflowed_traces AS (
SELECT
block_number :: INT AS block_number,
index_vals [1] :: INT AS tx_position,
IFF(
path IN (
'result',
'result.value',
'result.type',
'result.to',
'result.input',
'result.gasUsed',
'result.gas',
'result.from',
'result.output',
'result.error',
'result.revertReason',
'gasUsed',
'gas',
'type',
'to',
'from',
'value',
'input',
'error',
'output',
'revertReason',
'txHash',
'result.txHash'
),
'ORIGIN',
REGEXP_REPLACE(REGEXP_REPLACE(path, '[^0-9]+', '_'), '^_|_$', '')
) AS trace_address,
SYSDATE() :: timestamp_ltz AS _inserted_timestamp,
OBJECT_AGG(
key,
value_
) AS trace_json,
CASE
WHEN trace_address = 'ORIGIN' THEN NULL
WHEN POSITION(
'_' IN trace_address
) = 0 THEN 'ORIGIN'
ELSE REGEXP_REPLACE(
trace_address,
'_[0-9]+$',
'',
1,
1
)
END AS parent_trace_address,
SPLIT(
trace_address,
'_'
) AS str_array
FROM
{{ ref("bronze__overflowed_traces") }}
GROUP BY
block_number,
tx_position,
trace_address,
_inserted_timestamp
),
sub_traces AS (
SELECT
block_number,
tx_position,
parent_trace_address,
COUNT(*) AS sub_traces
FROM
bronze_overflowed_traces
GROUP BY
block_number,
tx_position,
parent_trace_address
),
num_array AS (
SELECT
block_number,
tx_position,
trace_address,
ARRAY_AGG(flat_value) AS num_array
FROM
(
SELECT
block_number,
tx_position,
trace_address,
IFF(
VALUE :: STRING = 'ORIGIN',
-1,
VALUE :: INT
) AS flat_value
FROM
bronze_overflowed_traces,
LATERAL FLATTEN (
input => str_array
)
)
GROUP BY
block_number,
tx_position,
trace_address
),
cleaned_traces AS (
SELECT
b.block_number,
b.tx_position,
b.trace_address,
IFNULL(
sub_traces,
0
) AS sub_traces,
num_array,
ROW_NUMBER() over (
PARTITION BY b.block_number,
b.tx_position
ORDER BY
num_array ASC
) - 1 AS trace_index,
trace_json,
b._inserted_timestamp
FROM
bronze_overflowed_traces b
LEFT JOIN sub_traces s
ON b.block_number = s.block_number
AND b.tx_position = s.tx_position
AND b.trace_address = s.parent_trace_address
JOIN num_array n
ON b.block_number = n.block_number
AND b.tx_position = n.tx_position
AND b.trace_address = n.trace_address
)
SELECT
tx_position,
trace_index,
block_number,
trace_address,
trace_json :error :: STRING AS error_reason,
trace_json :from :: STRING AS from_address,
trace_json :to :: STRING AS to_address,
IFNULL(
utils.udf_hex_to_int(
trace_json :value :: STRING
),
'0'
) AS matic_value_precise_raw,
utils.udf_decimal_adjust(
matic_value_precise_raw,
18
) AS matic_value_precise,
matic_value_precise :: FLOAT AS matic_value,
utils.udf_hex_to_int(
trace_json :gas :: STRING
) :: INT AS gas,
utils.udf_hex_to_int(
trace_json :gasUsed :: STRING
) :: INT AS gas_used,
trace_json :input :: STRING AS input,
trace_json :output :: STRING AS output,
trace_json :type :: STRING AS TYPE,
concat_ws(
'_',
TYPE,
trace_address
) AS identifier,
concat_ws(
'-',
block_number,
tx_position,
identifier
) AS _call_id,
_inserted_timestamp,
trace_json AS DATA,
sub_traces
FROM
cleaned_traces

View File

@ -0,0 +1,52 @@
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
tags = ['observability']
) }}
WITH base AS (
SELECT
blocks_impacted_count
FROM
{{ ref('silver_observability__traces_completeness') }}
WHERE
test_timestamp > DATEADD('day', -5, CURRENT_TIMESTAMP())
ORDER BY
test_timestamp DESC
LIMIT
1), run_model AS (
SELECT
blocks_impacted_count,
github_actions.workflow_dispatches(
'FlipsideCrypto',
'polygon-models',
'dbt_run_overflowed_traces.yml',
NULL
) AS run_overflow_models
FROM
base
WHERE
blocks_impacted_count > 0
)
SELECT
dummy,
COALESCE(
blocks_impacted_count,
0
) AS blocks_impacted_count,
COALESCE(
run_overflow_models,
OBJECT_CONSTRUCT(
'status',
'skipped'
)
) AS run_overflow_models,
SYSDATE() AS test_timestamp
FROM
(
SELECT
1 AS dummy
)
LEFT JOIN run_model
ON 1 = 1

View File

@ -1,4 +1,5 @@
-- depends_on: {{ ref('bronze__streamline_traces') }}
{% set warehouse = 'DBT_SNOWPARK' if var('OVERFLOWED_TRACES') else target.warehouse %}
{{ config (
materialized = "incremental",
incremental_strategy = 'delete+insert',
@ -6,7 +7,8 @@
cluster_by = "block_timestamp::date, _inserted_timestamp::date",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION",
full_refresh = false,
tags = ['non_realtime']
tags = ['non_realtime', 'overflowed_traces'],
snowflake_warehouse = warehouse
) }}
WITH bronze_traces AS (
@ -321,6 +323,59 @@ missing_data AS (
t.is_pending
)
{% endif %},
{% if is_incremental() and var(
'OVERFLOWED_TRACES',
) %}
overflowed_traces AS (
SELECT
t.block_number,
txs.tx_hash,
txs.block_timestamp,
txs.tx_status,
t.tx_position,
t.trace_index,
t.from_address,
t.to_address,
t.matic_value_precise_raw,
t.matic_value_precise,
t.matic_value,
t.gas,
t.gas_used,
t.input,
t.output,
t.type,
t.identifier,
t.sub_traces,
t.error_reason,
IFF(
t.error_reason IS NULL,
'SUCCESS',
'FAIL'
) AS trace_status,
t.data,
IFF(
txs.tx_hash IS NULL
OR txs.block_timestamp IS NULL
OR txs.tx_status IS NULL,
TRUE,
FALSE
) AS is_pending,
t._call_id,
txs._inserted_timestamp AS _inserted_timestamp
FROM
{{ source(
'polygon_silver',
'overflowed_traces'
) }}
t
LEFT JOIN {{ ref('silver__transactions') }}
txs
ON t.tx_position = txs.position
AND t.block_number = txs.block_number
),
{% endif %}
FINAL AS (
SELECT
block_number,
@ -379,6 +434,83 @@ SELECT
_inserted_timestamp
FROM
missing_data
UNION ALL
SELECT
block_number,
tx_hash,
block_timestamp,
tx_status,
tx_position,
trace_index,
from_address,
to_address,
matic_value_precise_raw,
matic_value_precise,
matic_value,
gas,
gas_used,
input,
output,
TYPE,
identifier,
sub_traces,
error_reason,
trace_status,
DATA,
is_pending,
_call_id,
_inserted_timestamp
FROM
{{ this }}
INNER JOIN (
SELECT
DISTINCT block_number
FROM
missing_data
{% if is_incremental() and var(
'OVERFLOWED_TRACES',
) %}
UNION
SELECT
DISTINCT block_number
FROM
overflowed_traces
{% endif %}
) USING (block_number)
{% endif %}
{% if is_incremental() and var(
'OVERFLOWED_TRACES',
) %}
UNION ALL
SELECT
block_number,
tx_hash,
block_timestamp,
tx_status,
tx_position,
trace_index,
from_address,
to_address,
matic_value_precise_raw,
matic_value_precise,
matic_value,
gas,
gas_used,
input,
output,
TYPE,
identifier,
sub_traces,
error_reason,
trace_status,
DATA,
is_pending,
_call_id,
_inserted_timestamp
FROM
overflowed_traces
{% endif %}
)
SELECT

View File

@ -59,6 +59,7 @@ sources:
schema: silver
tables:
- name: verified_abis
- name: overflowed_traces
- name: polygon_bronze_api
database: polygon
schema: bronze_api