An 2605/decoded data (#327)

* decoded data

* rename

* - update udf to check if value is a list

* - add try/except in udf_transform_logs

* - add schema paraemter when calling macro for decoding logs

* - fixed transform udf

* - removed extraneous list for transformed events

* - removed unused variables in transform_logs macro

* - fixed dupe transformed events

* - removed nesting of components

* updates

* remove array check

* docs

* docs

Co-authored-by: Julius Remigio <14811322+juls858@users.noreply.github.com>
This commit is contained in:
Austin 2022-12-21 16:16:11 -05:00 committed by GitHub
parent 853c40a4eb
commit 57c1e166e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 267 additions and 2 deletions

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -6,6 +6,9 @@
{{ create_udf_hex_to_int(
schema = "public"
) }}
{{ create_udf_transform_logs(
schema = 'silver'
) }}
{{ create_udf_hex_to_int_with_inputs(
schema = "public"
) }}

View File

@ -0,0 +1,34 @@
{% macro create_udf_transform_logs(schema) %}
create or replace function {{ schema }}.udf_transform_logs(decoded variant)
returns variant
language python
runtime_version = '3.8'
handler = 'transform' as $$
from copy import deepcopy
def transform_event(event: dict):
new_event = deepcopy(event)
if new_event.get("components"):
components = new_event.get("components")
for iy, y in enumerate(new_event["value"]):
for i, c in enumerate(components):
y[i] = {"value": y[i], **c}
new_event["value"][iy] = {z["name"]: z["value"] for z in y}
return new_event
else:
return event
def transform(events: list):
try:
results = [
transform_event(event) if event["decoded"] else event
for event in events["data"]
]
events["data"] = results
return events
except:
return events
$$;
{% endmacro %}

View File

@ -0,0 +1,17 @@
{{ config(
materialized = 'view',
persist_docs ={ "relation": true,
"columns": true }
) }}
SELECT
block_number,
tx_hash,
event_index,
event_name,
contract_address,
decoded_flat as decoded_log,
decoded_data AS full_decoded_log
FROM
{{ ref('silver__decoded_logs') }}

View File

@ -0,0 +1,38 @@
version: 2
models:
- name: beta__fact_decoded_logs
description: >
'BETA TABLE!
THIS TABLE IS SUBJECT TO CHANGE WITHOUT NOTICE AND MAY CONTAIN BUGS. DO NOT USE FOR DASHBOARDS OR IMPORTANT ANALYSIS.
This table is a beta version of our new ABI decoder for EVM events. Please report bugs as you find them, but know any issues you find will not be fixed immediately.
Only includes blocks 14,000,000 - 16,000,000 and does not update.
We are aware of some popular contracts (e.g. Blur, some of Univ3) that are not included in this table, and we are working on adding them.
Any feedback on this table is welcome, but please note that this table is not intended for production use in any way.'
columns:
- name: BLOCK_NUMBER
description: The block number of the block containing the transaction that emitted the event.'
- name: TX_HASH
description: 'The hash of the transaction that emitted the event.'
- name: EVENT_INDEX
description: 'The index of the event in the transaction.'
- name: EVENT_NAME
description: 'The name of the event.'
- name: CONTRACT_ADDRESS
description: 'The address of the contract that emitted the event.'
- name: DECODED_LOG
description: 'The ABI-decoded event log, with values mapped to their names. This column is most similar to the "event_inputs" column within "fact_event_logs".'
- name: FULL_DECODED_LOG
description: 'The ABI-decoded event log, including all fields.'

View File

@ -0,0 +1,120 @@
{{ config (
materialized = "incremental",
unique_key = "_log_id",
cluster_by = "ROUND(block_number, -3)",
merge_update_columns = ["_log_id"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(_log_id)"
) }}
WITH meta AS (
SELECT
registered_on,
last_modified,
file_name
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", "decoded_logs") }}'
)
) A
{% if is_incremental() %}
WHERE
LEAST(
registered_on,
last_modified
) >= (
SELECT
COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP
FROM
{{ this }})
)
{% else %}
)
{% endif %},
decoded_logs AS (
SELECT
block_number :: INTEGER AS block_number,
SPLIT(
id,
'-'
) [0] :: STRING AS tx_hash,
SPLIT(
id,
'-'
) [1] :: INTEGER AS event_index,
DATA :name :: STRING AS event_name,
LOWER(
DATA :address :: STRING
) :: STRING AS contract_address,
DATA AS decoded_data,
id :: STRING AS _log_id,
registered_on :: TIMESTAMP AS _inserted_timestamp
FROM
{{ source(
"bronze_streamline",
"decoded_logs"
) }} AS s
JOIN meta b
ON b.file_name = metadata$filename
WHERE
block_number > 14000000 qualify(ROW_NUMBER() over (PARTITION BY _log_id
ORDER BY
_inserted_timestamp DESC)) = 1
),
transformed_logs AS (
SELECT
block_number,
tx_hash,
event_index,
contract_address,
event_name,
decoded_data,
_inserted_timestamp,
_log_id,
silver.udf_transform_logs(decoded_data) AS transformed
FROM
decoded_logs
),
FINAL AS (
SELECT
b.tx_hash,
b.block_number,
b.event_index,
b.event_name,
b.contract_address,
b.decoded_data,
transformed,
b._log_id,
b._inserted_timestamp,
OBJECT_AGG(
DISTINCT CASE
WHEN v.value :name = '' THEN CONCAT(
'anonymous_',
v.index
)
ELSE v.value :name
END,
v.value :value
) AS decoded_flat
FROM
transformed_logs b,
LATERAL FLATTEN(
input => transformed :data
) v
GROUP BY
b.tx_hash,
b.block_number,
b.event_index,
b.event_name,
b.contract_address,
b.decoded_data,
transformed,
b._log_id,
b._inserted_timestamp
)
SELECT
*
FROM
FINAL

View File

@ -0,0 +1,53 @@
version: 2
models:
- name: silver__decoded_logs
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- _LOG_ID
- dbt_utils.recency:
datepart: day
field: _INSERTED_TIMESTAMP
interval: 1
enabled: false
columns:
- name: BLOCK_NUMBER
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: TX_HASH
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: EVENT_INDEX
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: CONTRACT_ADDRESS
tests:
- not_null
- dbt_expectations.expect_column_values_to_match_regex:
regex: 0[xX][0-9a-fA-F]+
- name: _INSERTED_TIMESTAMP
tests:
- not_null
- name: EVENT_NAME
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR