* stash

* decoder

* remove

* merge predicate
This commit is contained in:
Austin 2024-06-20 17:24:48 -04:00 committed by GitHub
parent 01d5b3ded9
commit 17dca0db11
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
42 changed files with 1596 additions and 7 deletions

View File

@ -43,4 +43,4 @@ jobs:
dbt deps
- name: Run DBT Jobs
run: |
dbt run -m models/bronze/core "sei_models,tag:core"
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m models/bronze/core "sei_models,tag:core" "sei_models,tag:streamline_decoded_logs_realtime" "sei_models,tag:streamline_decoded_logs_complete"

View File

@ -53,6 +53,7 @@ vars:
API_INTEGRATION: '{{ var("config")[target.name]["API_INTEGRATION"] if var("config")[target.name] else var("config")["dev"]["API_INTEGRATION"] }}'
EXTERNAL_FUNCTION_URI: '{{ var("config")[target.name]["EXTERNAL_FUNCTION_URI"] if var("config")[target.name] else var("config")["dev"]["EXTERNAL_FUNCTION_URI"] }}'
API_AWS_ROLE_ARN: '{{ var("config")[target.name]["API_AWS_ROLE_ARN"] if var("config")[target.name] else var("config")["dev"]["API_AWS_ROLE_ARN"] }}'
ROLES: '{{ var("config")[target.name]["ROLES"] }}'
config:
@ -60,12 +61,14 @@ vars:
dev:
API_INTEGRATION: aws_sei_api_dev
EXTERNAL_FUNCTION_URI: ibj933oi6f.execute-api.us-east-1.amazonaws.com/stg/
API_AWS_ROLE_ARN: arn:aws:iam::704693948482:role/sei-api-stg-rolesnowflakeudfsAF733095-YX1gTAavoOYe
ROLES:
- AWS_LAMBDA_SEI_API
- INTERNAL_DEV
prod:
API_INTEGRATION: aws_sei_api
EXTERNAL_FUNCTION_URI: dbtc9lfp0k.execute-api.us-east-1.amazonaws.com/prod/
API_AWS_ROLE_ARN: arn:aws:iam::924682671219:role/sei-api-prod-rolesnowflakeudfsAF733095-iooKxz0RazMg
ROLES:
- AWS_LAMBDA_AXELAR_API
- DBT_CLOUD_AXELAR
@ -73,6 +76,7 @@ vars:
prod-2xl:
API_INTEGRATION: aws_sei_api
EXTERNAL_FUNCTION_URI: dbtc9lfp0k.execute-api.us-east-1.amazonaws.com/prod/
API_AWS_ROLE_ARN: arn:aws:iam::924682671219:role/sei-api-prod-rolesnowflakeudfsAF733095-iooKxz0RazMg
ROLES:
- AWS_LAMBDA_AXELAR_API
- DBT_CLOUD_AXELAR
@ -80,6 +84,7 @@ vars:
dev-2xl:
API_INTEGRATION: aws_sei_api_dev
EXTERNAL_FUNCTION_URI: ibj933oi6f.execute-api.us-east-1.amazonaws.com/stg/
API_AWS_ROLE_ARN: arn:aws:iam::704693948482:role/sei-api-stg-rolesnowflakeudfsAF733095-YX1gTAavoOYe
ROLES:
- AWS_LAMBDA_SEI_API
- INTERNAL_DEV

View File

@ -6,7 +6,6 @@
schema = "streamline"
) }}
{{ create_udf_bulk_rest_api_v2() }}
{% endset %}
{% do run_query(sql) %}
{% endif %}

View File

@ -0,0 +1,216 @@
{{ config (
materialized = 'incremental',
unique_key = ['parent_contract_address','event_signature','start_block'],
merge_exclude_columns = ["inserted_timestamp"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION",
tags = ['abis']
) }}
WITH new_abis AS (
SELECT
DISTINCT contract_address
FROM
{{ ref('silver_evm__flat_event_abis') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '12 hours'
FROM
{{ this }}
)
{% endif %}
),
proxies AS (
SELECT
p0.created_block,
p0.proxy_created_block,
p0.contract_address,
p0.proxy_address,
p0.start_block,
p0._id,
p0._inserted_timestamp
FROM
{{ ref('silver_evm__proxies') }}
p0
JOIN new_abis na0
ON p0.contract_address = na0.contract_address
UNION
SELECT
p1.created_block,
p1.proxy_created_block,
p1.contract_address,
p1.proxy_address,
p1.start_block,
p1._id,
p1._inserted_timestamp
FROM
{{ ref('silver_evm__proxies') }}
p1
JOIN new_abis na1
ON p1.proxy_address = na1.contract_address
),
all_relevant_contracts AS (
SELECT
DISTINCT contract_address
FROM
proxies
UNION
SELECT
DISTINCT proxy_address AS contract_address
FROM
proxies
UNION
SELECT
contract_address
FROM
new_abis
),
flat_abis AS (
SELECT
contract_address,
event_name,
abi,
simple_event_name,
event_signature,
NAME,
inputs,
event_type,
_inserted_timestamp
FROM
{{ ref('silver_evm__flat_event_abis') }}
JOIN all_relevant_contracts USING (contract_address)
),
base AS (
SELECT
ea.contract_address,
event_name,
abi,
simple_event_name,
event_signature,
NAME,
inputs,
event_type,
ea._inserted_timestamp,
pb._inserted_timestamp AS proxy_inserted_timestamp,
pb.start_block,
pb.proxy_created_block,
pb.contract_address AS base_contract_address,
1 AS priority
FROM
flat_abis ea
JOIN proxies pb
ON ea.contract_address = pb.proxy_address
UNION ALL
SELECT
eab.contract_address,
event_name,
abi,
simple_event_name,
event_signature,
NAME,
inputs,
event_type,
eab._inserted_timestamp,
pbb._inserted_timestamp AS proxy_inserted_timestamp,
pbb.created_block AS start_block,
pbb.proxy_created_block,
pbb.contract_address AS base_contract_address,
2 AS priority
FROM
flat_abis eab
JOIN (
SELECT
DISTINCT contract_address,
created_block,
proxy_created_block,
_inserted_timestamp
FROM
proxies
) pbb
ON eab.contract_address = pbb.contract_address
UNION ALL
SELECT
contract_address,
event_name,
abi,
simple_event_name,
event_signature,
NAME,
inputs,
event_type,
_inserted_timestamp,
NULL AS proxy_inserted_timestamp,
0 AS start_block,
NULL AS proxy_created_block,
contract_address AS base_contract_address,
3 AS priority
FROM
flat_abis eac
WHERE
contract_address NOT IN (
SELECT
DISTINCT contract_address
FROM
proxies
)
),
new_records AS (
SELECT
base_contract_address AS parent_contract_address,
event_name,
abi,
start_block,
proxy_created_block,
simple_event_name,
event_signature,
NAME,
inputs,
event_type,
_inserted_timestamp,
proxy_inserted_timestamp
FROM
base qualify ROW_NUMBER() over (
PARTITION BY parent_contract_address,
NAME,
event_type,
event_signature,
start_block
ORDER BY
priority ASC,
_inserted_timestamp DESC,
proxy_created_block DESC nulls last,
proxy_inserted_timestamp DESC nulls last
) = 1
)
SELECT
parent_contract_address,
event_name,
abi,
start_block,
proxy_created_block,
simple_event_name,
event_signature,
IFNULL(LEAD(start_block) over (PARTITION BY parent_contract_address, event_signature
ORDER BY
start_block) -1, 1e18) AS end_block,
_inserted_timestamp,
proxy_inserted_timestamp,
SYSDATE() AS _updated_timestamp,
{{ dbt_utils.generate_surrogate_key(
['parent_contract_address','event_signature','start_block']
) }} AS complete_event_abis_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
new_records qualify ROW_NUMBER() over (
PARTITION BY parent_contract_address,
event_name,
event_signature,
start_block
ORDER BY
_inserted_timestamp DESC
) = 1

View File

@ -0,0 +1,9 @@
version: 2
models:
- name: silver_evm__complete_event_abis
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- PARENT_CONTRACT_ADDRESS
- EVENT_SIGNATURE
- START_BLOCK

View File

@ -0,0 +1,112 @@
{{ config (
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'contract_address',
cluster_by = '_inserted_timestamp::date',
tags = ['abis'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY (contract_address)"
) }}
WITH abi_base AS (
SELECT
contract_address,
DATA,
_inserted_timestamp
FROM
{{ ref('silver_evm__abis') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '24 hours'
FROM
{{ this }}
)
{% endif %}
),
flat_abi AS (
SELECT
contract_address,
_inserted_timestamp,
DATA,
VALUE :inputs AS inputs,
VALUE :payable :: BOOLEAN AS payable,
VALUE :stateMutability :: STRING AS stateMutability,
VALUE :type :: STRING AS TYPE,
VALUE :anonymous :: BOOLEAN AS anonymous,
VALUE :name :: STRING AS NAME
FROM
abi_base,
LATERAL FLATTEN (
input => DATA
)
WHERE
TYPE = 'event' qualify ROW_NUMBER() over (
PARTITION BY contract_address,
NAME,
inputs
ORDER BY
LENGTH(inputs)
) = 1
),
event_types AS (
SELECT
contract_address,
_inserted_timestamp,
inputs,
anonymous,
NAME,
ARRAY_AGG(
VALUE :type :: STRING
) AS event_type
FROM
flat_abi,
LATERAL FLATTEN (
input => inputs
)
GROUP BY
contract_address,
_inserted_timestamp,
inputs,
anonymous,
NAME
),
apply_udfs AS (
SELECT
contract_address,
NAME AS event_name,
PARSE_JSON(
OBJECT_CONSTRUCT(
'anonymous',
anonymous,
'inputs',
inputs,
'name',
NAME,
'type',
'event'
) :: STRING
) AS abi,
utils.udf_evm_text_signature(abi) AS simple_event_name,
utils.udf_keccak256(simple_event_name) AS event_signature,
NAME,
inputs,
event_type,
_inserted_timestamp
FROM
event_types
)
SELECT
contract_address,
event_name,
abi,
simple_event_name,
event_signature,
NAME,
inputs,
event_type,
_inserted_timestamp
FROM
apply_udfs

View File

@ -0,0 +1,181 @@
{{ config (
materialized = "incremental",
unique_key = "contract_address",
merge_exclude_columns = ["inserted_timestamp"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(contract_address)",
tags = ['abis']
) }}
WITH override_abis AS (
SELECT
contract_address,
PARSE_JSON(DATA) AS abi,
TO_TIMESTAMP_LTZ(SYSDATE()) AS _inserted_timestamp,
'flipside' AS abi_source,
'flipside' AS discord_username,
SHA2(abi) AS abi_hash,
1 AS priority
FROM
{{ ref('silver_evm__override_abis') }}
WHERE
contract_address IS NOT NULL
),
verified_abis AS (
SELECT
contract_address,
DATA,
_inserted_timestamp,
abi_source,
discord_username,
abi_hash,
2 AS priority
FROM
{{ ref('silver_evm__verified_abis') }}
WHERE
abi_source = 'seitrace'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
)
FROM
{{ this }}
WHERE
abi_source = 'seitrace'
)
{% endif %}
),
user_abis AS (
SELECT
contract_address,
DATA,
_inserted_timestamp,
abi_source,
discord_username,
abi_hash,
2 AS priority
FROM
{{ ref('silver_evm__verified_abis') }}
WHERE
abi_source = 'user'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
)
FROM
{{ this }}
WHERE
abi_source = 'user'
)
{% endif %}
),
bytecode_abis AS (
SELECT
contract_address,
abi,
abi_hash,
'bytecode_matched' AS abi_source,
NULL AS discord_username,
_inserted_timestamp,
3 AS priority
FROM
{{ ref('silver_evm__bytecode_abis') }}
WHERE
1 = 1
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
)
FROM
{{ this }}
WHERE
abi_source = 'bytecode_matched'
)
{% endif %}
),
all_abis AS (
SELECT
contract_address,
abi AS DATA,
_inserted_timestamp,
abi_source,
discord_username,
abi_hash,
priority
FROM
override_abis
UNION
SELECT
contract_address,
DATA,
_inserted_timestamp,
abi_source,
discord_username,
abi_hash,
priority
FROM
verified_abis
UNION
SELECT
contract_address,
DATA,
_inserted_timestamp,
abi_source,
discord_username,
abi_hash,
priority
FROM
user_abis
UNION
SELECT
contract_address,
abi AS DATA,
_inserted_timestamp,
abi_source,
discord_username,
abi_hash,
priority
FROM
bytecode_abis
),
priority_abis AS (
SELECT
contract_address,
DATA,
_inserted_timestamp,
abi_source,
discord_username,
abi_hash,
priority
FROM
all_abis qualify(ROW_NUMBER() over(PARTITION BY contract_address
ORDER BY
priority ASC)) = 1
)
SELECT
p.contract_address,
p.data,
p._inserted_timestamp,
p.abi_source,
p.discord_username,
p.abi_hash,
created_contract_input AS bytecode,
{{ dbt_utils.generate_surrogate_key(
['contract_address']
) }} AS abis_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
priority_abis p
LEFT JOIN {{ ref('silver_evm__created_contracts') }}
ON p.contract_address = created_contract_address

