An 4682/overflow models v2 (#250)

* load into silver directly

* tags

* tags

* better deps

* heal txs

* tags

* break out logs

* traces update and is pending logic

* tags

* source

* new triggers

* receipts
This commit is contained in:
Austin 2024-04-01 17:41:36 -04:00 committed by GitHub
parent 254e036b08
commit be2d16cfc9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 984 additions and 22 deletions

View File

@ -0,0 +1,44 @@
name: dbt_run_overflow_models_v2
run-name: dbt_run_overflow_models_v2
on:
workflow_dispatch:
branches:
- "main"
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run -m "bsc_models,tag:overflowed_receipts" --vars '{"OVERFLOWED_RECEIPTS":True}'

View File

@ -74,6 +74,7 @@ vars:
UPDATE_SNOWFLAKE_TAGS: True
WAIT: 0
OBSERV_FULL_TEST: False
OVERFLOWED_RECEIPTS: False
BALANCES_START: 0
BALANCES_END: 10000000
HEAL_MODEL: False

View File

@ -0,0 +1,80 @@
{{ config (
materialized = "view",
tags = ['overflowed_receipts']
) }}
{% for item in range(
1,
21
) %}
SELECT
o.file_name,
f.block_number,
f.index_vals,
f.path,
f.key,
f.value_
FROM
(
SELECT
file_name,
file_url,
index_cols,
[overflowed_block, overflowed_tx] AS index_vals
FROM
(
SELECT
block_number,
POSITION,
file_name,
file_url,
index_cols,
VALUE [0] AS overflowed_block,
VALUE [1] AS overflowed_tx,
block_number = overflowed_block
AND POSITION = overflowed_tx AS missing
FROM
(
SELECT
block_number,
POSITION,
file_name,
file_url,
index_cols,
utils.udf_detect_overflowed_responses(
file_url,
index_cols
) AS index_vals
FROM
{{ ref("bronze__potential_overflowed_logs") }}
WHERE
row_no = {{ item }}
),
LATERAL FLATTEN (
input => index_vals
)
)
WHERE
missing = TRUE
) o,
TABLE(
utils.udtf_flatten_overflowed_responses(
o.file_url,
o.index_cols,
[o.index_vals]
)
) f
WHERE
NOT IS_OBJECT(
f.value_
)
AND NOT IS_ARRAY(
f.value_
)
AND NOT IS_NULL_VALUE(
f.value_
) {% if not loop.last %}
UNION ALL
{% endif %}
{% endfor %}

View File

@ -0,0 +1,80 @@
{{ config (
materialized = "view",
tags = ['overflowed_receipts']
) }}
{% for item in range(
1,
21
) %}
SELECT
o.file_name,
f.block_number,
f.index_vals,
f.path,
f.key,
f.value_
FROM
(
SELECT
file_name,
file_url,
index_cols,
[overflowed_block, overflowed_tx] AS index_vals
FROM
(
SELECT
block_number,
POSITION,
file_name,
file_url,
index_cols,
VALUE [0] AS overflowed_block,
VALUE [1] AS overflowed_tx,
block_number = overflowed_block
AND POSITION = overflowed_tx AS missing
FROM
(
SELECT
block_number,
POSITION,
file_name,
file_url,
index_cols,
utils.udf_detect_overflowed_responses(
file_url,
index_cols
) AS index_vals
FROM
{{ ref("bronze__potential_overflowed_receipts") }}
WHERE
row_no = {{ item }}
),
LATERAL FLATTEN (
input => index_vals
)
)
WHERE
missing = TRUE
) o,
TABLE(
utils.udtf_flatten_overflowed_responses(
o.file_url,
o.index_cols,
[o.index_vals]
)
) f
WHERE
NOT IS_OBJECT(
f.value_
)
AND NOT IS_ARRAY(
f.value_
)
AND NOT IS_NULL_VALUE(
f.value_
) {% if not loop.last %}
UNION ALL
{% endif %}
{% endfor %}

View File

@ -0,0 +1,46 @@
{{ config (
materialized = "view",
tags = ['overflowed_receipts']
) }}
WITH missing_txs AS (
SELECT
r.block_number,
r.position,
r.tx_hash,
cr.file_name
FROM
{{ ref("silver__receipts") }}
r
JOIN {{ ref("streamline__complete_receipts") }}
cr
ON r.block_number = cr.block_number
LEFT JOIN {{ ref("silver__logs") }}
l
ON r.block_number = l.block_number
AND r.tx_hash = l.tx_hash
WHERE
overflowed
AND l.tx_hash IS NULL
)
SELECT
block_number,
POSITION,
tx_hash,
file_name,
build_scoped_file_url(
@streamline.bronze.external_tables,
file_name
) AS file_url,
['block_number', 'array_index'] AS index_cols,
ROW_NUMBER() over (
ORDER BY
block_number ASC,
POSITION ASC
) AS row_no
FROM
missing_txs
ORDER BY
block_number ASC,
POSITION ASC

View File

@ -0,0 +1,68 @@
{{ config (
materialized = "view",
tags = ['overflowed_receipts']
) }}
WITH impacted_blocks AS (
SELECT
VALUE :: INT AS block_number
FROM
(
SELECT
blocks_impacted_array
FROM
{{ ref("silver_observability__receipts_completeness") }}
ORDER BY
test_timestamp DESC
LIMIT
1
), LATERAL FLATTEN (
input => blocks_impacted_array
)
),
all_txs AS (
SELECT
t.block_number,
t.position,
t.tx_hash
FROM
{{ ref("silver__transactions") }}
t
JOIN impacted_blocks USING (block_number)
),
missing_txs AS (
SELECT
DISTINCT block_number,
POSITION,
file_name
FROM
all_txs
LEFT JOIN {{ ref("silver__receipts") }}
tr USING (
block_number,
tx_hash
)
JOIN {{ ref("streamline__complete_receipts") }} USING (block_number)
WHERE
tr.tx_hash IS NULL
)
SELECT
block_number,
POSITION,
file_name,
build_scoped_file_url(
@streamline.bronze.external_tables,
file_name
) AS file_url,
['block_number', 'array_index'] AS index_cols,
ROW_NUMBER() over (
ORDER BY
block_number ASC,
POSITION ASC
) AS row_no
FROM
missing_txs
ORDER BY
block_number ASC,
POSITION ASC

View File

@ -109,22 +109,7 @@ SELECT
blocks_tested,
blocks_impacted_count,
blocks_impacted_array,
CURRENT_TIMESTAMP() AS test_timestamp,
IFF(
blocks_impacted_count > 0,
TRUE,
FALSE
) AS overflowed,
IFF(
blocks_impacted_count > 0,
github_actions.workflow_dispatches(
'FlipsideCrypto',
'bsc-models',
'dbt_run_overflow_models.yml',
NULL
) :status_code :: INT,
NULL
) AS trigger_workflow
CURRENT_TIMESTAMP() AS test_timestamp
FROM
summary_stats
JOIN impacted_blocks

View File

@ -4,7 +4,7 @@
unique_key = "block_number",
cluster_by = "block_timestamp::date, _inserted_timestamp::date",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION",
tags = ['core','non_realtime'],
tags = ['core','non_realtime','overflowed_receipts'],
full_refresh = false
) }}
@ -131,6 +131,79 @@ missing_data AS (
t.is_pending
)
{% endif %},
{% if is_incremental() and var(
'OVERFLOWED_RECEIPTS',
) %}
overflowed_logs AS (
SELECT
block_number,
block_timestamp,
tx_hash,
origin_from_address,
origin_to_address,
origin_function_signature,
tx_status,
contract_address,
block_hash,
DATA,
event_index,
event_removed,
topics,
_inserted_timestamp,
CONCAT(
tx_hash :: STRING,
'-',
event_index :: STRING
) AS _log_id,
CASE
WHEN block_timestamp IS NULL
OR origin_function_signature IS NULL THEN TRUE
ELSE FALSE
END AS is_pending
FROM
{{ source(
'bsc_silver',
'overflowed_logs'
) }}
-- source works around circular dependency
LEFT JOIN {{ ref('silver__transactions') }}
txs USING (
block_number,
tx_hash
)
),
existing_blocks AS (
SELECT
block_number,
block_timestamp,
tx_hash,
origin_from_address,
origin_to_address,
origin_function_signature,
tx_status,
contract_address,
block_hash,
DATA,
event_index,
event_removed,
topics,
_inserted_timestamp,
_log_id,
is_pending
FROM
{{ this }}
JOIN (
SELECT
DISTINCT block_number
FROM
overflowed_logs
) USING (
block_number
)
),
{% endif %}
FINAL AS (
SELECT
block_number,
@ -174,6 +247,51 @@ SELECT
FROM
missing_data
{% endif %}
{% if is_incremental() and var(
'OVERFLOWED_RECEIPTS',
) %}
UNION ALL
SELECT
block_number,
block_timestamp,
tx_hash,
origin_from_address,
origin_to_address,
origin_function_signature,
tx_status,
contract_address,
block_hash,
DATA,
event_index,
event_removed,
topics,
_inserted_timestamp,
_log_id,
is_pending
FROM
overflowed_logs
UNION ALL
SELECT
block_number,
block_timestamp,
tx_hash,
origin_from_address,
origin_to_address,
origin_function_signature,
tx_status,
contract_address,
block_hash,
DATA,
event_index,
event_removed,
topics,
_inserted_timestamp,
_log_id,
is_pending
FROM
existing_blocks
{% endif %}
)
SELECT
*,

View File

@ -5,7 +5,7 @@
unique_key = "block_number",
cluster_by = "ROUND(block_number, -3)",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(tx_hash)",
tags = ['core','non_realtime'],
tags = ['core','non_realtime','overflowed_receipts'],
full_refresh = false
) }}
@ -78,14 +78,181 @@ FINAL AS (
utils.udf_hex_to_int(
DATA :type :: STRING
) :: INT AS TYPE,
_inserted_timestamp
_inserted_timestamp,
FALSE AS overflowed
FROM
base
)
{% if is_incremental() and var(
'OVERFLOWED_RECEIPTS',
) %},
overflowed_receipts AS (
SELECT
block_number,
block_hash,
blockNumber,
cumulative_gas_used,
effective_gas_price,
from_address,
gas_used,
[] :: variant AS logs,
logs_bloom,
status,
tx_success,
tx_status,
to_address1,
to_address,
tx_hash,
POSITION,
TYPE
FROM
{{ source(
'bsc_silver',
'overflowed_receipts'
) }}
-- source works around circular dependency
),
existing_blocks AS (
SELECT
block_number,
block_hash,
blockNumber,
cumulative_gas_used,
effective_gas_price,
from_address,
gas_used,
logs,
logs_bloom,
status,
tx_success,
tx_status,
to_address1,
to_address,
tx_hash,
POSITION,
TYPE,
_inserted_timestamp
FROM
{{ this }}
INNER JOIN (
SELECT
DISTINCT block_number
FROM
overflowed_receipts
) USING(block_number)
),
final_overflowed AS (
SELECT
block_number,
block_hash,
blockNumber,
cumulative_gas_used,
effective_gas_price,
from_address,
gas_used,
logs,
logs_bloom,
status,
tx_success,
tx_status,
to_address1,
to_address,
tx_hash,
POSITION,
TYPE,
_inserted_timestamp,
FALSE AS overflowed
FROM
FINAL
UNION ALL
SELECT
block_number,
block_hash,
blockNumber,
cumulative_gas_used,
effective_gas_price,
from_address,
gas_used,
logs,
logs_bloom,
status,
tx_success,
tx_status,
to_address1,
to_address,
tx_hash,
POSITION,
TYPE,
_inserted_timestamp,
FALSE AS overflowed
FROM
existing_blocks
UNION ALL
SELECT
block_number,
block_hash,
blockNumber,
cumulative_gas_used,
effective_gas_price,
from_address,
gas_used,
logs,
logs_bloom,
status,
tx_success,
tx_status,
to_address1,
to_address,
tx_hash,
POSITION,
TYPE,
_inserted_timestamp,
TRUE AS overflowed
FROM
overflowed_receipts
INNER JOIN (
SELECT
block_number,
MAX(_inserted_timestamp) AS _inserted_timestamp
FROM
existing_blocks
GROUP BY
block_number
) USING(
block_number
)
)
{% endif %}
SELECT
*
block_number,
block_hash,
blockNumber,
cumulative_gas_used,
effective_gas_price,
from_address,
gas_used,
logs,
logs_bloom,
status,
tx_success,
tx_status,
to_address1,
to_address,
tx_hash,
POSITION,
TYPE,
_inserted_timestamp,
overflowed
FROM
{% if is_incremental() and var(
'OVERFLOWED_RECEIPTS',
) %}
final_overflowed
{% else %}
FINAL
{% endif %}
WHERE
tx_hash IS NOT NULL qualify(ROW_NUMBER() over (PARTITION BY block_number, POSITION
ORDER BY

View File

@ -302,7 +302,13 @@ missing_data AS (
t.error_reason,
t.trace_status,
t.data,
FALSE AS is_pending,
IFF(
txs.tx_hash IS NULL
OR txs.block_timestamp IS NULL
OR txs.tx_status IS NULL,
TRUE,
FALSE
) AS is_pending,
t._call_id,
GREATEST(
t._inserted_timestamp,
@ -377,6 +383,40 @@ SELECT
_inserted_timestamp
FROM
missing_data
UNION ALL
SELECT
block_number,
tx_hash,
block_timestamp,
tx_status,
tx_position,
trace_index,
from_address,
to_address,
bnb_value_precise_raw,
bnb_value_precise,
bnb_value,
gas,
gas_used,
input,
output,
TYPE,
identifier,
sub_traces,
error_reason,
trace_status,
DATA,
is_pending,
_call_id,
_inserted_timestamp
FROM
{{ this }}
INNER JOIN (
SELECT
DISTINCT block_number
FROM
missing_data
) USING (block_number)
{% endif %}
)
SELECT

View File

@ -186,7 +186,11 @@ missing_data AS (
t.value_precise,
t.value,
b.block_timestamp,
FALSE AS is_pending,
CASE
WHEN b.block_timestamp IS NULL
OR r.tx_status IS NULL THEN TRUE
ELSE FALSE
END AS is_pending,
r.gas_used,
r.tx_success,
r.tx_status,
@ -297,6 +301,48 @@ SELECT
DATA
FROM
missing_data
UNION ALL
SELECT
block_number,
block_hash,
from_address,
gas,
gas_price,
tx_hash,
input_data,
origin_function_signature,
max_fee_per_gas,
max_priority_fee_per_gas,
nonce,
r,
s,
to_address,
POSITION,
TYPE,
v,
VALUE,
value_precise_raw,
value_precise,
block_timestamp,
is_pending,
gas_used,
tx_success,
tx_status,
cumulative_gas_used,
effective_gas_price,
tx_fee,
tx_fee_precise,
tx_type,
_inserted_timestamp,
DATA
FROM
{{ this }}
JOIN (
SELECT
DISTINCT block_number
FROM
missing_data
) USING (block_number)
{% endif %}
)
SELECT

View File

@ -0,0 +1,52 @@
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
tags = ['observability']
) }}
WITH base AS (
SELECT
blocks_impacted_count
FROM
{{ ref('silver_observability__traces_completeness') }}
WHERE
test_timestamp > DATEADD('day', -5, CURRENT_TIMESTAMP())
ORDER BY
test_timestamp DESC
LIMIT
1), run_model AS (
SELECT
blocks_impacted_count,
github_actions.workflow_dispatches(
'FlipsideCrypto',
'bsc-models',
'dbt_run_overflow_models.yml',
NULL
) AS run_overflow_models
FROM
base
WHERE
blocks_impacted_count > 0
)
SELECT
dummy,
COALESCE(
blocks_impacted_count,
0
) AS blocks_impacted_count,
COALESCE(
run_overflow_models,
OBJECT_CONSTRUCT(
'status',
'skipped'
)
) AS run_overflow_models,
SYSDATE() AS test_timestamp
FROM
(
SELECT
1 AS dummy
)
LEFT JOIN run_model
ON 1 = 1

