diff --git a/.github/workflows/dbt_run_streamline_block_txs_realtime_2.yml b/.github/workflows/dbt_run_streamline_block_txs_realtime_2.yml new file mode 100644 index 0000000..2aa0950 --- /dev/null +++ b/.github/workflows/dbt_run_streamline_block_txs_realtime_2.yml @@ -0,0 +1,44 @@ +name: dbt_run_streamline_block_txs_realtime_2 +run-name: dbt_run_streamline_block_txs_realtime_2 + +on: + workflow_dispatch: + branches: + - "main" + +env: + DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}" + + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_prod + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v4 + with: + python-version: "${{ vars.PYTHON_VERSION }}" + cache: "pip" + + - name: install dependencies + run: | + pip install -r requirements.txt + dbt deps + - name: Run DBT Jobs + run: | + dbt run -s streamline__block_txs_complete_2 streamline__block_txs_realtime_2 --vars '{STREAMLINE_INVOKE_STREAMS: True}' \ No newline at end of file diff --git a/data/github_actions__workflows.csv b/data/github_actions__workflows.csv index 5e8bde3..5ae811d 100644 --- a/data/github_actions__workflows.csv +++ b/data/github_actions__workflows.csv @@ -1,5 +1,6 @@ workflow_name,workflow_schedule dbt_run_streamline_blocks_realtime,"2-59/5 * * * *" dbt_run_streamline_block_txs_realtime,"14-59/15 * * * *" +dbt_run_streamline_block_txs_realtime_2,"09-54/15 * * * *" dbt_run_incremental_core,"19,49 * * * *" dbt_run_incremental_non_core,"4 */3 * * *" diff --git a/macros/streamline/models.sql b/macros/streamline/models.sql index 4cdcbb4..1379395 100644 --- a/macros/streamline/models.sql +++ b/macros/streamline/models.sql @@ -19,7 +19,9 @@ WITH meta AS ( ) SELECT {{ unique_key }}, + {% if other_cols is not none and other_cols != "" %} {{ other_cols }}, + {% endif %} DATA, _inserted_timestamp, s.{{ partition_name }}, @@ -64,7 +66,9 @@ WITH meta AS ( ) SELECT {{ unique_key }}, + {% if other_cols is not none and other_cols != "" %} {{ other_cols }}, + {% endif %} DATA, _inserted_timestamp, s.{{ partition_name }}, diff --git a/models/bronze/streamline/core/bronze__FR_transactions_2.sql b/models/bronze/streamline/core/bronze__FR_transactions_2.sql new file mode 100644 index 0000000..bb58d8f --- /dev/null +++ b/models/bronze/streamline/core/bronze__FR_transactions_2.sql @@ -0,0 +1,12 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = "block_txs_2" %} +{{ streamline_external_table_FR_query_v2( + model, + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)", + partition_name = "partition_key", + unique_key = "block_id", + other_cols="" +) }} \ No newline at end of file diff --git a/models/bronze/streamline/core/bronze__transactions_2.sql b/models/bronze/streamline/core/bronze__transactions_2.sql new file mode 100644 index 0000000..a50a15c --- /dev/null +++ b/models/bronze/streamline/core/bronze__transactions_2.sql @@ -0,0 +1,12 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = "block_txs_2" %} +{{ streamline_external_table_query_v2( + model, + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)", + partition_name = "partition_key", + unique_key = "block_id", + other_cols="" +) }} \ No newline at end of file diff --git a/models/sources.yml b/models/sources.yml index 741a7ad..f1904b8 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -7,6 +7,7 @@ sources: tables: - name: blocks - name: block_txs + - name: block_txs_2 - name: crosschain database: "{{ 'crosschain' if target.database == 'ECLIPSE' else 'crosschain_dev' }}" schema: core diff --git a/models/streamline/core/complete/streamline__block_txs_complete_2.sql b/models/streamline/core/complete/streamline__block_txs_complete_2.sql new file mode 100644 index 0000000..c185d30 --- /dev/null +++ b/models/streamline/core/complete/streamline__block_txs_complete_2.sql @@ -0,0 +1,42 @@ +-- depends_on: {{ ref('bronze__transactions_2') }} +-- depends_on: {{ ref('bronze__FR_transactions_2') }} + + +{{ config ( + materialized = "incremental", + unique_key = 'block_id', + merge_exclude_columns = ["inserted_timestamp"], + cluster_by = "ROUND(block_id, -4)", +) }} + +SELECT + block_id, + partition_key, + _inserted_timestamp, + sysdate() AS inserted_timestamp, + sysdate() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id, +FROM +{% if is_incremental() %} + {{ ref('bronze__transactions_2') }} +WHERE + _inserted_timestamp >= ( + SELECT + COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP + FROM + {{ this }} + ) + AND partition_key >= ( + SELECT + COALESCE( + MAX(partition_key), + 0 + ) + FROM + {{ this }} + ) +{% else %} + {{ ref('bronze__FR_transactions_2') }} +{% endif %} +QUALIFY + row_number() OVER (PARTITION BY block_id ORDER BY _inserted_timestamp DESC) = 1 \ No newline at end of file diff --git a/models/streamline/core/realtime/streamline__block_txs_realtime_2.sql b/models/streamline/core/realtime/streamline__block_txs_realtime_2.sql new file mode 100644 index 0000000..7e18e54 --- /dev/null +++ b/models/streamline/core/realtime/streamline__block_txs_realtime_2.sql @@ -0,0 +1,89 @@ +-- depends_on: {{ ref('streamline__node_min_block_available') }} +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :"block_txs_2", + "sql_limit" :"20000", + "producer_batch_size" :"20000", + "worker_batch_size" :"500", + "async_concurrent_requests": "10", + "exploded_key": tojson(["result.transactions"]), + "include_top_level_json": tojson(["result.blockTime"]), + "sql_source" :"{{this.identifier}}", + "order_by_column": "block_id" } + ) +) }} + +{% if execute %} + {% set min_block_query %} + + SELECT + MIN(block_id) + FROM + {{ ref('streamline__node_min_block_available') }} + + {% endset %} + {% set min_block_id = run_query(min_block_query) [0] [0] %} + {% endif %} + +WITH blocks AS ( + SELECT + block_id + FROM + {{ ref("streamline__blocks") }} + WHERE + block_id >= 52500858 + EXCEPT + SELECT + block_id + FROM + {{ ref('streamline__block_txs_complete') }} + WHERE + block_id <= 52500858 + EXCEPT + SELECT + block_id + FROM + {{ ref('streamline__block_txs_complete_2') }} +) +SELECT + block_id, + ROUND( + block_id, + -4 + ) :: INT AS partition_key, + {{ target.database }}.live.udf_api( + 'POST', + '{Service}', + OBJECT_CONSTRUCT( + 'Content-Type', + 'application/json' + ), + OBJECT_CONSTRUCT( + 'id', + block_id, + 'jsonrpc', + '2.0', + 'method', + 'getBlock', + 'params', + ARRAY_CONSTRUCT( + block_id, + OBJECT_CONSTRUCT( + 'encoding', + 'jsonParsed', + 'rewards', + FALSE, + 'transactionDetails', + 'full', + 'maxSupportedTransactionVersion', + 0 + ) + ) + ), + 'Vault/prod/eclipse/mainnet' + ) AS request +FROM + blocks