From acdad04ce9e2c87a3b70ef60be20ade317bab05e Mon Sep 17 00:00:00 2001 From: Eric Laurello Date: Mon, 3 Feb 2025 17:33:17 -0500 Subject: [PATCH] check in, gte in progress --- models/bronze/core/bronze__accounts.sql | 6 +- models/bronze/core/bronze__accounts_FR.sql | 6 +- models/bronze/core/bronze__ledgers.sql | 7 +- models/bronze/core/bronze__ledgers_FR.sql | 7 +- models/bronze/core/bronze__operations.sql | 6 +- models/bronze/core/bronze__operations_FR.sql | 6 +- models/silver/silver__accounts.sql | 122 +++++++++---------- models/silver/silver__assets.sql | 32 ++--- models/silver/silver__ledgers.sql | 28 +++-- models/silver/silver__operations.sql | 42 +++---- 10 files changed, 135 insertions(+), 127 deletions(-) diff --git a/models/bronze/core/bronze__accounts.sql b/models/bronze/core/bronze__accounts.sql index a6dc716..4bc8b6b 100644 --- a/models/bronze/core/bronze__accounts.sql +++ b/models/bronze/core/bronze__accounts.sql @@ -3,8 +3,8 @@ ) }} {{ streamline_external_table_query_v2( model = "accounts", - partition_function = " TO_DATE( SPLIT_PART(SPLIT_PART(file_name, '/', 3), '=', 2)|| '01', 'YYYYMMDD')", - partition_name = "partition_id", + partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')", + partition_name = "partition_gte_id", unique_key = "ACCOUNT_ID", - other_cols = "BALANCE, BUYING_LIABILITIES, SELLING_LIABILITIES, SEQUENCE_NUMBER, NUM_SUBENTRIES, INFLATION_DESTINATION, FLAGS, HOME_DOMAIN, MASTER_WEIGHT, THRESHOLD_LOW, THRESHOLD_MEDIUM, THRESHOLD_HIGH, LAST_MODIFIED_LEDGER, LEDGER_ENTRY_CHANGE, DELETED, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, SPONSOR, NUM_SPONSORED, NUM_SPONSORING, SEQUENCE_LEDGER, SEQUENCE_TIME, CLOSED_AT, LEDGER_SEQUENCE" + other_cols = "partition_id, BALANCE, BUYING_LIABILITIES, SELLING_LIABILITIES, SEQUENCE_NUMBER, NUM_SUBENTRIES, INFLATION_DESTINATION, FLAGS, HOME_DOMAIN, MASTER_WEIGHT, THRESHOLD_LOW, THRESHOLD_MEDIUM, THRESHOLD_HIGH, LAST_MODIFIED_LEDGER, LEDGER_ENTRY_CHANGE, DELETED, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, SPONSOR, NUM_SPONSORED, NUM_SPONSORING, SEQUENCE_LEDGER, SEQUENCE_TIME, CLOSED_AT, LEDGER_SEQUENCE" ) }} diff --git a/models/bronze/core/bronze__accounts_FR.sql b/models/bronze/core/bronze__accounts_FR.sql index d9129f3..b3bac40 100644 --- a/models/bronze/core/bronze__accounts_FR.sql +++ b/models/bronze/core/bronze__accounts_FR.sql @@ -3,8 +3,8 @@ ) }} {{ streamline_external_table_FR_query_v2( model = "accounts", - partition_function = " TO_DATE( SPLIT_PART(SPLIT_PART(file_name, '/', 3), '=', 2)|| '01', 'YYYYMMDD')", - partition_name = "partition_id", + partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')", + partition_name = "partition_gte_id", unique_key = "ACCOUNT_ID", - other_cols = "BALANCE, BUYING_LIABILITIES, SELLING_LIABILITIES, SEQUENCE_NUMBER, NUM_SUBENTRIES, INFLATION_DESTINATION, FLAGS, HOME_DOMAIN, MASTER_WEIGHT, THRESHOLD_LOW, THRESHOLD_MEDIUM, THRESHOLD_HIGH, LAST_MODIFIED_LEDGER, LEDGER_ENTRY_CHANGE, DELETED, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, SPONSOR, NUM_SPONSORED, NUM_SPONSORING, SEQUENCE_LEDGER, SEQUENCE_TIME, CLOSED_AT, LEDGER_SEQUENCE" + other_cols = "partition_id, BALANCE, BUYING_LIABILITIES, SELLING_LIABILITIES, SEQUENCE_NUMBER, NUM_SUBENTRIES, INFLATION_DESTINATION, FLAGS, HOME_DOMAIN, MASTER_WEIGHT, THRESHOLD_LOW, THRESHOLD_MEDIUM, THRESHOLD_HIGH, LAST_MODIFIED_LEDGER, LEDGER_ENTRY_CHANGE, DELETED, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, SPONSOR, NUM_SPONSORED, NUM_SPONSORING, SEQUENCE_LEDGER, SEQUENCE_TIME, CLOSED_AT, LEDGER_SEQUENCE" ) }} diff --git a/models/bronze/core/bronze__ledgers.sql b/models/bronze/core/bronze__ledgers.sql index 7df8534..6822587 100644 --- a/models/bronze/core/bronze__ledgers.sql +++ b/models/bronze/core/bronze__ledgers.sql @@ -3,8 +3,9 @@ ) }} {{ streamline_external_table_query_v2( model = "history_ledgers", - partition_function = " TO_DATE( SPLIT_PART(SPLIT_PART(file_name, '/', 3), '=', 2)|| '01', 'YYYYMMDD')", - partition_name = "partition_id", + partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')", + partition_name = "partition_gte_id", unique_key = "SEQUENCE", - other_cols = "LEDGER_HASH, PREVIOUS_LEDGER_HASH, TRANSACTION_COUNT, OPERATION_COUNT, CLOSED_AT, ID, TOTAL_COINS, FEE_POOL, BASE_FEE, BASE_RESERVE, MAX_TX_SET_SIZE, PROTOCOL_VERSION, LEDGER_HEADER, SUCCESSFUL_TRANSACTION_COUNT, FAILED_TRANSACTION_COUNT, TX_SET_OPERATION_COUNT, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, SOROBAN_FEE_WRITE_1KB, NODE_ID, SIGNATURE, TOTAL_BYTE_SIZE_OF_BUCKET_LIST" + other_cols = "partition_id, LEDGER_HASH, PREVIOUS_LEDGER_HASH, TRANSACTION_COUNT, OPERATION_COUNT, CLOSED_AT, ID, TOTAL_COINS, FEE_POOL, BASE_FEE, BASE_RESERVE, MAX_TX_SET_SIZE, PROTOCOL_VERSION, SUCCESSFUL_TRANSACTION_COUNT, FAILED_TRANSACTION_COUNT, TX_SET_OPERATION_COUNT, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, SOROBAN_FEE_WRITE_1KB, NODE_ID, SIGNATURE, TOTAL_BYTE_SIZE_OF_BUCKET_LIST" ) }} +--LEDGER_HEADER, diff --git a/models/bronze/core/bronze__ledgers_FR.sql b/models/bronze/core/bronze__ledgers_FR.sql index 74a23c8..076523c 100644 --- a/models/bronze/core/bronze__ledgers_FR.sql +++ b/models/bronze/core/bronze__ledgers_FR.sql @@ -3,8 +3,9 @@ ) }} {{ streamline_external_table_FR_query_v2( model = "history_ledgers", - partition_function = " TO_DATE( SPLIT_PART(SPLIT_PART(file_name, '/', 3), '=', 2)|| '01', 'YYYYMMDD')", - partition_name = "partition_id", + partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')", + partition_name = "partition_gte_id", unique_key = "SEQUENCE", - other_cols = "LEDGER_HASH, PREVIOUS_LEDGER_HASH, TRANSACTION_COUNT, OPERATION_COUNT, CLOSED_AT, ID, TOTAL_COINS, FEE_POOL, BASE_FEE, BASE_RESERVE, MAX_TX_SET_SIZE, PROTOCOL_VERSION, LEDGER_HEADER, SUCCESSFUL_TRANSACTION_COUNT, FAILED_TRANSACTION_COUNT, TX_SET_OPERATION_COUNT, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, SOROBAN_FEE_WRITE_1KB, NODE_ID, SIGNATURE, TOTAL_BYTE_SIZE_OF_BUCKET_LIST" + other_cols = "partition_id, LEDGER_HASH, PREVIOUS_LEDGER_HASH, TRANSACTION_COUNT, OPERATION_COUNT, CLOSED_AT, ID, TOTAL_COINS, FEE_POOL, BASE_FEE, BASE_RESERVE, MAX_TX_SET_SIZE, PROTOCOL_VERSION, SUCCESSFUL_TRANSACTION_COUNT, FAILED_TRANSACTION_COUNT, TX_SET_OPERATION_COUNT, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, SOROBAN_FEE_WRITE_1KB, NODE_ID, SIGNATURE, TOTAL_BYTE_SIZE_OF_BUCKET_LIST" ) }} +--LEDGER_HEADER, diff --git a/models/bronze/core/bronze__operations.sql b/models/bronze/core/bronze__operations.sql index a711c4b..027dadd 100644 --- a/models/bronze/core/bronze__operations.sql +++ b/models/bronze/core/bronze__operations.sql @@ -3,8 +3,8 @@ ) }} {{ streamline_external_table_query_v2( model = "history_operations", - partition_function = "TO_DATE( SPLIT_PART(SPLIT_PART(file_name, '/', 3), '=', 2)|| '01', 'YYYYMMDD')", - partition_name = "partition_id", + partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')", + partition_name = "partition_gte_id", unique_key = "ID", - other_cols = 'VALUE' + other_cols = 'partition_id,VALUE' ) }} diff --git a/models/bronze/core/bronze__operations_FR.sql b/models/bronze/core/bronze__operations_FR.sql index 9bddf10..488cb77 100644 --- a/models/bronze/core/bronze__operations_FR.sql +++ b/models/bronze/core/bronze__operations_FR.sql @@ -3,8 +3,8 @@ ) }} {{ streamline_external_table_FR_query_v2( model = "history_operations", - partition_function = "TO_DATE( SPLIT_PART(SPLIT_PART(file_name, '/', 3), '=', 2)|| '01', 'YYYYMMDD')", - partition_name = "partition_id", + partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')", + partition_name = "partition_gte_id", unique_key = "ID", - other_cols = 'VALUE' + other_cols = 'partition_id,VALUE' ) }} diff --git a/models/silver/silver__accounts.sql b/models/silver/silver__accounts.sql index d3725d5..ecd88d1 100644 --- a/models/silver/silver__accounts.sql +++ b/models/silver/silver__accounts.sql @@ -12,7 +12,7 @@ {% if execute %} {% if is_incremental() %} -{% set max_inserted_query %} +{% set max_is_query %} SELECT MAX(_inserted_timestamp) AS _inserted_timestamp @@ -20,13 +20,10 @@ FROM {{ this }} {% endset %} - {% set max_inserted_timestamp = run_query(max_inserted_query) [0] [0] %} -{% endif %} - -{% if is_incremental() %} -{% set max_part_query %} + {% set max_is = run_query(max_is_query) [0] [0] %} + {% set max_part_query %} SELECT - MAX(partition_id) AS partition_id + MAX(partition_gte_id) AS partition__gte_id FROM {{ this }} @@ -38,32 +35,33 @@ FROM WITH pre_final AS ( SELECT partition_id, - account_id, - balance, - buying_liabilities, - selling_liabilities, - sequence_number, - num_subentries, - inflation_destination, - flags, - home_domain, - master_weight, - threshold_low, - threshold_medium, - threshold_high, - last_modified_ledger, - ledger_entry_change, - deleted, - batch_id, - batch_run_date, - batch_insert_ts, - sponsor, - num_sponsored, - num_sponsoring, - sequence_ledger, - sequence_time, - closed_at, - ledger_sequence, + partition_gte_id, + account_id :: STRING AS account_id, + balance :: FLOAT AS balance, + buying_liabilities :: FLOAT AS buying_liabilities, + selling_liabilities :: FLOAT AS selling_liabilities, + sequence_number :: INTEGER AS sequence_number, + num_subentries :: INTEGER AS num_subentries, + inflation_destination :: STRING AS inflation_destination, + flags :: INTEGER AS flags, + home_domain :: STRING AS home_domain, + master_weight :: INTEGER AS master_weight, + threshold_low :: INTEGER AS threshold_low, + threshold_medium :: INTEGER AS threshold_medium, + threshold_high :: INTEGER AS threshold_high, + last_modified_ledger :: INTEGER AS last_modified_ledger, + ledger_entry_change :: INTEGER AS ledger_entry_change, + deleted :: BOOLEAN AS deleted, + batch_id :: STRING AS batch_id, + batch_run_date :: TIMESTAMP AS batch_run_date, + batch_insert_ts :: TIMESTAMP AS batch_insert_ts, + sponsor :: STRING AS sponsor, + num_sponsored :: INTEGER AS num_sponsored, + num_sponsoring :: INTEGER AS num_sponsoring, + sequence_ledger :: INTEGER AS sequence_ledger, + sequence_time :: TIMESTAMP AS sequence_time, + closed_at :: TIMESTAMP AS closed_at, + ledger_sequence :: INTEGER AS ledger_sequence, _inserted_timestamp FROM @@ -75,45 +73,47 @@ WITH pre_final AS ( {% if is_incremental() %} WHERE - _inserted_timestamp >= '{{ max_part }}' - AND _inserted_timestamp > '{{ max_inserted_timestamp }}' + partition_gte_id >= '{{ max_part }}' + AND _inserted_timestamp > '{{ max_is }}' {% endif %} qualify ROW_NUMBER() over ( PARTITION BY account_id, closed_at ORDER BY + batch_insert_ts DESC, _inserted_timestamp DESC ) = 1 ) SELECT partition_id, - account_id :: STRING AS account_id, - balance :: FLOAT AS balance, - buying_liabilities :: FLOAT AS buying_liabilities, - selling_liabilities :: FLOAT AS selling_liabilities, - sequence_number :: INTEGER AS sequence_number, - num_subentries :: INTEGER AS num_subentries, - inflation_destination :: STRING AS inflation_destination, - flags :: INTEGER AS flags, - home_domain :: STRING AS home_domain, - master_weight :: INTEGER AS master_weight, - threshold_low :: INTEGER AS threshold_low, - threshold_medium :: INTEGER AS threshold_medium, - threshold_high :: INTEGER AS threshold_high, - last_modified_ledger :: INTEGER AS last_modified_ledger, - ledger_entry_change :: INTEGER AS ledger_entry_change, - deleted :: BOOLEAN AS deleted, - batch_id :: STRING AS batch_id, - batch_run_date :: TIMESTAMP AS batch_run_date, - batch_insert_ts :: TIMESTAMP AS batch_insert_ts, - sponsor :: STRING AS sponsor, - num_sponsored :: INTEGER AS num_sponsored, - num_sponsoring :: INTEGER AS num_sponsoring, - sequence_ledger :: INTEGER AS sequence_ledger, - sequence_time :: TIMESTAMP AS sequence_time, - closed_at :: TIMESTAMP AS closed_at, - ledger_sequence :: INTEGER AS ledger_sequence, + partition_gte_id, + account_id, + balance, + buying_liabilities, + selling_liabilities, + sequence_number, + num_subentries, + inflation_destination, + flags, + home_domain, + master_weight, + threshold_low, + threshold_medium, + threshold_high, + last_modified_ledger, + ledger_entry_change, + deleted, + batch_id, + batch_run_date, + batch_insert_ts, + sponsor, + num_sponsored, + num_sponsoring, + sequence_ledger, + sequence_time, + closed_at, + ledger_sequence, _inserted_timestamp, {{ dbt_utils.generate_surrogate_key( ['account_id','closed_at'] diff --git a/models/silver/silver__assets.sql b/models/silver/silver__assets.sql index ff78f8d..ceff93e 100644 --- a/models/silver/silver__assets.sql +++ b/models/silver/silver__assets.sql @@ -11,15 +11,15 @@ {% if execute %} {% if is_incremental() %} -{% set max_inserted_query %} +{% set max_bat_query %} SELECT - MAX(_inserted_timestamp) AS _inserted_timestamp + MAX(batch_insert_ts) AS batch_insert_ts FROM {{ this }} {% endset %} - {% set max_inserted_timestamp = run_query(max_inserted_query) [0] [0] %} + {% set max_batch = run_query(max_bat_query) [0] [0] %} {% endif %} {% if is_incremental() %} @@ -37,34 +37,37 @@ FROM WITH pre_final AS ( SELECT partition_id, - id :: flot AS id, + id :: FLOAT AS id, asset_type :: STRING AS asset_type, asset_code :: STRING AS asset_code, asset_issuer :: STRING AS asset_issuer, batch_run_date :: datetime AS batch_run_date, batch_id :: STRING AS batch_id, batch_insert_ts :: datetime AS batch_insert_ts, - asset_id :: INT AS asset_id, - _inserted_timestamp + asset_id :: INT AS asset_id FROM + {# {% if is_incremental() %} + {{ ref('bronze__assets') }} + {% else %} + {{ ref('bronze__assets_FR') }} + {% endif %} -{% if is_incremental() %} -{{ ref('bronze__assets') }} -{% else %} - {{ ref('bronze__assets_FR') }} -{% endif %} + #} + {{ source( + 'bronze_streamline', + 'history_assets' + ) }} {% if is_incremental() %} WHERE partition_id >= '{{ max_part }}' - AND _inserted_timestamp > '{{ max_inserted_timestamp }}' + AND batch_insert_ts > '{{ max_batch }}' {% endif %} qualify ROW_NUMBER() over ( PARTITION BY asset_id ORDER BY - batch_insert_ts DESC, - _inserted_timestamp DESC + batch_insert_ts DESC ) = 1 ) SELECT @@ -77,7 +80,6 @@ SELECT batch_id, batch_insert_ts, asset_id, - _inserted_timestamp, {{ dbt_utils.generate_surrogate_key( ['asset_id'] ) }} AS assets_id, diff --git a/models/silver/silver__ledgers.sql b/models/silver/silver__ledgers.sql index 9e99214..487a1f2 100644 --- a/models/silver/silver__ledgers.sql +++ b/models/silver/silver__ledgers.sql @@ -5,14 +5,13 @@ incremental_predicates = ["dynamic_range_predicate", "partition_id::date"], merge_exclude_columns = ["inserted_timestamp"], cluster_by = ['closed_at::DATE','partition_id','modified_timestamp::DATE'], - full_refresh = false, tags = ['scheduled_core'], ) }} {% if execute %} {% if is_incremental() %} -{% set max_inserted_query %} +{% set max_is_query %} SELECT MAX(_inserted_timestamp) AS _inserted_timestamp @@ -20,13 +19,22 @@ FROM {{ this }} {% endset %} - {% set max_inserted_timestamp = run_query(max_inserted_query) [0] [0] %} + {% set max_is = run_query(max_is_query) [0] [0] %} + {% set max_part_query %} +SELECT + MAX(partition_gte_id) AS partition__gte_id +FROM + {{ this }} + + {% endset %} + {% set max_part = run_query(max_part_query) [0] [0] %} {% endif %} {% endif %} WITH pre_final AS ( SELECT partition_id, + partition_gte_id, SEQUENCE :: INTEGER AS SEQUENCE, ledger_hash :: STRING AS ledger_hash, previous_ledger_hash :: STRING AS previous_ledger_hash, @@ -40,7 +48,7 @@ WITH pre_final AS ( base_reserve :: INTEGER AS base_reserve, max_tx_set_size :: INTEGER AS max_tx_set_size, protocol_version :: INTEGER AS protocol_version, - ledger_header :: BINARY AS ledger_header, + {# ledger_header :: STRING AS ledger_header, #} successful_transaction_count :: INTEGER AS successful_transaction_count, failed_transaction_count :: INTEGER AS failed_transaction_count, tx_set_operation_count :: INTEGER AS tx_set_operation_count, @@ -62,18 +70,20 @@ WITH pre_final AS ( {% if is_incremental() %} WHERE - partition_id >= '{{ max_part }}' - AND _inserted_timestamp > '{{ max_inserted_timestamp }}' + partition_gte_id >= '{{ max_part }}' + AND _inserted_timestamp > '{{ max_is }}' {% endif %} qualify ROW_NUMBER() over ( PARTITION BY SEQUENCE ORDER BY + batch_insert_ts DESC, _inserted_timestamp DESC ) = 1 ) SELECT partition_id, + partition_gte_id, SEQUENCE, ledger_hash, previous_ledger_hash, @@ -87,7 +97,7 @@ SELECT base_reserve, max_tx_set_size, protocol_version, - ledger_header, + {# ledger_header, #} successful_transaction_count, failed_transaction_count, tx_set_operation_count, @@ -98,12 +108,12 @@ SELECT node_id, signature, total_byte_size_of_bucket_list, - _inserted_timestamp, {{ dbt_utils.generate_surrogate_key( ['sequence'] ) }} AS ledgers_id, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, - '{{ invocation_id }}' AS _invocation_id + '{{ invocation_id }}' AS _invocation_id, + _inserted_timestamp FROM pre_final diff --git a/models/silver/silver__operations.sql b/models/silver/silver__operations.sql index 5a59717..b42ff4f 100644 --- a/models/silver/silver__operations.sql +++ b/models/silver/silver__operations.sql @@ -11,21 +11,18 @@ {% if execute %} {% if is_incremental() %} -{% set max_inserted_query %} +{% set max_is_query %} SELECT - MAX(batch_insert_ts) AS batch_insert_ts + MAX(_inserted_timestamp) AS _inserted_timestamp FROM {{ this }} {% endset %} - {% set max_inserted_timestamp = run_query(max_inserted_query) [0] [0] %} -{% endif %} - -{% if is_incremental() %} -{% set max_part_query %} + {% set max_is = run_query(max_is_query) [0] [0] %} + {% set max_part_query %} SELECT - MAX(partition_id) AS partition_id + MAX(partition_gte_id) AS partition__gte_id FROM {{ this }} @@ -36,7 +33,8 @@ FROM WITH pre_final AS ( SELECT - partition_id AS partition_id, + partition_id, + partition_gte_id, id :: INTEGER AS id, VALUE :source_account :: STRING AS source_account, VALUE :source_account_muxed :: STRING AS op_source_account_muxed, @@ -170,31 +168,27 @@ WITH pre_final AS ( ) AS closed_at, VALUE :batch_id :: STRING AS batch_id, VALUE :batch_run_date :: TIMESTAMP AS batch_run_date, - VALUE :batch_insert_ts :: INT AS batch_insert_ts, - NULL AS _inserted_timestamp + VALUE :batch_insert_ts :: TIMESTAMP AS batch_insert_ts, + _inserted_timestamp FROM - {{ source( - 'bronze_streamline', - 'history_operations' - ) }} - {# {% if is_incremental() %} - {{ ref('bronze__operations') }} - {% else %} - {{ ref('bronze__operations_FR') }} - {% endif %} - #} +{% if is_incremental() %} +{{ ref('bronze__operations') }} +{% else %} + {{ ref('bronze__operations_FR') }} +{% endif %} {% if is_incremental() %} WHERE - partition_id >= '{{ max_part }}' - AND _inserted_timestamp > '{{ max_batch }}' + partition_gte_id >= '{{ max_part }}' + AND _inserted_timestamp > '{{ max_is }}' {% endif %} qualify ROW_NUMBER() over ( PARTITION BY id ORDER BY - batch_insert_ts DESC + batch_insert_ts DESC, + _inserted_timestamp DESC ) = 1 ) SELECT