diff --git a/models/beacon_chain/silver/silver__beacon_validators.yml b/models/beacon_chain/silver/silver__beacon_validators.yml index f266f7e4..135a7bfe 100644 --- a/models/beacon_chain/silver/silver__beacon_validators.yml +++ b/models/beacon_chain/silver/silver__beacon_validators.yml @@ -16,7 +16,6 @@ models: - FLOAT - name: STATE_ID tests: - - not_null - dbt_expectations.expect_column_values_to_be_in_type_list: column_type_list: - STRING diff --git a/models/sources.yml b/models/sources.yml index aa2d4c2b..2ccb2a32 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -45,6 +45,7 @@ sources: - name: confirm_blocks_v3 - name: contract_abis_v3 - name: token_reads + - name: pending_deposits - name: crosschain_silver database: >- {{ 'CROSSCHAIN_DEV' if '_DEV' in target.database.upper() else 'CROSSCHAIN' }} diff --git a/models/streamline/bronze/beacon/bronze__streamline_beacon_pending_deposits.sql b/models/streamline/bronze/beacon/bronze__streamline_beacon_pending_deposits.sql new file mode 100644 index 00000000..4f012961 --- /dev/null +++ b/models/streamline/bronze/beacon/bronze__streamline_beacon_pending_deposits.sql @@ -0,0 +1,9 @@ +{{ config ( + materialized = 'view', + tags = ['bronze_beacon_pending_deposits'] +) }} +{{ v0_streamline_external_table_query( + model = "pending_deposits", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)", + block_number = false +) }} diff --git a/models/streamline/bronze/beacon/bronze__streamline_fr_beacon_pending_deposits.sql b/models/streamline/bronze/beacon/bronze__streamline_fr_beacon_pending_deposits.sql new file mode 100644 index 00000000..6b82ee14 --- /dev/null +++ b/models/streamline/bronze/beacon/bronze__streamline_fr_beacon_pending_deposits.sql @@ -0,0 +1,15 @@ +{{ config ( + materialized = 'view', + tags = ['bronze_beacon_pending_deposits'] +) }} + +SELECT + partition_key, + VALUE :"SLOT_NUMBER" :: INT AS slot_number, + VALUE, + DATA, + metadata, + file_name, + _inserted_timestamp +FROM + {{ ref('bronze__streamline_fr_beacon_pending_deposits_v2') }} diff --git a/models/streamline/bronze/beacon/bronze__streamline_fr_beacon_pending_deposits_v2.sql b/models/streamline/bronze/beacon/bronze__streamline_fr_beacon_pending_deposits_v2.sql new file mode 100644 index 00000000..a0c3ed45 --- /dev/null +++ b/models/streamline/bronze/beacon/bronze__streamline_fr_beacon_pending_deposits_v2.sql @@ -0,0 +1,9 @@ +{{ config ( + materialized = 'view', + tags = ['bronze_beacon_pending_deposits'] +) }} +{{ v0_streamline_external_table_fr_query( + model = "pending_deposits", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )", + block_number = false +) }} diff --git a/models/streamline/silver/beacon/complete/streamline__complete_beacon_pending_deposits.sql b/models/streamline/silver/beacon/complete/streamline__complete_beacon_pending_deposits.sql new file mode 100644 index 00000000..518104de --- /dev/null +++ b/models/streamline/silver/beacon/complete/streamline__complete_beacon_pending_deposits.sql @@ -0,0 +1,58 @@ +-- depends on: {{ ref('bronze__streamline_beacon_pending_deposits') }} +{{ config ( + materialized = "incremental", + unique_key = "slot_number", + cluster_by = "ROUND(slot_number, -3)", + merge_update_columns = ["slot_number"], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(slot_number)", + incremental_predicates = ["dynamic_range", "slot_number"], + tags = ['streamline_beacon_complete'] +) }} + +SELECT + COALESCE( + VALUE :"SLOT_NUMBER" :: INT, + metadata :request :"slot_number" :: INT, + PARSE_JSON( + metadata :request :"slot_number" + ) :: INT + ) AS slot_number, + file_name, + --parse slot_number from metadata for FR because it's not properly accessible in VALUE column from v1 requests + {{ dbt_utils.generate_surrogate_key( + ['slot_number'] + ) }} AS complete_beacon_pending_deposits_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + _inserted_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_beacon_pending_deposits') }} +WHERE + _inserted_timestamp >= ( + SELECT + COALESCE(MAX(_inserted_timestamp), '1970-01-01' :: TIMESTAMP) _inserted_timestamp + FROM + {{ this }}) + AND (LEFT( + DATA :error :: STRING, + 1 + ) <> 'F' + OR DATA :error IS NULL + ) + {% else %} + {{ ref('bronze__streamline_fr_beacon_pending_deposits') }} + WHERE + (LEFT( + DATA :error :: STRING, + 1 + ) <> 'F' + OR DATA :error IS NULL + ) + {% endif %} + + qualify(ROW_NUMBER() over (PARTITION BY slot_number + ORDER BY + _inserted_timestamp DESC)) = 1 \ No newline at end of file diff --git a/models/streamline/silver/beacon/history/streamline__beacon_pending_deposits_history.sql b/models/streamline/silver/beacon/history/streamline__beacon_pending_deposits_history.sql new file mode 100644 index 00000000..658a8ddc --- /dev/null +++ b/models/streamline/silver/beacon/history/streamline__beacon_pending_deposits_history.sql @@ -0,0 +1,64 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :"pending_deposits", + "sql_limit" :"1000000", + "producer_batch_size" :"1000", + "worker_batch_size" :"100", + "async_concurrent_requests" :"10", + "sql_source" :"{{this.identifier}}", + "exploded_key": tojson(["data"]) } + ), + tags = ['streamline_beacon_history'] +) }} + +WITH to_do AS ( + SELECT + slot_number + FROM + {{ ref("streamline__beacon_blocks") }} + WHERE + slot_number <= ( + SELECT + MIN(slot_number) + FROM + {{ ref("beacon_chain__fact_blocks") }} + WHERE + slot_timestamp >= DATEADD(hour, -12, SYSDATE()) + ) + and slot_number >= 11649025 + EXCEPT + SELECT + slot_number + FROM + {{ ref("streamline__complete_beacon_pending_deposits") }} + WHERE + slot_number <= ( + SELECT + MIN(slot_number) + FROM + {{ ref("beacon_chain__fact_blocks") }} + WHERE + slot_timestamp >= DATEADD(hour, -12, SYSDATE()) + ) +) +SELECT + slot_number, + ROUND(slot_number, -3) AS partition_key, + {{ target.database }}.live.udf_api( + 'GET', + '{service}/{Authentication}/eth/v1/beacon/states/' || slot_number || '/pending_deposits', + OBJECT_CONSTRUCT( + 'accept', 'application/json', + 'fsc-quantum-state', 'streamline' + ), + {}, + 'vault/prod/ethereum/quicknode/mainnet' + ) AS request +FROM + to_do +ORDER BY + slot_number DESC +LIMIT 1000000 \ No newline at end of file diff --git a/models/streamline/silver/beacon/realtime/streamline__beacon_pending_deposits_realtime.sql b/models/streamline/silver/beacon/realtime/streamline__beacon_pending_deposits_realtime.sql new file mode 100644 index 00000000..11c737d3 --- /dev/null +++ b/models/streamline/silver/beacon/realtime/streamline__beacon_pending_deposits_realtime.sql @@ -0,0 +1,63 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :"pending_deposits", + "sql_limit" :"5000", + "producer_batch_size" :"2480", + "worker_batch_size" :"1240", + "async_concurrent_requests" :"10", + "sql_source" :"{{this.identifier}}", + "exploded_key": tojson(["data"]) } + ), + tags = ['streamline_beacon_realtime'] +) }} + +WITH to_do AS ( + SELECT + slot_number + FROM + {{ ref("streamline__beacon_blocks") }} + WHERE + slot_number > ( + SELECT + MIN(slot_number) + FROM + {{ ref("beacon_chain__fact_blocks") }} + WHERE + slot_timestamp >= DATEADD(day, -3, SYSDATE()) + ) + and slot_number >= 11649025 + EXCEPT + SELECT + slot_number + FROM + {{ ref("streamline__complete_beacon_pending_deposits") }} + WHERE + slot_number > ( + SELECT + MIN(slot_number) + FROM + {{ ref("beacon_chain__fact_blocks") }} + WHERE + slot_timestamp >= DATEADD(day, -3, SYSDATE()) + ) +) +SELECT + slot_number, + ROUND(slot_number, -3) AS partition_key, + {{ target.database }}.live.udf_api( + 'GET', + '{service}/{Authentication}/eth/v1/beacon/states/' || slot_number || '/pending_deposits', + OBJECT_CONSTRUCT( + 'accept', 'application/json', + 'fsc-quantum-state', 'streamline' + ), + {}, + 'vault/prod/ethereum/quicknode/mainnet' + ) AS request +FROM + to_do +ORDER BY + slot_number DESC \ No newline at end of file