From 77dfdc5e6a000dea440263a966ad1d99bc797502 Mon Sep 17 00:00:00 2001 From: desmond-hui <97470747+desmond-hui@users.noreply.github.com> Date: Wed, 27 Sep 2023 08:15:12 -0700 Subject: [PATCH] An 3858/change historical workflow (#351) * use perm data models for streamline work * add process to clean the historical queue * modify parser historical workflow * fix query * make this insert-only, merge is very costly * add clean queue job * fix name on build queue workflow --- .github/workflows/dbt_run_idls_history.yml | 2 +- .../dbt_run_parser_program_historical.yml | 2 +- ..._parser_program_historical_build_queue.yml | 43 ++++++++++++ ..._parser_program_historical_clean_queue.yml | 43 ++++++++++++ ..._clean_program_parser_historical_queue.sql | 7 ++ ..._clean_program_parser_historical_queue.sql | 59 +++++++++++++++++ ...ne__all_undecoded_instructions_history.sql | 20 ++++++ ...coded_instructions_history_in_progress.sql | 12 ++++ ...l_undecoded_instructions_history_queue.sql | 66 +++++++++++++++++++ .../streamline__complete_decoded_history.sql | 36 ++++++++++ .../streamline__idls_history.sql | 0 ...ne__all_undecoded_instructions_history.sql | 25 ------- .../streamline__idls_history_pointer.sql | 46 ------------- 13 files changed, 288 insertions(+), 73 deletions(-) create mode 100644 .github/workflows/dbt_run_parser_program_historical_build_queue.yml create mode 100644 .github/workflows/dbt_run_parser_program_historical_clean_queue.yml create mode 100644 macros/streamline/bulk_program_parser/run_sp_clean_program_parser_historical_queue.sql create mode 100644 macros/streamline/bulk_program_parser/sp_create_clean_program_parser_historical_queue.sql create mode 100644 models/streamline/parser/history/streamline__all_undecoded_instructions_history.sql create mode 100644 models/streamline/parser/history/streamline__all_undecoded_instructions_history_in_progress.sql create mode 100644 models/streamline/parser/history/streamline__all_undecoded_instructions_history_queue.sql create mode 100644 models/streamline/parser/history/streamline__complete_decoded_history.sql rename models/streamline/parser/{ => history}/streamline__idls_history.sql (100%) delete mode 100644 models/streamline/parser/streamline__all_undecoded_instructions_history.sql delete mode 100644 models/streamline/parser/streamline__idls_history_pointer.sql diff --git a/.github/workflows/dbt_run_idls_history.yml b/.github/workflows/dbt_run_idls_history.yml index 6e28ce8f..446d0bde 100644 --- a/.github/workflows/dbt_run_idls_history.yml +++ b/.github/workflows/dbt_run_idls_history.yml @@ -40,4 +40,4 @@ jobs: dbt deps - name: Run DBT Jobs run: | - dbt run -s models/streamline/parser/streamline__idls_history.sql \ No newline at end of file + dbt run -s models/streamline/parser/streamline__idls_history.sql models/streamline/parser/history/streamline__complete_decoded_history.sql \ No newline at end of file diff --git a/.github/workflows/dbt_run_parser_program_historical.yml b/.github/workflows/dbt_run_parser_program_historical.yml index 9a95ad77..389dfa42 100644 --- a/.github/workflows/dbt_run_parser_program_historical.yml +++ b/.github/workflows/dbt_run_parser_program_historical.yml @@ -40,5 +40,5 @@ jobs: dbt deps - name: Run DBT Jobs run: | - dbt run -s models/streamline/parser/streamline__complete_decoded_instructions.sql models/streamline/parser/streamline__idls_history_pointer.sql + dbt run -s models/streamline/parser/streamline__complete_decoded_instructions.sql dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m ./models/streamline/parser/streamline__all_undecoded_instructions_history.sql \ No newline at end of file diff --git a/.github/workflows/dbt_run_parser_program_historical_build_queue.yml b/.github/workflows/dbt_run_parser_program_historical_build_queue.yml new file mode 100644 index 00000000..fc783260 --- /dev/null +++ b/.github/workflows/dbt_run_parser_program_historical_build_queue.yml @@ -0,0 +1,43 @@ +name: dbt_run_parser_program_historical_build_queue +run-name: dbt_run_parser_program_historical_build_queue + +on: + schedule: + # Runs every SUNDAY at 22:22 (see https://crontab.guru) + - cron: '22 2 * * SUN' + +env: + DBT_PROFILES_DIR: "${{ secrets.DBT_PROFILES_DIR }}" + + ACCOUNT: "${{ secrets.ACCOUNT }}" + ROLE: "${{ secrets.ROLE }}" + USER: "${{ secrets.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ secrets.REGION }}" + DATABASE: "${{ secrets.DATABASE }}" + WAREHOUSE: "${{ secrets.WAREHOUSE }}" + SCHEMA: "${{ secrets.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_prod + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v1 + with: + python-version: "3.7.x" + + - name: install dependencies + run: | + pip3 install dbt-snowflake==${{ secrets.DBT_VERSION }} cli_passthrough requests click + dbt deps + - name: Run DBT Jobs + run: | + dbt run -s models/streamline/parser/history/streamline__all_undecoded_instructions_history_queue.sql \ No newline at end of file diff --git a/.github/workflows/dbt_run_parser_program_historical_clean_queue.yml b/.github/workflows/dbt_run_parser_program_historical_clean_queue.yml new file mode 100644 index 00000000..2b83b5ef --- /dev/null +++ b/.github/workflows/dbt_run_parser_program_historical_clean_queue.yml @@ -0,0 +1,43 @@ +name: dbt_run_parser_program_historical_clean_queue +run-name: dbt_run_parser_program_historical_clean_queue + +on: + schedule: + # Runs daily at 03:17 (see https://crontab.guru) + - cron: '17 3 * * *' + +env: + DBT_PROFILES_DIR: "${{ secrets.DBT_PROFILES_DIR }}" + + ACCOUNT: "${{ secrets.ACCOUNT }}" + ROLE: "${{ secrets.ROLE }}" + USER: "${{ secrets.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ secrets.REGION }}" + DATABASE: "${{ secrets.DATABASE }}" + WAREHOUSE: "${{ secrets.WAREHOUSE }}" + SCHEMA: "${{ secrets.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_prod + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v1 + with: + python-version: "3.7.x" + + - name: install dependencies + run: | + pip3 install dbt-snowflake==${{ secrets.DBT_VERSION }} cli_passthrough requests click + dbt deps + - name: Run DBT Jobs + run: | + dbt run-operation run_sp_clean_program_parser_historical_queue \ No newline at end of file diff --git a/macros/streamline/bulk_program_parser/run_sp_clean_program_parser_historical_queue.sql b/macros/streamline/bulk_program_parser/run_sp_clean_program_parser_historical_queue.sql new file mode 100644 index 00000000..70d94c3f --- /dev/null +++ b/macros/streamline/bulk_program_parser/run_sp_clean_program_parser_historical_queue.sql @@ -0,0 +1,7 @@ +{% macro run_sp_clean_program_parser_historical_queue() %} +{% set sql %} +call streamline.sp_clean_program_parser_historical_queue(); +{% endset %} + +{% do run_query(sql) %} +{% endmacro %} \ No newline at end of file diff --git a/macros/streamline/bulk_program_parser/sp_create_clean_program_parser_historical_queue.sql b/macros/streamline/bulk_program_parser/sp_create_clean_program_parser_historical_queue.sql new file mode 100644 index 00000000..43ffc0c8 --- /dev/null +++ b/macros/streamline/bulk_program_parser/sp_create_clean_program_parser_historical_queue.sql @@ -0,0 +1,59 @@ +{% macro sp_create_clean_program_parser_historical_queue() %} + {% if var("UPDATE_UDFS_AND_SPS") %} + {% set sql %} + CREATE OR REPLACE PROCEDURE streamline.sp_clean_program_parser_historical_queue() + RETURNS BOOLEAN + LANGUAGE SQL + AS + $$ + DECLARE + RESULT VARCHAR; + query_id VARCHAR; + BEGIN + /* review progress of all items that have been worked on before the most recent hour of activity */ + SELECT + INDEX, + program_id, + instruction, + tx_id, + h.block_id, + c.id as id + from streamline.all_undecoded_instructions_history_in_progress h + left outer join streamline.complete_decoded_instructions c + on c.block_id = h.block_id + and c.id = concat_ws( + '-', + h.block_id, + tx_id, + program_id, + index + ) + where h._inserted_timestamp <= current_timestamp - INTERVAL '1 HOURS'; + + query_id := SQLID; + + /* insert items not completed back into queue */ + INSERT INTO streamline.all_undecoded_instructions_history_queue (index, program_id, instruction, tx_id, block_id) + SELECT + INDEX, + program_id, + instruction, + tx_id, + block_id + FROM TABLE(RESULT_SCAN(:query_id)) + WHERE id is null; + + /* remove all in_progress items that have been reviewed */ + DELETE FROM streamline.all_undecoded_instructions_history_in_progress s + USING (SELECT * FROM TABLE(RESULT_SCAN(:query_id))) d + WHERE s.block_id = d.block_id + AND s.tx_id = d.tx_id + AND s.index = d.index + AND s.program_id = d.program_id; + + RETURN TRUE; + END; + $${% endset %} + {% do run_query(sql) %} + {% endif %} +{% endmacro %} \ No newline at end of file diff --git a/models/streamline/parser/history/streamline__all_undecoded_instructions_history.sql b/models/streamline/parser/history/streamline__all_undecoded_instructions_history.sql new file mode 100644 index 00000000..90bb4e0e --- /dev/null +++ b/models/streamline/parser/history/streamline__all_undecoded_instructions_history.sql @@ -0,0 +1,20 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_bulk_program_parser(object_construct('realtime', 'False'))", + target = "{{this.schema}}.{{this.identifier}}" + ) +) }} + +with m as ( + select max(block_id) as max_block_id + from {{ ref('streamline__all_undecoded_instructions_history_queue') }} h +) +select + INDEX, + program_id, + instruction, + tx_id, + h.block_id +from {{ ref('streamline__all_undecoded_instructions_history_queue') }} h +where h.block_id between (select max_block_id-8000000 from m) and (select max_block_id from m) \ No newline at end of file diff --git a/models/streamline/parser/history/streamline__all_undecoded_instructions_history_in_progress.sql b/models/streamline/parser/history/streamline__all_undecoded_instructions_history_in_progress.sql new file mode 100644 index 00000000..b70fc10f --- /dev/null +++ b/models/streamline/parser/history/streamline__all_undecoded_instructions_history_in_progress.sql @@ -0,0 +1,12 @@ +{{ config ( + materialized = 'incremental', + full_refresh = false +) }} + +SELECT + *, + sysdate() as _inserted_timestamp +FROM + {{ ref('streamline__all_undecoded_instructions_history_queue') }} +LIMIT + 0 diff --git a/models/streamline/parser/history/streamline__all_undecoded_instructions_history_queue.sql b/models/streamline/parser/history/streamline__all_undecoded_instructions_history_queue.sql new file mode 100644 index 00000000..166f6c58 --- /dev/null +++ b/models/streamline/parser/history/streamline__all_undecoded_instructions_history_queue.sql @@ -0,0 +1,66 @@ +{{ config ( + materialized = 'incremental', +) }} + +WITH base AS ( + + SELECT + SPLIT_PART( + id, + '-', + 3 + ) :: STRING AS program_id, + MIN(block_id) AS min_decoded_block + FROM + {{ ref('streamline__complete_decoded_instructions') }} + GROUP BY + 1 +), +program_last_processed AS ( + SELECT + b.*, + bl.block_timestamp :: DATE AS min_decoded_block_timestamp_date + FROM + base b + JOIN {{ ref('silver__blocks') }} + bl + ON bl.block_id = b.min_decoded_block +), +pre_final as ( + SELECT + h.program_id, + COALESCE( + p.min_decoded_block_timestamp_date, + h.default_backfill_start_block_timestamp + ) :: DATE AS min_decoded_block_timestamp_date, + COALESCE( + p.min_decoded_block, + h.default_backfill_start_block_id + ) AS min_decoded_block_id, + first_block_id + FROM + {{ ref('streamline__idls_history') }} + h + LEFT JOIN program_last_processed p + ON p.program_id = h.program_id + WHERE + min_decoded_block_id > first_block_id +) +select + e.block_id, + e.block_timestamp, + e.tx_id, + e.index, + e.program_id, + e.instruction +from {{ ref('silver__events') }} e +join pre_final pf on + e.program_id = pf.program_id + and e.block_timestamp::date <= pf.min_decoded_block_timestamp_date + and e.block_id >= pf.first_block_id +where + pf.program_id not in (select distinct(program_id) from {{ ref('streamline__complete_decoded_history') }}) +{% if is_incremental() %} +and + pf.program_id not in (select distinct(program_id) from {{ this }}) +{% endif %} \ No newline at end of file diff --git a/models/streamline/parser/history/streamline__complete_decoded_history.sql b/models/streamline/parser/history/streamline__complete_decoded_history.sql new file mode 100644 index 00000000..ffde98f6 --- /dev/null +++ b/models/streamline/parser/history/streamline__complete_decoded_history.sql @@ -0,0 +1,36 @@ +{{ config ( + materialized = 'incremental', + unique_key = 'program_id' +) }} + +WITH min_decoded AS ( + + SELECT + SPLIT_PART( + id, + '-', + 3 + ) :: STRING AS program_id, + MIN(block_id) AS block_id + FROM + {{ ref('streamline__complete_decoded_instructions') }} + GROUP BY + 1 +) +SELECT + h.program_id +FROM + {{ ref('streamline__idls_history') }} + h + JOIN min_decoded + ON min_decoded.block_id = h.first_block_id + +{% if is_incremental() %} +WHERE + h.program_id NOT IN ( + SELECT + DISTINCT(program_id) + FROM + {{ this }} + ) +{% endif %} diff --git a/models/streamline/parser/streamline__idls_history.sql b/models/streamline/parser/history/streamline__idls_history.sql similarity index 100% rename from models/streamline/parser/streamline__idls_history.sql rename to models/streamline/parser/history/streamline__idls_history.sql diff --git a/models/streamline/parser/streamline__all_undecoded_instructions_history.sql b/models/streamline/parser/streamline__all_undecoded_instructions_history.sql deleted file mode 100644 index 98df6dce..00000000 --- a/models/streamline/parser/streamline__all_undecoded_instructions_history.sql +++ /dev/null @@ -1,25 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = if_data_call_function( - func = "{{this.schema}}.udf_bulk_program_parser(object_construct('realtime', 'False'))", - target = "{{this.schema}}.{{this.identifier}}" - ) -) }} - -SELECT - e.program_id, - e.tx_id, - e.index, - e.instruction, - e.block_id, - e.block_timestamp -FROM - {{ ref('silver__events') }} - e - JOIN {{ ref('streamline__idls_history_pointer') }} - p - ON e.block_timestamp >= p.backfill_to_date - AND e.block_timestamp <= p.min_decoded_block_timestamp_date - AND e.program_id = p.program_id -WHERE - e.block_id <= p.min_decoded_block_id diff --git a/models/streamline/parser/streamline__idls_history_pointer.sql b/models/streamline/parser/streamline__idls_history_pointer.sql deleted file mode 100644 index 38de99c6..00000000 --- a/models/streamline/parser/streamline__idls_history_pointer.sql +++ /dev/null @@ -1,46 +0,0 @@ -{{ config ( - materialized = 'table' -) }} - -WITH base AS ( - - SELECT - SPLIT_PART( - id, - '-', - 3 - ) :: STRING AS program_id, - MIN(block_id) AS min_decoded_block - FROM - {{ ref('streamline__complete_decoded_instructions') }} - GROUP BY - 1 -), -program_last_processed AS ( - SELECT - b.*, - bl.block_timestamp :: DATE AS min_decoded_block_timestamp_date - FROM - base b - JOIN {{ ref('silver__blocks') }} - bl - ON bl.block_id = b.min_decoded_block -) -SELECT - h.program_id, - COALESCE( - p.min_decoded_block_timestamp_date, - h.default_backfill_start_block_timestamp - ) :: DATE AS min_decoded_block_timestamp_date, - COALESCE( - p.min_decoded_block, - h.default_backfill_start_block_id - ) AS min_decoded_block_id, - min_decoded_block_timestamp_date -2 AS backfill_to_date -FROM - {{ ref('streamline__idls_history') }} - h - LEFT JOIN program_last_processed p - ON p.program_id = h.program_id -WHERE - min_decoded_block_id > first_block_id