View File

@ -0,0 +1,8 @@
version: 2
models:
- name: silver_evm__abis
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- CONTRACT_ADDRESS

View File

@ -0,0 +1,76 @@
{{ config (
materialized = "incremental",
unique_key = "contract_address",
tags = ['abis']
) }}
WITH contracts_with_abis AS (
-- Identifying contracts with verified ABIs
SELECT
created_contract_address AS contract_address
FROM
{{ ref('silver_evm__created_contracts') }}
JOIN {{ ref('silver_evm__verified_abis') }} A
ON A.contract_address = created_contract_address
),
contracts_without_abis AS (
-- Contracts that are missing ABIs
SELECT
created_contract_address AS contract_address,
created_contract_input AS bytecode
FROM
{{ ref('silver_evm__created_contracts') }}
WHERE
created_contract_address NOT IN (
SELECT
contract_address
FROM
contracts_with_abis
)
{% if is_incremental() %}
AND created_contract_address NOT IN (
SELECT
contract_address
FROM
{{ this }}
)
{% endif %}
),
unique_bytecode_abis AS (
-- Bytecodes from created_contracts with a unique ABI
SELECT
cc.created_contract_input AS bytecode,
va.data AS abi,
va.abi_hash
FROM
{{ ref('silver_evm__created_contracts') }}
cc
JOIN {{ ref('silver_evm__verified_abis') }}
va
ON cc.created_contract_address = va.contract_address
GROUP BY
cc.created_contract_input,
va.data,
va.abi_hash
HAVING
COUNT(
DISTINCT va.data
) = 1 -- Ensuring there's only one ABI per bytecode
) -- Final matching
SELECT
contract_address,
abi,
abi_hash,
{% if is_incremental() %}
SYSDATE()
{% else %}
TO_TIMESTAMP_NTZ('2000-01-01 00:00:00')
{% endif %}
AS _inserted_timestamp
FROM
contracts_without_abis
JOIN unique_bytecode_abis USING (bytecode)

