bronze & silver

This commit is contained in:
Eric Laurello 2025-02-04 14:01:55 -05:00
parent acdad04ce9
commit 53fcb53a57
20 changed files with 308 additions and 251 deletions

View File

@ -22,8 +22,7 @@ SELECT
{{ other_cols }},
value,
_inserted_timestamp,
s.{{ partition_name }},
s.value AS VALUE
s.{{ partition_name }}
FROM
{{ source(
"bronze_streamline",
@ -63,8 +62,7 @@ SELECT
{{ other_cols }},
value,
_inserted_timestamp,
s.{{ partition_name }},
s.value AS VALUE
s.{{ partition_name }}
FROM
{{ source(
"bronze_streamline",

View File

@ -6,5 +6,5 @@
partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')",
partition_name = "partition_gte_id",
unique_key = "ACCOUNT_ID",
other_cols = "partition_id, BALANCE, BUYING_LIABILITIES, SELLING_LIABILITIES, SEQUENCE_NUMBER, NUM_SUBENTRIES, INFLATION_DESTINATION, FLAGS, HOME_DOMAIN, MASTER_WEIGHT, THRESHOLD_LOW, THRESHOLD_MEDIUM, THRESHOLD_HIGH, LAST_MODIFIED_LEDGER, LEDGER_ENTRY_CHANGE, DELETED, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, SPONSOR, NUM_SPONSORED, NUM_SPONSORING, SEQUENCE_LEDGER, SEQUENCE_TIME, CLOSED_AT, LEDGER_SEQUENCE"
other_cols = "partition_id"
) }}

View File

@ -6,5 +6,5 @@
partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')",
partition_name = "partition_gte_id",
unique_key = "ACCOUNT_ID",
other_cols = "partition_id, BALANCE, BUYING_LIABILITIES, SELLING_LIABILITIES, SEQUENCE_NUMBER, NUM_SUBENTRIES, INFLATION_DESTINATION, FLAGS, HOME_DOMAIN, MASTER_WEIGHT, THRESHOLD_LOW, THRESHOLD_MEDIUM, THRESHOLD_HIGH, LAST_MODIFIED_LEDGER, LEDGER_ENTRY_CHANGE, DELETED, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, SPONSOR, NUM_SPONSORED, NUM_SPONSORING, SEQUENCE_LEDGER, SEQUENCE_TIME, CLOSED_AT, LEDGER_SEQUENCE"
other_cols = "partition_id"
) }}

View File

@ -3,8 +3,8 @@
) }}
{{ streamline_external_table_query_v2(
model = "history_assets",
partition_function = " TO_DATE( SPLIT_PART(SPLIT_PART(file_name, '/', 3), '=', 2)|| '01', 'YYYYMMDD')",
partition_name = "partition_id",
unique_key = "ID",
other_cols = "ASSET_TYPE, ASSET_CODE, ASSET_ISSUER, BATCH_RUN_DATE, BATCH_ID, BATCH_INSERT_TS, ASSET_ID"
partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')",
partition_name = "partition_gte_id",
unique_key = "asset_id",
other_cols = "partition_id"
) }}

View File

@ -3,8 +3,8 @@
) }}
{{ streamline_external_table_FR_query_v2(
model = "history_assets",
partition_function = " TO_DATE( SPLIT_PART(SPLIT_PART(file_name, '/', 3), '=', 2)|| '01', 'YYYYMMDD')",
partition_name = "partition_id",
unique_key = "ID",
other_cols = " ASSET_TYPE, ASSET_CODE, ASSET_ISSUER, BATCH_RUN_DATE, BATCH_ID, BATCH_INSERT_TS, ASSET_ID"
partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')",
partition_name = "partition_gte_id",
unique_key = "asset_id",
other_cols = "partition_id"
) }}

View File

@ -3,8 +3,8 @@
) }}
{{ streamline_external_table_query_v2(
model = "liquidity_pools",
partition_function = " TO_DATE( SPLIT_PART(SPLIT_PART(file_name, '/', 3), '=', 2)|| '01', 'YYYYMMDD')",
partition_name = "partition_id",
partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')",
partition_name = "partition_gte_id",
unique_key = "LIQUIDITY_POOL_ID",
other_cols = "TYPE, FEE, TRUSTLINE_COUNT, POOL_SHARE_COUNT, ASSET_A_TYPE, ASSET_A_CODE, ASSET_A_ISSUER, ASSET_A_ID, ASSET_A_AMOUNT, ASSET_B_TYPE, ASSET_B_CODE, ASSET_B_ISSUER, ASSET_B_ID, ASSET_B_AMOUNT, LAST_MODIFIED_LEDGER, LEDGER_ENTRY_CHANGE, DELETED, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, CLOSED_AT, LEDGER_SEQUENCE"
other_cols = "partition_id"
) }}

View File

