bronze table and silver cluster

This commit is contained in:
mattromano 2024-06-25 14:20:38 -07:00
parent a98be9edc3
commit 87a5c21f99
4 changed files with 92 additions and 17 deletions

View File

@ -1,8 +1,8 @@
-- depends_on: {{ source('klaytn_bronze','decoded_logs') }}
-- depends_on: {{ ref('bronze__decoded_logs') }}
{{ config (
materialized = "incremental",
unique_key = ['block_number', 'event_index'],
cluster_by = "block_timestamp::date",
cluster_by = ["modified_timestamp::date","block_timestamp::date"],
incremental_predicates = ["dynamic_range", "block_number"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION",
merge_exclude_columns = ["inserted_timestamp"],
@ -32,10 +32,7 @@ WITH base_data AS (
FROM
{% if is_incremental() %}
{{ source(
'klaytn_bronze',
'decoded_logs'
) }}
{{ ref('bronze__decoded_logs') }}
WHERE
TO_TIMESTAMP_NTZ(_inserted_timestamp) >= (
SELECT
@ -45,10 +42,7 @@ WHERE
)
AND DATA NOT ILIKE '%Event topic is not present in given ABI%'
{% else %}
{{ source(
'klaytn_bronze',
'fr_decoded_logs'
) }}
{{ ref('bronze__fr_decoded_logs') }}
WHERE
DATA NOT ILIKE '%Event topic is not present in given ABI%'
{% endif %}

View File

@ -11,12 +11,6 @@ sources:
- name: receipts
- name: traces
- name: transactions
- name: klaytn_bronze
database: klaytn
schema: bronze
tables:
- name: decoded_logs
- name: fr_decoded_logs
- name: github_actions
database: kaia
schema: github_actions
@ -36,4 +30,10 @@ sources:
database: crosschain
schema: bronze_public
tables:
- name: user_abis
- name: user_abis
- name: bronze_streamline
database: streamline
schema: |
{{ "KLAYTN_DEV" if var("STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES", False) else "KLAYTN" }}
tables:
- name: decoded_logs

View File

@ -0,0 +1,41 @@
{{ config (
materialized = 'view'
) }}
WITH meta AS (
SELECT
last_modified AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 6), '_', 1) AS INTEGER) AS _partition_by_block_number,
TO_DATE(
concat_ws('-', SPLIT_PART(file_name, '/', 3), SPLIT_PART(file_name, '/', 4), SPLIT_PART(file_name, '/', 5))
) AS _partition_by_created_date
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze_streamline", "decoded_logs") }}')
) A
)
SELECT
block_number,
id :: STRING AS id,
DATA,
_inserted_timestamp,
s._partition_by_block_number AS _partition_by_block_number,
s._partition_by_created_date AS _partition_by_created_date
FROM
{{ source(
"bronze_streamline",
"decoded_logs"
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b._partition_by_block_number = s._partition_by_block_number
AND b._partition_by_created_date = s._partition_by_created_date
WHERE
b._partition_by_block_number = s._partition_by_block_number
AND b._partition_by_created_date = s._partition_by_created_date
AND s._partition_by_created_date >= DATEADD('day', -2, CURRENT_TIMESTAMP())

View File

@ -0,0 +1,40 @@
{{ config (
materialized = 'view'
) }}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 6), '_', 1) AS INTEGER) AS _partition_by_block_number,
TO_DATE(
concat_ws('-', SPLIT_PART(file_name, '/', 3), SPLIT_PART(file_name, '/', 4), SPLIT_PART(file_name, '/', 5))
) AS _partition_by_created_date
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", "decoded_logs") }}'
)
) A
)
SELECT
block_number,
id :: STRING AS id,
DATA,
_inserted_timestamp,
s._partition_by_block_number AS _partition_by_block_number,
s._partition_by_created_date AS _partition_by_created_date
FROM
{{ source(
"bronze_streamline",
"decoded_logs"
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b._partition_by_block_number = s._partition_by_block_number
AND b._partition_by_created_date = s._partition_by_created_date
WHERE
b._partition_by_block_number = s._partition_by_block_number
AND b._partition_by_created_date = s._partition_by_created_date