AN5386/new-decoded-traces & traces history macro (#976)

* add

* traces

* add macros

* delete comments

* delete test

* add improvement

* change to gold traces

* delete

* packagelock
This commit is contained in:
Sam 2024-11-13 23:34:36 +08:00 committed by GitHub
parent 896c983fb9
commit d10f34da13
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 557 additions and 442 deletions

View File

@ -1,12 +1,12 @@
name: dbt_run_streamline_traces_decoder_history_range_0
run-name: dbt_run_streamline_traces_decoder_history_range_0
name: dbt_run_streamline_decoded_traces_history
run-name: dbt_run_streamline_decoded_traces_history
on:
workflow_dispatch:
schedule:
# Runs “At 01:18.” (see https://crontab.guru)
- cron: '18 1 * * *'
# Runs “At 22:05 every Saturday.” (see https://crontab.guru)
- cron: '5 22 * * 6'
env:
DBT_PROFILES_DIR: ./
@ -22,12 +22,10 @@ env:
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
environment:
name: workflow_prod
steps:
@ -42,6 +40,11 @@ jobs:
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
- name: Update complete table
run: |
dbt run --threads 6 --vars '{"STREAMLINE_INVOKE_STREAMS":True,"WAIT":150,"row_limit":10000000}' -m "ethereum_models,tag:streamline_decoded_traces_history_range_0" "ethereum_models,tag:streamline_decoded_traces_complete"
dbt run -m "fsc_evm,tag:streamline_decoded_traces_complete"
- name: Decode historical traces
run: |
dbt run-operation decoded_traces_history --args '{"backfill_mode": false}' --vars '{"STREAMLINE_INVOKE_STREAMS":True}'

View File

@ -1,47 +0,0 @@
name: dbt_run_streamline_traces_decoder_history_range_1
run-name: dbt_run_streamline_traces_decoder_history_range_1
on:
workflow_dispatch:
schedule:
# Runs “At 07:18.” (see https://crontab.guru)
- cron: '18 7 * * *'
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run --threads 6 --vars '{"STREAMLINE_INVOKE_STREAMS":True,"WAIT":150,"row_limit":10000000}' -m "ethereum_models,tag:streamline_decoded_traces_history_range_1" "ethereum_models,tag:streamline_decoded_traces_complete"

View File

@ -1,47 +0,0 @@
name: dbt_run_streamline_traces_decoder_history_range_2
run-name: dbt_run_streamline_traces_decoder_history_range_2
on:
workflow_dispatch:
schedule:
# Runs “At 1:18.” (see https://crontab.guru)
- cron: '18 13 * * *'
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run --threads 6 --vars '{"STREAMLINE_INVOKE_STREAMS":True,"WAIT":150,"row_limit":10000000}' -m "ethereum_models,tag:streamline_decoded_traces_history_range_2" "ethereum_models,tag:streamline_decoded_traces_complete"

View File

@ -1,47 +0,0 @@
name: dbt_run_streamline_traces_decoder_history_range_3
run-name: dbt_run_streamline_traces_decoder_history_range_3
on:
workflow_dispatch:
schedule:
# Runs “At 7:18.” (see https://crontab.guru)
- cron: '18 19 * * *'
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run --threads 6 --vars '{"STREAMLINE_INVOKE_STREAMS":True,"WAIT":150,"row_limit":10000000}' -m "ethereum_models,tag:streamline_decoded_traces_history_range_3" "ethereum_models,tag:streamline_decoded_traces_complete"

3
.gitignore vendored
View File

@ -16,4 +16,5 @@ logs/
.vscode/
.env
.user.yml
dbt-env/
dbt-env/
package-lock.yml

View File

@ -3,4 +3,47 @@ SHELL := /bin/bash
dbt-console:
docker-compose run dbt_console
.PHONY: dbt-console
refresh_package:
rm -f package-lock.yml
dbt clean
dbt deps
dbt run-operation fsc_utils.create_evm_streamline_udfs --vars '{UPDATE_UDFS_AND_SPS: true}' --target dev-admin
realtime:
dbt run -m models/streamline/silver/decoder/realtime/streamline__decode_traces_realtime.sql --vars '{"STREAMLINE_INVOKE_STREAMS":True,"STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES":True"}' --target dev-admin
dbt run -m models/streamline/bronze/decoder/bronze__streamline_decoded_traces.sql --full-refresh --vars '{"STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES":True}' --target dev-admin
dbt run -m models/silver/core/silver__decoded_traces.sql
realtime_logs:
dbt run -m models/streamline/silver/decoder/realtime/streamline__decode_logs_realtime.sql --vars '{"STREAMLINE_INVOKE_STREAMS":True,"STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES":True"}' --target dev-admin
dbt run -m models/streamline/bronze/decoder/bronze__streamline_decoded_logs.sql --full-refresh --vars '{"STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES":True}' --target dev-admin
dbt run -m models/silver/core/silver__decoded_logs.sql
history:
dbt run -m models/streamline/silver/decoder/history/traces/range_1/streamline__decode_traces_history_011667449_011706397.sql --vars '{"STREAMLINE_INVOKE_STREAMS":True,"STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES":True"}' --target dev-admin
dbt run -m models/streamline/bronze/decoder/bronze__streamline_decoded_traces.sql --full-refresh --vars '{"STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES":True}' --target dev-admin
dbt run -m models/silver/core/silver__decoded_traces.sql
history_logs:
dbt run -m models/streamline/silver/decoder/history/event_logs/range_0/streamline__decode_logs_history_016532020_016560020.sql --vars '{"STREAMLINE_INVOKE_STREAMS":True,"STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES":True"}' --target dev-admin
dbt run -m models/streamline/bronze/decoder/bronze__streamline_decoded_logs.sql --full-refresh --vars '{"STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES":True}' --target dev-admin
dbt run -m models/silver/core/silver__decoded_logs.sql
load_new:
dbt run -m models/silver/core/silver__blocks.sql
dbt run -m models/silver/core/silver__transactions.sql
dbt run -m models/silver/core/silver__receipts.sql
dbt run -m models/silver/core/silver__logs.sql
dbt run -m models/silver/core/silver__traces.sql
load_abi:
dbt run -m models/silver/core/silver__relevant_contracts.sql
dbt run -m models/silver/core/silver__created_contracts.sql
dbt run -m models/silver/abis --exclude models/silver/abis/event_logs
load_new_and_abi:
make load_new
make load_abi
.PHONY: dbt-console refresh_package

View File

@ -0,0 +1,310 @@
{% macro decoded_traces_history(
backfill_mode = false
) %}
{%- set params ={ "sql_limit": var(
"DECODED_TRACES_HISTORY_SQL_LIMIT",
1000000
),
"producer_batch_size": var(
"DECODED_TRACES_HISTORY_PRODUCER_BATCH_SIZE",
400000
),
"worker_batch_size": var(
"DECODED_TRACES_HISTORY_WORKER_BATCH_SIZE",
200000
),
"lookback_days": var(
"DECODED_TRACES_HISTORY_LOOKBACK_DAYS",
10
) } -%}
{% set wait_time = var(
"DECODED_TRACES_HISTORY_WAIT_TIME",
60
) %}
{% set find_months_query %}
WITH base AS (
SELECT
t.block_number,
DATE_TRUNC(
'month',
t.block_timestamp
) :: DATE AS MONTH,
concat_ws(
'-',
t.block_number,
t.tx_position,
t.identifier
) AS _call_id
FROM
{{ ref('core__fact_traces') }}
t
INNER JOIN {{ ref('silver__flat_function_abis') }}
f
ON t.to_address = f.contract_address
AND LEFT(
input,
10
) = LEFT(
f.function_signature,
10
)
WHERE
1 = 1 {% if not backfill_mode %}
AND f._inserted_timestamp > DATEADD('day',- {{ params.lookback_days }}, SYSDATE())
{% endif %}),
ranges AS (
SELECT
MIN(block_number) AS min_block_number,
MAX(block_number) AS max_block_number
FROM
base
),
exclusions AS (
SELECT
_call_id
FROM
{{ ref('streamline__complete_decoded_traces') }}
INNER JOIN ranges
WHERE
block_number BETWEEN min_block_number
AND max_block_number
)
SELECT
DISTINCT MONTH
FROM
base t
WHERE
NOT EXISTS (
SELECT
1
FROM
exclusions e
WHERE
t._call_id = e._call_id
)
ORDER BY
MONTH ASC {% endset %}
{% set results = run_query(find_months_query) %}
{% if execute %}
{% set months = results.columns [0].values() %}
{% for month in months %}
{% set view_name = 'decoded_traces_history_' ~ month.strftime('%Y_%m') %}
{% set create_view_query %}
CREATE
OR REPLACE VIEW streamline.{{ view_name }} AS (
WITH target_blocks AS (
SELECT
MIN(block_number) AS min_block_number,
MAX(block_number) AS max_block_number
FROM
{{ ref('core__fact_blocks') }}
WHERE
DATE_TRUNC(
'month',
block_timestamp
) = '{{month}}' :: TIMESTAMP
),
existing_traces_to_exclude AS (
SELECT
_call_id
FROM
{{ ref('streamline__complete_decoded_traces') }}
INNER JOIN target_blocks
WHERE
block_number BETWEEN min_block_number
AND max_block_number
),
raw_traces AS (
SELECT
block_number,
tx_hash,
trace_index,
from_address,
to_address,
TYPE,
REGEXP_REPLACE(
identifier,
'[A-Z]+_',
''
) AS trace_address,
sub_traces,
CASE
WHEN sub_traces > 0
AND trace_address = 'ORIGIN' THEN 'ORIGIN'
WHEN sub_traces > 0
AND trace_address != 'ORIGIN' THEN trace_address || '_'
ELSE NULL
END AS parent_of,
IFF(REGEXP_REPLACE(trace_address, '.$', '') = '', 'ORIGIN', REGEXP_REPLACE(trace_address, '.$', '')) AS child_of,
input,
output,
concat_ws(
'-',
t.block_number,
t.tx_position,
t.identifier
) AS _call_id
FROM
target_blocks
INNER JOIN {{ ref('core__fact_traces') }}
t
WHERE
block_number BETWEEN min_block_number
AND max_block_number
AND DATE_TRUNC(
'month',
t.block_timestamp
) = '{{month}}' :: TIMESTAMP
),
PARENT AS (
SELECT
tx_hash,
parent_of AS child_of,
input
FROM
raw_traces
WHERE
sub_traces > 0
),
effective_contract AS (
SELECT
tx_hash,
TYPE AS child_type,
to_address AS child_to_address,
child_of AS parent_of,
input
FROM
raw_traces t
INNER JOIN PARENT USING (
tx_hash,
child_of,
input
)
WHERE
TYPE = 'DELEGATECALL' qualify ROW_NUMBER() over (
PARTITION BY t.tx_hash,
t.child_of
ORDER BY
t.trace_index ASC
) = 1
),
final_traces AS (
SELECT
block_number,
tx_hash,
trace_index,
from_address,
to_address,
TYPE,
trace_address,
sub_traces,
parent_of,
child_of,
input,
output,
child_type,
child_to_address,
IFF(
child_type = 'DELEGATECALL'
AND child_to_address IS NOT NULL,
child_to_address,
to_address
) AS effective_contract_address,
_call_id
FROM
raw_traces
LEFT JOIN effective_contract USING (
tx_hash,
parent_of,
input
)
)
SELECT
t.block_number,
t.tx_hash,
t.trace_index,
_call_id,
f.abi AS abi,
f.function_name,
t.effective_contract_address AS abi_address,
t.input,
COALESCE(
t.output,
'0x'
) AS output
FROM
final_traces t
LEFT JOIN {{ ref('silver__flat_function_abis') }}
f
ON t.effective_contract_address = f.contract_address
AND LEFT(
t.input,
10
) = LEFT(
f.function_signature,
10
)
WHERE
f.abi IS NOT NULL {% if not backfill_mode %}
AND f._inserted_timestamp > DATEADD('day',- {{ params.lookback_days }}, SYSDATE())
{% endif %}
AND NOT EXISTS (
SELECT
1
FROM
existing_traces_to_exclude e
WHERE
e._call_id = t._call_id
)
LIMIT
{{ params.sql_limit }}
) {% endset %}
{# Create the view #}
{% do run_query(create_view_query) %}
{{ log(
"Created view for month " ~ month.strftime('%Y-%m'),
info = True
) }}
{% if var(
"STREAMLINE_INVOKE_STREAMS",
false
) %}
{# Invoke streamline, if rows exist to decode #}
{% set decode_query %}
SELECT
streamline.udf_bulk_decode_traces_v2(
PARSE_JSON(
$${ "external_table": "decoded_traces",
"producer_batch_size": {{ params.producer_batch_size }},
"sql_limit": {{ params.sql_limit }},
"sql_source": "{{view_name}}",
"worker_batch_size": {{ params.worker_batch_size }} }$$
)
)
WHERE
EXISTS(
SELECT
1
FROM
streamline.{{ view_name }}
LIMIT
1
) {% endset %}
{% do run_query(decode_query) %}
{{ log(
"Triggered decoding for month " ~ month.strftime('%Y-%m'),
info = True
) }}
{# Call wait to avoid queueing up too many jobs #}
{% do run_query(
"call system$wait(" ~ wait_time ~ ")"
) %}
{{ log(
"Completed wait after decoding for month " ~ month.strftime('%Y-%m'),
info = True
) }}
{% endif %}
{% endfor %}
{% endif %}
{% endmacro %}

View File

@ -0,0 +1,30 @@
{% macro run_decoded_traces_history() %}
{% set blockchain = var('GLOBAL_PROD_DB_NAME','ethereum').lower() %}
{% set check_for_new_user_abis_query %}
select 1
from {{ ref('silver__user_verified_abis') }}
where _inserted_timestamp::date = sysdate()::date
{% endset %}
{% set results = run_query(check_for_new_user_abis_query) %}
{% if execute %}
{% set new_user_abis = results.columns[0].values()[0] %}
{% if new_user_abis %}
{% set invoke_workflow_query %}
SELECT
github_actions.workflow_dispatches(
'FlipsideCrypto',
'{{ blockchain }}' ~ '-models',
'dbt_run_streamline_decoded_traces_history.yml',
NULL
)
{% endset %}
{% do run_query(invoke_workflow_query) %}
{% endif %}
{% endif %}
{% endmacro %}

View File

@ -1,270 +0,0 @@
{{ config (
materialized = 'incremental',
unique_key = ['parent_contract_address','function_signature','start_block'],
merge_exclude_columns = ["inserted_timestamp"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION",
tags = ['abis']
) }}
WITH new_abis AS (
SELECT
DISTINCT contract_address
FROM
{{ ref('silver__flat_function_abis') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '18 hours'
FROM
{{ this }}
)
UNION
-- catches any late arriving proxies
SELECT
DISTINCT proxy_address AS contract_address
FROM
{{ ref('silver__proxies') }}
WHERE
start_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '18 hours'
FROM
{{ this }}
)
{% endif %}
),
proxies AS (
SELECT
p0.created_block,
p0.proxy_created_block,
p0.contract_address,
p0.proxy_address,
p0.start_block,
p0._id,
p0._inserted_timestamp
FROM
{{ ref('silver__proxies') }}
p0
JOIN new_abis na0
ON p0.contract_address = na0.contract_address
UNION
SELECT
p1.created_block,
p1.proxy_created_block,
p1.contract_address,
p1.proxy_address,
p1.start_block,
p1._id,
p1._inserted_timestamp
FROM
{{ ref('silver__proxies') }}
p1
JOIN new_abis na1
ON p1.proxy_address = na1.contract_address
),
all_relevant_contracts AS (
SELECT
DISTINCT contract_address
FROM
proxies
UNION
SELECT
DISTINCT proxy_address AS contract_address
FROM
proxies
UNION
SELECT
contract_address
FROM
new_abis
),
flat_abis AS (
SELECT
contract_address,
function_name,
abi,
simple_function_name,
function_signature,
inputs,
outputs,
inputs_type,
outputs_type,
_inserted_timestamp
FROM
{{ ref('silver__flat_function_abis') }}
JOIN all_relevant_contracts USING (contract_address)
),
base AS (
SELECT
ea.contract_address,
function_name,
abi,
simple_function_name,
function_signature,
inputs,
outputs,
inputs_type,
outputs_type,
ea._inserted_timestamp,
pb._inserted_timestamp AS proxy_inserted_timestamp,
pb.start_block,
pb.proxy_created_block,
pb.contract_address AS base_contract_address,
1 AS priority
FROM
flat_abis ea
JOIN proxies pb
ON ea.contract_address = pb.proxy_address
UNION ALL
SELECT
eab.contract_address,
function_name,
abi,
simple_function_name,
function_signature,
inputs,
outputs,
inputs_type,
outputs_type,
eab._inserted_timestamp,
pbb._inserted_timestamp AS proxy_inserted_timestamp,
pbb.created_block AS start_block,
pbb.proxy_created_block,
pbb.contract_address AS base_contract_address,
2 AS priority
FROM
flat_abis eab
JOIN (
SELECT
DISTINCT contract_address,
created_block,
proxy_created_block,
_inserted_timestamp
FROM
proxies
) pbb
ON eab.contract_address = pbb.contract_address
UNION ALL
SELECT
contract_address,
function_name,
abi,
simple_function_name,
function_signature,
inputs,
outputs,
inputs_type,
outputs_type,
_inserted_timestamp,
NULL AS proxy_inserted_timestamp,
0 AS start_block,
NULL AS proxy_created_block,
contract_address AS base_contract_address,
3 AS priority
FROM
flat_abis eac
WHERE
contract_address NOT IN (
SELECT
DISTINCT contract_address
FROM
proxies
)
),
new_records AS (
SELECT
base_contract_address AS parent_contract_address,
contract_address AS implementation_contract,
function_name,
abi,
start_block,
proxy_created_block,
simple_function_name,
function_signature,
inputs,
outputs,
inputs_type,
outputs_type,
_inserted_timestamp,
proxy_inserted_timestamp
FROM
base qualify ROW_NUMBER() over (
PARTITION BY parent_contract_address,
function_name,
inputs_type,
simple_function_name,
start_block
ORDER BY
priority ASC,
_inserted_timestamp DESC,
proxy_created_block DESC nulls last,
proxy_inserted_timestamp DESC nulls last
) = 1
),
FINAL AS (
SELECT
parent_contract_address,
implementation_contract,
function_name,
abi,
start_block,
proxy_created_block,
simple_function_name,
function_signature,
IFNULL(LEAD(start_block) over (PARTITION BY parent_contract_address, function_signature
ORDER BY
start_block) -1, 1e18) AS end_block,
_inserted_timestamp,
proxy_inserted_timestamp,
SYSDATE() AS _updated_timestamp,
{{ dbt_utils.generate_surrogate_key(
['parent_contract_address','function_signature','start_block']
) }} AS complete_event_abis_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
new_records qualify ROW_NUMBER() over (
PARTITION BY parent_contract_address,
function_name,
function_signature,
start_block
ORDER BY
_inserted_timestamp DESC
) = 1
)
SELECT
parent_contract_address,
implementation_contract,
function_name,
abi,
start_block,
proxy_created_block,
simple_function_name,
function_signature,
end_block,
_inserted_timestamp,
proxy_inserted_timestamp,
_updated_timestamp,
complete_event_abis_id,
inserted_timestamp,
modified_timestamp,
_invocation_id
FROM
FINAL
{% if is_incremental() %}
LEFT JOIN {{ this }}
t USING (
parent_contract_address,
function_name,
function_signature,
start_block,
end_block
)
WHERE
t.function_signature IS NULL
{% endif %}

View File

@ -1,9 +0,0 @@
version: 2
models:
- name: silver__complete_function_abis
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- PARENT_CONTRACT_ADDRESS
- FUNCTION_SIGNATURE
- START_BLOCK

View File

@ -29,11 +29,7 @@ GROUP BY
),
function_calls AS (
SELECT
IFF(
TYPE = 'DELEGATECALL',
from_address,
to_address
) AS contract_address,
to_address AS contract_address,
COUNT(*) AS function_call_count,
MAX(_inserted_timestamp) AS max_inserted_timestamp_traces,
MAX(block_number) AS latest_call_block

View File

@ -12,6 +12,158 @@
fsc_utils.if_data_call_wait()],
tags = ['streamline_decoded_traces_realtime']
) }}
{{ fsc_evm.streamline_decoded_traces_requests(
model_type = 'realtime'
) }}
WITH look_back AS (
SELECT
block_number
FROM
{{ ref("_24_hour_lookback") }}
),
raw_traces AS (
SELECT
block_number,
tx_hash,
trace_index,
from_address,
to_address,
TYPE,
REGEXP_REPLACE(
identifier,
'[A-Z]+_',
''
) AS trace_address,
sub_traces,
CASE
WHEN sub_traces > 0
AND trace_address = 'ORIGIN' THEN 'ORIGIN'
WHEN sub_traces > 0
AND trace_address != 'ORIGIN' THEN trace_address || '_'
ELSE NULL
END AS parent_of,
IFF(REGEXP_REPLACE(trace_address, '.$', '') = '', 'ORIGIN', REGEXP_REPLACE(trace_address, '.$', '')) AS child_of,
input,
output,
concat_ws(
'-',
t.block_number,
t.tx_position,
t.identifier
) AS _call_id
FROM
{{ ref("core__fact_traces") }}
t
WHERE
t.block_number >= (
SELECT
block_number
FROM
look_back
)
AND t.block_timestamp >= DATEADD('day', -2, CURRENT_DATE())
AND _call_id NOT IN (
SELECT
_call_id
FROM
{{ ref("streamline__complete_decoded_traces") }}
WHERE
block_number >= (
SELECT
block_number
FROM
look_back
)
AND modified_timestamp >= DATEADD('day', -2, CURRENT_DATE()))
),
PARENT AS (
SELECT
tx_hash,
parent_of AS child_of,
input
FROM
raw_traces
WHERE
sub_traces > 0
),
effective_contract AS (
SELECT
tx_hash,
TYPE AS child_type,
to_address AS child_to_address,
child_of AS parent_of,
input
FROM
raw_traces t
INNER JOIN PARENT USING (
tx_hash,
child_of,
input
)
WHERE
TYPE = 'DELEGATECALL' qualify ROW_NUMBER() over (
PARTITION BY t.tx_hash,
t.child_of
ORDER BY
t.trace_index ASC
) = 1
),
final_traces AS (
SELECT
block_number,
tx_hash,
trace_index,
from_address,
to_address,
TYPE,
trace_address,
sub_traces,
parent_of,
child_of,
input,
output,
child_type,
child_to_address,
IFF(
child_type = 'DELEGATECALL'
AND child_to_address IS NOT NULL,
child_to_address,
to_address
) AS effective_contract_address,
_call_id
FROM
raw_traces
LEFT JOIN effective_contract USING (
tx_hash,
parent_of,
input
)
)
SELECT
t.block_number,
t.tx_hash,
t.trace_index,
_call_id,
f.abi AS abi,
f.function_name,
t.effective_contract_address AS abi_address,
t.input,
COALESCE(
t.output,
'0x'
) AS output
FROM
final_traces t
LEFT JOIN {{ ref("silver__flat_function_abis") }}
f
ON t.effective_contract_address = f.contract_address
AND LEFT(
t.input,
10
) = LEFT(
f.function_signature,
10
)
where f.abi is not null

View File

@ -8,11 +8,11 @@ packages:
- git: https://github.com/FlipsideCrypto/fsc-utils.git
revision: 8c99db499671ff6f514bd0695f7b1f20bce8d80d
- git: https://github.com/FlipsideCrypto/fsc-evm.git
revision: 2a41facb272eba867486813d29dad177b913e59f
revision: 7f8e05df3f2408f4186af2975e0501a344113013
- package: get-select/dbt_snowflake_query_tags
version: 2.5.0
- package: calogica/dbt_date
version: 0.7.2
- git: https://github.com/FlipsideCrypto/livequery-models.git
revision: b024188be4e9c6bc00ed77797ebdc92d351d620e
sha1_hash: 0a7662309787040f0824f4e98f018e024d01fca1
sha1_hash: 8c7bfc3041f203d73521610b9e50148f69aa7b0e