check in, gte in progress

This commit is contained in:
Eric Laurello 2025-02-03 17:33:17 -05:00
parent 7c3d26700b
commit acdad04ce9
10 changed files with 135 additions and 127 deletions

View File

@ -3,8 +3,8 @@
) }}
{{ streamline_external_table_query_v2(
model = "accounts",
partition_function = " TO_DATE( SPLIT_PART(SPLIT_PART(file_name, '/', 3), '=', 2)|| '01', 'YYYYMMDD')",
partition_name = "partition_id",
partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')",
partition_name = "partition_gte_id",
unique_key = "ACCOUNT_ID",
other_cols = "BALANCE, BUYING_LIABILITIES, SELLING_LIABILITIES, SEQUENCE_NUMBER, NUM_SUBENTRIES, INFLATION_DESTINATION, FLAGS, HOME_DOMAIN, MASTER_WEIGHT, THRESHOLD_LOW, THRESHOLD_MEDIUM, THRESHOLD_HIGH, LAST_MODIFIED_LEDGER, LEDGER_ENTRY_CHANGE, DELETED, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, SPONSOR, NUM_SPONSORED, NUM_SPONSORING, SEQUENCE_LEDGER, SEQUENCE_TIME, CLOSED_AT, LEDGER_SEQUENCE"
other_cols = "partition_id, BALANCE, BUYING_LIABILITIES, SELLING_LIABILITIES, SEQUENCE_NUMBER, NUM_SUBENTRIES, INFLATION_DESTINATION, FLAGS, HOME_DOMAIN, MASTER_WEIGHT, THRESHOLD_LOW, THRESHOLD_MEDIUM, THRESHOLD_HIGH, LAST_MODIFIED_LEDGER, LEDGER_ENTRY_CHANGE, DELETED, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, SPONSOR, NUM_SPONSORED, NUM_SPONSORING, SEQUENCE_LEDGER, SEQUENCE_TIME, CLOSED_AT, LEDGER_SEQUENCE"
) }}

View File

@ -3,8 +3,8 @@
) }}
{{ streamline_external_table_FR_query_v2(
model = "accounts",
partition_function = " TO_DATE( SPLIT_PART(SPLIT_PART(file_name, '/', 3), '=', 2)|| '01', 'YYYYMMDD')",
partition_name = "partition_id",
partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')",
partition_name = "partition_gte_id",
unique_key = "ACCOUNT_ID",
other_cols = "BALANCE, BUYING_LIABILITIES, SELLING_LIABILITIES, SEQUENCE_NUMBER, NUM_SUBENTRIES, INFLATION_DESTINATION, FLAGS, HOME_DOMAIN, MASTER_WEIGHT, THRESHOLD_LOW, THRESHOLD_MEDIUM, THRESHOLD_HIGH, LAST_MODIFIED_LEDGER, LEDGER_ENTRY_CHANGE, DELETED, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, SPONSOR, NUM_SPONSORED, NUM_SPONSORING, SEQUENCE_LEDGER, SEQUENCE_TIME, CLOSED_AT, LEDGER_SEQUENCE"
other_cols = "partition_id, BALANCE, BUYING_LIABILITIES, SELLING_LIABILITIES, SEQUENCE_NUMBER, NUM_SUBENTRIES, INFLATION_DESTINATION, FLAGS, HOME_DOMAIN, MASTER_WEIGHT, THRESHOLD_LOW, THRESHOLD_MEDIUM, THRESHOLD_HIGH, LAST_MODIFIED_LEDGER, LEDGER_ENTRY_CHANGE, DELETED, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, SPONSOR, NUM_SPONSORED, NUM_SPONSORING, SEQUENCE_LEDGER, SEQUENCE_TIME, CLOSED_AT, LEDGER_SEQUENCE"
) }}

View File

