upd atlas incr load

This commit is contained in:
forgash_ 2024-02-07 12:08:37 -07:00
parent 0c3d9fa55d
commit fe6ab2df04
6 changed files with 75 additions and 35 deletions

View File

@ -14,17 +14,20 @@ WITH txs AS (
block_id,
block_timestamp,
tx_hash,
COALESCE(
_load_timestamp,
_inserted_timestamp
) AS _inserted_timestamp
_partition_by_block_number,
_inserted_timestamp,
modified_timestamp AS _modified_timestamp
FROM
{{ ref('silver__streamline_transactions_final') }}
WHERE
{% if var("MANUAL_FIX") %}
{{ partition_load_manual('no_buffer') }}
{% else %}
{{ incremental_load_filter('_inserted_timestamp') }}
{% if var('IS_MIGRATION') %}
{{ incremental_load_filter('_inserted_timestamp') }}
{% else %}
{{ incremental_load_filter('_modified_timestamp') }}
{% endif %}
{% endif %}
),
FINAL AS (
@ -32,7 +35,9 @@ FINAL AS (
address,
MIN(block_timestamp) AS first_tx_timestamp,
MIN(block_id) AS first_tx_block_id,
MIN(_inserted_timestamp) AS _inserted_timestamp
MIN(_partition_by_block_number) AS _partition_by_block_number,
MIN(_inserted_timestamp) AS _inserted_timestamp,
MIN(_modified_timestamp) AS _modified_timestamp
FROM
txs
GROUP BY
@ -45,7 +50,9 @@ SELECT
address,
first_tx_timestamp,
first_tx_block_id,
_partition_by_block_number,
_inserted_timestamp,
_modified_timestamp,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id

View File

@ -28,12 +28,18 @@ models:
tests:
- not_null
- name: _partition_by_block_number
description: "{{ doc('_partition_by_block_number') }}"
- name: _inserted_timestamp
description: "{{ doc('_inserted_timestamp') }}"
tests:
- name: not_null_silver__atlas_address_first_action_INSERTED_TIMESTAMP_
test_name: not_null
- name: _modified_timestamp
description: "{{ doc('_modified_timestamp') }}"
- name: inserted_timestamp
description: "{{ doc('inserted_timestamp') }}"
tests:

View File

@ -18,17 +18,20 @@ WITH nft_mints AS (
signer_id,
owner_id AS owner,
token_id,
COALESCE(
_inserted_timestamp,
_load_timestamp
) AS _inserted_timestamp
_partition_by_block_number,
_inserted_timestamp,
modified_timestamp AS _modified_timestamp
FROM
{{ ref('silver__standard_nft_mint_s3') }}
WHERE
{% if var("MANUAL_FIX") %}
{{ partition_load_manual('no_buffer') }}
{% else %}
{{ incremental_load_filter('_inserted_timestamp') }}
{% if var('IS_MIGRATION') %}
{{ incremental_load_filter('_inserted_timestamp') }}
{% else %}
{{ incremental_load_filter('_modified_timestamp') }}
{% endif %}
{% endif %}
),
nft_transfers AS (
@ -44,10 +47,9 @@ nft_transfers AS (
signer_id,
args ['receiver_id'] AS owner,
args ['token_id'] AS token_id,
COALESCE(
_inserted_timestamp,
_load_timestamp
) AS _inserted_timestamp
_partition_by_block_number,
_inserted_timestamp,
modified_timestamp AS _modified_timestamp
FROM
{{ ref('silver__actions_events_function_call_s3') }}
WHERE
@ -55,7 +57,11 @@ nft_transfers AS (
AND {% if var("MANUAL_FIX") %}
{{ partition_load_manual('no_buffer') }}
{% else %}
{{ incremental_load_filter('_inserted_timestamp') }}
{% if var('IS_MIGRATION') %}
{{ incremental_load_filter('_inserted_timestamp') }}
{% else %}
{{ incremental_load_filter('_modified_timestamp') }}
{% endif %}
{% endif %}
),
unioned_nft_data AS (
@ -82,8 +88,10 @@ SELECT
token_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id,
_partition_by_block_number,
_inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
_modified_timestamp
FROM
unioned_nft_data
WHERE

View File

@ -52,11 +52,6 @@ models:
tests:
- not_null
- name: _inserted_timestamp
description: "{{doc('_inserted_timestamp')}}"
tests:
- not_null
- name: inserted_timestamp
description: "{{doc('inserted_timestamp')}}"
tests:
@ -69,3 +64,14 @@ models:
- name: _invocation_id
description: "{{doc('invocation_id')}}"
- name: _partition_by_block_number
description: "{{doc('_partition_by_block_number')}}"
- name: _inserted_timestamp
description: "{{doc('_inserted_timestamp')}}"
tests:
- not_null
- name: _modified_timestamp
description: "{{doc('_modified_timestamp')}}"

View File

@ -10,14 +10,27 @@
WITH receipts AS (
SELECT
*
receipt_object_id,
tx_hash,
block_timestamp,
receipt_actions,
receiver_id,
status_value,
logs,
_partition_by_block_number,
_inserted_timestamp,
modified_timestamp AS _modified_timestamp
FROM
{{ ref('silver__streamline_receipts_final') }}
WHERE
{% if var("MANUAL_FIX") %}
{{ partition_load_manual('no_buffer') }}
{% else %}
{{ incremental_load_filter('_inserted_timestamp') }}
{% if var('IS_MIGRATION') %}
{{ incremental_load_filter('_inserted_timestamp') }}
{% else %}
{{ incremental_load_filter('_modified_timestamp') }}
{% endif %}
{% endif %}
),
FINAL AS (
@ -32,15 +45,9 @@ FINAL AS (
status_value
) [0] :: STRING AS status,
logs,
COALESCE(
_inserted_timestamp,
_load_timestamp
) AS _inserted_timestamp,
_partition_by_block_number,
{{ dbt_utils.generate_surrogate_key(['receipt_object_id']) }} AS atlas_supply_lockup_receipts_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
_inserted_timestamp,
_modified_timestamp
FROM
receipts
WHERE
@ -48,6 +55,10 @@ FINAL AS (
AND status != 'Failure'
)
SELECT
*
*,
{{ dbt_utils.generate_surrogate_key(['receipt_object_id']) }} AS atlas_supply_lockup_receipts_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
FINAL

View File

@ -36,10 +36,12 @@ models:
- not_null
- name: logs
description: "{{ doc('logs') }}"
- name: _inserted_timestamp
description: "{{ doc('_inserted_timestamp') }}"
- name: _partition_by_block_number
description: "{{ doc('_partition_by_block_number') }}"
- name: _inserted_timestamp
description: "{{ doc('_inserted_timestamp') }}"
- name: _modified_timestamp
description: "{{ doc('_modified_timestamp') }}"
- name: atlas_supply_lockup_receipts_id
description: "{{ doc('id') }}"
tests: