An 3858/change historical workflow (#351)

* use perm data models for streamline work

* add process to clean the historical queue

* modify parser historical workflow

* fix query

* make this insert-only, merge is very costly

* add clean queue job

* fix name on build queue workflow
This commit is contained in:
desmond-hui 2023-09-27 08:15:12 -07:00 committed by GitHub
parent a6d57153cd
commit 77dfdc5e6a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 288 additions and 73 deletions

View File

@ -40,4 +40,4 @@ jobs:
dbt deps
- name: Run DBT Jobs
run: |
dbt run -s models/streamline/parser/streamline__idls_history.sql
dbt run -s models/streamline/parser/streamline__idls_history.sql models/streamline/parser/history/streamline__complete_decoded_history.sql

View File

@ -40,5 +40,5 @@ jobs:
dbt deps
- name: Run DBT Jobs
run: |
dbt run -s models/streamline/parser/streamline__complete_decoded_instructions.sql models/streamline/parser/streamline__idls_history_pointer.sql
dbt run -s models/streamline/parser/streamline__complete_decoded_instructions.sql
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m ./models/streamline/parser/streamline__all_undecoded_instructions_history.sql

View File

@ -0,0 +1,43 @@
name: dbt_run_parser_program_historical_build_queue
run-name: dbt_run_parser_program_historical_build_queue
on:
schedule:
# Runs every SUNDAY at 22:22 (see https://crontab.guru)
- cron: '22 2 * * SUN'
env:
DBT_PROFILES_DIR: "${{ secrets.DBT_PROFILES_DIR }}"
ACCOUNT: "${{ secrets.ACCOUNT }}"
ROLE: "${{ secrets.ROLE }}"
USER: "${{ secrets.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ secrets.REGION }}"
DATABASE: "${{ secrets.DATABASE }}"
WAREHOUSE: "${{ secrets.WAREHOUSE }}"
SCHEMA: "${{ secrets.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v1
with:
python-version: "3.7.x"
- name: install dependencies
run: |
pip3 install dbt-snowflake==${{ secrets.DBT_VERSION }} cli_passthrough requests click
dbt deps
- name: Run DBT Jobs
run: |
dbt run -s models/streamline/parser/history/streamline__all_undecoded_instructions_history_queue.sql

View File

@ -0,0 +1,43 @@
name: dbt_run_parser_program_historical_clean_queue
run-name: dbt_run_parser_program_historical_clean_queue
on:
schedule:
# Runs daily at 03:17 (see https://crontab.guru)
- cron: '17 3 * * *'
env:
DBT_PROFILES_DIR: "${{ secrets.DBT_PROFILES_DIR }}"
ACCOUNT: "${{ secrets.ACCOUNT }}"
ROLE: "${{ secrets.ROLE }}"
USER: "${{ secrets.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ secrets.REGION }}"
DATABASE: "${{ secrets.DATABASE }}"
WAREHOUSE: "${{ secrets.WAREHOUSE }}"
SCHEMA: "${{ secrets.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v1
with:
python-version: "3.7.x"
- name: install dependencies
run: |
pip3 install dbt-snowflake==${{ secrets.DBT_VERSION }} cli_passthrough requests click
dbt deps
- name: Run DBT Jobs
run: |
dbt run-operation run_sp_clean_program_parser_historical_queue

View File

@ -0,0 +1,7 @@
{% macro run_sp_clean_program_parser_historical_queue() %}
{% set sql %}
call streamline.sp_clean_program_parser_historical_queue();
{% endset %}
{% do run_query(sql) %}
{% endmacro %}

View File

@ -0,0 +1,59 @@
{% macro sp_create_clean_program_parser_historical_queue() %}
{% if var("UPDATE_UDFS_AND_SPS") %}
{% set sql %}
CREATE OR REPLACE PROCEDURE streamline.sp_clean_program_parser_historical_queue()
RETURNS BOOLEAN
LANGUAGE SQL
AS
$$
DECLARE
RESULT VARCHAR;
query_id VARCHAR;
BEGIN
/* review progress of all items that have been worked on before the most recent hour of activity */
SELECT
INDEX,
program_id,
instruction,
tx_id,
h.block_id,
c.id as id
from streamline.all_undecoded_instructions_history_in_progress h
left outer join streamline.complete_decoded_instructions c
on c.block_id = h.block_id
and c.id = concat_ws(
'-',
h.block_id,
tx_id,
program_id,
index
)
where h._inserted_timestamp <= current_timestamp - INTERVAL '1 HOURS';
query_id := SQLID;
/* insert items not completed back into queue */
INSERT INTO streamline.all_undecoded_instructions_history_queue (index, program_id, instruction, tx_id, block_id)
SELECT
INDEX,
program_id,
instruction,
tx_id,
block_id
FROM TABLE(RESULT_SCAN(:query_id))
WHERE id is null;
/* remove all in_progress items that have been reviewed */
DELETE FROM streamline.all_undecoded_instructions_history_in_progress s
USING (SELECT * FROM TABLE(RESULT_SCAN(:query_id))) d
WHERE s.block_id = d.block_id
AND s.tx_id = d.tx_id
AND s.index = d.index
AND s.program_id = d.program_id;
RETURN TRUE;
END;
$${% endset %}
{% do run_query(sql) %}
{% endif %}
{% endmacro %}

View File

@ -0,0 +1,20 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_program_parser(object_construct('realtime', 'False'))",
target = "{{this.schema}}.{{this.identifier}}"
)
) }}
with m as (
select max(block_id) as max_block_id
from {{ ref('streamline__all_undecoded_instructions_history_queue') }} h
)
select
INDEX,
program_id,
instruction,
tx_id,
h.block_id
from {{ ref('streamline__all_undecoded_instructions_history_queue') }} h
where h.block_id between (select max_block_id-8000000 from m) and (select max_block_id from m)

