migrate decoding and update history (#166)

This commit is contained in:
Austin 2023-05-18 13:26:44 -04:00 committed by GitHub
parent daa56c1ed7
commit d64cea07cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 467 additions and 150 deletions

View File

@ -0,0 +1,47 @@
name: dbt_run_abi_refresh
run-name: dbt_run_abi_refresh
on:
workflow_dispatch:
schedule:
# Runs “At minute 0 past every 12th hour.” (see https://crontab.guru)
- cron: '0 */12 * * *'
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v1
with:
python-version: "3.7.x"
- name: install dependencies
run: |
pip3 install dbt-snowflake==${{ vars.DBT_VERSION }} cli_passthrough requests click
dbt deps
- name: Run DBT Jobs
run: |
dbt run -m models/silver/abis

View File

@ -41,4 +41,4 @@ jobs:
dbt deps
- name: Run DBT Jobs
run: |
dbt run --exclude models/silver/API_udf models/silver/streamline/* models/silver/silver__decoded_logs.sql models/silver/silver__decoded_logs_full.sql models/bronze/api_udf/bronze_api__contract_abis.sql models/silver/core/tests
dbt run --exclude models/silver/abis models/silver/API_udf models/silver/streamline/* models/silver/silver__decoded_logs.sql models/silver/silver__decoded_logs_full.sql models/bronze/api_udf/bronze_api__contract_abis.sql models/silver/core/tests

View File

@ -9,6 +9,12 @@
{{ create_udtf_get_base_table(
schema = "streamline"
) }}
{{ create_udf_keccak(
schema = 'silver'
) }}
{{ create_udf_simple_event_names(
schema = 'silver'
) }}
{% endset %}
{% do run_query(sql) %}

View File

@ -0,0 +1,18 @@
{% macro create_udf_keccak(schema) %}
create or replace function {{ schema }}.udf_keccak(event_name VARCHAR(255))
RETURNS string
LANGUAGE python
runtime_version = 3.8
packages = ('pycryptodome==3.15.0')
HANDLER = 'udf_encode'
AS
$$
from Crypto.Hash import keccak
def udf_encode(event_name):
keccak_hash = keccak.new(digest_bits=256)
keccak_hash.update(event_name.encode('utf-8'))
return '0x' + keccak_hash.hexdigest()
$$;
{% endmacro %}

View File

@ -0,0 +1,31 @@
{% macro create_udf_simple_event_names(schema) %}
create or replace function {{ schema }}.udf_simple_event_name(abi VARIANT)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
HANDLER = 'get_simplified_signature'
AS
$$
def get_simplified_signature(abi):
def generate_signature(inputs):
signature_parts = []
for input_data in inputs:
if 'components' in input_data:
component_signature_parts = []
components = input_data['components']
component_signature_parts.extend(generate_signature(components))
component_signature_parts[-1] = component_signature_parts[-1].rstrip(",")
if input_data['type'].endswith('[]'):
signature_parts.append("(" + "".join(component_signature_parts) + ")[],")
else:
signature_parts.append("(" + "".join(component_signature_parts) + "),")
else:
signature_parts.append(input_data['type'].replace('enum ', '').replace(' payable', '') + ",")
return signature_parts
signature_parts = [abi['name'] + "("]
signature_parts.extend(generate_signature(abi['inputs']))
signature_parts[-1] = signature_parts[-1].rstrip(",") + ")"
return "".join(signature_parts)
$$;
{% endmacro %}

View File

@ -15,14 +15,23 @@
SELECT
l.block_number,
l._log_id,
abi.data AS abi,
l.data
A.abi AS abi,
OBJECT_CONSTRUCT(
'topics',
l.topics,
'data',
l.data,
'address',
l.contract_address
) AS DATA
FROM
{{ ref("streamline__decode_logs") }}
{{ ref("silver__logs") }}
l
INNER JOIN {{ ref("silver__abis") }}
abi
ON l.abi_address = abi.contract_address
INNER JOIN {{ ref("silver__complete_event_abis") }} A
ON A.parent_contract_address = l.contract_address
AND A.event_signature = l.topics [0] :: STRING
AND l.block_number BETWEEN A.start_block
AND A.end_block
WHERE
(
l.block_number BETWEEN {{ start }}

View File

@ -0,0 +1,177 @@
{{ config (
materialized = 'table'
) }}
WITH abi_base AS (
SELECT
contract_address,
DATA
FROM
{{ ref('silver__abis') }}
),
flat_abi AS (
SELECT
contract_address,
DATA,
VALUE :inputs AS inputs,
VALUE :payable :: BOOLEAN AS payable,
VALUE :stateMutability :: STRING AS stateMutability,
VALUE :type :: STRING AS TYPE,
VALUE :anonymous :: BOOLEAN AS anonymous,
VALUE :name :: STRING AS NAME
FROM
abi_base,
LATERAL FLATTEN (
input => DATA
)
WHERE
TYPE = 'event'
),
event_types AS (
SELECT
contract_address,
inputs,
anonymous,
NAME,
ARRAY_AGG(
VALUE :type :: STRING
) AS event_type
FROM
flat_abi,
LATERAL FLATTEN (
input => inputs
)
GROUP BY
contract_address,
inputs,
anonymous,
NAME
),
proxy_base AS (
SELECT
C.created_contract_address AS contract_address,
p.proxy_address,
p.start_block,
C.block_number AS created_block
FROM
{{ ref('silver__created_contracts') }} C
INNER JOIN {{ ref('silver__proxies') }}
p
ON C.created_contract_address = p.contract_address
AND p.proxy_address <> '0x0000000000000000000000000000000000000000'
),
stacked AS (
SELECT
ea.contract_address,
ea.inputs,
ea.anonymous,
ea.name,
ea.event_type,
pb.start_block,
pb.contract_address AS base_contract_address,
1 AS priority
FROM
event_types ea
INNER JOIN proxy_base pb
ON ea.contract_address = pb.proxy_address
UNION ALL
SELECT
eab.contract_address,
eab.inputs,
eab.anonymous,
eab.name,
eab.event_type,
pbb.created_block AS start_block,
pbb.contract_address AS base_contract_address,
2 AS priority
FROM
event_types eab
INNER JOIN (
SELECT
DISTINCT contract_address,
created_block
FROM
proxy_base
) pbb
ON eab.contract_address = pbb.contract_address
UNION ALL
SELECT
eac.contract_address,
eac.inputs,
eac.anonymous,
eac.name,
eac.event_type,
0 AS start_block,
eac.contract_address AS base_contract_address,
3 AS priority
FROM
event_types eac
WHERE
contract_address NOT IN (
SELECT
DISTINCT contract_address
FROM
proxy_base
)
),
apply_udfs AS (
SELECT
contract_address AS source_contract_address,
base_contract_address AS parent_contract_address,
NAME AS event_name,
PARSE_JSON(
OBJECT_CONSTRUCT(
'anonymous',
anonymous,
'inputs',
inputs,
'name',
NAME,
'type',
'event'
) :: STRING
) AS abi,
start_block,
silver.udf_simple_event_name(abi) AS simple_event_name,
silver.udf_keccak(simple_event_name) AS event_signature,
priority,
NAME,
inputs,
event_type
FROM
stacked
),
FINAL AS (
SELECT
parent_contract_address,
event_name,
abi,
start_block,
simple_event_name,
event_signature,
NAME,
inputs,
event_type
FROM
apply_udfs qualify ROW_NUMBER() over (
PARTITION BY parent_contract_address,
NAME,
event_type,
start_block
ORDER BY
priority ASC
) = 1
)
SELECT
parent_contract_address,
event_name,
abi,
start_block,
simple_event_name,
event_signature,
IFNULL(LEAD(start_block) over (PARTITION BY parent_contract_address, event_signature
ORDER BY
start_block) -1, 1e18) AS end_block
FROM
FINAL

View File

@ -0,0 +1,9 @@
version: 2
models:
- name: silver__complete_event_abis
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- PARENT_CONTRACT_ADDRESS
- EVENT_SIGNATURE
- START_BLOCK

View File

@ -7,14 +7,15 @@ WITH base AS (
SELECT
from_address,
to_address,
MIN(block_number) AS start_block
MIN(block_number) AS start_block,
MAX(_inserted_timestamp) AS _inserted_timestamp
FROM
{{ ref('silver__traces') }}
WHERE
TYPE = 'DELEGATECALL'
AND trace_status = 'SUCCESS'
AND tx_status = 'SUCCESS'
AND from_address != to_address -- exclude self-calls
AND output :: STRING = '0x'
GROUP BY
from_address,
to_address
@ -23,16 +24,11 @@ SELECT
from_address AS contract_address,
to_address AS proxy_address,
start_block,
COALESCE(
(LAG(start_block) over(PARTITION BY from_address
ORDER BY
start_block DESC)) - 1,
100000000000
) AS end_block,
CONCAT(
from_address,
'-',
to_address
) AS _id
) AS _id,
_inserted_timestamp
FROM
base
base

View File

@ -44,35 +44,112 @@ ORDER BY
_inserted_timestamp ASC
LIMIT
10
), max_blocks AS (
), contracts AS (
SELECT
abi_address,
abi,
MAX(block_number) AS max_block,
max_block - 100000 AS min_block
contract_address
FROM
{{ ref('streamline__decode_logs') }}
l
JOIN base C
ON C.contract_address = l.abi_address
{{ ref('silver__proxies') }}
WHERE
contract_address IN (
SELECT
contract_address
FROM
base
)
),
proxies AS (
SELECT
proxy_address,
contract_address
FROM
{{ ref('silver__proxies') }}
WHERE
proxy_address IN (
SELECT
contract_address
FROM
base
)
),
final_groupings AS (
SELECT
b.contract_address AS address,
C.contract_address,
proxy_address,
CASE
WHEN C.contract_address IS NOT NULL
AND proxy_address IS NOT NULL THEN 'contract'
WHEN C.contract_address IS NOT NULL THEN 'contract'
WHEN proxy_address IS NOT NULL THEN 'proxy'
WHEN C.contract_address IS NULL
AND proxy_address IS NULL THEN 'contract'
END AS TYPE,
p.contract_address AS proxy_parent,
CASE
WHEN TYPE = 'contract' THEN address
ELSE proxy_parent
END AS final_address
FROM
base b
LEFT JOIN (
SELECT
DISTINCT contract_address
FROM
contracts
) C
ON b.contract_address = C.contract_address
LEFT JOIN (
SELECT
DISTINCT proxy_address,
contract_address
FROM
proxies
) p
ON b.contract_address = proxy_address
),
identified_addresses AS (
SELECT
DISTINCT address AS base_address,
final_address AS contract_address
FROM
final_groupings
),
ranges AS (
SELECT
contract_address,
base_address,
MIN(block_number) AS min_block,
min_block + 100000 AS max_block
FROM
{{ ref('silver__logs') }}
JOIN identified_addresses USING (contract_address)
GROUP BY
1,
2
contract_address,
base_address
),
logs AS (
SELECT
l.block_number,
l.contract_address,
l.data AS logs_data,
C.abi,
l.abi_address
OBJECT_CONSTRUCT(
'topics',
l.topics,
'data',
l.data,
'address',
l.contract_address
) AS logs_data,
b.abi,
base_address AS abi_address
FROM
{{ ref('streamline__decode_logs') }}
{{ ref('silver__logs') }}
l
JOIN max_blocks C
ON C.abi_address = l.abi_address
JOIN ranges C
ON C.contract_address = l.contract_address
AND l.block_number BETWEEN C.min_block
AND C.max_block
JOIN base b
ON b.contract_address = C.base_address
),
recent_logs AS (
SELECT

View File

@ -1,40 +0,0 @@
{{ config (
materialized = "incremental",
unique_key = "contract_address",
cluster_by = "_inserted_timestamp::date",
merge_update_columns = ["contract_address"]
) }}
WITH base AS (
SELECT
contract_address,
abi_data AS full_data,
abi_data :data :result AS abi,
_inserted_timestamp
FROM
{{ ref('bronze_api__contract_abis') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
)
FROM
{{ this }}
)
{% endif %}
)
SELECT
contract_address,
full_data,
abi,
_inserted_timestamp
FROM
base
WHERE
abi :: STRING <> 'Contract source code not verified' qualify(ROW_NUMBER() over(PARTITION BY contract_address
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -1,7 +0,0 @@
version: 2
models:
- name: silver__contract_abis
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- CONTRACT_ADDRESS

View File

@ -6,10 +6,20 @@
)
) }}
{% for item in range(40) %}
{% for item in range(45) %}
(
WITH blocks AS (
WITH last_3_days AS (
SELECT
block_number
FROM
{{ ref("_max_block_by_date") }}
qualify ROW_NUMBER() over (
ORDER BY
block_number DESC
) = 3
),
blocks AS (
SELECT
block_number
FROM
@ -19,6 +29,12 @@
AND {{(
item + 1
) * 1000000 }}
AND block_number <= (
SELECT
block_number
FROM
last_3_days
)
EXCEPT
SELECT
block_number
@ -29,6 +45,12 @@
AND {{(
item + 1
) * 1000000 }}
AND block_number <= (
SELECT
block_number
FROM
last_3_days
)
)
SELECT
PARSE_JSON(

View File

@ -6,10 +6,20 @@
)
) }}
{% for item in range(40) %}
{% for item in range(45) %}
(
WITH blocks AS (
WITH last_3_days AS (
SELECT
block_number
FROM
{{ ref("_max_block_by_date") }}
qualify ROW_NUMBER() over (
ORDER BY
block_number DESC
) = 3
),
blocks AS (
SELECT
block_number
FROM
@ -19,6 +29,12 @@
AND {{(
item + 1
) * 1000000 }}
AND block_number <= (
SELECT
block_number
FROM
last_3_days
)
EXCEPT
SELECT
block_number
@ -29,6 +45,12 @@
AND {{(
item + 1
) * 1000000 }}
AND block_number <= (
SELECT
block_number
FROM
last_3_days
)
)
SELECT
PARSE_JSON(

View File

@ -1,60 +0,0 @@
{{ config (
materialized = "incremental",
unique_key = "_log_id",
cluster_by = "round(block_number,-3)",
merge_update_columns = ["_log_id"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(abi_address, _log_id)"
) }}
WITH base AS (
SELECT
l.block_number,
l.tx_hash,
l.event_index,
l.contract_address,
l.topics,
l.data,
l._log_id,
p.proxy_address,
l._INSERTED_TIMESTAMP
FROM
{{ ref('silver__logs') }}
l
LEFT JOIN {{ ref('silver__proxies') }}
p
ON p.contract_address = l.contract_address
AND l.block_number BETWEEN p.start_block
AND p.end_block
WHERE
1 = 1
{% if is_incremental() %}
AND l._inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
)
SELECT
b.block_number,
b._log_id,
contract_address,
proxy_address,
COALESCE(
proxy_address,
contract_address
) AS abi_address,
OBJECT_CONSTRUCT(
'topics',
b.topics,
'data',
b.data,
'address',
b.contract_address
) AS DATA,
_inserted_timestamp
FROM
base b

View File

@ -17,13 +17,23 @@ WITH look_back AS (
SELECT
l.block_number,
l._log_id,
A.data AS abi,
l.data
A.abi AS abi,
OBJECT_CONSTRUCT(
'topics',
l.topics,
'data',
l.data,
'address',
l.contract_address
) AS DATA
FROM
{{ ref("streamline__decode_logs") }}
{{ ref("silver__logs") }}
l
INNER JOIN {{ ref("silver__abis") }} A
ON l.abi_address = A.contract_address
INNER JOIN {{ ref("silver__complete_event_abis") }} A
ON A.parent_contract_address = l.contract_address
AND A.event_signature = l.topics[0]:: STRING
AND l.block_number BETWEEN A.start_block
AND A.end_block
WHERE
(
l.block_number >= (