diff --git a/.github/workflows/dbt_run_streamline.yml b/.github/workflows/dbt_run_streamline_blocks.yml similarity index 80% rename from .github/workflows/dbt_run_streamline.yml rename to .github/workflows/dbt_run_streamline_blocks.yml index d8cb188..fd42678 100644 --- a/.github/workflows/dbt_run_streamline.yml +++ b/.github/workflows/dbt_run_streamline_blocks.yml @@ -1,12 +1,8 @@ -name: dbt_run_streamline -run-name: dbt_run_streamline +name: dbt_run_streamline_blocks +run-name: dbt_run_streamline_blocks on: workflow_dispatch: - push: - branches: - # - main - - turn-off-dev-turn-on-prod # schedule: # # Runs "every hour at the 0 minute" (see https://crontab.guru) # - cron: '0,30 * * * *' @@ -32,7 +28,7 @@ jobs: uses: FlipsideCrypto/analytics-workflow-templates/.github/workflows/dbt_run_template.yml@main with: dbt_command: | - dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m ./models/streamline/* --exclude models/streamline/streamline__validators* + dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/streamline/streamline__blocks_realtime.sql environment: workflow_prod warehouse: ${{ vars.WAREHOUSE }} secrets: inherit \ No newline at end of file diff --git a/.github/workflows/dbt_run_streamline_transactions.yml b/.github/workflows/dbt_run_streamline_transactions.yml new file mode 100644 index 0000000..b57a4fd --- /dev/null +++ b/.github/workflows/dbt_run_streamline_transactions.yml @@ -0,0 +1,34 @@ +name: dbt_run_streamline_transactions +run-name: dbt_run_streamline_transactions + +on: + workflow_dispatch: + # schedule: + # # Runs "every hour at the 0 minute" (see https://crontab.guru) + # - cron: '0,30 * * * *' + +env: + USE_VARS: "${{ vars.USE_VARS }}" + DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}" + DBT_VERSION: "${{ vars.DBT_VERSION }}" + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + called_workflow_template: + uses: FlipsideCrypto/analytics-workflow-templates/.github/workflows/dbt_run_template.yml@main + with: + dbt_command: | + dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/streamline/streamline__txs_realtime.sql + environment: workflow_prod + warehouse: ${{ vars.WAREHOUSE }} + secrets: inherit \ No newline at end of file diff --git a/data/github_actions__workflows.csv b/data/github_actions__workflows.csv index 97ed6f3..ec7dbfc 100644 --- a/data/github_actions__workflows.csv +++ b/data/github_actions__workflows.csv @@ -1,4 +1,5 @@ workflow_name,workflow_schedule dbt_run_incremental,"25,55 * * * *" -dbt_run_streamline,"0,30 * * * *" +dbt_run_streamline_blocks,"0,30 * * * *" +dbt_run_streamline_blocks,"5,35 * * * *" dbt_test_tasks,"0,30 * * * *" \ No newline at end of file diff --git a/models/streamline/streamline__blocks_history.sql b/models/streamline/streamline__blocks_history.sql index 3cb3db9..d888d6f 100644 --- a/models/streamline/streamline__blocks_history.sql +++ b/models/streamline/streamline__blocks_history.sql @@ -5,49 +5,28 @@ merge_update_columns = ["id"], post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" ) }} +-- depends_on: {{ ref('bronze__streamline_blocks') }} -WITH meta AS ( - - SELECT - last_modified, - file_name - FROM - TABLE( - information_schema.external_table_files( - table_name => '{{ source( "bronze", "blocks") }}' - ) - ) A -) - -{% if is_incremental() %}, -max_date AS ( - SELECT - COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP - FROM - {{ this }}) - {% endif %} - SELECT - {{ dbt_utils.generate_surrogate_key( - ['block_number'] - ) }} AS id, - block_number, - last_modified AS _inserted_timestamp - FROM - {{ source( - "bronze", - "blocks" - ) }} - JOIN meta b - ON b.file_name = metadata$filename +SELECT + id, + block_number, + ARRAY_SIZE( + DATA :result :block :data :txs + ) AS tx_count, + _inserted_timestamp +FROM {% if is_incremental() %} +{{ ref('bronze__streamline_blocks') }} WHERE - b.last_modified > ( + _inserted_timestamp >= ( SELECT - max_INSERTED_TIMESTAMP + MAX(_inserted_timestamp) _inserted_timestamp FROM - max_date + {{ this }} ) +{% else %} + {{ ref('bronze__streamline_FR_blocks') }} {% endif %} qualify(ROW_NUMBER() over (PARTITION BY id diff --git a/models/streamline/streamline__txs_history.sql b/models/streamline/streamline__txs_history.sql index 7175af9..dbe3761 100644 --- a/models/streamline/streamline__txs_history.sql +++ b/models/streamline/streamline__txs_history.sql @@ -9,28 +9,24 @@ -- depends_on: {{ ref('bronze__streamline_FR_transactions') }} SELECT - DISTINCT {{ dbt_utils.generate_surrogate_key( - ['block_number'] - ) }} AS id, + id, block_number, _inserted_timestamp FROM {% if is_incremental() %} {{ ref('bronze__streamline_transactions') }} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + {{ this }} + ) {% else %} {{ ref('bronze__streamline_FR_transactions') }} {% endif %} -{% if is_incremental() %} -WHERE - _inserted_timestamp > ( - SELECT - COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP - FROM - {{ this }}) - {% endif %} - - qualify(ROW_NUMBER() over (PARTITION BY id - ORDER BY - _inserted_timestamp DESC)) = 1 +qualify(ROW_NUMBER() over (PARTITION BY id +ORDER BY + _inserted_timestamp DESC)) = 1 diff --git a/models/streamline/streamline__txs_realtime.sql b/models/streamline/streamline__txs_realtime.sql index 050b6f7..ab78299 100644 --- a/models/streamline/streamline__txs_realtime.sql +++ b/models/streamline/streamline__txs_realtime.sql @@ -7,27 +7,16 @@ ) }} SELECT - block_number, - ARRAY_SIZE( - DATA :result :block :data :txs - ) AS tx_count + A.block_number, + A.tx_count FROM - -{% if is_incremental() %} -{{ ref('bronze__streamline_blocks') }} -{% else %} - {{ ref('bronze__streamline_FR_blocks') }} -{% endif %} + {{ ref("streamline__blocks_history") }} A + LEFT JOIN {{ ref("streamline__txs_history") }} + b + ON A.block_number = b.block_number WHERE - tx_count IS NOT NULL - AND tx_count > 0 - AND block_number NOT IN ( - SELECT - block_number - FROM - {{ ref( - "streamline__txs_history" - ) }} - ) + A.tx_count IS NOT NULL + AND A.tx_count > 0 + AND b.block_number IS NULL ORDER BY 1 ASC