View File

@ -0,0 +1,12 @@
{{ config (
materialized = 'incremental',
full_refresh = false
) }}
SELECT
*,
sysdate() as _inserted_timestamp
FROM
{{ ref('streamline__all_undecoded_instructions_history_queue') }}
LIMIT
0

View File

@ -0,0 +1,66 @@
{{ config (
materialized = 'incremental',
) }}
WITH base AS (
SELECT
SPLIT_PART(
id,
'-',
3
) :: STRING AS program_id,
MIN(block_id) AS min_decoded_block
FROM
{{ ref('streamline__complete_decoded_instructions') }}
GROUP BY
1
),
program_last_processed AS (
SELECT
b.*,
bl.block_timestamp :: DATE AS min_decoded_block_timestamp_date
FROM
base b
JOIN {{ ref('silver__blocks') }}
bl
ON bl.block_id = b.min_decoded_block
),
pre_final as (
SELECT
h.program_id,
COALESCE(
p.min_decoded_block_timestamp_date,
h.default_backfill_start_block_timestamp
) :: DATE AS min_decoded_block_timestamp_date,
COALESCE(
p.min_decoded_block,
h.default_backfill_start_block_id
) AS min_decoded_block_id,
first_block_id
FROM
{{ ref('streamline__idls_history') }}
h
LEFT JOIN program_last_processed p
ON p.program_id = h.program_id
WHERE
min_decoded_block_id > first_block_id
)
select
e.block_id,
e.block_timestamp,
e.tx_id,
e.index,
e.program_id,
e.instruction
from {{ ref('silver__events') }} e
join pre_final pf on
e.program_id = pf.program_id
and e.block_timestamp::date <= pf.min_decoded_block_timestamp_date
and e.block_id >= pf.first_block_id
where
pf.program_id not in (select distinct(program_id) from {{ ref('streamline__complete_decoded_history') }})
{% if is_incremental() %}
and
pf.program_id not in (select distinct(program_id) from {{ this }})
{% endif %}

View File

@ -0,0 +1,36 @@
{{ config (
materialized = 'incremental',
unique_key = 'program_id'
) }}
WITH min_decoded AS (
SELECT
SPLIT_PART(
id,
'-',
3
) :: STRING AS program_id,
MIN(block_id) AS block_id
FROM
{{ ref('streamline__complete_decoded_instructions') }}
GROUP BY
1
)
SELECT
h.program_id
FROM
{{ ref('streamline__idls_history') }}
h
JOIN min_decoded
ON min_decoded.block_id = h.first_block_id
{% if is_incremental() %}
WHERE
h.program_id NOT IN (
SELECT
DISTINCT(program_id)
FROM
{{ this }}
)
{% endif %}

View File

@ -1,25 +0,0 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_program_parser(object_construct('realtime', 'False'))",
target = "{{this.schema}}.{{this.identifier}}"
)
) }}
SELECT
e.program_id,
e.tx_id,
e.index,
e.instruction,
e.block_id,
e.block_timestamp
FROM
{{ ref('silver__events') }}
e
JOIN {{ ref('streamline__idls_history_pointer') }}
p
ON e.block_timestamp >= p.backfill_to_date
AND e.block_timestamp <= p.min_decoded_block_timestamp_date
AND e.program_id = p.program_id
WHERE
e.block_id <= p.min_decoded_block_id

View File

@ -1,46 +0,0 @@
{{ config (
materialized = 'table'
) }}
WITH base AS (
SELECT
SPLIT_PART(
id,
'-',
3
) :: STRING AS program_id,
MIN(block_id) AS min_decoded_block
FROM
{{ ref('streamline__complete_decoded_instructions') }}
GROUP BY
1
),
program_last_processed AS (
SELECT
b.*,
bl.block_timestamp :: DATE AS min_decoded_block_timestamp_date
FROM
base b
JOIN {{ ref('silver__blocks') }}
bl
ON bl.block_id = b.min_decoded_block
)
SELECT
h.program_id,
COALESCE(
p.min_decoded_block_timestamp_date,
h.default_backfill_start_block_timestamp
) :: DATE AS min_decoded_block_timestamp_date,
COALESCE(
p.min_decoded_block,
h.default_backfill_start_block_id
) AS min_decoded_block_id,
min_decoded_block_timestamp_date -2 AS backfill_to_date
FROM
{{ ref('streamline__idls_history') }}
h
LEFT JOIN program_last_processed p
ON p.program_id = h.program_id
WHERE
min_decoded_block_id > first_block_id