diff --git a/models/__overview__.md b/models/__overview__.md index e886719e..62e93ad9 100644 --- a/models/__overview__.md +++ b/models/__overview__.md @@ -90,6 +90,8 @@ There is more information on how to use dbt docs in the last section of this doc - [fact_attestations](https://flipsidecrypto.github.io/ethereum-models/#!/model/model.ethereum_models.beacon_chain__fact_attestations) - [fact_deposits](https://flipsidecrypto.github.io/ethereum-models/#!/model/model.ethereum_models.beacon_chain__fact_deposits) - [fact_pending_deposits](https://flipsidecrypto.github.io/ethereum-models/#!/model/model.ethereum_models.beacon_chain__fact_pending_deposits) +- [fact_pending_partial_withdrawals](https://flipsidecrypto.github.io/ethereum-models/#!/model/model.ethereum_models.beacon_chain__fact_pending_partial_withdrawals) +- [fact_pending_consolidations](https://flipsidecrypto.github.io/ethereum-models/#!/model/model.ethereum_models.beacon_chain__fact_pending_consolidations) - [fact_withdrawals](https://flipsidecrypto.github.io/ethereum-models/#!/model/model.ethereum_models.beacon_chain__fact_withdrawals) - [fact_validators](https://flipsidecrypto.github.io/ethereum-models/#!/model/model.ethereum_models.beacon_chain__fact_validators) - [fact_validator_balances](https://flipsidecrypto.github.io/ethereum-models/#!/model/model.ethereum_models.beacon_chain__fact_validator_balances) diff --git a/models/beacon_chain/gold/beacon_chain__fact_pending_consolidations.sql b/models/beacon_chain/gold/beacon_chain__fact_pending_consolidations.sql new file mode 100644 index 00000000..b946bdd1 --- /dev/null +++ b/models/beacon_chain/gold/beacon_chain__fact_pending_consolidations.sql @@ -0,0 +1,18 @@ +{{ config( + materialized = 'view', + persist_docs ={ "relation": true, + "columns": true }, + meta ={ 'database_tags':{ 'table':{ 'PURPOSE': 'BEACON' } } }, + tags = ['gold','beacon'] +) }} + +SELECT + request_slot_number, + source_index, + target_index, + pending_consolidations_id as fact_pending_consolidations_id, + inserted_timestamp, + modified_timestamp +FROM + {{ ref('silver__pending_consolidations') }} + diff --git a/models/beacon_chain/gold/beacon_chain__fact_pending_consolidations.yml b/models/beacon_chain/gold/beacon_chain__fact_pending_consolidations.yml new file mode 100644 index 00000000..55f5338b --- /dev/null +++ b/models/beacon_chain/gold/beacon_chain__fact_pending_consolidations.yml @@ -0,0 +1,19 @@ +version: 2 +models: + - name: beacon_chain__fact_pending_consolidations + description: This model details the pending consolidations that are queued in the beacon chain of Ethereum. + + columns: + - name: REQUEST_SLOT_NUMBER + description: The slot number that the queue was read at. + - name: SOURCE_INDEX + description: The source validator index for the consolidation. + - name: TARGET_INDEX + description: The target validator index for the consolidation. + - name: FACT_PENDING_CONSOLIDATIONS_ID + description: The unique identifier for a row. + - name: INSERTED_TIMESTAMP + description: The timestamp when the pending consolidation was inserted. + - name: MODIFIED_TIMESTAMP + description: The timestamp when the pending consolidation was modified. + diff --git a/models/beacon_chain/gold/beacon_chain__fact_pending_partial_withdrawals.sql b/models/beacon_chain/gold/beacon_chain__fact_pending_partial_withdrawals.sql new file mode 100644 index 00000000..cd227064 --- /dev/null +++ b/models/beacon_chain/gold/beacon_chain__fact_pending_partial_withdrawals.sql @@ -0,0 +1,20 @@ +{{ config( + materialized = 'view', + persist_docs ={ "relation": true, + "columns": true }, + meta ={ 'database_tags':{ 'table':{ 'PURPOSE': 'BEACON' } } }, + tags = ['gold','beacon'] +) }} + +SELECT + request_slot_number, + validator_index, + withdrawable_epoch, + withdrawable_epoch_timestamp, + amount / pow(10, 9) AS amount, + pending_partial_withdrawals_id as fact_pending_partial_withdrawals_id, + inserted_timestamp, + modified_timestamp +FROM + {{ ref('silver__pending_partial_withdrawals') }} + diff --git a/models/beacon_chain/gold/beacon_chain__fact_pending_partial_withdrawals.yml b/models/beacon_chain/gold/beacon_chain__fact_pending_partial_withdrawals.yml new file mode 100644 index 00000000..6ce5a16f --- /dev/null +++ b/models/beacon_chain/gold/beacon_chain__fact_pending_partial_withdrawals.yml @@ -0,0 +1,23 @@ +version: 2 +models: + - name: beacon_chain__fact_pending_partial_withdrawals + description: This model details the pending partial withdrawals that are queued in the beacon chain of Ethereum. + + columns: + - name: REQUEST_SLOT_NUMBER + description: The slot number that the queue was read at. + - name: VALIDATOR_INDEX + description: The validator index for the pending partial withdrawal. + - name: WITHDRAWABLE_EPOCH + description: The epoch when the withdrawal becomes withdrawable. + - name: WITHDRAWABLE_EPOCH_TIMESTAMP + description: The estimated timestamp when the withdrawal becomes withdrawable (calculated from epoch number using genesis timestamp). + - name: AMOUNT + description: The amount of the pending partial withdrawal, adjusted for the 9 decimal places (in ETH). + - name: FACT_PENDING_PARTIAL_WITHDRAWALS_ID + description: The unique identifier for a row. + - name: INSERTED_TIMESTAMP + description: The timestamp when the pending partial withdrawal was inserted. + - name: MODIFIED_TIMESTAMP + description: The timestamp when the pending partial withdrawal was modified. + diff --git a/models/beacon_chain/silver/silver__pending_consolidations.sql b/models/beacon_chain/silver/silver__pending_consolidations.sql new file mode 100644 index 00000000..0f3ae543 --- /dev/null +++ b/models/beacon_chain/silver/silver__pending_consolidations.sql @@ -0,0 +1,59 @@ +-- depends on: {{ ref('bronze__streamline_beacon_pending_consolidations') }} +{{ config ( + materialized = "incremental", + unique_key = "pending_consolidations_id", + cluster_by = "ROUND(request_slot_number, -3)", + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(pending_consolidations_id)", + incremental_predicates = ["dynamic_range", "request_slot_number"], + tags = ['silver','beacon'] +) }} + +SELECT + COALESCE( + VALUE :"SLOT_NUMBER" :: INT, + metadata :request :"slot_number" :: INT, + PARSE_JSON( + metadata :request :"slot_number" + ) :: INT + ) AS request_slot_number, + try_to_number(data:source_index::STRING) AS source_index, + try_to_number(data:target_index::STRING) AS target_index, + data, + {{ dbt_utils.generate_surrogate_key( + ['request_slot_number', 'source_index', 'target_index'] + ) }} AS pending_consolidations_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + _inserted_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_beacon_pending_consolidations') }} +WHERE + _inserted_timestamp >= ( + SELECT + COALESCE(MAX(_inserted_timestamp), '1970-01-01' :: TIMESTAMP) _inserted_timestamp + FROM + {{ this }}) + AND (LEFT( + DATA :error :: STRING, + 1 + ) <> 'F' + OR DATA :error IS NULL + ) + {% else %} + {{ ref('bronze__streamline_fr_beacon_pending_consolidations') }} + WHERE + (LEFT( + DATA :error :: STRING, + 1 + ) <> 'F' + OR DATA :error IS NULL + ) + {% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY pending_consolidations_id +ORDER BY + _inserted_timestamp DESC)) = 1 + diff --git a/models/beacon_chain/silver/silver__pending_consolidations.yml b/models/beacon_chain/silver/silver__pending_consolidations.yml new file mode 100644 index 00000000..45fc25f8 --- /dev/null +++ b/models/beacon_chain/silver/silver__pending_consolidations.yml @@ -0,0 +1,15 @@ +version: 2 +models: + - name: silver__pending_consolidations + columns: + - name: INSERTED_TIMESTAMP + tests: + - not_null + - dbt_expectations.expect_row_values_to_have_recent_data: + datepart: day + interval: 1 + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - TIMESTAMP_LTZ + - TIMESTAMP_NTZ + diff --git a/models/beacon_chain/silver/silver__pending_partial_withdrawals.sql b/models/beacon_chain/silver/silver__pending_partial_withdrawals.sql new file mode 100644 index 00000000..267ba8f8 --- /dev/null +++ b/models/beacon_chain/silver/silver__pending_partial_withdrawals.sql @@ -0,0 +1,65 @@ +-- depends on: {{ ref('bronze__streamline_beacon_pending_partial_withdrawals') }} +{{ config ( + materialized = "incremental", + unique_key = "pending_partial_withdrawals_id", + cluster_by = "ROUND(request_slot_number, -3)", + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(pending_partial_withdrawals_id)", + incremental_predicates = ["dynamic_range", "request_slot_number"], + tags = ['silver','beacon'] +) }} + +SELECT + COALESCE( + VALUE :"SLOT_NUMBER" :: INT, + metadata :request :"slot_number" :: INT, + PARSE_JSON( + metadata :request :"slot_number" + ) :: INT + ) AS request_slot_number, + try_to_number(data:amount::STRING) AS amount, + try_to_number(data:validator_index::STRING) AS validator_index, + try_to_number(data:withdrawable_epoch::STRING) AS withdrawable_epoch, + DATEADD( + 'seconds', + try_to_number(data:withdrawable_epoch::STRING) * 32 * 12, + '2020-12-01T12:00:23Z' :: timestamp_ntz + ) AS withdrawable_epoch_timestamp, + data, + {{ dbt_utils.generate_surrogate_key( + ['request_slot_number', 'validator_index', 'withdrawable_epoch', 'amount'] + ) }} AS pending_partial_withdrawals_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + _inserted_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_beacon_pending_partial_withdrawals') }} +WHERE + _inserted_timestamp >= ( + SELECT + COALESCE(MAX(_inserted_timestamp), '1970-01-01' :: TIMESTAMP) _inserted_timestamp + FROM + {{ this }}) + AND (LEFT( + DATA :error :: STRING, + 1 + ) <> 'F' + OR DATA :error IS NULL + ) + {% else %} + {{ ref('bronze__streamline_fr_beacon_pending_partial_withdrawals') }} + WHERE + (LEFT( + DATA :error :: STRING, + 1 + ) <> 'F' + OR DATA :error IS NULL + ) + {% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY pending_partial_withdrawals_id +ORDER BY + _inserted_timestamp DESC)) = 1 + diff --git a/models/beacon_chain/silver/silver__pending_partial_withdrawals.yml b/models/beacon_chain/silver/silver__pending_partial_withdrawals.yml new file mode 100644 index 00000000..af1a0943 --- /dev/null +++ b/models/beacon_chain/silver/silver__pending_partial_withdrawals.yml @@ -0,0 +1,15 @@ +version: 2 +models: + - name: silver__pending_partial_withdrawals + columns: + - name: INSERTED_TIMESTAMP + tests: + - not_null + - dbt_expectations.expect_row_values_to_have_recent_data: + datepart: day + interval: 1 + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - TIMESTAMP_LTZ + - TIMESTAMP_NTZ + diff --git a/models/sources.yml b/models/sources.yml index 6d6bbfaf..47ea03bf 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -46,6 +46,8 @@ sources: - name: contract_abis_v3 - name: token_reads - name: pending_deposits + - name: pending_partial_withdrawals + - name: pending_consolidations - name: balances_erc20 - name: balances_native - name: crosschain_silver diff --git a/models/streamline/bronze/beacon/bronze__streamline_beacon_pending_consolidations.sql b/models/streamline/bronze/beacon/bronze__streamline_beacon_pending_consolidations.sql new file mode 100644 index 00000000..9665a8df --- /dev/null +++ b/models/streamline/bronze/beacon/bronze__streamline_beacon_pending_consolidations.sql @@ -0,0 +1,10 @@ +{{ config ( + materialized = 'view', + tags = ['bronze_beacon_pending_consolidations'] +) }} +{{ v0_streamline_external_table_query( + model = "pending_consolidations", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)", + block_number = false +) }} + diff --git a/models/streamline/bronze/beacon/bronze__streamline_beacon_pending_partial_withdrawals.sql b/models/streamline/bronze/beacon/bronze__streamline_beacon_pending_partial_withdrawals.sql new file mode 100644 index 00000000..860995b2 --- /dev/null +++ b/models/streamline/bronze/beacon/bronze__streamline_beacon_pending_partial_withdrawals.sql @@ -0,0 +1,9 @@ +{{ config ( + materialized = 'view', + tags = ['bronze_beacon_pending_partial_withdrawals'] +) }} +{{ v0_streamline_external_table_query( + model = "pending_partial_withdrawals", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)", + block_number = false +) }} diff --git a/models/streamline/bronze/beacon/bronze__streamline_fr_beacon_pending_consolidations.sql b/models/streamline/bronze/beacon/bronze__streamline_fr_beacon_pending_consolidations.sql new file mode 100644 index 00000000..8672d024 --- /dev/null +++ b/models/streamline/bronze/beacon/bronze__streamline_fr_beacon_pending_consolidations.sql @@ -0,0 +1,16 @@ +{{ config ( + materialized = 'view', + tags = ['bronze_beacon_pending_consolidations'] +) }} + +SELECT + partition_key, + VALUE :"SLOT_NUMBER" :: INT AS slot_number, + VALUE, + DATA, + metadata, + file_name, + _inserted_timestamp +FROM + {{ ref('bronze__streamline_fr_beacon_pending_consolidations_v2') }} + diff --git a/models/streamline/bronze/beacon/bronze__streamline_fr_beacon_pending_consolidations_v2.sql b/models/streamline/bronze/beacon/bronze__streamline_fr_beacon_pending_consolidations_v2.sql new file mode 100644 index 00000000..e875df1b --- /dev/null +++ b/models/streamline/bronze/beacon/bronze__streamline_fr_beacon_pending_consolidations_v2.sql @@ -0,0 +1,10 @@ +{{ config ( + materialized = 'view', + tags = ['bronze_beacon_pending_consolidations'] +) }} +{{ v0_streamline_external_table_fr_query( + model = "pending_consolidations", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )", + block_number = false +) }} + diff --git a/models/streamline/bronze/beacon/bronze__streamline_fr_beacon_pending_partial_withdrawals.sql b/models/streamline/bronze/beacon/bronze__streamline_fr_beacon_pending_partial_withdrawals.sql new file mode 100644 index 00000000..c86e2567 --- /dev/null +++ b/models/streamline/bronze/beacon/bronze__streamline_fr_beacon_pending_partial_withdrawals.sql @@ -0,0 +1,15 @@ +{{ config ( + materialized = 'view', + tags = ['bronze_beacon_pending_partial_withdrawals'] +) }} + +SELECT + partition_key, + VALUE :"SLOT_NUMBER" :: INT AS slot_number, + VALUE, + DATA, + metadata, + file_name, + _inserted_timestamp +FROM + {{ ref('bronze__streamline_fr_beacon_pending_partial_withdrawals_v2') }} diff --git a/models/streamline/bronze/beacon/bronze__streamline_fr_beacon_pending_partial_withdrawals_v2.sql b/models/streamline/bronze/beacon/bronze__streamline_fr_beacon_pending_partial_withdrawals_v2.sql new file mode 100644 index 00000000..e9bb160c --- /dev/null +++ b/models/streamline/bronze/beacon/bronze__streamline_fr_beacon_pending_partial_withdrawals_v2.sql @@ -0,0 +1,9 @@ +{{ config ( + materialized = 'view', + tags = ['bronze_beacon_pending_partial_withdrawals'] +) }} +{{ v0_streamline_external_table_fr_query( + model = "pending_partial_withdrawals", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )", + block_number = false +) }} diff --git a/models/streamline/silver/beacon/complete/streamline__complete_beacon_pending_consolidations.sql b/models/streamline/silver/beacon/complete/streamline__complete_beacon_pending_consolidations.sql new file mode 100644 index 00000000..43f04c72 --- /dev/null +++ b/models/streamline/silver/beacon/complete/streamline__complete_beacon_pending_consolidations.sql @@ -0,0 +1,59 @@ +-- depends on: {{ ref('bronze__streamline_beacon_pending_consolidations') }} +{{ config ( + materialized = "incremental", + unique_key = "slot_number", + cluster_by = "ROUND(slot_number, -3)", + merge_update_columns = ["slot_number"], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(slot_number)", + incremental_predicates = ["dynamic_range", "slot_number"], + tags = ['streamline_beacon_complete'] +) }} + +SELECT + COALESCE( + VALUE :"SLOT_NUMBER" :: INT, + metadata :request :"slot_number" :: INT, + PARSE_JSON( + metadata :request :"slot_number" + ) :: INT + ) AS slot_number, + file_name, + --parse slot_number from metadata for FR because it's not properly accessible in VALUE column from v1 requests + {{ dbt_utils.generate_surrogate_key( + ['slot_number'] + ) }} AS complete_beacon_pending_consolidations_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + _inserted_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_beacon_pending_consolidations') }} +WHERE + _inserted_timestamp >= ( + SELECT + COALESCE(MAX(_inserted_timestamp), '1970-01-01' :: TIMESTAMP) _inserted_timestamp + FROM + {{ this }}) + AND (LEFT( + DATA :error :: STRING, + 1 + ) <> 'F' + OR DATA :error IS NULL + ) + {% else %} + {{ ref('bronze__streamline_fr_beacon_pending_consolidations') }} + WHERE + (LEFT( + DATA :error :: STRING, + 1 + ) <> 'F' + OR DATA :error IS NULL + ) + {% endif %} + + qualify(ROW_NUMBER() over (PARTITION BY slot_number + ORDER BY + _inserted_timestamp DESC)) = 1 + diff --git a/models/streamline/silver/beacon/complete/streamline__complete_beacon_pending_partial_withdrawals.sql b/models/streamline/silver/beacon/complete/streamline__complete_beacon_pending_partial_withdrawals.sql new file mode 100644 index 00000000..a842e42b --- /dev/null +++ b/models/streamline/silver/beacon/complete/streamline__complete_beacon_pending_partial_withdrawals.sql @@ -0,0 +1,58 @@ +-- depends on: {{ ref('bronze__streamline_beacon_pending_partial_withdrawals') }} +{{ config ( + materialized = "incremental", + unique_key = "slot_number", + cluster_by = "ROUND(slot_number, -3)", + merge_update_columns = ["slot_number"], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(slot_number)", + incremental_predicates = ["dynamic_range", "slot_number"], + tags = ['streamline_beacon_complete'] +) }} + +SELECT + COALESCE( + VALUE :"SLOT_NUMBER" :: INT, + metadata :request :"slot_number" :: INT, + PARSE_JSON( + metadata :request :"slot_number" + ) :: INT + ) AS slot_number, + file_name, + --parse slot_number from metadata for FR because it's not properly accessible in VALUE column from v1 requests + {{ dbt_utils.generate_surrogate_key( + ['slot_number'] + ) }} AS complete_beacon_pending_partial_withdrawals_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + _inserted_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_beacon_pending_partial_withdrawals') }} +WHERE + _inserted_timestamp >= ( + SELECT + COALESCE(MAX(_inserted_timestamp), '1970-01-01' :: TIMESTAMP) _inserted_timestamp + FROM + {{ this }}) + AND (LEFT( + DATA :error :: STRING, + 1 + ) <> 'F' + OR DATA :error IS NULL + ) + {% else %} + {{ ref('bronze__streamline_fr_beacon_pending_partial_withdrawals') }} + WHERE + (LEFT( + DATA :error :: STRING, + 1 + ) <> 'F' + OR DATA :error IS NULL + ) + {% endif %} + + qualify(ROW_NUMBER() over (PARTITION BY slot_number + ORDER BY + _inserted_timestamp DESC)) = 1 \ No newline at end of file diff --git a/models/streamline/silver/beacon/history/streamline__beacon_pending_consolidations_history.sql b/models/streamline/silver/beacon/history/streamline__beacon_pending_consolidations_history.sql new file mode 100644 index 00000000..094ec838 --- /dev/null +++ b/models/streamline/silver/beacon/history/streamline__beacon_pending_consolidations_history.sql @@ -0,0 +1,65 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :"pending_consolidations", + "sql_limit" :"1000000", + "producer_batch_size" :"1000", + "worker_batch_size" :"100", + "async_concurrent_requests" :"10", + "sql_source" :"{{this.identifier}}", + "exploded_key": tojson(["data"]) } + ), + tags = ['streamline_beacon_history'] +) }} + +WITH to_do AS ( + SELECT + slot_number + FROM + {{ ref("streamline__beacon_blocks") }} + WHERE + slot_number <= ( + SELECT + MIN(slot_number) + FROM + {{ ref("beacon_chain__fact_blocks") }} + WHERE + slot_timestamp >= DATEADD(hour, -12, SYSDATE()) + ) + and slot_number >= 11649025 + EXCEPT + SELECT + slot_number + FROM + {{ ref("streamline__complete_beacon_pending_consolidations") }} + WHERE + slot_number <= ( + SELECT + MIN(slot_number) + FROM + {{ ref("beacon_chain__fact_blocks") }} + WHERE + slot_timestamp >= DATEADD(hour, -12, SYSDATE()) + ) +) +SELECT + slot_number, + ROUND(slot_number, -3) AS partition_key, + {{ target.database }}.live.udf_api( + 'GET', + '{service}/{Authentication}/eth/v1/beacon/states/' || slot_number || '/pending_consolidations', + OBJECT_CONSTRUCT( + 'accept', 'application/json', + 'fsc-quantum-state', 'streamline' + ), + {}, + 'vault/prod/ethereum/quicknode/mainnet' + ) AS request +FROM + to_do +ORDER BY + slot_number DESC +LIMIT 1000000 + diff --git a/models/streamline/silver/beacon/history/streamline__beacon_pending_partial_withdrawals_history.sql b/models/streamline/silver/beacon/history/streamline__beacon_pending_partial_withdrawals_history.sql new file mode 100644 index 00000000..1bbbe24d --- /dev/null +++ b/models/streamline/silver/beacon/history/streamline__beacon_pending_partial_withdrawals_history.sql @@ -0,0 +1,65 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :"pending_partial_withdrawals", + "sql_limit" :"1000000", + "producer_batch_size" :"1000", + "worker_batch_size" :"100", + "async_concurrent_requests" :"10", + "sql_source" :"{{this.identifier}}", + "exploded_key": tojson(["data"]) } + ), + tags = ['streamline_beacon_history'] +) }} + +WITH to_do AS ( + SELECT + slot_number + FROM + {{ ref("streamline__beacon_blocks") }} + WHERE + slot_number <= ( + SELECT + MIN(slot_number) + FROM + {{ ref("beacon_chain__fact_blocks") }} + WHERE + slot_timestamp >= DATEADD(hour, -12, SYSDATE()) + ) + and slot_number >= 11649025 + EXCEPT + SELECT + slot_number + FROM + {{ ref("streamline__complete_beacon_pending_partial_withdrawals") }} + WHERE + slot_number <= ( + SELECT + MIN(slot_number) + FROM + {{ ref("beacon_chain__fact_blocks") }} + WHERE + slot_timestamp >= DATEADD(hour, -12, SYSDATE()) + ) +) +SELECT + slot_number, + ROUND(slot_number, -3) AS partition_key, + {{ target.database }}.live.udf_api( + 'GET', + '{service}/{Authentication}/eth/v1/beacon/states/' || slot_number || '/pending_partial_withdrawals', + OBJECT_CONSTRUCT( + 'accept', 'application/json', + 'fsc-quantum-state', 'streamline' + ), + {}, + 'vault/prod/ethereum/quicknode/mainnet' + ) AS request +FROM + to_do +ORDER BY + slot_number DESC +LIMIT 1000000 + diff --git a/models/streamline/silver/beacon/realtime/streamline__beacon_pending_consolidations_realtime.sql b/models/streamline/silver/beacon/realtime/streamline__beacon_pending_consolidations_realtime.sql new file mode 100644 index 00000000..32fe6e29 --- /dev/null +++ b/models/streamline/silver/beacon/realtime/streamline__beacon_pending_consolidations_realtime.sql @@ -0,0 +1,63 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :"pending_consolidations", + "sql_limit" :"5000", + "producer_batch_size" :"2480", + "worker_batch_size" :"1240", + "async_concurrent_requests" :"10", + "sql_source" :"{{this.identifier}}", + "exploded_key": tojson(["data"]) } + ), + tags = ['streamline_beacon_realtime'] +) }} + +WITH to_do AS ( + SELECT + slot_number + FROM + {{ ref("streamline__beacon_blocks") }} + WHERE + slot_number > ( + SELECT + MIN(slot_number) + FROM + {{ ref("beacon_chain__fact_blocks") }} + WHERE + slot_timestamp >= DATEADD(day, -3, SYSDATE()) + ) + and slot_number >= 11649025 + EXCEPT + SELECT + slot_number + FROM + {{ ref("streamline__complete_beacon_pending_consolidations") }} + WHERE + slot_number > ( + SELECT + MIN(slot_number) + FROM + {{ ref("beacon_chain__fact_blocks") }} + WHERE + slot_timestamp >= DATEADD(day, -3, SYSDATE()) + ) +) +SELECT + slot_number, + ROUND(slot_number, -3) AS partition_key, + {{ target.database }}.live.udf_api( + 'GET', + '{service}/{Authentication}/eth/v1/beacon/states/' || slot_number || '/pending_consolidations', + OBJECT_CONSTRUCT( + 'accept', 'application/json', + 'fsc-quantum-state', 'streamline' + ), + {}, + 'vault/prod/ethereum/quicknode/mainnet' + ) AS request +FROM + to_do +ORDER BY + slot_number DESC \ No newline at end of file diff --git a/models/streamline/silver/beacon/realtime/streamline__beacon_pending_partial_withdrawals_realtime.sql b/models/streamline/silver/beacon/realtime/streamline__beacon_pending_partial_withdrawals_realtime.sql new file mode 100644 index 00000000..5a5e094a --- /dev/null +++ b/models/streamline/silver/beacon/realtime/streamline__beacon_pending_partial_withdrawals_realtime.sql @@ -0,0 +1,63 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :"pending_partial_withdrawals", + "sql_limit" :"5000", + "producer_batch_size" :"2480", + "worker_batch_size" :"1240", + "async_concurrent_requests" :"10", + "sql_source" :"{{this.identifier}}", + "exploded_key": tojson(["data"]) } + ), + tags = ['streamline_beacon_realtime'] +) }} + +WITH to_do AS ( + SELECT + slot_number + FROM + {{ ref("streamline__beacon_blocks") }} + WHERE + slot_number > ( + SELECT + MIN(slot_number) + FROM + {{ ref("beacon_chain__fact_blocks") }} + WHERE + slot_timestamp >= DATEADD(day, -3, SYSDATE()) + ) + and slot_number >= 11649025 + EXCEPT + SELECT + slot_number + FROM + {{ ref("streamline__complete_beacon_pending_partial_withdrawals") }} + WHERE + slot_number > ( + SELECT + MIN(slot_number) + FROM + {{ ref("beacon_chain__fact_blocks") }} + WHERE + slot_timestamp >= DATEADD(day, -3, SYSDATE()) + ) +) +SELECT + slot_number, + ROUND(slot_number, -3) AS partition_key, + {{ target.database }}.live.udf_api( + 'GET', + '{service}/{Authentication}/eth/v1/beacon/states/' || slot_number || '/pending_partial_withdrawals', + OBJECT_CONSTRUCT( + 'accept', 'application/json', + 'fsc-quantum-state', 'streamline' + ), + {}, + 'vault/prod/ethereum/quicknode/mainnet' + ) AS request +FROM + to_do +ORDER BY + slot_number DESC \ No newline at end of file