@ -3,8 +3,9 @@
) }}
{{ streamline_external_table_query_v2(
model = "history_ledgers",
partition_function = " TO_DATE( SPLIT_PART(SPLIT_PART(file_name, '/', 3), '=', 2)|| '01', 'YYYYMMDD')",
partition_name = "partition_id",
partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')",
partition_name = "partition_gte_id",
unique_key = "SEQUENCE",
other_cols = "LEDGER_HASH, PREVIOUS_LEDGER_HASH, TRANSACTION_COUNT, OPERATION_COUNT, CLOSED_AT, ID, TOTAL_COINS, FEE_POOL, BASE_FEE, BASE_RESERVE, MAX_TX_SET_SIZE, PROTOCOL_VERSION, LEDGER_HEADER, SUCCESSFUL_TRANSACTION_COUNT, FAILED_TRANSACTION_COUNT, TX_SET_OPERATION_COUNT, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, SOROBAN_FEE_WRITE_1KB, NODE_ID, SIGNATURE, TOTAL_BYTE_SIZE_OF_BUCKET_LIST"
other_cols = "partition_id, LEDGER_HASH, PREVIOUS_LEDGER_HASH, TRANSACTION_COUNT, OPERATION_COUNT, CLOSED_AT, ID, TOTAL_COINS, FEE_POOL, BASE_FEE, BASE_RESERVE, MAX_TX_SET_SIZE, PROTOCOL_VERSION, SUCCESSFUL_TRANSACTION_COUNT, FAILED_TRANSACTION_COUNT, TX_SET_OPERATION_COUNT, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, SOROBAN_FEE_WRITE_1KB, NODE_ID, SIGNATURE, TOTAL_BYTE_SIZE_OF_BUCKET_LIST"
) }}
--LEDGER_HEADER,

View File

@ -3,8 +3,9 @@
) }}
{{ streamline_external_table_FR_query_v2(
model = "history_ledgers",
partition_function = " TO_DATE( SPLIT_PART(SPLIT_PART(file_name, '/', 3), '=', 2)|| '01', 'YYYYMMDD')",
partition_name = "partition_id",
partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')",
partition_name = "partition_gte_id",
unique_key = "SEQUENCE",
other_cols = "LEDGER_HASH, PREVIOUS_LEDGER_HASH, TRANSACTION_COUNT, OPERATION_COUNT, CLOSED_AT, ID, TOTAL_COINS, FEE_POOL, BASE_FEE, BASE_RESERVE, MAX_TX_SET_SIZE, PROTOCOL_VERSION, LEDGER_HEADER, SUCCESSFUL_TRANSACTION_COUNT, FAILED_TRANSACTION_COUNT, TX_SET_OPERATION_COUNT, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, SOROBAN_FEE_WRITE_1KB, NODE_ID, SIGNATURE, TOTAL_BYTE_SIZE_OF_BUCKET_LIST"
other_cols = "partition_id, LEDGER_HASH, PREVIOUS_LEDGER_HASH, TRANSACTION_COUNT, OPERATION_COUNT, CLOSED_AT, ID, TOTAL_COINS, FEE_POOL, BASE_FEE, BASE_RESERVE, MAX_TX_SET_SIZE, PROTOCOL_VERSION, SUCCESSFUL_TRANSACTION_COUNT, FAILED_TRANSACTION_COUNT, TX_SET_OPERATION_COUNT, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, SOROBAN_FEE_WRITE_1KB, NODE_ID, SIGNATURE, TOTAL_BYTE_SIZE_OF_BUCKET_LIST"
) }}
--LEDGER_HEADER,

View File

@ -3,8 +3,8 @@
) }}
{{ streamline_external_table_query_v2(
model = "history_operations",
partition_function = "TO_DATE( SPLIT_PART(SPLIT_PART(file_name, '/', 3), '=', 2)|| '01', 'YYYYMMDD')",
partition_name = "partition_id",
partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')",
partition_name = "partition_gte_id",
unique_key = "ID",
other_cols = 'VALUE'
other_cols = 'partition_id,VALUE'
) }}

View File

@ -3,8 +3,8 @@
) }}
{{ streamline_external_table_FR_query_v2(
model = "history_operations",
partition_function = "TO_DATE( SPLIT_PART(SPLIT_PART(file_name, '/', 3), '=', 2)|| '01', 'YYYYMMDD')",
partition_name = "partition_id",
partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')",
partition_name = "partition_gte_id",
unique_key = "ID",
other_cols = 'VALUE'
other_cols = 'partition_id,VALUE'
) }}

View File

@ -12,7 +12,7 @@
{% if execute %}
{% if is_incremental() %}
{% set max_inserted_query %}
{% set max_is_query %}
SELECT
MAX(_inserted_timestamp) AS _inserted_timestamp
@ -20,13 +20,10 @@ FROM
{{ this }}
{% endset %}
{% set max_inserted_timestamp = run_query(max_inserted_query) [0] [0] %}
{% endif %}
{% if is_incremental() %}
{% set max_part_query %}
{% set max_is = run_query(max_is_query) [0] [0] %}
{% set max_part_query %}
SELECT
MAX(partition_id) AS partition_id
MAX(partition_gte_id) AS partition__gte_id
FROM
{{ this }}
@ -38,32 +35,33 @@ FROM
WITH pre_final AS (
SELECT
partition_id,
account_id,
balance,
buying_liabilities,
selling_liabilities,
sequence_number,
num_subentries,
inflation_destination,
flags,
home_domain,
master_weight,
threshold_low,
threshold_medium,
threshold_high,
last_modified_ledger,
ledger_entry_change,
deleted,
batch_id,
batch_run_date,
batch_insert_ts,
sponsor,
num_sponsored,
num_sponsoring,
sequence_ledger,
sequence_time,
closed_at,
ledger_sequence,
partition_gte_id,
account_id :: STRING AS account_id,
balance :: FLOAT AS balance,
buying_liabilities :: FLOAT AS buying_liabilities,
selling_liabilities :: FLOAT AS selling_liabilities,
sequence_number :: INTEGER AS sequence_number,
num_subentries :: INTEGER AS num_subentries,
inflation_destination :: STRING AS inflation_destination,
flags :: INTEGER AS flags,
home_domain :: STRING AS home_domain,
master_weight :: INTEGER AS master_weight,
threshold_low :: INTEGER AS threshold_low,
threshold_medium :: INTEGER AS threshold_medium,
threshold_high :: INTEGER AS threshold_high,
last_modified_ledger :: INTEGER AS last_modified_ledger,
ledger_entry_change :: INTEGER AS ledger_entry_change,
deleted :: BOOLEAN AS deleted,
batch_id :: STRING AS batch_id,
batch_run_date :: TIMESTAMP AS batch_run_date,
batch_insert_ts :: TIMESTAMP AS batch_insert_ts,
sponsor :: STRING AS sponsor,
num_sponsored :: INTEGER AS num_sponsored,
num_sponsoring :: INTEGER AS num_sponsoring,
sequence_ledger :: INTEGER AS sequence_ledger,
sequence_time :: TIMESTAMP AS sequence_time,
closed_at :: TIMESTAMP AS closed_at,
ledger_sequence :: INTEGER AS ledger_sequence,
_inserted_timestamp
FROM
@ -75,45 +73,47 @@ WITH pre_final AS (
{% if is_incremental() %}
WHERE
_inserted_timestamp >= '{{ max_part }}'
AND _inserted_timestamp > '{{ max_inserted_timestamp }}'
partition_gte_id >= '{{ max_part }}'
AND _inserted_timestamp > '{{ max_is }}'
{% endif %}
qualify ROW_NUMBER() over (
PARTITION BY account_id,
closed_at
ORDER BY
batch_insert_ts DESC,
_inserted_timestamp DESC
) = 1
)
SELECT
partition_id,
account_id :: STRING AS account_id,
balance :: FLOAT AS balance,
buying_liabilities :: FLOAT AS buying_liabilities,
selling_liabilities :: FLOAT AS selling_liabilities,
sequence_number :: INTEGER AS sequence_number,
num_subentries :: INTEGER AS num_subentries,
inflation_destination :: STRING AS inflation_destination,
flags :: INTEGER AS flags,
home_domain :: STRING AS home_domain,
master_weight :: INTEGER AS master_weight,
threshold_low :: INTEGER AS threshold_low,
threshold_medium :: INTEGER AS threshold_medium,
threshold_high :: INTEGER AS threshold_high,
last_modified_ledger :: INTEGER AS last_modified_ledger,
ledger_entry_change :: INTEGER AS ledger_entry_change,
deleted :: BOOLEAN AS deleted,
batch_id :: STRING AS batch_id,
batch_run_date :: TIMESTAMP AS batch_run_date,
batch_insert_ts :: TIMESTAMP AS batch_insert_ts,
sponsor :: STRING AS sponsor,
num_sponsored :: INTEGER AS num_sponsored,
num_sponsoring :: INTEGER AS num_sponsoring,
sequence_ledger :: INTEGER AS sequence_ledger,
sequence_time :: TIMESTAMP AS sequence_time,
closed_at :: TIMESTAMP AS closed_at,
ledger_sequence :: INTEGER AS ledger_sequence,
partition_gte_id,
account_id,
balance,
buying_liabilities,
selling_liabilities,
sequence_number,
num_subentries,
inflation_destination,
flags,
home_domain,
master_weight,
threshold_low,
threshold_medium,
threshold_high,
last_modified_ledger,
ledger_entry_change,
deleted,
batch_id,
batch_run_date,
batch_insert_ts,
sponsor,
num_sponsored,
num_sponsoring,
sequence_ledger,
sequence_time,
closed_at,
ledger_sequence,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['account_id','closed_at']

View File

@ -11,15 +11,15 @@
{% if execute %}
{% if is_incremental() %}
{% set max_inserted_query %}
{% set max_bat_query %}
SELECT
MAX(_inserted_timestamp) AS _inserted_timestamp
MAX(batch_insert_ts) AS batch_insert_ts
FROM
{{ this }}
{% endset %}
{% set max_inserted_timestamp = run_query(max_inserted_query) [0] [0] %}
{% set max_batch = run_query(max_bat_query) [0] [0] %}
{% endif %}
{% if is_incremental() %}
@ -37,34 +37,37 @@ FROM
WITH pre_final AS (
SELECT
partition_id,
id :: flot AS id,
id :: FLOAT AS id,
asset_type :: STRING AS asset_type,
asset_code :: STRING AS asset_code,
asset_issuer :: STRING AS asset_issuer,
batch_run_date :: datetime AS batch_run_date,
batch_id :: STRING AS batch_id,
batch_insert_ts :: datetime AS batch_insert_ts,
asset_id :: INT AS asset_id,
_inserted_timestamp
asset_id :: INT AS asset_id
FROM
{# {% if is_incremental() %}
{{ ref('bronze__assets') }}
{% else %}
{{ ref('bronze__assets_FR') }}
{% endif %}
{% if is_incremental() %}
{{ ref('bronze__assets') }}
{% else %}
{{ ref('bronze__assets_FR') }}
{% endif %}
#}
{{ source(
'bronze_streamline',
'history_assets'
) }}
{% if is_incremental() %}
WHERE
partition_id >= '{{ max_part }}'
AND _inserted_timestamp > '{{ max_inserted_timestamp }}'
AND batch_insert_ts > '{{ max_batch }}'
{% endif %}
qualify ROW_NUMBER() over (
PARTITION BY asset_id
ORDER BY
batch_insert_ts DESC,
_inserted_timestamp DESC
batch_insert_ts DESC
) = 1
)
SELECT
@ -77,7 +80,6 @@ SELECT
batch_id,
batch_insert_ts,
asset_id,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['asset_id']
) }} AS assets_id,

