diff --git a/models/silver/core/silver__decoded_logs.sql b/models/silver/core/silver__decoded_logs.sql index b4b9158..28881bf 100644 --- a/models/silver/core/silver__decoded_logs.sql +++ b/models/silver/core/silver__decoded_logs.sql @@ -1,8 +1,8 @@ --- depends_on: {{ source('klaytn_bronze','decoded_logs') }} +-- depends_on: {{ ref('bronze__decoded_logs') }} {{ config ( materialized = "incremental", unique_key = ['block_number', 'event_index'], - cluster_by = "block_timestamp::date", + cluster_by = ["modified_timestamp::date","block_timestamp::date"], incremental_predicates = ["dynamic_range", "block_number"], post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION", merge_exclude_columns = ["inserted_timestamp"], @@ -32,10 +32,7 @@ WITH base_data AS ( FROM {% if is_incremental() %} - {{ source( - 'klaytn_bronze', - 'decoded_logs' - ) }} +{{ ref('bronze__decoded_logs') }} WHERE TO_TIMESTAMP_NTZ(_inserted_timestamp) >= ( SELECT @@ -45,10 +42,7 @@ WHERE ) AND DATA NOT ILIKE '%Event topic is not present in given ABI%' {% else %} - {{ source( - 'klaytn_bronze', - 'fr_decoded_logs' - ) }} + {{ ref('bronze__fr_decoded_logs') }} WHERE DATA NOT ILIKE '%Event topic is not present in given ABI%' {% endif %} diff --git a/models/sources.yml b/models/sources.yml index b38bae9..b635307 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -11,12 +11,6 @@ sources: - name: receipts - name: traces - name: transactions - - name: klaytn_bronze - database: klaytn - schema: bronze - tables: - - name: decoded_logs - - name: fr_decoded_logs - name: github_actions database: kaia schema: github_actions @@ -36,4 +30,10 @@ sources: database: crosschain schema: bronze_public tables: - - name: user_abis \ No newline at end of file + - name: user_abis + - name: bronze_streamline + database: streamline + schema: | + {{ "KLAYTN_DEV" if var("STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES", False) else "KLAYTN" }} + tables: + - name: decoded_logs diff --git a/models/streamline/bronze/decoder/bronze__decoded_logs.sql b/models/streamline/bronze/decoder/bronze__decoded_logs.sql new file mode 100644 index 0000000..bd43f6f --- /dev/null +++ b/models/streamline/bronze/decoder/bronze__decoded_logs.sql @@ -0,0 +1,41 @@ +{{ config ( + materialized = 'view' +) }} + +WITH meta AS ( + + SELECT + last_modified AS _inserted_timestamp, + file_name, + CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 6), '_', 1) AS INTEGER) AS _partition_by_block_number, + TO_DATE( + concat_ws('-', SPLIT_PART(file_name, '/', 3), SPLIT_PART(file_name, '/', 4), SPLIT_PART(file_name, '/', 5)) + ) AS _partition_by_created_date + FROM + TABLE( + information_schema.external_table_file_registration_history( + start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()), + table_name => '{{ source( "bronze_streamline", "decoded_logs") }}') + ) A + ) + SELECT + block_number, + id :: STRING AS id, + DATA, + _inserted_timestamp, + s._partition_by_block_number AS _partition_by_block_number, + s._partition_by_created_date AS _partition_by_created_date + FROM + {{ source( + "bronze_streamline", + "decoded_logs" + ) }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b._partition_by_block_number = s._partition_by_block_number + AND b._partition_by_created_date = s._partition_by_created_date + WHERE + b._partition_by_block_number = s._partition_by_block_number + AND b._partition_by_created_date = s._partition_by_created_date + AND s._partition_by_created_date >= DATEADD('day', -2, CURRENT_TIMESTAMP()) diff --git a/models/streamline/bronze/decoder/bronze__fr_decoded_logs.sql b/models/streamline/bronze/decoder/bronze__fr_decoded_logs.sql new file mode 100644 index 0000000..4e4a1c8 --- /dev/null +++ b/models/streamline/bronze/decoder/bronze__fr_decoded_logs.sql @@ -0,0 +1,40 @@ +{{ config ( + materialized = 'view' +) }} + +WITH meta AS ( + + SELECT + registered_on AS _inserted_timestamp, + file_name, + CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 6), '_', 1) AS INTEGER) AS _partition_by_block_number, + TO_DATE( + concat_ws('-', SPLIT_PART(file_name, '/', 3), SPLIT_PART(file_name, '/', 4), SPLIT_PART(file_name, '/', 5)) + ) AS _partition_by_created_date + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze_streamline", "decoded_logs") }}' + ) + ) A +) +SELECT + block_number, + id :: STRING AS id, + DATA, + _inserted_timestamp, + s._partition_by_block_number AS _partition_by_block_number, + s._partition_by_created_date AS _partition_by_created_date +FROM + {{ source( + "bronze_streamline", + "decoded_logs" + ) }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b._partition_by_block_number = s._partition_by_block_number + AND b._partition_by_created_date = s._partition_by_created_date +WHERE + b._partition_by_block_number = s._partition_by_block_number + AND b._partition_by_created_date = s._partition_by_created_date