View File

@ -0,0 +1,52 @@
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
tags = ['observability']
) }}
WITH base AS (
SELECT
blocks_impacted_count
FROM
{{ ref('silver_observability__receipts_completeness') }}
WHERE
test_timestamp > DATEADD('day', -5, CURRENT_TIMESTAMP())
ORDER BY
test_timestamp DESC
LIMIT
1), run_model AS (
SELECT
blocks_impacted_count,
github_actions.workflow_dispatches(
'FlipsideCrypto',
'bsc-models',
'dbt_run_overflow_models_v2.yml',
NULL
) AS run_overflow_models
FROM
base
WHERE
blocks_impacted_count > 0
)
SELECT
dummy,
COALESCE(
blocks_impacted_count,
0
) AS blocks_impacted_count,
COALESCE(
run_overflow_models,
OBJECT_CONSTRUCT(
'status',
'skipped'
)
) AS run_overflow_models,
SYSDATE() AS test_timestamp
FROM
(
SELECT
1 AS dummy
)
LEFT JOIN run_model
ON 1 = 1

View File

@ -0,0 +1,115 @@
{{ config(
materialized = 'view',
tags = ['overflowed_receipts']
) }}
WITH base AS (
SELECT
block_number,
index_vals [1] :: INT AS tx_position,
REPLACE(
REPLACE(SPLIT(IFF(path LIKE '%[%', path, 'logs[-1]'), '.') [0] :: STRING, 'logs['),
']'
) :: INT AS id_,
SYSDATE() AS _inserted_timestamp,
OBJECT_AGG(IFNULL(key, SPLIT(path, '.') [1]), value_) AS json_data
FROM
{{ ref("bronze__overflowed_logs") }}
GROUP BY
ALL
),
receipt_info AS (
SELECT
block_number,
tx_position,
json_data :blockHash :: STRING AS block_hash,
utils.udf_hex_to_int(
json_data :blockNumber :: STRING
) :: INT AS blockNumber,
utils.udf_hex_to_int(
json_data :cumulativeGasUsed :: STRING
) :: INT AS cumulative_gas_used,
utils.udf_hex_to_int(
json_data :effectiveGasPrice :: STRING
) :: INT / pow(
10,
9
) AS effective_gas_price,
json_data :from :: STRING AS from_address,
utils.udf_hex_to_int(
json_data :gasUsed :: STRING
) :: INT AS gas_used,
json_data :logsBloom :: STRING AS logs_bloom,
utils.udf_hex_to_int(
json_data :status :: STRING
) :: INT AS status,
CASE
WHEN status = 1 THEN TRUE
ELSE FALSE
END AS tx_success,
CASE
WHEN status = 1 THEN 'SUCCESS'
ELSE 'FAIL'
END AS tx_status,
json_data :to :: STRING AS to_address1,
CASE
WHEN to_address1 = '' THEN NULL
ELSE to_address1
END AS to_address,
json_data :transactionHash :: STRING AS tx_hash,
utils.udf_hex_to_int(
json_data :type :: STRING
) :: INT AS TYPE
FROM
base
WHERE
id_ = -1
),
flat_logs AS (
SELECT
block_number,
tx_position,
id_,
json_data :address :: STRING AS contract_address,
json_data :blockHash :: STRING AS block_hash,
json_data :data :: STRING AS DATA,
utils.udf_hex_to_int(
json_data :logIndex :: STRING
) :: INT AS event_index,
json_data :removed :: BOOLEAN AS event_removed,
json_data :transactionHash :: STRING AS tx_hash,
json_data :transactionIndex :: STRING AS tx_position,
ARRAY_CONSTRUCT(
json_data :"topics[0]",
json_data :"topics[1]",
json_data :"topics[2]",
json_data :"topics[3]"
) AS topics,
_inserted_timestamp,
from_address AS origin_from_address,
to_address AS origin_to_address,
tx_status
FROM
base
JOIN receipt_info USING (
block_number,
tx_position
)
WHERE
id_ >= 0
)
SELECT
block_number,
tx_hash,
origin_from_address,
origin_to_address,
tx_status,
contract_address,
block_hash,
DATA,
event_index,
event_removed,
topics
FROM
flat_logs