View File

@ -0,0 +1,8 @@
version: 2
models:
- name: silver_evm__bytecode_abis
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- CONTRACT_ADDRESS
- ABI_HASH

View File

@ -0,0 +1,8 @@
{{ config(
materialized = 'view',
tags = ['abis']
) }}
SELECT
NULL AS contract_address,
NULL AS DATA

View File

@ -0,0 +1,104 @@
{{ config (
materialized = 'incremental',
unique_key = ['contract_address','proxy_address'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION",
tags = ['abis']
) }}
WITH base AS (
SELECT
from_address,
to_address,
MIN(block_number) AS start_block,
MAX(_inserted_timestamp) AS _inserted_timestamp
FROM
{{ ref('silver_evm__traces') }}
WHERE
TYPE = 'DELEGATECALL'
AND trace_status = 'SUCCESS'
AND tx_status = 'SUCCESS'
AND from_address != to_address -- exclude self-calls
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '24 hours'
FROM
{{ this }}
)
{% endif %}
GROUP BY
from_address,
to_address
),
create_id AS (
SELECT
from_address AS contract_address,
to_address AS proxy_address,
start_block,
CONCAT(
from_address,
'-',
to_address
) AS _id,
_inserted_timestamp
FROM
base
),
heal AS (
SELECT
contract_address,
proxy_address,
start_block,
_id,
_inserted_timestamp
FROM
create_id
{% if is_incremental() %}
UNION ALL
SELECT
contract_address,
proxy_address,
start_block,
_id,
_inserted_timestamp
FROM
{{ this }}
JOIN create_id USING (
contract_address,
proxy_address
)
{% endif %}
),
FINAL AS (
SELECT
contract_address,
proxy_address,
start_block,
_id,
_inserted_timestamp
FROM
heal qualify ROW_NUMBER() over (
PARTITION BY contract_address,
proxy_address
ORDER BY
start_block ASC
) = 1
)
SELECT
f.contract_address,
f.proxy_address,
f.start_block,
f._id,
f._inserted_timestamp,
C.block_number AS created_block,
p.block_number AS proxy_created_block
FROM
FINAL f
JOIN {{ ref('silver_evm__created_contracts') }} C
ON f.contract_address = C.created_contract_address
JOIN {{ ref('silver_evm__created_contracts') }}
p
ON f.proxy_address = p.created_contract_address

View File

@ -0,0 +1,7 @@
version: 2
models:
- name: silver_evm__proxies
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- _ID

View File

@ -0,0 +1,550 @@
{{ config (
materialized = "incremental",
unique_key = "id",
tags = ['abis']
) }}
WITH base AS (
SELECT
contract_address,
abi,
PARSE_JSON(abi) AS DATA,
SHA2(PARSE_JSON(abi)) AS abi_hash,
discord_username,
_inserted_timestamp
FROM
{{ source(
"crosschain_public",
"user_abis"
) }}
WHERE
blockchain = 'sei'
AND NOT duplicate_abi
{% if is_incremental() %}
AND contract_address NOT IN (
SELECT
contract_address
FROM
{{ this }}
)
AND _inserted_timestamp > (
SELECT
COALESCE(
MAX(
_inserted_timestamp
),
'1970-01-01'
)
FROM
{{ this }}
)
{% endif %}
ORDER BY
_inserted_timestamp ASC
LIMIT
10
), flat_event_abi AS (
SELECT
contract_address,
_inserted_timestamp,
DATA,
VALUE :inputs AS inputs,
VALUE :payable :: BOOLEAN AS payable,
VALUE :stateMutability :: STRING AS stateMutability,
VALUE :type :: STRING AS TYPE,
VALUE :anonymous :: BOOLEAN AS anonymous,
VALUE :name :: STRING AS NAME
FROM
base,
LATERAL FLATTEN (
input => DATA
)
WHERE
TYPE = 'event' qualify ROW_NUMBER() over (
PARTITION BY contract_address,
NAME,
inputs
ORDER BY
LENGTH(inputs)
) = 1
),
event_types AS (
SELECT
contract_address,
_inserted_timestamp,
inputs,
anonymous,
NAME,
ARRAY_AGG(
VALUE :type :: STRING
) AS event_type
FROM
flat_event_abi,
LATERAL FLATTEN (
input => inputs
)
GROUP BY
contract_address,
_inserted_timestamp,
inputs,
anonymous,
NAME
),
apply_event_udfs AS (
SELECT
contract_address,
NAME AS event_name,
PARSE_JSON(
OBJECT_CONSTRUCT(
'anonymous',
anonymous,
'inputs',
inputs,
'name',
NAME,
'type',
'event'
) :: STRING
) AS abi,
utils.udf_evm_text_signature(abi) AS simple_event_name,
utils.udf_keccak256(simple_event_name) AS event_signature,
NAME,
inputs,
event_type,
_inserted_timestamp
FROM
event_types
),
final_flat_event_abis AS (
SELECT
contract_address,
event_name,
abi,
simple_event_name,
event_signature,
NAME,
inputs,
event_type,
_inserted_timestamp
FROM
apply_event_udfs
),
flat_function_abis AS (
SELECT
contract_address,
DATA,
_inserted_timestamp,
VALUE :inputs AS inputs,
VALUE :outputs AS outputs,
VALUE :payable :: BOOLEAN AS payable,
VALUE :stateMutability :: STRING AS stateMutability,
VALUE :type :: STRING AS TYPE,
VALUE :name :: STRING AS NAME
FROM
base,
LATERAL FLATTEN (
input => DATA
)
WHERE
TYPE = 'function'
),
udf_function_abis AS (
SELECT
*,
PARSE_JSON(
object_construct_keep_null(
'inputs',
IFNULL(
inputs,
[]
),
'outputs',
IFNULL(
outputs,
[]
),
'name',
NAME,
'type',
'function'
) :: STRING
) AS abi,
utils.udf_evm_text_signature(abi) AS simple_function_name,
utils.udf_keccak256(simple_function_name) AS function_signature
FROM
flat_function_abis qualify ROW_NUMBER() over (
PARTITION BY contract_address,
function_signature
ORDER BY
_inserted_timestamp DESC
) = 1
),
flat_inputs AS (
SELECT
contract_address,
inputs,
NAME,
simple_function_name,
function_signature,
ARRAY_AGG(
VALUE :type :: STRING
) AS inputs_type
FROM
udf_function_abis,
LATERAL FLATTEN (
input => inputs
)
GROUP BY
ALL
),
fill_missing_input_names AS (
SELECT
contract_address,
NAME,
inputs_type,
simple_function_name,
function_signature,
VALUE :internalType :: STRING AS internalType,
VALUE :type :: STRING AS TYPE,
CASE
WHEN VALUE :name :: STRING = '' THEN CONCAT('input_', ROW_NUMBER() over (PARTITION BY contract_address, function_signature
ORDER BY
INDEX ASC) :: STRING)
ELSE VALUE :name :: STRING
END AS name_fixed,
inputs,
INDEX,
VALUE :components AS components
FROM
flat_inputs,
LATERAL FLATTEN (
input => inputs
)
),
final_flat_inputs AS (
SELECT
contract_address,
NAME,
inputs_type,
simple_function_name,
function_signature,
ARRAY_AGG(
OBJECT_CONSTRUCT(
'internalType',
internalType,
'name',
name_fixed,
'type',
TYPE,
'components',
components
)
) within GROUP (
ORDER BY
INDEX
) AS inputs
FROM
fill_missing_input_names
GROUP BY
ALL
),
flat_outputs AS (
SELECT
contract_address,
outputs,
simple_function_name,
function_signature,
NAME,
ARRAY_AGG(
VALUE :type :: STRING
) AS outputs_type
FROM
udf_function_abis,
LATERAL FLATTEN (
input => outputs
)
GROUP BY
ALL
),
fill_missing_output_names AS (
SELECT
contract_address,
NAME,
outputs_type,
simple_function_name,
function_signature,
VALUE :internalType :: STRING AS internalType,
VALUE :type :: STRING AS TYPE,
CASE
WHEN VALUE :name :: STRING = '' THEN CONCAT('output_', ROW_NUMBER() over (PARTITION BY contract_address, function_signature
ORDER BY
INDEX ASC) :: STRING)
ELSE VALUE :name :: STRING
END AS name_fixed,
outputs,
INDEX,
VALUE :components AS components
FROM
flat_outputs,
LATERAL FLATTEN (
input => outputs
)
),
final_flat_outputs AS (
SELECT
contract_address,
NAME,
outputs_type,
simple_function_name,
function_signature,
ARRAY_AGG(
OBJECT_CONSTRUCT(
'internalType',
internalType,
'name',
name_fixed,
'type',
TYPE,
'components',
components
)
) within GROUP (
ORDER BY
INDEX
) AS outputs
FROM
fill_missing_output_names
GROUP BY
ALL
),
all_contracts AS (
SELECT
A.contract_address,
A.name AS function_name,
i.inputs,
o.outputs,
i.inputs_type,
o.outputs_type,
A._inserted_timestamp,
A.function_signature,
A.simple_function_name
FROM
udf_function_abis A
LEFT JOIN final_flat_inputs i
ON A.contract_address = i.contract_address
AND A.function_signature = i.function_signature
LEFT JOIN final_flat_outputs o
ON A.contract_address = o.contract_address
AND A.function_signature = o.function_signature
),
apply_function_udfs AS (
SELECT
contract_address,
function_name,
PARSE_JSON(
object_construct_keep_null(
'inputs',
IFNULL(
inputs,
[]
),
'outputs',
IFNULL(
outputs,
[]
),
'name',
function_name,
'type',
'function'
) :: STRING
) AS abi,
simple_function_name,
function_signature,
inputs,
outputs,
inputs_type,
outputs_type,
_inserted_timestamp
FROM
all_contracts
),
final_function_abis AS (
SELECT
contract_address,
function_name,
abi,
simple_function_name,
function_signature,
inputs,
outputs,
inputs_type,
outputs_type,
_inserted_timestamp
FROM
apply_function_udfs
),
new_abis AS (
SELECT
DISTINCT contract_address
FROM
base
),
contracts AS (
SELECT
contract_address
FROM
{{ ref('silver_evm__proxies') }}
JOIN new_abis USING (contract_address)
),
proxies AS (
SELECT
p.proxy_address,
p.contract_address
FROM
{{ ref('silver_evm__proxies') }}
p
JOIN new_abis n
ON p.proxy_address = n.contract_address
),
final_groupings AS (
SELECT
b.contract_address AS address,
C.contract_address,
proxy_address,
CASE
WHEN C.contract_address IS NOT NULL
AND proxy_address IS NOT NULL THEN 'contract'
WHEN C.contract_address IS NOT NULL THEN 'contract'
WHEN proxy_address IS NOT NULL THEN 'proxy'
WHEN C.contract_address IS NULL
AND proxy_address IS NULL THEN 'contract'
END AS TYPE,
p.contract_address AS proxy_parent,
CASE
WHEN TYPE = 'contract' THEN address
ELSE proxy_parent
END AS final_address
FROM
base b
LEFT JOIN (
SELECT
DISTINCT contract_address
FROM
contracts
) C
ON b.contract_address = C.contract_address
LEFT JOIN (
SELECT
DISTINCT proxy_address,
contract_address
FROM
proxies
) p
ON b.contract_address = proxy_address
),
identified_addresses AS (
SELECT
DISTINCT address AS base_address,
final_address AS contract_address
FROM
final_groupings
),
function_mapping AS (
SELECT
ia.base_address,
ia.contract_address,
LEFT(
function_signature,
10
) AS function_sig
FROM
identified_addresses ia
JOIN final_function_abis ffa
ON ia.base_address = ffa.contract_address
),
valid_traces AS (
SELECT
DISTINCT base_address
FROM
(
SELECT
base_address
FROM
{{ ref('silver_evm__traces') }}
JOIN function_mapping
ON function_sig = LEFT(
input,
10
)
AND IFF(
TYPE = 'DELEGATECALL',
from_address,
to_address
) = contract_address
WHERE
block_timestamp > DATEADD('month', -12, SYSDATE())
LIMIT
50000)
), event_mapping AS (
SELECT
ia.base_address,
ia.contract_address,
event_signature
FROM
identified_addresses ia
JOIN final_flat_event_abis fea
ON ia.base_address = fea.contract_address
),
valid_logs AS (
SELECT
DISTINCT base_address
FROM
(
SELECT
base_address
FROM
{{ ref('silver_evm__logs') }}
l
JOIN event_mapping ia
ON ia.contract_address = l.contract_address
AND event_signature = topics [0] :: STRING
WHERE
block_timestamp > DATEADD('month', -12, SYSDATE())
LIMIT
50000)
), all_valid_addresses AS (
SELECT
base_address
FROM
valid_traces
UNION
SELECT
base_address
FROM
valid_logs
)
SELECT
contract_address,
abi,
discord_username,
_inserted_timestamp,
abi_hash,
CONCAT(
contract_address,
'-',
abi_hash
) AS id
FROM
base
WHERE
contract_address IN (
SELECT
base_address
FROM
all_valid_addresses
) qualify(ROW_NUMBER() over(PARTITION BY contract_address
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,7 @@
version: 2
models:
- name: silver_evm__user_verified_abis
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- ID

View File

@ -0,0 +1,108 @@
{{ config(
materialized = 'incremental',
unique_key = "contract_address",
tags = ['abis']
) }}
WITH base AS (
SELECT
contract_address,
PARSE_JSON(
abi_data
) AS DATA,
_inserted_timestamp
FROM
{{ ref('bronze_evm_api__contract_abis') }}
WHERE
abi_data [0] :: STRING <> 'ABI unavailable'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
COALESCE(
MAX(
_inserted_timestamp
),
'1970-01-01'
)
FROM
{{ this }}
)
{% endif %}
),
sei_trace_abis AS (
SELECT
contract_address,
DATA,
_inserted_timestamp,
'seitrace' AS abi_source
FROM
base
),
user_abis AS (
SELECT
contract_address,
abi,
discord_username,
_inserted_timestamp,
'user' AS abi_source,
abi_hash
FROM
{{ ref('silver_evm__user_verified_abis') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
COALESCE(
MAX(
_inserted_timestamp
),
'1970-01-01'
)
FROM
{{ this }}
WHERE
abi_source = 'user'
)
AND contract_address NOT IN (
SELECT
contract_address
FROM
{{ this }}
)
{% endif %}
),
all_abis AS (
SELECT
contract_address,
DATA,
_inserted_timestamp,
abi_source,
NULL AS discord_username,
SHA2(DATA) AS abi_hash
FROM
sei_trace_abis
UNION
SELECT
contract_address,
PARSE_JSON(abi) AS DATA,
_inserted_timestamp,
'user' AS abi_source,
discord_username,
abi_hash
FROM
user_abis
)
SELECT
contract_address,
DATA,
_inserted_timestamp,
abi_source,
discord_username,
abi_hash
FROM
all_abis qualify(ROW_NUMBER() over(PARTITION BY contract_address
ORDER BY
_INSERTED_TIMESTAMP DESC)) = 1

View File

@ -0,0 +1,7 @@
version: 2
models:
- name: silver_evm__verified_abis
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- CONTRACT_ADDRESS

View File

@ -0,0 +1,41 @@
{{ config (
materialized = 'view'
) }}
WITH meta AS (
SELECT
last_modified AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 6), '_', 1) AS INTEGER) AS _partition_by_block_number,
TO_DATE(
concat_ws('-', SPLIT_PART(file_name, '/', 3), SPLIT_PART(file_name, '/', 4), SPLIT_PART(file_name, '/', 5))
) AS _partition_by_created_date
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze_streamline", "evm_decoded_logs") }}')
) A
)
SELECT
block_number,
id :: STRING AS id,
DATA,
_inserted_timestamp,
s._partition_by_block_number AS _partition_by_block_number,
s._partition_by_created_date AS _partition_by_created_date
FROM
{{ source(
"bronze_streamline",
"evm_decoded_logs"
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b._partition_by_block_number = s._partition_by_block_number
AND b._partition_by_created_date = s._partition_by_created_date
WHERE
b._partition_by_block_number = s._partition_by_block_number
AND b._partition_by_created_date = s._partition_by_created_date
AND s._partition_by_created_date >= DATEADD('day', -2, CURRENT_TIMESTAMP())

View File

@ -0,0 +1,40 @@
{{ config (
materialized = 'view'
) }}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 6), '_', 1) AS INTEGER) AS _partition_by_block_number,
TO_DATE(
concat_ws('-', SPLIT_PART(file_name, '/', 3), SPLIT_PART(file_name, '/', 4), SPLIT_PART(file_name, '/', 5))
) AS _partition_by_created_date
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", "evm_decoded_logs") }}'
)
) A
)
SELECT
block_number,
id :: STRING AS id,
DATA,
_inserted_timestamp,
s._partition_by_block_number AS _partition_by_block_number,
s._partition_by_created_date AS _partition_by_created_date
FROM
{{ source(
"bronze_streamline",
"evm_decoded_logs"
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b._partition_by_block_number = s._partition_by_block_number
AND b._partition_by_created_date = s._partition_by_created_date
WHERE
b._partition_by_block_number = s._partition_by_block_number
AND b._partition_by_created_date = s._partition_by_created_date

View File

@ -0,0 +1,31 @@
-- depends_on: {{ ref('bronze_evm__decoded_logs') }}
{{ config (
materialized = "incremental",
unique_key = "_log_id",
cluster_by = "ROUND(block_number, -3)",
merge_update_columns = ["_log_id"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(_log_id)",
tags = ['streamline_decoded_logs_complete']
) }}
SELECT
block_number,
id AS _log_id,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze_evm__decoded_logs') }}
WHERE
TO_TIMESTAMP_NTZ(_inserted_timestamp) >= (
SELECT
COALESCE(MAX(TO_TIMESTAMP_NTZ(_inserted_timestamp)), '1970-01-01 00:00:00') _inserted_timestamp
FROM
{{ this }})
{% else %}
{{ ref('bronze_evm__fr_decoded_logs') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,66 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_decode_logs_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"decoded_logs",
"sql_limit" :"1500000",
"producer_batch_size" :"400000",
"worker_batch_size" :"200000",
"sql_source" :"{{this.identifier}}" }
),
tags = ['streamline_decoded_logs_realtime']
) }}
WITH look_back AS (
SELECT
block_number
FROM
{{ ref("_evm_block_lookback") }}
)
SELECT
l.block_number,
l._log_id,
A.abi AS abi,
OBJECT_CONSTRUCT(
'topics',
l.topics,
'data',
l.data,
'address',
l.contract_address
) AS DATA
FROM
{{ ref("silver_evm__logs") }}
l
INNER JOIN {{ ref("silver_evm__complete_event_abis") }} A
ON A.parent_contract_address = l.contract_address
AND A.event_signature = l.topics [0] :: STRING
AND l.block_number BETWEEN A.start_block
AND A.end_block
WHERE
(
l.block_number >= (
SELECT
block_number
FROM
look_back
)
)
AND l.block_number IS NOT NULL
AND l.block_timestamp >= DATEADD('day', -2, CURRENT_DATE())
AND _log_id NOT IN (
SELECT
_log_id
FROM
{{ ref("streamline__complete_decode_logs") }}
WHERE
block_number >= (
SELECT
block_number
FROM
look_back
)
AND _inserted_timestamp >= DATEADD('day', -2, CURRENT_DATE())
)

View File

@ -47,6 +47,7 @@ sources:
- name: blocks_v2
- name: transactions_v2
- name: txcount_v2
- name: evm_decoded_logs
- name: bronze
schema: bronze
tables:
@ -66,4 +67,9 @@ sources:
database: sei
schema: github_actions
tables:
- name: workflows
- name: workflows
- name: crosschain_public
database: crosschain
schema: bronze_public
tables:
- name: user_abis

View File

@ -4,11 +4,11 @@ packages:
- package: dbt-labs/dbt_utils
version: 1.0.0
- git: https://github.com/FlipsideCrypto/fsc-utils.git
revision: 484e9db07d2060286768bb745e1b0e879178d43b
revision: 12aed56282bf1d03542f28986e6cf3e7a656b8e5
- package: get-select/dbt_snowflake_query_tags
version: 2.3.3
version: 2.5.0
- package: calogica/dbt_date
version: 0.7.2
- git: https://github.com/FlipsideCrypto/livequery-models.git
revision: b024188be4e9c6bc00ed77797ebdc92d351d620e
sha1_hash: c5a90d3c4a3f4e5450031099b59d32e30c7d759e
sha1_hash: 6899ebf470799bc86decf08e82314d6968e678f1

View File

@ -4,6 +4,6 @@ packages:
- package: dbt-labs/dbt_utils
version: 1.0.0
- git: https://github.com/FlipsideCrypto/fsc-utils.git
revision: v1.23.0
revision: v1.25.1
- package: get-select/dbt_snowflake_query_tags
version: [">=2.0.0", "<3.0.0"]