model and workflow (#686)

* model and workflow

* name, filter large partitions, max partition

* formatting
This commit is contained in:
tarikceric 2024-11-01 09:32:02 -07:00 committed by GitHub
parent 16224abec9
commit a4da0f567e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 100 additions and 0 deletions

View File

@ -0,0 +1,45 @@
name: dbt_run_transactions_units_consumed_backfill
run-name: dbt_run_transactions_units_consumed_backfill
on:
workflow_dispatch:
branches:
- "main"
env:
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "${{ vars.PYTHON_VERSION }}"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run -s "solana_models,tag:units_consumed_backfill"
dbt run -s "solana_models,tag:units_consumed_backfill"

View File

@ -9,3 +9,4 @@ dbt_run_decode_logs_orchestrator,"*/15 * * * *"
dbt_run_streamline_block_rewards,"*/15 * * * *"
dbt_run_streamline_blocks,"*/5 * * * *"
dbt_run_streamline_helius_cnft_metadata,"*/10 * * * *"
dbt_run_transactions_units_consumed_backfill,"2,7,12,17,22,27,32,37,42,47,52,57 * * * *"

1 workflow_name workflow_schedule
9 dbt_run_streamline_block_rewards */15 * * * *
10 dbt_run_streamline_blocks */5 * * * *
11 dbt_run_streamline_helius_cnft_metadata */10 * * * *
12 dbt_run_transactions_units_consumed_backfill 2,7,12,17,22,27,32,37,42,47,52,57 * * * *

View File

@ -0,0 +1,54 @@
{{
config(
materialized="incremental",
unique_key=['tx_id'],
tags=['units_consumed_backfill']
)
}}
{% if execute %}
{% set max_partition = 120880 %}
{% set next_partition_query %}
{% if is_incremental() %}
SELECT
LEAST(max(_partition_id) + 1, {{ max_partition }}),
LEAST(max(_partition_id) + 5, {{ max_partition }})
FROM
{{ this }}
{% else %}
SELECT 24239, 24239 /* When computeUnitsConsumed first appears in node response */
{% endif %}
{% endset %}
{% set next_partition = run_query(next_partition_query)[0][0] %}
{% set next_partition_2 = run_query(next_partition_query)[0][1] %}
-- list of partition IDs with >2m records to exclude
{% set excluded_partitions = [
116876, 116875, 116874, 116872, 116871, 116870, 116869, 116868, 116867,
116863, 116862, 116850, 105987, 103411, 103410, 103409, 103406, 51215,
45995, 31483, 31482, 27335, 27333, 27331, 27330, 27327, 27325
] %}
{% endif %}
SELECT
t.tx_id,
t.data :meta :computeUnitsConsumed :: NUMBER AS units_consumed,
t._partition_id,
t._inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(['tx_id']) }} AS transactions_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ ref('bronze__transactions2') }} t
WHERE
tx_id IS NOT NULL
AND (
COALESCE(t.data :transaction :message :instructions [0] :programId :: STRING, '') <> 'Vote111111111111111111111111111111111111111'
OR (array_size(t.data :transaction :message :instructions) > 1)
)
AND _partition_id >= {{ next_partition }}
AND _partition_id <= {{ next_partition_2 }}
AND _partition_id NOT IN ({{ excluded_partitions | join(', ') }})