AN-4241/abi-decoded-logs2 (#175)

* abi pipeline + decodedlogs2

* package version

* typo

* export columns
This commit is contained in:
drethereum 2023-12-11 12:01:50 -07:00 committed by GitHub
parent 145c12942a
commit 8c4efdb628
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 812 additions and 189 deletions

View File

@ -0,0 +1,45 @@
name: dbt_run_streamline_temp_decoder2
run-name: dbt_run_streamline_temp_decoder2
on:
workflow_dispatch:
schedule:
# Runs "every 1 hours" (see https://crontab.guru)
- cron: '5 */1 * * *'
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod_backfill
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run -m "arbitrum_models,tag:decoded_logs2"

View File

@ -0,0 +1,16 @@
{{ config(
materialized = 'view',
persist_docs ={ "relation": true,
"columns": true }
) }}
SELECT
contract_address,
DATA AS abi,
abi_source,
bytecode,
abis_id AS dim_contract_abis_id,
inserted_timestamp,
modified_timestamp
FROM
{{ ref('silver__abis') }}

View File

@ -0,0 +1,19 @@
version: 2
models:
- name: core__dim_contract_abis
description: >
'This table contains the contract ABIs that we have sourced from Arbiscan, the community, or bytecode matched. This table is the source of ABIs used in the `core__ez_decoded_event_logs` and `core__fact_decoded_event_logs` tables.
We first try to source ABIs from Arbiscan. If we cannot find an ABI on Arbiscan, we will rely on user submissions. To add a contract to this table, please visit [here](https://science.flipsidecrypto.xyz/abi-requestor/).
If we are unable to locate an ABI for a contract from Arbiscan or the community, we will try to find an ABI to use by matching the contract bytecode to a known contract bytecode we do have an ABI for.'
columns:
- name: CONTRACT_ADDRESS
description: 'The address of the contract.'
- name: ABI
description: 'The JSON ABI for the contract.'
- name: ABI_SOURCE
description: 'The source of the ABI. This can be `Arbiscan`, `user_submitted`, or `bytecode_matched`.'
- name: BYTECODE
description: 'The deployed bytecode of the contract.'

View File

@ -1,6 +1,7 @@
{{ config (
materialized = "incremental",
unique_key = "contract_address",
merge_exclude_columns = ["inserted_timestamp"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(contract_address)",
tags = ['abis']
) }}
@ -17,6 +18,8 @@ WITH override_abis AS (
1 AS priority
FROM
{{ ref('silver__override_abis') }}
WHERE
contract_address IS NOT NULL
),
verified_abis AS (
SELECT
@ -62,12 +65,15 @@ user_abis AS (
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
COALESCE(MAX(_inserted_timestamp), '1970-01-01' :: TIMESTAMP)
MAX(
_inserted_timestamp
)
FROM
{{ this }}
WHERE
abi_source = 'user')
{% endif %}
abi_source = 'user'
)
{% endif %}
),
bytecode_abis AS (
SELECT
@ -81,7 +87,7 @@ bytecode_abis AS (
FROM
{{ ref('silver__bytecode_abis') }}
WHERE
NOT bytecode_dupe
1 = 1
{% if is_incremental() %}
AND _inserted_timestamp >= (
@ -162,10 +168,14 @@ SELECT
p.abi_source,
p.discord_username,
p.abi_hash,
created_contract_input AS bytecode
created_contract_input AS bytecode,
{{ dbt_utils.generate_surrogate_key(
['contract_address']
) }} AS abis_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
priority_abis p
LEFT JOIN {{ ref('silver__created_contracts') }}
ON p.contract_address = created_contract_address
WHERE
p.contract_address IS NOT NULL

View File

@ -1,20 +1,33 @@
{{ config (
materialized = "incremental",
unique_key = "abi_id",
unique_key = "contract_address",
tags = ['abis']
) }}
WITH bytecodes AS (
WITH contracts_with_abis AS (
-- Identifying contracts with verified ABIs
SELECT
created_contract_address AS contract_address,
A.data AS abi,
created_contract_input AS bytecode,
abi_hash
created_contract_address AS contract_address
FROM
{{ ref('silver__created_contracts') }}
LEFT JOIN {{ ref('silver__verified_abis') }} A
JOIN {{ ref('silver__verified_abis') }} A
ON A.contract_address = created_contract_address
),
contracts_without_abis AS (
-- Contracts that are missing ABIs
SELECT
created_contract_address AS contract_address,
created_contract_input AS bytecode
FROM
{{ ref('silver__created_contracts') }}
WHERE
created_contract_address NOT IN (
SELECT
contract_address
FROM
contracts_with_abis
)
{% if is_incremental() %}
AND created_contract_address NOT IN (
@ -25,63 +38,39 @@ AND created_contract_address NOT IN (
)
{% endif %}
),
contracts_missing_abis AS (
unique_bytecode_abis AS (
-- Bytecodes from created_contracts with a unique ABI
SELECT
contract_address,
bytecode
cc.created_contract_input AS bytecode,
va.data AS abi,
va.abi_hash
FROM
bytecodes
WHERE
abi_hash IS NULL
),
bytecode_abis AS (
SELECT
*,
ROW_NUMBER() over(
PARTITION BY bytecode
ORDER BY
abi_length DESC
) AS abi_row_no
FROM
(
SELECT
DISTINCT bytecode,
abi,
abi_hash,
LENGTH(abi) AS abi_length
FROM
bytecodes
WHERE
abi_hash IS NOT NULL
)
),
dupe_check AS (
SELECT
bytecode,
COUNT(*) AS num_abis
FROM
bytecode_abis
{{ ref('silver__created_contracts') }}
cc
JOIN {{ ref('silver__verified_abis') }}
va
ON cc.created_contract_address = va.contract_address
GROUP BY
bytecode
cc.created_contract_input,
va.data,
va.abi_hash
HAVING
COUNT(*) > 1
)
COUNT(
DISTINCT va.data
) = 1 -- Ensuring there's only one ABI per bytecode
) -- Final matching
SELECT
contract_address,
abi,
abi_hash,
SYSDATE() AS _inserted_timestamp,
abi_row_no,
CONCAT(
contract_address,
'-',
abi_row_no
) AS abi_id,
CASE
WHEN num_abis > 1 THEN TRUE
ELSE FALSE
END AS bytecode_dupe
{% if is_incremental() %}
SYSDATE()
{% else %}
TO_TIMESTAMP_NTZ('2000-01-01 00:00:00')
{% endif %}
AS _inserted_timestamp
FROM
contracts_missing_abis
JOIN bytecode_abis USING (bytecode)
LEFT JOIN dupe_check USING (bytecode)
contracts_without_abis
JOIN unique_bytecode_abis USING (bytecode)

View File

@ -4,4 +4,5 @@ models:
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- ABI_ID
- CONTRACT_ADDRESS
- ABI_HASH

View File

@ -1,167 +1,159 @@
{{ config (
materialized = 'table',
materialized = 'incremental',
unique_key = ['parent_contract_address','event_signature','start_block'],
merge_exclude_columns = ["inserted_timestamp"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION",
tags = ['abis']
) }}
WITH abi_base AS (
WITH proxies AS (
SELECT
created_block,
proxy_created_block,
contract_address,
DATA
proxy_address,
start_block,
_id,
_inserted_timestamp
FROM
{{ ref('silver__abis') }}
{{ ref('silver__proxies') }}
),
flat_abi AS (
flat_abis AS (
SELECT
contract_address,
DATA,
VALUE :inputs AS inputs,
VALUE :payable :: BOOLEAN AS payable,
VALUE :stateMutability :: STRING AS stateMutability,
VALUE :type :: STRING AS TYPE,
VALUE :anonymous :: BOOLEAN AS anonymous,
VALUE :name :: STRING AS NAME
FROM
abi_base,
LATERAL FLATTEN (
input => DATA
)
WHERE
TYPE = 'event'
),
event_types AS (
SELECT
contract_address,
inputs,
anonymous,
event_name,
abi,
simple_event_name,
event_signature,
NAME,
ARRAY_AGG(
VALUE :type :: STRING
) AS event_type
FROM
flat_abi,
LATERAL FLATTEN (
input => inputs
)
GROUP BY
contract_address,
inputs,
anonymous,
NAME
),
proxy_base AS (
SELECT
C.created_contract_address AS contract_address,
p.proxy_address,
p.start_block,
C.block_number AS created_block
event_type,
_inserted_timestamp
FROM
{{ ref('silver__created_contracts') }} C
INNER JOIN {{ ref('silver__proxies') }}
p
ON C.created_contract_address = p.contract_address
AND p.proxy_address <> '0x0000000000000000000000000000000000000000'
{{ ref('silver__flat_event_abis') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '24 hours'
FROM
{{ this }}
)
OR contract_address IN (
SELECT
DISTINCT contract_address AS contract_address
FROM
proxies
UNION ALL
SELECT
DISTINCT proxy_address AS contract_address
FROM
proxies
)
{% endif %}
),
stacked AS (
base AS (
SELECT
ea.contract_address,
ea.inputs,
ea.anonymous,
ea.name,
ea.event_type,
event_name,
abi,
simple_event_name,
event_signature,
NAME,
inputs,
event_type,
ea._inserted_timestamp,
pb._inserted_timestamp AS proxy_inserted_timestamp,
pb.start_block,
pb.proxy_created_block,
pb.contract_address AS base_contract_address,
1 AS priority
FROM
event_types ea
INNER JOIN proxy_base pb
flat_abis ea
JOIN proxies pb
ON ea.contract_address = pb.proxy_address
UNION ALL
SELECT
eab.contract_address,
eab.inputs,
eab.anonymous,
eab.name,
eab.event_type,
event_name,
abi,
simple_event_name,
event_signature,
NAME,
inputs,
event_type,
eab._inserted_timestamp,
pbb._inserted_timestamp AS proxy_inserted_timestamp,
pbb.created_block AS start_block,
pbb.proxy_created_block,
pbb.contract_address AS base_contract_address,
2 AS priority
FROM
event_types eab
INNER JOIN (
flat_abis eab
JOIN (
SELECT
DISTINCT contract_address,
created_block
created_block,
proxy_created_block,
_inserted_timestamp
FROM
proxy_base
proxies
) pbb
ON eab.contract_address = pbb.contract_address
UNION ALL
SELECT
eac.contract_address,
eac.inputs,
eac.anonymous,
eac.name,
eac.event_type,
contract_address,
event_name,
abi,
simple_event_name,
event_signature,
NAME,
inputs,
event_type,
_inserted_timestamp,
NULL AS proxy_inserted_timestamp,
0 AS start_block,
eac.contract_address AS base_contract_address,
NULL AS proxy_created_block,
contract_address AS base_contract_address,
3 AS priority
FROM
event_types eac
flat_abis eac
WHERE
contract_address NOT IN (
SELECT
DISTINCT contract_address
FROM
proxy_base
proxies
)
),
apply_udfs AS (
new_records AS (
SELECT
contract_address AS source_contract_address,
base_contract_address AS parent_contract_address,
NAME AS event_name,
PARSE_JSON(
OBJECT_CONSTRUCT(
'anonymous',
anonymous,
'inputs',
inputs,
'name',
NAME,
'type',
'event'
) :: STRING
) AS abi,
start_block,
utils.udf_evm_text_signature(abi) AS simple_event_name,
utils.udf_keccak256(simple_event_name) AS event_signature,
priority,
NAME,
inputs,
event_type
FROM
stacked
),
FINAL AS (
SELECT
parent_contract_address,
event_name,
abi,
start_block,
proxy_created_block,
simple_event_name,
event_signature,
NAME,
inputs,
event_type
event_type,
_inserted_timestamp,
proxy_inserted_timestamp
FROM
apply_udfs qualify ROW_NUMBER() over (
base qualify ROW_NUMBER() over (
PARTITION BY parent_contract_address,
NAME,
event_type,
event_signature,
start_block
ORDER BY
priority ASC
priority ASC,
_inserted_timestamp DESC,
proxy_created_block DESC NULLS LAST,
proxy_inserted_timestamp DESC NULLS LAST
) = 1
)
SELECT
@ -169,10 +161,27 @@ SELECT
event_name,
abi,
start_block,
proxy_created_block,
simple_event_name,
event_signature,
IFNULL(LEAD(start_block) over (PARTITION BY parent_contract_address, event_signature
ORDER BY
start_block) -1, 1e18) AS end_block
start_block) -1, 1e18) AS end_block,
_inserted_timestamp,
proxy_inserted_timestamp,
SYSDATE() AS _updated_timestamp,
{{ dbt_utils.generate_surrogate_key(
['parent_contract_address','event_signature','start_block']
) }} AS complete_event_abis_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
FINAL
new_records qualify ROW_NUMBER() over (
PARTITION BY parent_contract_address,
event_name,
event_signature,
start_block
ORDER BY
_inserted_timestamp DESC
) = 1

View File

@ -0,0 +1,112 @@
{{ config (
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'contract_address',
cluster_by = '_inserted_timestamp::date',
tags = ['abis'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY (contract_address)"
) }}
WITH abi_base AS (
SELECT
contract_address,
DATA,
_inserted_timestamp
FROM
{{ ref('silver__abis') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '24 hours'
FROM
{{ this }}
)
{% endif %}
),
flat_abi AS (
SELECT
contract_address,
_inserted_timestamp,
DATA,
VALUE :inputs AS inputs,
VALUE :payable :: BOOLEAN AS payable,
VALUE :stateMutability :: STRING AS stateMutability,
VALUE :type :: STRING AS TYPE,
VALUE :anonymous :: BOOLEAN AS anonymous,
VALUE :name :: STRING AS NAME
FROM
abi_base,
LATERAL FLATTEN (
input => DATA
)
WHERE
TYPE = 'event' qualify ROW_NUMBER() over (
PARTITION BY contract_address,
NAME,
inputs
ORDER BY
LENGTH(inputs)
) = 1
),
event_types AS (
SELECT
contract_address,
_inserted_timestamp,
inputs,
anonymous,
NAME,
ARRAY_AGG(
VALUE :type :: STRING
) AS event_type
FROM
flat_abi,
LATERAL FLATTEN (
input => inputs
)
GROUP BY
contract_address,
_inserted_timestamp,
inputs,
anonymous,
NAME
),
apply_udfs AS (
SELECT
contract_address,
NAME AS event_name,
PARSE_JSON(
OBJECT_CONSTRUCT(
'anonymous',
anonymous,
'inputs',
inputs,
'name',
NAME,
'type',
'event'
) :: STRING
) AS abi,
utils.udf_evm_text_signature(abi) AS simple_event_name,
utils.udf_keccak256(simple_event_name) AS event_signature,
NAME,
inputs,
event_type,
_inserted_timestamp
FROM
event_types
)
SELECT
contract_address,
event_name,
abi,
simple_event_name,
event_signature,
NAME,
inputs,
event_type,
_inserted_timestamp
FROM
apply_udfs

View File

@ -1,5 +1,7 @@
{{ config (
materialized = "table",
materialized = 'incremental',
unique_key = ['contract_address','proxy_address'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION",
tags = ['abis']
) }}
@ -8,25 +10,94 @@ WITH base AS (
SELECT
from_address,
to_address,
MIN(block_number) AS start_block
MIN(block_number) AS start_block,
MAX(_inserted_timestamp) AS _inserted_timestamp
FROM
{{ ref('core__fact_traces') }}
{{ ref('silver__traces') }}
WHERE
TYPE = 'DELEGATECALL' -- AND trace_status = 'SUCCESS'
TYPE = 'DELEGATECALL'
AND trace_status = 'SUCCESS'
AND tx_status = 'SUCCESS'
AND from_address != to_address -- exclude self-calls
GROUP BY
from_address,
to_address
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '24 hours'
FROM
{{ this }}
)
{% endif %}
GROUP BY
from_address,
to_address
),
create_id AS (
SELECT
from_address AS contract_address,
to_address AS proxy_address,
start_block,
CONCAT(
from_address,
'-',
to_address
) AS _id,
_inserted_timestamp
FROM
base
),
heal AS (
SELECT
contract_address,
proxy_address,
start_block,
_id,
_inserted_timestamp
FROM
create_id
{% if is_incremental() %}
UNION ALL
SELECT
contract_address,
proxy_address,
start_block,
_id,
_inserted_timestamp
FROM
{{ this }}
JOIN create_id USING (
contract_address,
proxy_address
)
{% endif %}
),
FINAL AS (
SELECT
contract_address,
proxy_address,
start_block,
_id,
_inserted_timestamp
FROM
heal qualify ROW_NUMBER() over (
PARTITION BY contract_address,
proxy_address
ORDER BY
start_block ASC
) = 1
)
SELECT
from_address AS contract_address,
to_address AS proxy_address,
start_block,
CONCAT(
from_address,
'-',
to_address
) AS _id
f.contract_address,
f.proxy_address,
f.start_block,
f._id,
f._inserted_timestamp,
C.block_number AS created_block,
p.block_number AS proxy_created_block
FROM
base
FINAL f
JOIN {{ ref('silver__created_contracts') }} C
ON f.contract_address = C.created_contract_address
JOIN {{ ref('silver__created_contracts') }} p
ON f.proxy_address = p.created_contract_address

View File

@ -0,0 +1,253 @@
-- depends_on: {{ ref('bronze__decoded_logs') }}
{{ config (
materialized = "incremental",
unique_key = ['block_number', 'event_index'],
cluster_by = "block_timestamp::date",
incremental_predicates = ["dynamic_range", "block_number"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION",
merge_exclude_columns = ["inserted_timestamp"],
tags = ['decoded_logs2']
) }}
-- revert after backfill
-- full_refresh = false,
-- tags = ['decoded_logs','reorg']
WITH base_data AS (
SELECT
block_number :: INTEGER AS block_number,
SPLIT(
id,
'-'
) [0] :: STRING AS tx_hash,
SPLIT(
id,
'-'
) [1] :: INTEGER AS event_index,
DATA :name :: STRING AS event_name,
LOWER(
DATA :address :: STRING
) :: STRING AS contract_address,
DATA AS decoded_data,
id :: STRING AS _log_id,
TO_TIMESTAMP_NTZ(_inserted_timestamp) AS _inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__fr_decoded_logs') }} --revert to bronze__decoded_logs after backfill
WHERE
--TO_TIMESTAMP_NTZ(_inserted_timestamp) >= (
--SELECT
--MAX(_inserted_timestamp)
--FROM
--{{ this }}
--)
_partition_by_block_number BETWEEN (
SELECT
ROUND(MAX(block_number), -4)
FROM
{{ this }}
)
AND (
SELECT
ROUND(MAX(block_number), -4) + 5000000
FROM
{{ this }})
AND DATA NOT ILIKE '%Event topic is not present in given ABI%'
{% else %}
{{ ref('bronze__fr_decoded_logs') }}
WHERE
_partition_by_block_number <= 10000000
AND DATA NOT ILIKE '%Event topic is not present in given ABI%'
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY block_number, event_index
ORDER BY
_inserted_timestamp DESC, _partition_by_created_date DESC)) = 1
),
transformed_logs AS (
SELECT
block_number,
tx_hash,
event_index,
contract_address,
event_name,
decoded_data,
_inserted_timestamp,
_log_id,
utils.udf_transform_logs(decoded_data) AS transformed
FROM
base_data
),
FINAL AS (
SELECT
b.tx_hash,
b.block_number,
b.event_index,
b.event_name,
b.contract_address,
b.decoded_data,
transformed,
b._log_id,
b._inserted_timestamp,
OBJECT_AGG(
DISTINCT CASE
WHEN v.value :name = '' THEN CONCAT(
'anonymous_',
v.index
)
ELSE v.value :name
END,
v.value :value
) AS decoded_flat
FROM
transformed_logs b,
LATERAL FLATTEN(
input => transformed :data
) v
GROUP BY
b.tx_hash,
b.block_number,
b.event_index,
b.event_name,
b.contract_address,
b.decoded_data,
transformed,
b._log_id,
b._inserted_timestamp
),
new_records AS (
SELECT
b.tx_hash,
b.block_number,
b.event_index,
b.event_name,
b.contract_address,
b.decoded_data,
b.transformed,
b._log_id,
b._inserted_timestamp,
b.decoded_flat,
block_timestamp,
origin_function_signature,
origin_from_address,
origin_to_address,
topics,
DATA,
event_removed :: STRING AS event_removed,
tx_status,
CASE
WHEN block_timestamp IS NULL THEN TRUE
ELSE FALSE
END AS is_pending
FROM
FINAL b
LEFT JOIN {{ ref('silver__logs') }} USING (
block_number,
_log_id
)
)
{% if is_incremental() %},
missing_data AS (
SELECT
t.tx_hash,
t.block_number,
t.event_index,
t.event_name,
t.contract_address,
t.decoded_data,
t.transformed,
t._log_id,
GREATEST(
TO_TIMESTAMP_NTZ(
t._inserted_timestamp
),
TO_TIMESTAMP_NTZ(
l._inserted_timestamp
)
) AS _inserted_timestamp,
t.decoded_flat,
l.block_timestamp,
l.origin_function_signature,
l.origin_from_address,
l.origin_to_address,
l.topics,
l.data,
l.event_removed :: STRING AS event_removed,
l.tx_status,
FALSE AS is_pending
FROM
{{ this }}
t
INNER JOIN {{ ref('silver__logs') }}
l USING (
block_number,
_log_id
)
WHERE
t.is_pending
AND l.block_timestamp IS NOT NULL
)
{% endif %}
SELECT
tx_hash,
block_number,
event_index,
event_name,
contract_address,
decoded_data,
transformed,
_log_id,
_inserted_timestamp,
decoded_flat,
block_timestamp,
origin_function_signature,
origin_from_address,
origin_to_address,
topics,
DATA,
event_removed,
tx_status,
is_pending,
{{ dbt_utils.generate_surrogate_key(
['tx_hash', 'event_index']
) }} AS decoded_logs_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
new_records
{% if is_incremental() %}
UNION
SELECT
tx_hash,
block_number,
event_index,
event_name,
contract_address,
decoded_data,
transformed,
_log_id,
_inserted_timestamp,
decoded_flat,
block_timestamp,
origin_function_signature,
origin_from_address,
origin_to_address,
topics,
DATA,
event_removed,
tx_status,
is_pending,
{{ dbt_utils.generate_surrogate_key(
['tx_hash', 'event_index']
) }} AS decoded_logs_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
missing_data
{% endif %}

View File

@ -0,0 +1,9 @@
{{ config (
materialized = 'view',
tags = ['full_test']
) }}
SELECT
*
FROM
{{ ref('silver__decoded_logs') }}

View File

@ -1,6 +1,6 @@
version: 2
models:
- name: silver__decoded_logs
- name: test_silver__decoded_logs_full
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
@ -44,3 +44,9 @@ models:
column_type_list:
- STRING
- VARCHAR

View File

@ -0,0 +1,27 @@
{{ config (
materialized = 'view',
tags = ['recent_test']
) }}
WITH last_3_days AS (
SELECT
block_number
FROM
{{ ref("_max_block_by_date") }}
qualify ROW_NUMBER() over (
ORDER BY
block_number DESC
) = 3
)
SELECT
*
FROM
{{ ref('silver__decoded_logs') }}
WHERE
block_number >= (
SELECT
block_number
FROM
last_3_days
)

View File

@ -0,0 +1,56 @@
version: 2
models:
- name: test_silver__decoded_logs_recent
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- _LOG_ID
- dbt_utils.recency:
datepart: day
field: _INSERTED_TIMESTAMP
interval: 1
- fsc_utils.recent_decoded_logs_match:
config:
severity: error
error_if: ">0"
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: TX_HASH
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: EVENT_INDEX
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: CONTRACT_ADDRESS
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: _INSERTED_TIMESTAMP
tests:
- not_null
- name: EVENT_NAME
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR

View File

@ -6,6 +6,6 @@ packages:
- package: dbt-labs/dbt_utils
version: 1.0.0
- git: https://github.com/FlipsideCrypto/fsc-utils.git
revision: v1.11.0
revision: v1.12.0
- package: get-select/dbt_snowflake_query_tags
version: [">=2.0.0", "<3.0.0"]