@ -3,8 +3,8 @@
) }}
{{ streamline_external_table_FR_query_v2(
model = "liquidity_pools",
partition_function = " TO_DATE( SPLIT_PART(SPLIT_PART(file_name, '/', 3), '=', 2)|| '01', 'YYYYMMDD')",
partition_name = "partition_id",
partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')",
partition_name = "partition_gte_id",
unique_key = "LIQUIDITY_POOL_ID",
other_cols = "TYPE, FEE, TRUSTLINE_COUNT, POOL_SHARE_COUNT, ASSET_A_TYPE, ASSET_A_CODE, ASSET_A_ISSUER, ASSET_A_ID, ASSET_A_AMOUNT, ASSET_B_TYPE, ASSET_B_CODE, ASSET_B_ISSUER, ASSET_B_ID, ASSET_B_AMOUNT, LAST_MODIFIED_LEDGER, LEDGER_ENTRY_CHANGE, DELETED, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, CLOSED_AT, LEDGER_SEQUENCE"
other_cols = "partition_id"
) }}

View File

@ -6,5 +6,5 @@
partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')",
partition_name = "partition_gte_id",
unique_key = "ID",
other_cols = 'partition_id,VALUE'
other_cols = 'partition_id'
) }}

View File

@ -6,5 +6,5 @@
partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')",
partition_name = "partition_gte_id",
unique_key = "ID",
other_cols = 'partition_id,VALUE'
other_cols = 'partition_id'
) }}

View File

@ -3,8 +3,8 @@
) }}
{{ streamline_external_table_query_v2(
model = "history_trades",
partition_function = " TO_DATE( SPLIT_PART(SPLIT_PART(file_name, '/', 3), '=', 2)|| '01', 'YYYYMMDD')",
partition_name = "partition_id",
partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')",
partition_name = "partition_gte_id",
unique_key = "HISTORY_OPERATION_ID",
other_cols = '"order", LEDGER_CLOSED_AT, SELLING_ACCOUNT_ADDRESS, SELLING_ASSET_CODE, SELLING_ASSET_ISSUER, SELLING_ASSET_TYPE, SELLING_ASSET_ID, SELLING_AMOUNT, BUYING_ACCOUNT_ADDRESS, BUYING_ASSET_CODE, BUYING_ASSET_ISSUER, BUYING_ASSET_TYPE, BUYING_ASSET_ID, BUYING_AMOUNT, PRICE_N, PRICE_D, SELLING_OFFER_ID, BUYING_OFFER_ID, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, SELLING_LIQUIDITY_POOL_ID, LIQUIDITY_POOL_FEE, TRADE_TYPE, ROUNDING_SLIPPAGE, SELLER_IS_EXACT'
other_cols = 'partition_id'
) }}

View File

@ -3,8 +3,8 @@
) }}
{{ streamline_external_table_FR_query_v2(
model = "history_trades",
partition_function = " TO_DATE( SPLIT_PART(SPLIT_PART(file_name, '/', 3), '=', 2)|| '01', 'YYYYMMDD')",
partition_name = "partition_id",
partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')",
partition_name = "partition_gte_id",
unique_key = "HISTORY_OPERATION_ID",
other_cols = '"order", LEDGER_CLOSED_AT, SELLING_ACCOUNT_ADDRESS, SELLING_ASSET_CODE, SELLING_ASSET_ISSUER, SELLING_ASSET_TYPE, SELLING_ASSET_ID, SELLING_AMOUNT, BUYING_ACCOUNT_ADDRESS, BUYING_ASSET_CODE, BUYING_ASSET_ISSUER, BUYING_ASSET_TYPE, BUYING_ASSET_ID, BUYING_AMOUNT, PRICE_N, PRICE_D, SELLING_OFFER_ID, BUYING_OFFER_ID, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, SELLING_LIQUIDITY_POOL_ID, LIQUIDITY_POOL_FEE, TRADE_TYPE, ROUNDING_SLIPPAGE, SELLER_IS_EXACT'
other_cols = 'partition_id'
) }}

View File

@ -3,8 +3,8 @@
) }}
{{ streamline_external_table_query_v2(
model = "history_transactions",
partition_function = " TO_DATE( SPLIT_PART(SPLIT_PART(file_name, '/', 3), '=', 2)|| '01', 'YYYYMMDD')",
partition_name = "partition_id",
partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')",
partition_name = "partition_gte_id",
unique_key = "ID",
other_cols = "TRANSACTION_HASH, LEDGER_SEQUENCE, ACCOUNT, ACCOUNT_SEQUENCE, MAX_FEE, OPERATION_COUNT, CREATED_AT, MEMO_TYPE, MEMO, TIME_BOUNDS, SUCCESSFUL, FEE_CHARGED, INNER_TRANSACTION_HASH, FEE_ACCOUNT, NEW_MAX_FEE, ACCOUNT_MUXED, FEE_ACCOUNT_MUXED, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, LEDGER_BOUNDS, MIN_ACCOUNT_SEQUENCE, MIN_ACCOUNT_SEQUENCE_AGE, MIN_ACCOUNT_SEQUENCE_LEDGER_GAP, TX_ENVELOPE, TX_RESULT, TX_META, TX_FEE_META, EXTRA_SIGNERS, RESOURCE_FEE, SOROBAN_RESOURCES_INSTRUCTIONS, SOROBAN_RESOURCES_READ_BYTES, SOROBAN_RESOURCES_WRITE_BYTES, CLOSED_AT, TRANSACTION_RESULT_CODE, INCLUSION_FEE_BID, INCLUSION_FEE_CHARGED, RESOURCE_FEE_REFUND, NON_REFUNDABLE_RESOURCE_FEE_CHARGED, REFUNDABLE_RESOURCE_FEE_CHARGED, RENT_FEE_CHARGED, TX_SIGNERS, REFUNDABLE_FEE"
other_cols = "partition_id"
) }}

View File

@ -3,8 +3,8 @@
) }}
{{ streamline_external_table_FR_query_v2(
model = "history_transactions",
partition_function = " TO_DATE( SPLIT_PART(SPLIT_PART(file_name, '/', 3), '=', 2)|| '01', 'YYYYMMDD')",
partition_name = "partition_id",
partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')",
partition_name = "partition_gte_id",
unique_key = "ID",
other_cols = "TRANSACTION_HASH, LEDGER_SEQUENCE, ACCOUNT, ACCOUNT_SEQUENCE, MAX_FEE, OPERATION_COUNT, CREATED_AT, MEMO_TYPE, MEMO, TIME_BOUNDS, SUCCESSFUL, FEE_CHARGED, INNER_TRANSACTION_HASH, FEE_ACCOUNT, NEW_MAX_FEE, ACCOUNT_MUXED, FEE_ACCOUNT_MUXED, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, LEDGER_BOUNDS, MIN_ACCOUNT_SEQUENCE, MIN_ACCOUNT_SEQUENCE_AGE, MIN_ACCOUNT_SEQUENCE_LEDGER_GAP, TX_ENVELOPE, TX_RESULT, TX_META, TX_FEE_META, EXTRA_SIGNERS, RESOURCE_FEE, SOROBAN_RESOURCES_INSTRUCTIONS, SOROBAN_RESOURCES_READ_BYTES, SOROBAN_RESOURCES_WRITE_BYTES, CLOSED_AT, TRANSACTION_RESULT_CODE, INCLUSION_FEE_BID, INCLUSION_FEE_CHARGED, RESOURCE_FEE_REFUND, NON_REFUNDABLE_RESOURCE_FEE_CHARGED, REFUNDABLE_RESOURCE_FEE_CHARGED, RENT_FEE_CHARGED, TX_SIGNERS, REFUNDABLE_FEE"
other_cols = "partition_id"
) }}

View File

@ -5,7 +5,6 @@
incremental_predicates = ["dynamic_range_predicate", "partition_id::date"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['closed_at::DATE','partition_id','modified_timestamp::DATE'],
full_refresh = false,
tags = ['scheduled_core'],
) }}
@ -15,20 +14,15 @@
{% set max_is_query %}
SELECT
MAX(_inserted_timestamp) AS _inserted_timestamp
FROM
{{ this }}
{% endset %}
{% set max_is = run_query(max_is_query) [0] [0] %}
{% set max_part_query %}
SELECT
MAX(_inserted_timestamp) AS _inserted_timestamp,
MAX(partition_gte_id) AS partition__gte_id
FROM
{{ this }}
{% endset %}
{% set max_part = run_query(max_part_query) [0] [0] %}
{% set result = run_query(max_is_query) %}
{% set max_is = result [0] [0] %}
{% set max_part = result [0] [1] %}
{% endif %}
{% endif %}
@ -36,32 +30,53 @@ WITH pre_final AS (
SELECT
partition_id,
partition_gte_id,
account_id :: STRING AS account_id,
balance :: FLOAT AS balance,
buying_liabilities :: FLOAT AS buying_liabilities,
selling_liabilities :: FLOAT AS selling_liabilities,
sequence_number :: INTEGER AS sequence_number,
num_subentries :: INTEGER AS num_subentries,
inflation_destination :: STRING AS inflation_destination,
flags :: INTEGER AS flags,
home_domain :: STRING AS home_domain,
master_weight :: INTEGER AS master_weight,
threshold_low :: INTEGER AS threshold_low,
threshold_medium :: INTEGER AS threshold_medium,
threshold_high :: INTEGER AS threshold_high,
last_modified_ledger :: INTEGER AS last_modified_ledger,
ledger_entry_change :: INTEGER AS ledger_entry_change,
deleted :: BOOLEAN AS deleted,
batch_id :: STRING AS batch_id,
batch_run_date :: TIMESTAMP AS batch_run_date,
batch_insert_ts :: TIMESTAMP AS batch_insert_ts,
sponsor :: STRING AS sponsor,
num_sponsored :: INTEGER AS num_sponsored,
num_sponsoring :: INTEGER AS num_sponsoring,
sequence_ledger :: INTEGER AS sequence_ledger,
sequence_time :: TIMESTAMP AS sequence_time,
closed_at :: TIMESTAMP AS closed_at,
ledger_sequence :: INTEGER AS ledger_sequence,
VALUE :account_id :: STRING AS account_id,
TRY_CAST(
VALUE :balance :: STRING AS FLOAT
) AS balance,
TRY_CAST(
VALUE :buying_liabilities :: STRING AS FLOAT
) AS buying_liabilities,
TRY_CAST(
VALUE :selling_liabilities :: STRING AS FLOAT
) AS selling_liabilities,
VALUE :sequence_number :: INTEGER AS sequence_number,
VALUE :num_subentries :: INTEGER AS num_subentries,
VALUE :inflation_destination :: STRING AS inflation_destination,
VALUE :flags :: INTEGER AS flags,
VALUE :home_domain :: STRING AS home_domain,
VALUE :master_weight :: INTEGER AS master_weight,
VALUE :threshold_low :: INTEGER AS threshold_low,
VALUE :threshold_medium :: INTEGER AS threshold_medium,
VALUE :threshold_high :: INTEGER AS threshold_high,
VALUE :last_modified_ledger :: INTEGER AS last_modified_ledger,
VALUE :ledger_entry_change :: INTEGER AS ledger_entry_change,
VALUE :deleted :: BOOLEAN AS deleted,
TO_TIMESTAMP(
VALUE :batch_run_date :: INT,
6
) AS batch_run_date,
VALUE: batch_id :: STRING AS batch_id,
TO_TIMESTAMP(
VALUE :batch_insert_ts :: INT,
6
) AS batch_insert_ts,
VALUE :sponsor :: STRING AS sponsor,
VALUE :num_sponsored :: INTEGER AS num_sponsored,
VALUE :num_sponsoring :: INTEGER AS num_sponsoring,
VALUE :sequence_ledger :: INTEGER AS sequence_ledger,
TO_TIMESTAMP(
VALUE :sequence_time :: INT,
6
) AS sequence_time,
TO_TIMESTAMP(
VALUE :closed_at :: INT,
6
) AS closed_at,
TO_TIMESTAMP(
VALUE :ledger_sequence :: INT,
6
) AS ledger_sequence,
_inserted_timestamp
FROM

View File

@ -11,67 +11,64 @@
{% if execute %}
{% if is_incremental() %}
{% set max_bat_query %}
{% set max_is_query %}
SELECT
MAX(batch_insert_ts) AS batch_insert_ts
MAX(_inserted_timestamp) AS _inserted_timestamp,
MAX(partition_gte_id) AS partition__gte_id
FROM
{{ this }}
{% endset %}
{% set max_batch = run_query(max_bat_query) [0] [0] %}
{% endif %}
{% if is_incremental() %}
{% set max_part_query %}
SELECT
MAX(partition_id) AS partition_id
FROM
{{ this }}
{% endset %}
{% set max_part = run_query(max_part_query) [0] [0] %}
{% set result = run_query(max_is_query) %}
{% set max_is = result [0] [0] %}
{% set max_part = result [0] [1] %}
{% endif %}
{% endif %}
WITH pre_final AS (
SELECT
partition_id,
id :: FLOAT AS id,
asset_type :: STRING AS asset_type,
asset_code :: STRING AS asset_code,
asset_issuer :: STRING AS asset_issuer,
batch_run_date :: datetime AS batch_run_date,
batch_id :: STRING AS batch_id,
batch_insert_ts :: datetime AS batch_insert_ts,
asset_id :: INT AS asset_id
partition_gte_id,
VALUE :id :: FLOAT AS id,
VALUE :asset_type :: STRING AS asset_type,
VALUE :asset_code :: STRING AS asset_code,
VALUE :asset_issuer :: STRING AS asset_issuer,
TO_TIMESTAMP(
VALUE :batch_run_date :: INT,
6
) AS batch_run_date,
VALUE: batch_id :: STRING AS batch_id,
TO_TIMESTAMP(
VALUE :batch_insert_ts :: INT,
6
) AS batch_insert_ts,
VALUE :asset_id :: INT AS asset_id,
_inserted_timestamp
FROM
{# {% if is_incremental() %}
{% if is_incremental() %}
{{ ref('bronze__assets') }}
{% else %}
{{ ref('bronze__assets_FR') }}
{% endif %}
#}
{{ source(
'bronze_streamline',
'history_assets'
) }}
{% if is_incremental() %}
WHERE
partition_id >= '{{ max_part }}'
AND batch_insert_ts > '{{ max_batch }}'
partition_gte_id >= '{{ max_part }}'
AND _inserted_timestamp > '{{ max_is }}'
{% endif %}
qualify ROW_NUMBER() over (
PARTITION BY asset_id
ORDER BY
batch_insert_ts DESC
batch_insert_ts DESC,
_inserted_timestamp DESC
) = 1
)
SELECT
partition_id,
partition_gte_id,
id,
asset_type,
asset_code,
@ -80,6 +77,7 @@ SELECT
batch_id,
batch_insert_ts,
asset_id,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['asset_id']
) }} AS assets_id,

View File

@ -14,20 +14,15 @@
{% set max_is_query %}
SELECT
MAX(_inserted_timestamp) AS _inserted_timestamp
FROM
{{ this }}
{% endset %}
{% set max_is = run_query(max_is_query) [0] [0] %}
{% set max_part_query %}
SELECT
MAX(_inserted_timestamp) AS _inserted_timestamp,
MAX(partition_gte_id) AS partition__gte_id
FROM
{{ this }}
{% endset %}
{% set max_part = run_query(max_part_query) [0] [0] %}
{% set result = run_query(max_is_query) %}
{% set max_is = result [0] [0] %}
{% set max_part = result [0] [1] %}
{% endif %}
{% endif %}

View File

@ -11,22 +11,84 @@
{% if execute %}
{% if is_incremental() %}
{% set max_inserted_query %}
{% set max_is_query %}
SELECT
MAX(_inserted_timestamp) AS _inserted_timestamp
MAX(_inserted_timestamp) AS _inserted_timestamp,
MAX(partition_gte_id) AS partition__gte_id
FROM
{{ this }}
{% endset %}
{% set max_inserted_timestamp = run_query(max_inserted_query) [0] [0] %}
{% set result = run_query(max_is_query) %}
{% set max_is = result [0] [0] %}
{% set max_part = result [0] [1] %}
{% endif %}
{% endif %}
WITH pre_final AS (
SELECT
partition_id,
partition_gte_id,
liquidity_pool_id,
VALUE :TYPE :: STRING AS TYPE,
VALUE :fee :: INTEGER AS fee,
VALUE :trustline_count :: INTEGER AS trustline_count,
VALUE :pool_share_count :: FLOAT AS pool_share_count,
VALUE :asset_a_type :: STRING AS asset_a_type,
VALUE :asset_a_code :: STRING AS asset_a_code,
VALUE :asset_a_issuer :: STRING AS asset_a_issuer,
VALUE :asset_a_id :: INTEGER AS asset_a_id,
VALUE :asset_a_amount :: FLOAT AS asset_a_amount,
VALUE :asset_b_type :: STRING AS asset_b_type,
VALUE :asset_b_code :: STRING AS asset_b_code,
VALUE :asset_b_issuer :: STRING AS asset_b_issuer,
VALUE :asset_b_id :: INTEGER AS asset_b_id,
VALUE :asset_b_amount :: FLOAT AS asset_b_amount,
VALUE :last_modified_ledger :: INTEGER AS last_modified_ledger,
VALUE :ledger_entry_change :: INTEGER AS ledger_entry_change,
VALUE :deleted :: BOOLEAN AS deleted,
TO_TIMESTAMP(
VALUE :batch_run_date :: INT,
6
) AS batch_run_date,
VALUE: batch_id :: STRING AS batch_id,
TO_TIMESTAMP(
VALUE :batch_insert_ts :: INT,
6
) AS batch_insert_ts,
TO_TIMESTAMP(
VALUE :closed_at :: INT,
6
) AS closed_at,
VALUE :ledger_sequence :: INTEGER AS ledger_sequence,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__liquidity_pools') }}
{% else %}
{{ ref('bronze__liquidity_pools_FR') }}
{% endif %}
{% if is_incremental() %}
WHERE
partition_gte_id >= '{{ max_part }}'
AND _inserted_timestamp > '{{ max_is }}'
{% endif %}
qualify ROW_NUMBER() over (
PARTITION BY liquidity_pool_id,
closed_at
ORDER BY
batch_insert_ts DESC,
_inserted_timestamp DESC
) = 1
)
SELECT
partition_id,
partition_gte_id,
liquidity_pool_id :: STRING AS liquidity_pool_id,
TYPE,
fee,
trustline_count,
@ -49,53 +111,6 @@ WITH pre_final AS (
batch_insert_ts,
closed_at,
ledger_sequence,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__liquidity_pools') }}
{% else %}
{{ ref('bronze__liquidity_pools_FR') }}
{% endif %}
{% if is_incremental() %}
WHERE
partition_id >= '{{ max_part }}'
AND _inserted_timestamp > '{{ max_inserted_timestamp }}'
{% endif %}
qualify ROW_NUMBER() over (
PARTITION BY liquidity_pool_id,
closed_at
ORDER BY
_inserted_timestamp DESC
) = 1
)
SELECT
partition_id,
liquidity_pool_id :: STRING AS liquidity_pool_id,
TYPE :: STRING AS TYPE,
fee :: INTEGER AS fee,
trustline_count :: INTEGER AS trustline_count,
pool_share_count :: FLOAT AS pool_share_count,
asset_a_type :: STRING AS asset_a_type,
asset_a_code :: STRING AS asset_a_code,
asset_a_issuer :: STRING AS asset_a_issuer,
asset_a_id :: INTEGER AS asset_a_id,
asset_a_amount :: FLOAT AS asset_a_amount,
asset_b_type :: STRING AS asset_b_type,
asset_b_code :: STRING AS asset_b_code,
asset_b_issuer :: STRING AS asset_b_issuer,
asset_b_id :: INTEGER AS asset_b_id,
asset_b_amount :: FLOAT AS asset_b_amount,
last_modified_ledger :: INTEGER AS last_modified_ledger,
ledger_entry_change :: INTEGER AS ledger_entry_change,
deleted :: BOOLEAN AS deleted,
batch_id :: STRING AS batch_id,
batch_run_date :: TIMESTAMP AS batch_run_date,
batch_insert_ts :: TIMESTAMP AS batch_insert_ts,
closed_at :: TIMESTAMP AS closed_at,
ledger_sequence :: INTEGER AS ledger_sequence,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['liquidity_pool_id','closed_at']

View File

@ -14,20 +14,15 @@
{% set max_is_query %}
SELECT
MAX(_inserted_timestamp) AS _inserted_timestamp
FROM
{{ this }}
{% endset %}
{% set max_is = run_query(max_is_query) [0] [0] %}
{% set max_part_query %}
SELECT
MAX(_inserted_timestamp) AS _inserted_timestamp,
MAX(partition_gte_id) AS partition__gte_id
FROM
{{ this }}
{% endset %}
{% set max_part = run_query(max_part_query) [0] [0] %}
{% set result = run_query(max_is_query) %}
{% set max_is = result [0] [0] %}
{% set max_part = result [0] [1] %}
{% endif %}
{% endif %}
@ -167,8 +162,14 @@ WITH pre_final AS (
6
) AS closed_at,
VALUE :batch_id :: STRING AS batch_id,
VALUE :batch_run_date :: TIMESTAMP AS batch_run_date,
VALUE :batch_insert_ts :: TIMESTAMP AS batch_insert_ts,
TO_TIMESTAMP(
VALUE :batch_run_date :: INT,
6
) AS batch_run_date,
TO_TIMESTAMP(
VALUE :batch_insert_ts :: INT,
6
) AS batch_insert_ts,
_inserted_timestamp
FROM

View File

@ -11,48 +11,61 @@
{% if execute %}
{% if is_incremental() %}
{% set max_inserted_query %}
{% set max_is_query %}
SELECT
MAX(_inserted_timestamp) AS _inserted_timestamp
MAX(_inserted_timestamp) AS _inserted_timestamp,
MAX(partition_gte_id) AS partition__gte_id
FROM
{{ this }}
{% endset %}
{% set max_inserted_timestamp = run_query(max_inserted_query) [0] [0] %}
{% set result = run_query(max_is_query) %}
{% set max_is = result [0] [0] %}
{% set max_part = result [0] [1] %}
{% endif %}
{% endif %}
WITH pre_final AS (
SELECT
partition_id,
partition_gte_id,
history_operation_id :: INTEGER AS history_operation_id,
"order" :: INTEGER AS "order",
ledger_closed_at :: TIMESTAMP AS ledger_closed_at,
selling_account_address :: STRING AS selling_account_address,
selling_asset_code :: STRING AS selling_asset_code,
selling_asset_issuer :: STRING AS selling_asset_issuer,
selling_asset_type :: STRING AS selling_asset_type,
selling_asset_id :: INTEGER AS selling_asset_id,
selling_amount :: FLOAT AS selling_amount,
buying_account_address :: STRING AS buying_account_address,
buying_asset_code :: STRING AS buying_asset_code,
buying_asset_issuer :: STRING AS buying_asset_issuer,
buying_asset_type :: STRING AS buying_asset_type,
buying_asset_id :: INTEGER AS buying_asset_id,
buying_amount :: FLOAT AS buying_amount,
price_n :: INTEGER AS price_n,
price_d :: INTEGER AS price_d,
selling_offer_id :: INTEGER AS selling_offer_id,
buying_offer_id :: INTEGER AS buying_offer_id,
batch_id :: STRING AS batch_id,
batch_run_date :: TIMESTAMP AS batch_run_date,
batch_insert_ts :: TIMESTAMP AS batch_insert_ts,
selling_liquidity_pool_id :: STRING AS selling_liquidity_pool_id,
liquidity_pool_fee :: INTEGER AS liquidity_pool_fee,
trade_type :: INTEGER AS trade_type,
rounding_slippage :: INTEGER AS rounding_slippage,
seller_is_exact :: BOOLEAN AS seller_is_exact,
VALUE: "order" :: INTEGER AS "order",
TO_TIMESTAMP(
VALUE :ledger_closed_at :: INT,
6
) AS ledger_closed_at,
VALUE: selling_account_address :: STRING AS selling_account_address,
VALUE: selling_asset_code :: STRING AS selling_asset_code,
VALUE: selling_asset_issuer :: STRING AS selling_asset_issuer,
VALUE: selling_asset_type :: STRING AS selling_asset_type,
VALUE: selling_asset_id :: INTEGER AS selling_asset_id,
VALUE: selling_amount :: FLOAT AS selling_amount,
VALUE: buying_account_address :: STRING AS buying_account_address,
VALUE: buying_asset_code :: STRING AS buying_asset_code,
VALUE: buying_asset_issuer :: STRING AS buying_asset_issuer,
VALUE: buying_asset_type :: STRING AS buying_asset_type,
VALUE: buying_asset_id :: INTEGER AS buying_asset_id,
VALUE: buying_amount :: FLOAT AS buying_amount,
VALUE: price_n :: INTEGER AS price_n,
VALUE: price_d :: INTEGER AS price_d,
VALUE: selling_offer_id :: INTEGER AS selling_offer_id,
VALUE: buying_offer_id :: INTEGER AS buying_offer_id,
VALUE :batch_id :: STRING AS batch_id,
TO_TIMESTAMP(
VALUE :batch_run_date :: INT,
6
) AS batch_run_date,
TO_TIMESTAMP(
VALUE :batch_insert_ts :: INT,
6
) AS batch_insert_ts,
VALUE: selling_liquidity_pool_id :: STRING AS selling_liquidity_pool_id,
VALUE: liquidity_pool_fee :: INTEGER AS liquidity_pool_fee,
VALUE: trade_type :: INTEGER AS trade_type,
VALUE: rounding_slippage :: INTEGER AS rounding_slippage,
VALUE: seller_is_exact :: BOOLEAN AS seller_is_exact,
_inserted_timestamp
FROM
@ -64,18 +77,21 @@ WITH pre_final AS (
{% if is_incremental() %}
WHERE
_inserted_timestamp >= '{{ max_inserted_timestamp }}'
partition_gte_id >= '{{ max_part }}'
AND _inserted_timestamp > '{{ max_is }}'
{% endif %}
qualify ROW_NUMBER() over (
PARTITION BY history_operation_id,
"order"
ORDER BY
batch_insert_ts DESC,
_inserted_timestamp DESC
) = 1
)
SELECT
partition_id,
partition_gte_id,
history_operation_id,
"order",
ledger_closed_at,

View File

@ -11,65 +11,81 @@
{% if execute %}
{% if is_incremental() %}
{% set max_inserted_query %}
{% set max_is_query %}
SELECT
MAX(_inserted_timestamp) AS _inserted_timestamp
MAX(_inserted_timestamp) AS _inserted_timestamp,
MAX(partition_gte_id) AS partition__gte_id
FROM
{{ this }}
{% endset %}
{% set max_inserted_timestamp = run_query(max_inserted_query) [0] [0] %}
{% set result = run_query(max_is_query) %}
{% set max_is = result [0] [0] %}
{% set max_part = result [0] [1] %}
{% endif %}
{% endif %}
WITH pre_final AS (
SELECT
partition_id,
partition_gte_id,
id :: INTEGER AS id,
transaction_hash :: STRING AS transaction_hash,
ledger_sequence :: INTEGER AS ledger_sequence,
account :: STRING AS account,
account_sequence :: INTEGER AS account_sequence,
max_fee :: INTEGER AS max_fee,
operation_count :: INTEGER AS operation_count,
created_at :: TIMESTAMP AS created_at,
memo_type :: STRING AS memo_type,
memo :: STRING AS memo,
time_bounds :: STRING AS time_bounds,
SUCCESSFUL :: BOOLEAN AS SUCCESSFUL,
fee_charged :: INTEGER AS fee_charged,
inner_transaction_hash :: STRING AS inner_transaction_hash,
fee_account :: STRING AS fee_account,
new_max_fee :: INTEGER AS new_max_fee,
account_muxed :: STRING AS account_muxed,
fee_account_muxed :: STRING AS fee_account_muxed,
batch_id :: STRING AS batch_id,
batch_run_date :: TIMESTAMP AS batch_run_date,
batch_insert_ts :: TIMESTAMP AS batch_insert_ts,
ledger_bounds :: STRING AS ledger_bounds,
min_account_sequence :: INTEGER AS min_account_sequence,
min_account_sequence_age :: INTEGER AS min_account_sequence_age,
min_account_sequence_ledger_gap :: INTEGER AS min_account_sequence_ledger_gap,
tx_envelope :: STRING AS tx_envelope,
tx_result :: STRING AS tx_result,
tx_meta :: STRING AS tx_meta,
tx_fee_meta :: STRING AS tx_fee_meta,
extra_signers :: ARRAY AS extra_signers,
resource_fee :: INTEGER AS resource_fee,
soroban_resources_instructions :: INTEGER AS soroban_resources_instructions,
soroban_resources_read_bytes :: INTEGER AS soroban_resources_read_bytes,
soroban_resources_write_bytes :: INTEGER AS soroban_resources_write_bytes,
closed_at :: TIMESTAMP AS closed_at,
transaction_result_code :: STRING AS transaction_result_code,
inclusion_fee_bid :: INTEGER AS inclusion_fee_bid,
inclusion_fee_charged :: INTEGER AS inclusion_fee_charged,
resource_fee_refund :: INTEGER AS resource_fee_refund,
non_refundable_resource_fee_charged :: INTEGER AS non_refundable_resource_fee_charged,
refundable_resource_fee_charged :: INTEGER AS refundable_resource_fee_charged,
rent_fee_charged :: INTEGER AS rent_fee_charged,
tx_signers :: STRING AS tx_signers,
refundable_fee :: INTEGER AS refundable_fee,
VALUE :transaction_hash :: STRING AS transaction_hash,
VALUE :ledger_sequence :: INTEGER AS ledger_sequence,
VALUE :account :: STRING AS account,
VALUE :account_sequence :: INTEGER AS account_sequence,
VALUE :max_fee :: INTEGER AS max_fee,
VALUE :operation_count :: INTEGER AS operation_count,
TO_TIMESTAMP(
VALUE :created_at :: INT,
6
) AS created_at,
VALUE :memo_type :: STRING AS memo_type,
VALUE :memo :: STRING AS memo,
VALUE :time_bounds :: STRING AS time_bounds,
VALUE :SUCCESSFUL :: BOOLEAN AS SUCCESSFUL,
VALUE :fee_charged :: INTEGER AS fee_charged,
VALUE :inner_transaction_hash :: STRING AS inner_transaction_hash,
VALUE :fee_account :: STRING AS fee_account,
VALUE :new_max_fee :: INTEGER AS new_max_fee,
VALUE :account_muxed :: STRING AS account_muxed,
VALUE :fee_account_muxed :: STRING AS fee_account_muxed,
VALUE :batch_id :: STRING AS batch_id,
TO_TIMESTAMP(
VALUE :batch_run_date :: INT,
6
) AS batch_run_date,
TO_TIMESTAMP(
VALUE :batch_insert_ts :: INT,
6
) AS batch_insert_ts,
VALUE :ledger_bounds :: STRING AS ledger_bounds,
VALUE :min_account_sequence :: INTEGER AS min_account_sequence,
VALUE :min_account_sequence_age :: INTEGER AS min_account_sequence_age,
VALUE :min_account_sequence_ledger_gap :: INTEGER AS min_account_sequence_ledger_gap,
VALUE :tx_envelope :: STRING AS tx_envelope,
VALUE :tx_result :: STRING AS tx_result,
VALUE :tx_meta :: STRING AS tx_meta,
VALUE :tx_fee_meta :: STRING AS tx_fee_meta,
VALUE :extra_signers :: ARRAY AS extra_signers,
VALUE :resource_fee :: INTEGER AS resource_fee,
VALUE :soroban_resources_instructions :: INTEGER AS soroban_resources_instructions,
VALUE :soroban_resources_read_bytes :: INTEGER AS soroban_resources_read_bytes,
VALUE :soroban_resources_write_bytes :: INTEGER AS soroban_resources_write_bytes,
TO_TIMESTAMP(
VALUE :closed_at :: INT,
6
) AS closed_at,
VALUE :transaction_result_code :: STRING AS transaction_result_code,
VALUE :inclusion_fee_bid :: INTEGER AS inclusion_fee_bid,
VALUE :inclusion_fee_charged :: INTEGER AS inclusion_fee_charged,
VALUE :resource_fee_refund :: INTEGER AS resource_fee_refund,
VALUE :non_refundable_resource_fee_charged :: INTEGER AS non_refundable_resource_fee_charged,
VALUE :refundable_resource_fee_charged :: INTEGER AS refundable_resource_fee_charged,
VALUE :rent_fee_charged :: INTEGER AS rent_fee_charged,
VALUE :tx_signers :: STRING AS tx_signers,
VALUE :refundable_fee :: INTEGER AS refundable_fee,
_inserted_timestamp
FROM
@ -81,17 +97,20 @@ WITH pre_final AS (
{% if is_incremental() %}
WHERE
_inserted_timestamp >= '{{ max_inserted_timestamp }}'
partition_gte_id >= '{{ max_part }}'
AND _inserted_timestamp > '{{ max_is }}'
{% endif %}
qualify ROW_NUMBER() over (
PARTITION BY id
ORDER BY
batch_insert_ts DESC,
_inserted_timestamp DESC
) = 1
)
SELECT
partition_id,
partition_gte_id,
id,
transaction_hash,
ledger_sequence,