diff --git a/macros/dbt/incremental_predicates.sql b/macros/dbt/incremental_predicates.sql new file mode 100644 index 0000000..ba1bb2e --- /dev/null +++ b/macros/dbt/incremental_predicates.sql @@ -0,0 +1,22 @@ +{% macro standard_predicate( + input_column = 'block_number' + ) -%} + {%- set database_name = target.database -%} + {%- set schema_name = generate_schema_name( + node = model + ) -%} + {%- set table_name = generate_alias_name( + node = model + ) -%} + {%- set tmp_table_name = table_name ~ '__dbt_tmp' -%} + {%- set full_table_name = database_name ~ '.' ~ schema_name ~ '.' ~ table_name -%} + {%- set full_tmp_table_name = database_name ~ '.' ~ schema_name ~ '.' ~ tmp_table_name -%} + {{ full_table_name }}.{{ input_column }} >= ( + SELECT + MIN( + {{ input_column }} + ) + FROM + {{ full_tmp_table_name }} + ) +{%- endmacro %} diff --git a/models/gold/core/core__ez_decoded_event_logs.sql b/models/gold/core/core__ez_decoded_event_logs.sql index 91ae01a..07e7d30 100644 --- a/models/gold/core/core__ez_decoded_event_logs.sql +++ b/models/gold/core/core__ez_decoded_event_logs.sql @@ -1,7 +1,11 @@ -{{ config( - materialized = 'view', - persist_docs ={ "relation": true, - "columns": true } +{{ config ( + materialized = "incremental", + unique_key = "ez_decoded_event_logs_id", + incremental_strategy = 'delete+insert', + cluster_by = "block_timestamp::date", + incremental_predicates = [standard_predicate()], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(ez_decoded_event_logs_id, contract_name, contract_address)", + tags = ['decoded_logs'] ) }} SELECT @@ -39,10 +43,25 @@ SELECT ['tx_hash', 'event_index'] ) }} ) AS ez_decoded_event_logs_id, - GREATEST(COALESCE(l.inserted_timestamp, '2000-01-01'), COALESCE(C.inserted_timestamp, '2000-01-01')) AS inserted_timestamp, - GREATEST(COALESCE(l.modified_timestamp, '2000-01-01'), COALESCE(C.modified_timestamp, '2000-01-01')) AS modified_timestamp, - tx_status -- deprecate + +{% if is_incremental() %} +SYSDATE() AS inserted_timestamp, +SYSDATE() AS modified_timestamp, +{% else %} + GREATEST(block_timestamp, DATEADD('day', -10, SYSDATE())) AS inserted_timestamp, + GREATEST(block_timestamp, DATEADD('day', -10, SYSDATE())) AS modified_timestamp, +{% endif %} + +tx_status -- deprecate FROM {{ ref('silver__decoded_logs') }} l LEFT JOIN {{ ref('silver__contracts') }} C USING (contract_address) + +{% if is_incremental() %} +AND l.modified_timestamp > ( + SELECT + COALESCE(MAX(modified_timestamp), '2000-01-01' :: TIMESTAMP) + FROM + {{ this }}) + {% endif %}