View File

@ -0,0 +1,66 @@
{{ config (
materialized = 'view',
tags = ['overflowed_receipts']
) }}
WITH base AS (
SELECT
block_number,
index_vals [1] :: INT AS tx_position,
OBJECT_AGG(
key,
value_
) AS DATA
FROM
{{ ref("bronze__overflowed_receipts") }}
WHERE
path NOT LIKE '%[%'
GROUP BY
ALL
)
SELECT
block_number,
DATA :blockHash :: STRING AS block_hash,
utils.udf_hex_to_int(
DATA :blockNumber :: STRING
) :: INT AS blockNumber,
utils.udf_hex_to_int(
DATA :cumulativeGasUsed :: STRING
) :: INT AS cumulative_gas_used,
utils.udf_hex_to_int(
DATA :effectiveGasPrice :: STRING
) :: INT / pow(
10,
9
) AS effective_gas_price,
DATA :from :: STRING AS from_address,
utils.udf_hex_to_int(
DATA :gasUsed :: STRING
) :: INT AS gas_used,
DATA :logsBloom :: STRING AS logs_bloom,
utils.udf_hex_to_int(
DATA :status :: STRING
) :: INT AS status,
CASE
WHEN status = 1 THEN TRUE
ELSE FALSE
END AS tx_success,
CASE
WHEN status = 1 THEN 'SUCCESS'
ELSE 'FAIL'
END AS tx_status,
DATA :to :: STRING AS to_address1,
CASE
WHEN to_address1 = '' THEN NULL
ELSE to_address1
END AS to_address,
DATA :transactionHash :: STRING AS tx_hash,
utils.udf_hex_to_int(
DATA :transactionIndex :: STRING
) :: INT AS POSITION,
utils.udf_hex_to_int(
DATA :type :: STRING
) :: INT AS TYPE
FROM
base

View File

@ -48,6 +48,8 @@ sources:
tables:
- name: verified_abis
- name: overflowed_traces
- name: overflowed_receipts
- name: overflowed_logs
- name: bsc_bronze_api
database: bsc
schema: bronze_api