An 5218/overflow traces/an 4956 macro traces (#205)

* stash

* fr

* source

* traces2 workflow

* fr flag
This commit is contained in:
Austin 2024-09-11 13:23:12 -04:00 committed by GitHub
parent 95a45c2ace
commit bc3c7a0de0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 472 additions and 142 deletions

View File

@ -0,0 +1,46 @@
name: dbt_run_overflowed_traces2
run-name: dbt_run_overflowed_traces2
on:
workflow_dispatch:
branches:
- "main"
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run -m "base_models,tag:overflowed_traces2" --vars '{"OVERFLOWED_TRACES":True}'

View File

@ -0,0 +1,50 @@
name: dbt_run_temp_traces2
run-name: dbt_run_temp_traces2
on:
workflow_dispatch:
schedule:
# Runs “At minute 12 past every hour.” (see https://crontab.guru)
- cron: '12 * * * *'
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod_2xl
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run -m "base_models,tag:traces_reload"

View File

@ -63,6 +63,7 @@ vars:
UPDATE_UDFS_AND_SPS: False
UPDATE_SNOWFLAKE_TAGS: True
OBSERV_FULL_TEST: False
OVERFLOWED_TRACES: False
WAIT: 0
HEAL_MODEL: False
HEAL_MODELS: []

View File

@ -58,7 +58,7 @@
)
{% endmacro %}
{% macro streamline_external_table_FR_query(
{% macro streamline_external_table_fr_query(
model,
partition_function,
partition_name,

View File

@ -0,0 +1,80 @@
{{ config (
materialized = "view",
tags = ['overflowed_traces2']
) }}
{% for item in range(
1,
11
) %}
SELECT
o.file_name,
f.block_number,
f.index_vals,
f.path,
f.key,
f.value_
FROM
(
SELECT
file_name,
file_url,
index_cols,
[overflowed_block, overflowed_tx] AS index_vals
FROM
(
SELECT
block_number,
POSITION,
file_name,
file_url,
index_cols,
VALUE [0] AS overflowed_block,
VALUE [1] AS overflowed_tx,
block_number = overflowed_block
AND POSITION = overflowed_tx AS missing
FROM
(
SELECT
block_number,
POSITION,
file_name,
file_url,
index_cols,
utils.udf_detect_overflowed_responses(
file_url,
index_cols
) AS index_vals
FROM
{{ ref("bronze__potential_overflowed_traces2") }}
WHERE
row_no = {{ item }}
),
LATERAL FLATTEN (
input => index_vals
)
)
WHERE
missing = TRUE
) o,
TABLE(
utils.udtf_flatten_overflowed_responses(
o.file_url,
o.index_cols,
[o.index_vals]
)
) f
WHERE
NOT IS_OBJECT(
f.value_
)
AND NOT IS_ARRAY(
f.value_
)
AND NOT IS_NULL_VALUE(
f.value_
) {% if not loop.last %}
UNION ALL
{% endif %}
{% endfor %}

View File

@ -0,0 +1,77 @@
{{ config (
materialized = "view",
tags = ['overflowed_traces2']
) }}
WITH impacted_blocks AS (
SELECT
blocks_impacted_array
FROM
{{ ref("silver_observability__traces_completeness") }}
ORDER BY
test_timestamp DESC
LIMIT
1
), all_missing AS (
SELECT
DISTINCT VALUE :: INT AS block_number
FROM
impacted_blocks,
LATERAL FLATTEN (
input => blocks_impacted_array
)
),
all_txs AS (
SELECT
block_number,
POSITION AS tx_position,
tx_hash
FROM
{{ ref("silver__transactions") }}
JOIN all_missing USING (block_number)
),
missing_txs AS (
SELECT
DISTINCT txs.block_number,
txs.tx_position,
file_name
FROM
all_txs txs
LEFT JOIN {{ ref("silver__traces2") }}
tr2 USING (
block_number,
tx_position
)
JOIN {{ ref("streamline__complete_debug_traceBlockByNumber") }} USING (block_number)
LEFT JOIN {{ source(
'base_silver',
'overflowed_traces2'
) }}
ot USING (
block_number,
tx_position
)
WHERE
tr2.block_number IS NULL
AND ot.block_number IS NULL
)
SELECT
block_number,
tx_position AS POSITION,
file_name,
build_scoped_file_url(
@streamline.bronze.external_tables,
file_name
) AS file_url,
['block_number', 'array_index'] AS index_cols,
ROW_NUMBER() over (
ORDER BY
block_number ASC,
POSITION ASC
) AS row_no
FROM
missing_txs
ORDER BY
block_number ASC,
POSITION ASC

View File

@ -70,7 +70,7 @@ WHERE
{{ this }}
)
{% else %}
{{ ref('bronze__streamline_FR_blocks') }}
{{ ref('bronze__streamline_fr_blocks') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY block_number

View File

@ -32,7 +32,7 @@ WHERE
{{ this }}
)
{% else %}
{{ ref('bronze__streamline_FR_confirm_blocks') }}
{{ ref('bronze__streamline_fr_confirm_blocks') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY block_number

View File

@ -0,0 +1,15 @@
{{ config (
materialized = "incremental",
incremental_strategy = 'delete+insert',
unique_key = ['block_number'],
cluster_by = "block_timestamp::date",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION",
tags = ['traces_reload'],
full_refresh = false
) }}
{{ fsc_evm.gold_traces_v1(
full_reload_start_block = 3000000,
full_reload_blocks = 1000000,
full_reload_mode = true,
uses_overflow_steps = true
) }}

View File

@ -28,7 +28,7 @@ WHERE
)
AND IS_OBJECT(DATA)
{% else %}
{{ ref('bronze__streamline_FR_receipts') }}
{{ ref('bronze__streamline_fr_receipts') }}
WHERE
IS_OBJECT(DATA)
{% endif %}

View File

@ -29,7 +29,7 @@ WHERE
)
AND DATA :result IS NOT NULL
{% else %}
{{ ref('bronze__streamline_FR_traces') }}
{{ ref('bronze__streamline_fr_traces') }}
WHERE
_partition_by_block_id <= 2300000
AND DATA :result IS NOT NULL

View File

@ -8,129 +8,7 @@
full_refresh = false,
tags = ['core','non_realtime']
) }}
WITH bronze_traces AS (
SELECT
block_number,
_partition_by_block_id AS partition_key,
VALUE :array_index :: INT AS tx_position,
DATA :result AS full_traces,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_traces') }}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
{{ this }}
)
AND DATA :result IS NOT NULL
{% else %}
{{ ref('bronze__streamline_FR_traces') }}
WHERE
_partition_by_block_id <= 2300000
AND DATA :result IS NOT NULL
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY block_number, tx_position
ORDER BY
_inserted_timestamp DESC)) = 1
),
flatten_traces AS (
SELECT
block_number,
tx_position,
partition_key,
IFF(
path IN (
'result',
'result.value',
'result.type',
'result.to',
'result.input',
'result.gasUsed',
'result.gas',
'result.from',
'result.output',
'result.error',
'result.revertReason',
'gasUsed',
'gas',
'type',
'to',
'from',
'value',
'input',
'error',
'output',
'revertReason'
),
'ORIGIN',
REGEXP_REPLACE(REGEXP_REPLACE(path, '[^0-9]+', '_'), '^_|_$', '')
) AS trace_address,
_inserted_timestamp,
OBJECT_AGG(
key,
VALUE
) AS trace_json,
CASE
WHEN trace_address = 'ORIGIN' THEN NULL
WHEN POSITION(
'_' IN trace_address
) = 0 THEN 'ORIGIN'
ELSE REGEXP_REPLACE(
trace_address,
'_[0-9]+$',
'',
1,
1
)
END AS parent_trace_address,
SPLIT(
trace_address,
'_'
) AS trace_address_array
FROM
bronze_traces txs,
TABLE(
FLATTEN(
input => PARSE_JSON(
txs.full_traces
),
recursive => TRUE
)
) f
WHERE
f.index IS NULL
AND f.key != 'calls'
AND f.path != 'result'
GROUP BY
block_number,
tx_position,
partition_key,
trace_address,
_inserted_timestamp
)
SELECT
block_number,
tx_position,
trace_address,
parent_trace_address,
trace_address_array,
trace_json,
partition_key,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['block_number', 'tx_position', 'trace_address']
) }} AS traces_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
flatten_traces qualify(ROW_NUMBER() over(PARTITION BY traces_id
ORDER BY
_inserted_timestamp DESC)) = 1
{{ fsc_evm.silver_traces_v1(
full_reload_start_block = 2300000,
full_reload_blocks = 1000000
) }}

View File

@ -27,7 +27,7 @@ WHERE
)
AND IS_OBJECT(DATA)
{% else %}
{{ ref('bronze__streamline_FR_transactions') }}
{{ ref('bronze__streamline_fr_transactions') }}
WHERE
IS_OBJECT(DATA)
{% endif %}

View File

@ -0,0 +1,59 @@
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
tags = ['observability']
) }}
WITH base AS (
SELECT
blocks_impacted_count
FROM
{{ ref('silver_observability__traces_completeness') }}
WHERE
test_timestamp > DATEADD('day', -5, CURRENT_TIMESTAMP())
ORDER BY
test_timestamp DESC
LIMIT
1), run_model AS (
SELECT
blocks_impacted_count,
github_actions.workflow_dispatches(
'FlipsideCrypto',
'base-models',
'dbt_run_overflowed_traces2.yml',
NULL
) AS run_overflow_models
FROM
base
WHERE
blocks_impacted_count > 0
)
SELECT
dummy,
COALESCE(
blocks_impacted_count,
0
) AS blocks_impacted_count,
COALESCE(
run_overflow_models,
OBJECT_CONSTRUCT(
'status',
'skipped'
)
) AS run_overflow_models,
COALESCE(
run_overflow_models2,
OBJECT_CONSTRUCT(
'status',
'skipped'
)
) AS run_overflow_models2,
SYSDATE() AS test_timestamp
FROM
(
SELECT
1 AS dummy
)
LEFT JOIN run_model
ON 1 = 1

View File

@ -0,0 +1,115 @@
-- depends_on: {{ ref('bronze__overflowed_traces2') }}
{% set warehouse = 'DBT_SNOWPARK' if var('OVERFLOWED_TRACES') else target.warehouse %}
{{ config (
materialized = "incremental",
incremental_strategy = 'delete+insert',
unique_key = ['block_number','tx_position'],
cluster_by = ['modified_timestamp::DATE','partition_key'],
tags = ['overflowed_traces2'],
full_refresh = false,
snowflake_warehouse = warehouse
) }}
{% if is_incremental() %}
WITH bronze_overflowed_traces AS (
SELECT
block_number :: INT AS block_number,
ROUND(
block_number,
-3
) AS partition_key,
index_vals [1] :: INT AS tx_position,
IFF(
path IN (
'result',
'result.value',
'result.type',
'result.to',
'result.input',
'result.gasUsed',
'result.gas',
'result.from',
'result.output',
'result.error',
'result.revertReason',
'gasUsed',
'gas',
'type',
'to',
'from',
'value',
'input',
'error',
'output',
'revertReason'
),
'ORIGIN',
REGEXP_REPLACE(REGEXP_REPLACE(path, '[^0-9]+', '_'), '^_|_$', '')
) AS trace_address,
SYSDATE() :: timestamp_ltz AS _inserted_timestamp,
OBJECT_AGG(
key,
value_
) AS trace_json,
CASE
WHEN trace_address = 'ORIGIN' THEN NULL
WHEN POSITION(
'_' IN trace_address
) = 0 THEN 'ORIGIN'
ELSE REGEXP_REPLACE(
trace_address,
'_[0-9]+$',
'',
1,
1
)
END AS parent_trace_address,
SPLIT(
trace_address,
'_'
) AS trace_address_array
FROM
{{ ref("bronze__overflowed_traces2") }}
GROUP BY
block_number,
tx_position,
trace_address,
_inserted_timestamp
)
SELECT
block_number,
tx_position,
trace_address,
parent_trace_address,
trace_address_array,
trace_json,
partition_key,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['block_number', 'tx_position', 'trace_address']
) }} AS traces_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
bronze_overflowed_traces qualify(ROW_NUMBER() over(PARTITION BY traces_id
ORDER BY
_inserted_timestamp DESC)) = 1
{% else %}
SELECT
NULL :: INT AS block_number,
NULL :: INT tx_position,
NULL :: text AS trace_address,
NULL :: text AS parent_trace_address,
NULL :: ARRAY AS trace_address_array,
NULL :: OBJECT AS trace_json,
NULL :: INT AS partition_key,
NULL :: timestamp_ltz AS _inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['block_number', 'tx_position', 'trace_address']
) }} AS traces_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
{% endif %}