View File

@ -5,14 +5,13 @@
incremental_predicates = ["dynamic_range_predicate", "partition_id::date"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['closed_at::DATE','partition_id','modified_timestamp::DATE'],
full_refresh = false,
tags = ['scheduled_core'],
) }}
{% if execute %}
{% if is_incremental() %}
{% set max_inserted_query %}
{% set max_is_query %}
SELECT
MAX(_inserted_timestamp) AS _inserted_timestamp
@ -20,13 +19,22 @@ FROM
{{ this }}
{% endset %}
{% set max_inserted_timestamp = run_query(max_inserted_query) [0] [0] %}
{% set max_is = run_query(max_is_query) [0] [0] %}
{% set max_part_query %}
SELECT
MAX(partition_gte_id) AS partition__gte_id
FROM
{{ this }}
{% endset %}
{% set max_part = run_query(max_part_query) [0] [0] %}
{% endif %}
{% endif %}
WITH pre_final AS (
SELECT
partition_id,
partition_gte_id,
SEQUENCE :: INTEGER AS SEQUENCE,
ledger_hash :: STRING AS ledger_hash,
previous_ledger_hash :: STRING AS previous_ledger_hash,
@ -40,7 +48,7 @@ WITH pre_final AS (
base_reserve :: INTEGER AS base_reserve,
max_tx_set_size :: INTEGER AS max_tx_set_size,
protocol_version :: INTEGER AS protocol_version,
ledger_header :: BINARY AS ledger_header,
{# ledger_header :: STRING AS ledger_header, #}
successful_transaction_count :: INTEGER AS successful_transaction_count,
failed_transaction_count :: INTEGER AS failed_transaction_count,
tx_set_operation_count :: INTEGER AS tx_set_operation_count,
@ -62,18 +70,20 @@ WITH pre_final AS (
{% if is_incremental() %}
WHERE
partition_id >= '{{ max_part }}'
AND _inserted_timestamp > '{{ max_inserted_timestamp }}'
partition_gte_id >= '{{ max_part }}'
AND _inserted_timestamp > '{{ max_is }}'
{% endif %}
qualify ROW_NUMBER() over (
PARTITION BY SEQUENCE
ORDER BY
batch_insert_ts DESC,
_inserted_timestamp DESC
) = 1
)
SELECT
partition_id,
partition_gte_id,
SEQUENCE,
ledger_hash,
previous_ledger_hash,
@ -87,7 +97,7 @@ SELECT
base_reserve,
max_tx_set_size,
protocol_version,
ledger_header,
{# ledger_header, #}
successful_transaction_count,
failed_transaction_count,
tx_set_operation_count,
@ -98,12 +108,12 @@ SELECT
node_id,
signature,
total_byte_size_of_bucket_list,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['sequence']
) }} AS ledgers_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
'{{ invocation_id }}' AS _invocation_id,
_inserted_timestamp
FROM
pre_final

View File

@ -11,21 +11,18 @@
{% if execute %}
{% if is_incremental() %}
{% set max_inserted_query %}
{% set max_is_query %}
SELECT
MAX(batch_insert_ts) AS batch_insert_ts
MAX(_inserted_timestamp) AS _inserted_timestamp
FROM
{{ this }}
{% endset %}
{% set max_inserted_timestamp = run_query(max_inserted_query) [0] [0] %}
{% endif %}
{% if is_incremental() %}
{% set max_part_query %}
{% set max_is = run_query(max_is_query) [0] [0] %}
{% set max_part_query %}
SELECT
MAX(partition_id) AS partition_id
MAX(partition_gte_id) AS partition__gte_id
FROM
{{ this }}
@ -36,7 +33,8 @@ FROM
WITH pre_final AS (
SELECT
partition_id AS partition_id,
partition_id,
partition_gte_id,
id :: INTEGER AS id,
VALUE :source_account :: STRING AS source_account,
VALUE :source_account_muxed :: STRING AS op_source_account_muxed,
@ -170,31 +168,27 @@ WITH pre_final AS (
) AS closed_at,
VALUE :batch_id :: STRING AS batch_id,
VALUE :batch_run_date :: TIMESTAMP AS batch_run_date,
VALUE :batch_insert_ts :: INT AS batch_insert_ts,
NULL AS _inserted_timestamp
VALUE :batch_insert_ts :: TIMESTAMP AS batch_insert_ts,
_inserted_timestamp
FROM
{{ source(
'bronze_streamline',
'history_operations'
) }}
{# {% if is_incremental() %}
{{ ref('bronze__operations') }}
{% else %}
{{ ref('bronze__operations_FR') }}
{% endif %}
#}
{% if is_incremental() %}
{{ ref('bronze__operations') }}
{% else %}
{{ ref('bronze__operations_FR') }}
{% endif %}
{% if is_incremental() %}
WHERE
partition_id >= '{{ max_part }}'
AND _inserted_timestamp > '{{ max_batch }}'
partition_gte_id >= '{{ max_part }}'
AND _inserted_timestamp > '{{ max_is }}'
{% endif %}
qualify ROW_NUMBER() over (
PARTITION BY id
ORDER BY
batch_insert_ts DESC
batch_insert_ts DESC,
_inserted_timestamp DESC
) = 1
)
SELECT