diff --git a/dbt_project.yml b/dbt_project.yml index df20eec..a1ebbe8 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -62,6 +62,7 @@ vars: OBSERV_FULL_TEST: False DBT_FULL_TEST: False STREAMLINE_LOAD_LOOKBACK_HOURS: 3 + RECEIPT_MAP_LOOKBACK_HOURS: 6 IS_MIGRATION: False dispatch: @@ -74,4 +75,3 @@ dispatch: query-comment: comment: "{{ dbt_snowflake_query_tags.get_query_comment(node) }}" append: true # Snowflake removes prefixed comments. - diff --git a/models/descriptions/_modified_timestamp.md b/models/descriptions/_modified_timestamp.md new file mode 100644 index 0000000..6cb7c90 --- /dev/null +++ b/models/descriptions/_modified_timestamp.md @@ -0,0 +1,6 @@ +{% docs _modified_timestamp %} + +The timestamp at which the underlying record was last modified by an internal process. +This is used for incrementally loading based on when the source data was last modified. + +{% enddocs %} diff --git a/models/silver/streamline/helpers/_retry_range.sql b/models/silver/streamline/helpers/_retry_range.sql new file mode 100644 index 0000000..019d241 --- /dev/null +++ b/models/silver/streamline/helpers/_retry_range.sql @@ -0,0 +1,21 @@ +{{ config( + materialized = 'ephemeral', + tags = ['helper', 'receipt_map'] +) }} + +SELECT + receipt_object_id, + block_id, + _partition_by_block_number, + _inserted_timestamp + {% if not var('IS_MIGRATION') %} + , _modified_timestamp + {% endif %} +FROM + {{ target.database }}.silver.streamline_receipts_final +WHERE + {{ "_inserted_timestamp" if var('IS_MIGRATION') else "_modified_timestamp" }} >= SYSDATE() - INTERVAL '3 days' + AND ( + tx_hash IS NULL + OR block_timestamp IS NULL + ) diff --git a/models/silver/streamline/helpers/silver__flatten_receipts.sql b/models/silver/streamline/helpers/silver__flatten_receipts.sql index b19d093..183e7e1 100644 --- a/models/silver/streamline/helpers/silver__flatten_receipts.sql +++ b/models/silver/streamline/helpers/silver__flatten_receipts.sql @@ -26,16 +26,10 @@ WITH receipts AS ( WHERE _partition_by_block_number >= ( SELECT - MAX(_partition_by_block_number) + MIN(_partition_by_block_number) - (3000 * {{ var('RECEIPT_MAP_LOOKBACK_HOURS') }}) FROM - {{ target.database }}.silver.streamline_receipts_final - ) - 20000 - AND _partition_by_block_number <= ( - SELECT - MAX(_partition_by_block_number) - FROM - {{ target.database }}.silver.streamline_receipts_final - ) + 220000 + {{ ref('_retry_range')}} + ) {% endif %} ) SELECT diff --git a/models/silver/streamline/helpers/silver__receipt_tx_hash_mapping.sql b/models/silver/streamline/helpers/silver__receipt_tx_hash_mapping.sql index 946ba7c..efd238e 100644 --- a/models/silver/streamline/helpers/silver__receipt_tx_hash_mapping.sql +++ b/models/silver/streamline/helpers/silver__receipt_tx_hash_mapping.sql @@ -4,7 +4,8 @@ tags = ['helper', 'receipt_map'] ) }} -WITH recursive ancestrytree AS ( +WITH +recursive ancestrytree AS ( SELECT item, @@ -25,7 +26,9 @@ WITH recursive ancestrytree AS ( ), txs AS ( SELECT - * + tx_hash, + outcome_receipts, + _partition_by_block_number FROM {{ ref('silver__streamline_transactions') }} @@ -36,16 +39,10 @@ txs AS ( WHERE _partition_by_block_number >= ( SELECT - MAX(_partition_by_block_number) + MIN(_partition_by_block_number) - (3000 * {{ var('RECEIPT_MAP_LOOKBACK_HOURS') }}) FROM - {{ target.database }}.silver.streamline_receipts_final - ) - 20000 - AND _partition_by_block_number <= ( - SELECT - MAX(_partition_by_block_number) - FROM - {{ target.database }}.silver.streamline_receipts_final - ) + 220000 + {{ ref('_retry_range')}} + ) {% endif %} ), FINAL AS ( diff --git a/models/silver/streamline/silver__streamline_blocks.yml b/models/silver/streamline/silver__streamline_blocks.yml index 8125ce5..95461f6 100644 --- a/models/silver/streamline/silver__streamline_blocks.yml +++ b/models/silver/streamline/silver__streamline_blocks.yml @@ -57,16 +57,8 @@ models: - name: _partition_by_block_number description: "{{ doc('_partition_by_block_number')}}" - - name: _load_timestamp - description: "{{ doc('_load_timestamp')}}" - tests: - - not_null - - dbt_expectations.expect_column_values_to_be_in_type_list: - column_type_list: - - TIMESTAMP_NTZ - - dbt_expectations.expect_row_values_to_have_recent_data: - datepart: day - interval: 1 + - name: _MODIFIED_TIMESTAMP + description: "{{ doc('_modified_timestamp')}}" - name: _INSERTED_TIMESTAMP description: "{{ doc('_inserted_timestamp')}}" diff --git a/models/silver/streamline/silver__streamline_receipts.yml b/models/silver/streamline/silver__streamline_receipts.yml index 6ea67ca..4ffd6d4 100644 --- a/models/silver/streamline/silver__streamline_receipts.yml +++ b/models/silver/streamline/silver__streamline_receipts.yml @@ -79,8 +79,8 @@ models: tests: - not_null - - name: _LOAD_TIMESTAMP - description: "{{ doc('_load_timestamp')}}" + - name: _MODIFIED_TIMESTAMP + description: "{{ doc('_modified_timestamp')}}" - name: _PARTITION_BY_BLOCK_NUMBER description: "{{ doc('_partition_by_block_number')}}" diff --git a/models/silver/streamline/silver__streamline_receipts_final.sql b/models/silver/streamline/silver__streamline_receipts_final.sql index d599102..93b0ddb 100644 --- a/models/silver/streamline/silver__streamline_receipts_final.sql +++ b/models/silver/streamline/silver__streamline_receipts_final.sql @@ -7,36 +7,18 @@ tags = ['receipt_map'], full_refresh = False ) }} -{# TODO - check clustering. Add SO? #} +{# + TODO - check clustering. Gold view on this table, add block_ts? + Add SO? Probably +#} WITH retry_range AS ( SELECT - receipt_object_id, - block_id, - _partition_by_block_number, - _inserted_timestamp - {% if not var('IS_MIGRATION') %} - , _modified_timestamp - {% endif %} + * FROM - {{ this }} + {{ ref('_retry_range')}} - {% if var('IS_MIGRATION') %} - WHERE - _inserted_timestamp >= SYSDATE() - INTERVAL '1 day' - AND ( - tx_hash IS NULL - OR block_timestamp IS NULL - ) - {% else %} - WHERE - _modified_timestamp >= SYSDATE() - INTERVAL '1 day' - AND ( - tx_hash IS NULL - OR block_timestamp IS NULL - ) - {% endif %} ), base_receipts AS ( SELECT @@ -60,23 +42,36 @@ base_receipts AS ( _inserted_timestamp, modified_timestamp AS _modified_timestamp FROM - {{ ref('silver__streamline_receipts') }} - WHERE - _partition_by_block_number >= ( - SELECT - MIN(_partition_by_block_number) - FROM - retry_range - ) - AND ( - {{ incremental_load_filter('_inserted_timestamp') }} - OR receipt_id IN ( + {{ ref('silver__streamline_receipts') }} r + + {% if var('MANUAL_FIX') %} + + WHERE + {{ partition_load_manual('no_buffer') }} + + {% else %} + + WHERE + _partition_by_block_number >= ( SELECT - receipt_object_id + MIN(_partition_by_block_number) - (3000 * {{ var('RECEIPT_MAP_LOOKBACK_HOURS') }}) FROM retry_range ) - ) + AND ( + {% if var('IS_MIGRATION') %} + {{ incremental_load_filter('_inserted_timestamp') }} + {% else %} + {{ incremental_load_filter('_modified_timestamp') }} + {% endif %} + OR receipt_id IN ( + SELECT + DISTINCT receipt_object_id + FROM + retry_range + ) + ) + {% endif %} ), blocks AS ( SELECT @@ -84,15 +79,24 @@ blocks AS ( block_timestamp, _partition_by_block_number, _inserted_timestamp + {% if not var('IS_MIGRATION') %} + , modified_timestamp AS _modified_timestamp + {% endif %} FROM {{ ref('silver__streamline_blocks') }} - WHERE - _partition_by_block_number >= ( - SELECT - MIN(_partition_by_block_number) - FROM - retry_range - ) + + {% if var('MANUAL_FIX') %} + WHERE + {{ partition_load_manual('no_buffer') }} + {% else %} + WHERE + _partition_by_block_number >= ( + SELECT + MIN(_partition_by_block_number) - (3000 * {{ var('RECEIPT_MAP_LOOKBACK_HOURS') }}) + FROM + retry_range + ) + {% endif %} ), append_tx_hash AS ( SELECT diff --git a/models/silver/streamline/silver__streamline_receipts_final.yml b/models/silver/streamline/silver__streamline_receipts_final.yml index 7af3565..4e4356f 100644 --- a/models/silver/streamline/silver__streamline_receipts_final.yml +++ b/models/silver/streamline/silver__streamline_receipts_final.yml @@ -84,13 +84,8 @@ models: - name: ERROR_MESSAGE description: "{{ doc('error_message')}}" - - name: _LOAD_TIMESTAMP - description: "{{ doc('_load_timestamp')}}" - tests: - - not_null - - dbt_expectations.expect_row_values_to_have_recent_data: - datepart: day - interval: 1 + - name: _MODIFIED_TIMESTAMP + description: "{{ doc('_modified_timestamp')}}" - name: _PARTITION_BY_BLOCK_NUMBER description: "{{ doc('_partition_by_block_number')}}" diff --git a/models/silver/streamline/silver__streamline_shards.yml b/models/silver/streamline/silver__streamline_shards.yml index c5c59a3..7633887 100644 --- a/models/silver/streamline/silver__streamline_shards.yml +++ b/models/silver/streamline/silver__streamline_shards.yml @@ -53,16 +53,8 @@ models: - name: _PARTITION_BY_BLOCK_NUMBER description: "{{ doc('_partition_by_block_number')}}" - - name: _load_timestamp - description: "{{ doc('_load_timestamp')}}" - tests: - - not_null - - dbt_expectations.expect_column_values_to_be_in_type_list: - column_type_list: - - TIMESTAMP_NTZ - - dbt_expectations.expect_row_values_to_have_recent_data: - datepart: day - interval: 1 + - name: _MODIFIED_TIMESTAMP + description: "{{ doc('_modified_timestamp')}}" - name: _INSERTED_TIMESTAMP description: "{{ doc('_inserted_timestamp')}}" diff --git a/models/silver/streamline/silver__streamline_transactions.yml b/models/silver/streamline/silver__streamline_transactions.yml index ab58232..de07df0 100644 --- a/models/silver/streamline/silver__streamline_transactions.yml +++ b/models/silver/streamline/silver__streamline_transactions.yml @@ -31,10 +31,8 @@ models: tests: - not_null - - name: _LOAD_TIMESTAMP - description: "{{ doc('_load_timestamp')}}" - tests: - - not_null + - name: _MODIFIED_TIMESTAMP + description: "{{ doc('_modified_timestamp')}}" - name: CHUNK_HASH description: "{{ doc('chunk_hash')}}" diff --git a/models/silver/streamline/silver__streamline_transactions_final.sql b/models/silver/streamline/silver__streamline_transactions_final.sql index ab64bfa..f2cfb13 100644 --- a/models/silver/streamline/silver__streamline_transactions_final.sql +++ b/models/silver/streamline/silver__streamline_transactions_final.sql @@ -1,17 +1,36 @@ {{ config( materialized = 'incremental', + incremental_strategy = 'merge', + merge_exclude_columns = ['inserted_timestamp'], unique_key = 'tx_hash', - incremental_strategy = 'delete+insert', cluster_by = ['_inserted_timestamp::date', '_modified_timestamp::DATE', '_partition_by_block_number'], - tags = ['receipt_map'] + ) }} -{# TODO - upd * to col selection #} + {# TODO - add _modified_timestamp column #} {# TODO - check clustering. Add SO? #} +{# TODO - clean up the model and joins #} + WITH int_txs AS ( SELECT - * + block_id, + tx_hash, + shard_id, + transactions_index, + chunk_hash, + outcome_receipts, + _actions, + _hash, + _nonce, + _outcome, + _public_key, + _receiver_id, + _signature, + _signer_id, + _partition_by_block_number, + _inserted_timestamp, + modified_timestamp AS _modified_timestamp FROM {{ ref('silver__streamline_transactions') }} @@ -29,7 +48,15 @@ WITH int_txs AS ( ), int_receipts AS ( SELECT - * + block_id, + block_timestamp, + tx_hash, + execution_outcome, + receipt_succeeded, + gas_burnt, + _partition_by_block_number, + _inserted_timestamp, + modified_timestamp AS _modified_timestamp FROM {{ ref('silver__streamline_receipts_final') }} @@ -48,7 +75,12 @@ int_receipts AS ( ), int_blocks AS ( SELECT - * + block_id, + block_hash, + block_timestamp, + _partition_by_block_number, + _inserted_timestamp, + modified_timestamp AS _modified_timestamp FROM {{ ref('silver__streamline_blocks') }} {# TODO add WHERE #} @@ -95,7 +127,6 @@ base_transactions AS ( 'signer_id', _signer_id ) AS tx, - _load_timestamp, _partition_by_block_number, t._inserted_timestamp FROM @@ -108,7 +139,7 @@ actions AS ( SELECT tx_hash, SUM( - VALUE :FunctionCall :gas + VALUE :FunctionCall :gas :: NUMBER ) AS attached_gas FROM base_transactions, @@ -131,7 +162,6 @@ transactions AS ( tx, tx :outcome :outcome :gas_burnt :: NUMBER AS transaction_gas_burnt, tx :outcome :outcome :tokens_burnt :: NUMBER AS transaction_tokens_burnt, - _load_timestamp, _partition_by_block_number, _inserted_timestamp FROM @@ -141,7 +171,7 @@ receipts AS ( SELECT block_id, tx_hash, - receipt_succeeded AS success_or_fail, + receipt_succeeded, SUM( gas_burnt ) over ( @@ -160,7 +190,7 @@ receipts AS ( FROM int_receipts WHERE - tokens_burnt != '0' + tokens_burnt != 0 -- TODO is this a str? cast to INT ), FINAL AS ( SELECT @@ -175,7 +205,6 @@ FINAL AS ( t.tx, t.transaction_gas_burnt + r.receipt_gas_burnt AS gas_used, t.transaction_tokens_burnt + r.receipt_tokens_burnt AS transaction_fee, - _load_timestamp, _partition_by_block_number, _inserted_timestamp, COALESCE( @@ -183,7 +212,7 @@ FINAL AS ( gas_used ) AS attached_gas, LAST_VALUE( - r.success_or_fail + r.receipt_succeeded ) over ( PARTITION BY r.tx_hash ORDER BY @@ -213,12 +242,12 @@ SELECT tx, gas_used, transaction_fee, - _load_timestamp, _partition_by_block_number, attached_gas, tx_succeeded, tx_status, _inserted_timestamp, + {# TODO add _modified_timestamp #} {{ dbt_utils.generate_surrogate_key( ['tx_hash'] ) }} AS streamline_transactions_final_id, diff --git a/models/silver/streamline/silver__streamline_transactions_final.yml b/models/silver/streamline/silver__streamline_transactions_final.yml index 7e97514..79343de 100644 --- a/models/silver/streamline/silver__streamline_transactions_final.yml +++ b/models/silver/streamline/silver__streamline_transactions_final.yml @@ -27,14 +27,6 @@ models: - not_null: where: _inserted_timestamp <= current_timestamp - interval '1 hour' - - name: _LOAD_TIMESTAMP - description: "{{ doc('_load_timestamp')}}" - tests: - - not_null - - dbt_expectations.expect_row_values_to_have_recent_data: - datepart: day - interval: 1 - - name: _PARTITION_BY_BLOCK_NUMBER description: "{{ doc('_partition_by_block_number')}}" @@ -86,6 +78,9 @@ models: - not_null: where: _inserted_timestamp <= current_timestamp - interval '1 hour' + - name: _MODIFIED_TIMESTAMP + description: "{{ doc('_modified_timestamp')}}" + - name: _INSERTED_TIMESTAMP description: "{{ doc('_inserted_timestamp')}}" diff --git a/tests/tests__chunk_gaps.sql b/tests/tests__chunk_gaps.sql index 9cbfe14..c271c63 100644 --- a/tests/tests__chunk_gaps.sql +++ b/tests/tests__chunk_gaps.sql @@ -1,6 +1,6 @@ {{ config( - error_if = '>=25', - warn_if = 'BETWEEN 1 AND 24' + error_if = '>=10', + warn_if = 'BETWEEN 1 AND 9' ) }} WITH blocks AS ( @@ -73,6 +73,9 @@ FROM comp WHERE chunk_ct_expected > 0 - AND is_missing + AND is_missing + {# Filter out false positive from blocks at start of window #} + AND _inserted_timestamp_blocks > SYSDATE() - INTERVAL '7 days' + INTERVAL '1 hour' + AND _inserted_timestamp_shards > SYSDATE() - INTERVAL '7 days' + INTERVAL '1 hour' ORDER BY 1 diff --git a/tests/tests__streamline_block_gaps.sql b/tests/tests__streamline_block_gaps.sql index f98702c..b53985a 100644 --- a/tests/tests__streamline_block_gaps.sql +++ b/tests/tests__streamline_block_gaps.sql @@ -38,4 +38,4 @@ FROM WHERE prior_hash <> prev_hash {# Filter out false positive from blocks at start of window (whose parent hash was cut off) #} - AND (_inserted_timestamp > SYSDATE() - INTERVAL '7 days' + INTERVAL '1 hour') \ No newline at end of file + AND (_inserted_timestamp > SYSDATE() - INTERVAL '7 days' + INTERVAL '1 hour')