View File

@ -68,3 +68,8 @@ sources:
schema: defillama
tables:
- name: dim_chains
- name: base_silver
database: "{{ 'base' if target.database == 'BASE' else 'base_dev' }}"
schema: silver
tables:
- name: overflowed_traces2

View File

@ -3,7 +3,7 @@
) }}
{% set model = this.identifier.split("_") [-1] %}
{{ streamline_external_table_FR_query(
{{ streamline_external_table_fr_query(
model,
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "_partition_by_block_id",

View File

@ -1,7 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_FR_query(
{{ streamline_external_table_fr_query(
model = "confirm_blocks",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",

View File

@ -3,7 +3,7 @@
) }}
{% set model = this.identifier.split("_") [-1] %}
{{ streamline_external_table_FR_query(
{{ streamline_external_table_fr_query(
model,
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",

View File

@ -2,7 +2,7 @@
materialized = 'view'
) }}
{{ streamline_external_table_FR_query(
{{ streamline_external_table_fr_query(
model = "debug_traceBlockByNumber",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",

View File

@ -3,7 +3,7 @@
) }}
{% set model = this.identifier.split("_") [-1] %}
{{ streamline_external_table_FR_query(
{{ streamline_external_table_fr_query(
model,
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",

View File

@ -21,7 +21,7 @@ WHERE
FROM
{{ this }})
{% else %}
{{ ref('bronze__streamline_FR_confirm_blocks') }}
{{ ref('bronze__streamline_fr_confirm_blocks') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY id

View File

@ -25,7 +25,7 @@ WHERE
{{ this }}
)
{% else %}
{{ ref('bronze__streamline_FR_traces') }}
{{ ref('bronze__streamline_fr_traces') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY id

View File

@ -25,7 +25,7 @@ WHERE
{{ this }}
)
{% else %}
{{ ref('bronze__streamline_FR_blocks') }}
{{ ref('bronze__streamline_fr_blocks') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY id

View File

@ -9,8 +9,10 @@ packages:
revision: eb33ac727af26ebc8a8cc9711d4a6ebc3790a107
- package: get-select/dbt_snowflake_query_tags
version: 2.5.0
- git: https://github.com/FlipsideCrypto/fsc-evm.git
revision: cd09f5c97aa2e09671ff5c3a60aab57b4b27e670
- package: calogica/dbt_date
version: 0.7.2
- git: https://github.com/FlipsideCrypto/livequery-models.git
revision: b024188be4e9c6bc00ed77797ebdc92d351d620e
sha1_hash: efa8844f7c3e54f84d660c43f887b9cb084dfd9f
sha1_hash: 2639021a873ca556b9399807a57d293a78f59dba

View File

@ -8,4 +8,6 @@ packages:
- git: https://github.com/FlipsideCrypto/fsc-utils.git
revision: v1.29.0
- package: get-select/dbt_snowflake_query_tags
version: [">=2.0.0", "<3.0.0"]
version: [">=2.0.0", "<3.0.0"]
- git: https://github.com/FlipsideCrypto/fsc-evm.git
revision: v1.1.0