From 6ce17a7219e21b771eeb81f26bdaaa2565610fb0 Mon Sep 17 00:00:00 2001 From: WHYTEWYLL <49686519+WHYTEWYLL@users.noreply.github.com> Date: Wed, 30 Aug 2023 16:38:01 +0200 Subject: [PATCH] Streamline Realtime and Backfill Models (#156) * add: model to backfill * fix: node's string format * add tags * upd collection model * mainnet21 hardcoded collections history model * del tag * mainnet 21 getblock * tag for 21 * realtime models * alias num as height * realtime tags * add missing tag and newlines * backfiller * backfiller * move script to folder (renamed) python, upd test accordingly w dir name ch * upd script to accept model input, update jobs per method call * error w use_dev arg * add: silver mdoels * limit backfill job in python script * rename silver dbt models to streamline_ and move into silver/core * explicit casting to silver streamline models * add documentation to silver streamline models * run only current mainnet and history mainnet 22 first * activate schedule for gha * del hardcoded mainnet models * move history modes out of subdirs into history dir * fix GHA vars * del upstream 1+ from history step * del tag --------- Co-authored-by: Jack Forgash --- ...tory.yml => dbt_run_streamline_blocks.yml} | 25 +++-- ...yml => dbt_run_streamline_collections.yml} | 26 +++-- .../workflows/dbt_run_streamline_hourly.yml | 36 +++++++ ...bt_run_streamline_transaction_results.yml} | 26 +++-- ...ml => dbt_run_streamline_transactions.yml} | 26 +++-- .github/workflows/dbt_test.yml | 2 +- data/seeds__network_version.csv | 2 +- dbt_project.yml | 1 + models/descriptions/_partition_by_block_id.md | 6 ++ models/descriptions/arguments.md | 5 + models/descriptions/block_id.md | 5 + models/descriptions/block_number.md | 6 ++ models/descriptions/block_seals.md | 7 ++ models/descriptions/collection_count.md | 5 + models/descriptions/collection_guarantees.md | 7 ++ models/descriptions/collection_id.md | 7 ++ models/descriptions/envelope_signatures.md | 7 ++ models/descriptions/error_message.md | 5 + models/descriptions/events.md | 5 + models/descriptions/payload_signatures.md | 8 ++ models/descriptions/proposal_key.md | 9 ++ models/descriptions/script.md | 7 ++ models/descriptions/signatures.md | 5 + models/descriptions/status.md | 7 ++ models/descriptions/status_code.md | 7 ++ models/descriptions/transaction_ids.md | 5 + .../chainwalkers}/silver__blocks.sql | 0 .../chainwalkers}/silver__blocks.yml | 0 .../silver__event_attributes.sql | 0 .../silver__event_attributes_https.sql | 0 .../silver__event_attributes_https.yml | 0 .../chainwalkers}/silver__events.sql | 0 .../chainwalkers}/silver__events.yml | 0 .../chainwalkers}/silver__events_final.sql | 0 .../chainwalkers}/silver__events_final.yml | 0 .../chainwalkers}/silver__transactions.sql | 0 .../chainwalkers}/silver__transactions.yml | 0 .../streamline/silver__streamline_blocks.sql | 40 ++++++++ .../streamline/silver__streamline_blocks.yml | 40 ++++++++ .../silver__streamline_collections.sql | 35 +++++++ .../silver__streamline_collections.yml | 25 +++++ ...silver__streamline_transaction_results.sql | 35 +++++++ ...silver__streamline_transaction_results.yml | 31 ++++++ .../silver__streamline_transactions.sql | 40 ++++++++ .../silver__streamline_transactions.yml | 46 +++++++++ .../streamline__complete_get_blocks.sql | 5 +- .../streamline__complete_get_collections.sql | 5 +- ...line__complete_get_transaction_results.sql | 5 +- .../streamline__complete_get_transactions.sql | 5 +- ...ine__get_collections_history_mainnet22.sql | 47 --------- ...sql => streamline__get_blocks_history.sql} | 10 +- .../streamline__get_collections_history.sql | 76 ++++++++++++++ ...mline__get_transaction_results_history.sql | 43 ++++++++ .../streamline__get_transactions_history.sql | 43 ++++++++ ..._transaction_results_history_mainnet22.sql | 44 --------- ...ne__get_transactions_history_mainnet22.sql | 45 --------- .../streamline__get_blocks_realtime.sql | 64 ++++++++++++ .../streamline__get_collections_realtime.sql | 98 +++++++++++++++++++ ...line__get_transaction_results_realtime.sql | 96 ++++++++++++++++++ .../streamline__get_transactions_realtime.sql | 96 ++++++++++++++++++ python/dbt_execute_flow_streamline.py | 57 +++++++++++ .../test_alert => python}/dbt_test_alert.py | 0 62 files changed, 1097 insertions(+), 191 deletions(-) rename .github/workflows/{dbt_run_streamline_blocks_history.yml => dbt_run_streamline_blocks.yml} (52%) rename .github/workflows/{dbt_run_streamline_collections_history.yml => dbt_run_streamline_collections.yml} (51%) create mode 100644 .github/workflows/dbt_run_streamline_hourly.yml rename .github/workflows/{dbt_run_streamline_transactions_history copy.yml => dbt_run_streamline_transaction_results.yml} (50%) rename .github/workflows/{dbt_run_streamline_transaction_results_history.yml => dbt_run_streamline_transactions.yml} (51%) create mode 100644 models/descriptions/_partition_by_block_id.md create mode 100644 models/descriptions/arguments.md create mode 100644 models/descriptions/block_id.md create mode 100644 models/descriptions/block_number.md create mode 100644 models/descriptions/block_seals.md create mode 100644 models/descriptions/collection_count.md create mode 100644 models/descriptions/collection_guarantees.md create mode 100644 models/descriptions/collection_id.md create mode 100644 models/descriptions/envelope_signatures.md create mode 100644 models/descriptions/error_message.md create mode 100644 models/descriptions/events.md create mode 100644 models/descriptions/payload_signatures.md create mode 100644 models/descriptions/proposal_key.md create mode 100644 models/descriptions/script.md create mode 100644 models/descriptions/signatures.md create mode 100644 models/descriptions/status.md create mode 100644 models/descriptions/status_code.md create mode 100644 models/descriptions/transaction_ids.md rename models/silver/{base => core/chainwalkers}/silver__blocks.sql (100%) rename models/silver/{base => core/chainwalkers}/silver__blocks.yml (100%) rename models/silver/{base => core/chainwalkers}/silver__event_attributes.sql (100%) rename models/silver/{base => core/chainwalkers}/silver__event_attributes_https.sql (100%) rename models/silver/{base => core/chainwalkers}/silver__event_attributes_https.yml (100%) rename models/silver/{base => core/chainwalkers}/silver__events.sql (100%) rename models/silver/{base => core/chainwalkers}/silver__events.yml (100%) rename models/silver/{base => core/chainwalkers}/silver__events_final.sql (100%) rename models/silver/{base => core/chainwalkers}/silver__events_final.yml (100%) rename models/silver/{base => core/chainwalkers}/silver__transactions.sql (100%) rename models/silver/{base => core/chainwalkers}/silver__transactions.yml (100%) create mode 100644 models/silver/core/streamline/silver__streamline_blocks.sql create mode 100644 models/silver/core/streamline/silver__streamline_blocks.yml create mode 100644 models/silver/core/streamline/silver__streamline_collections.sql create mode 100644 models/silver/core/streamline/silver__streamline_collections.yml create mode 100644 models/silver/core/streamline/silver__streamline_transaction_results.sql create mode 100644 models/silver/core/streamline/silver__streamline_transaction_results.yml create mode 100644 models/silver/core/streamline/silver__streamline_transactions.sql create mode 100644 models/silver/core/streamline/silver__streamline_transactions.yml delete mode 100644 models/silver/streamline/core/history/collections/streamline__get_collections_history_mainnet22.sql rename models/silver/streamline/core/history/{blocks/streamline__get_blocks_history_mainnet22.sql => streamline__get_blocks_history.sql} (59%) create mode 100644 models/silver/streamline/core/history/streamline__get_collections_history.sql create mode 100644 models/silver/streamline/core/history/streamline__get_transaction_results_history.sql create mode 100644 models/silver/streamline/core/history/streamline__get_transactions_history.sql delete mode 100644 models/silver/streamline/core/history/transaction_results/streamline__get_transaction_results_history_mainnet22.sql delete mode 100644 models/silver/streamline/core/history/transactions/streamline__get_transactions_history_mainnet22.sql create mode 100644 models/silver/streamline/core/realtime/streamline__get_blocks_realtime.sql create mode 100644 models/silver/streamline/core/realtime/streamline__get_collections_realtime.sql create mode 100644 models/silver/streamline/core/realtime/streamline__get_transaction_results_realtime.sql create mode 100644 models/silver/streamline/core/realtime/streamline__get_transactions_realtime.sql create mode 100644 python/dbt_execute_flow_streamline.py rename {python_scripts/test_alert => python}/dbt_test_alert.py (100%) diff --git a/.github/workflows/dbt_run_streamline_blocks_history.yml b/.github/workflows/dbt_run_streamline_blocks.yml similarity index 52% rename from .github/workflows/dbt_run_streamline_blocks_history.yml rename to .github/workflows/dbt_run_streamline_blocks.yml index 348afaa..183dcff 100644 --- a/.github/workflows/dbt_run_streamline_blocks_history.yml +++ b/.github/workflows/dbt_run_streamline_blocks.yml @@ -1,15 +1,15 @@ -name: dbt_run_streamline_blocks_history -run-name: dbt_run_streamline_blocks_history +name: dbt_run_streamline_blocks +run-name: dbt_run_streamline_blocks on: workflow_dispatch: schedule: - # Runs "at 10:00 UTC AM" (see https://crontab.guru) - - cron: '0 10 * * *' - -env: - DBT_PROFILES_DIR: ./ + # Runs on minute 15 (see https://crontab.guru) + - cron: "15 * * * *" +env: + USE_VARS: "${{ vars.USE_VARS }}" + DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}" ACCOUNT: "${{ vars.ACCOUNT }}" ROLE: "${{ vars.ROLE }}" USER: "${{ vars.USER }}" @@ -25,7 +25,7 @@ concurrency: jobs: run_dbt_jobs: runs-on: ubuntu-latest - environment: + environment: name: workflow_prod steps: @@ -40,6 +40,11 @@ jobs: run: | pip install -r requirements.txt dbt deps - - name: Run DBT Jobs + + - name: Run DBT Realtime run: | - dbt run --threads 8 --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/silver/streamline/core/history/blocks --profile flow --target prod \ No newline at end of file + dbt run -s 1+streamline__get_blocks_realtime --vars '{"STREAMLINE_INVOKE_STREAMS": True}' + + - name: Run DBT History Jobs - Mainnet22 + run: | + dbt run -s 1+streamline__get_blocks_history --vars '{"STREAMLINE_INVOKE_STREAMS": True, "node_url": "access-001.mainnet22.nodes.onflow.org:9000", "start_block": 47169687, "end_block": 55114466}' diff --git a/.github/workflows/dbt_run_streamline_collections_history.yml b/.github/workflows/dbt_run_streamline_collections.yml similarity index 51% rename from .github/workflows/dbt_run_streamline_collections_history.yml rename to .github/workflows/dbt_run_streamline_collections.yml index 5c5b50c..3bd21b3 100644 --- a/.github/workflows/dbt_run_streamline_collections_history.yml +++ b/.github/workflows/dbt_run_streamline_collections.yml @@ -1,15 +1,15 @@ -name: dbt_run_streamline_collections_history -run-name: dbt_run_streamline_collections_history +name: dbt_run_streamline_collections +run-name: dbt_run_streamline_collections on: workflow_dispatch: schedule: - # Runs "at 10:00 UTC AM" (see https://crontab.guru) - - cron: '0 10 * * *' - -env: - DBT_PROFILES_DIR: ./ + # Runs on minute 30 (see https://crontab.guru) + - cron: "30 * * * *" +env: + USE_VARS: "${{ vars.USE_VARS }}" + DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}" ACCOUNT: "${{ vars.ACCOUNT }}" ROLE: "${{ vars.ROLE }}" USER: "${{ vars.USER }}" @@ -25,7 +25,7 @@ concurrency: jobs: run_dbt_jobs: runs-on: ubuntu-latest - environment: + environment: name: workflow_prod steps: @@ -40,6 +40,12 @@ jobs: run: | pip install -r requirements.txt dbt deps - - name: Run DBT Jobs + + - name: Run DBT Realtime run: | - dbt run --threads 8 --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/silver/streamline/core/history/collections --profile flow --target prod \ No newline at end of file + dbt run -s 1+streamline__get_collections_realtime --vars '{"STREAMLINE_INVOKE_STREAMS": True}' + + - name: Run DBT History Jobs - Mainnet22 + run: | + dbt run -s 1+streamline__get_collections_history --vars '{"STREAMLINE_INVOKE_STREAMS": True, "node_url": "access-001.mainnet22.nodes.onflow.org:9000", "start_block": 47169687, "end_block": 55114466}' + diff --git a/.github/workflows/dbt_run_streamline_hourly.yml b/.github/workflows/dbt_run_streamline_hourly.yml new file mode 100644 index 0000000..fb7d643 --- /dev/null +++ b/.github/workflows/dbt_run_streamline_hourly.yml @@ -0,0 +1,36 @@ +name: dbt_run_scheduled +run-name: dbt_run_scheduled + +on: + workflow_dispatch: + schedule: + # Runs "every hour" (see https://crontab.guru) + - cron: '0 */1 * * *' + +env: + USE_VARS: "${{ vars.USE_VARS }}" + DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}" + DBT_VERSION: "${{ vars.DBT_VERSION }}" + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + called_workflow_template: + uses: FlipsideCrypto/analytics-workflow-templates/.github/workflows/dbt_run_template.yml@main + with: + dbt_command: > + dbt run-operation stage_external_sources --vars "ext_full_refresh: true"; + dbt seed; + dbt run -s tag:streamline_load tag:streamline_complete + environment: workflow_prod + warehouse: ${{ vars.WAREHOUSE }} + secrets: inherit diff --git a/.github/workflows/dbt_run_streamline_transactions_history copy.yml b/.github/workflows/dbt_run_streamline_transaction_results.yml similarity index 50% rename from .github/workflows/dbt_run_streamline_transactions_history copy.yml rename to .github/workflows/dbt_run_streamline_transaction_results.yml index 6940313..5df486d 100644 --- a/.github/workflows/dbt_run_streamline_transactions_history copy.yml +++ b/.github/workflows/dbt_run_streamline_transaction_results.yml @@ -1,15 +1,15 @@ -name: dbt_run_streamline_transaction_results_history -run-name: dbt_run_streamline_transaction_results_history +name: dbt_run_streamline_transaction_results +run-name: dbt_run_streamline_transaction_results on: workflow_dispatch: schedule: - # Runs "at 10:00 UTC AM" (see https://crontab.guru) - - cron: '0 10 * * *' - -env: - DBT_PROFILES_DIR: ./ + # Runs on minute 45 (see https://crontab.guru) + - cron: "45 * * * *" +env: + USE_VARS: "${{ vars.USE_VARS }}" + DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}" ACCOUNT: "${{ vars.ACCOUNT }}" ROLE: "${{ vars.ROLE }}" USER: "${{ vars.USER }}" @@ -25,7 +25,7 @@ concurrency: jobs: run_dbt_jobs: runs-on: ubuntu-latest - environment: + environment: name: workflow_prod steps: @@ -40,6 +40,12 @@ jobs: run: | pip install -r requirements.txt dbt deps - - name: Run DBT Jobs + + - name: Run DBT Realtime run: | - dbt run --threads 8 --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/silver/streamline/core/history/transaction_results --profile flow --target prod \ No newline at end of file + dbt run -s 1+streamline__get_transaction_results_realtime --vars '{"STREAMLINE_INVOKE_STREAMS": True}' + + - name: Run DBT History Jobs - Mainnet22 + run: | + dbt run -s 1+streamline__get_transaction_results_history --vars '{"STREAMLINE_INVOKE_STREAMS": True, "node_url": "access-001.mainnet22.nodes.onflow.org:9000", "start_block": 47169687, "end_block": 55114466}' + diff --git a/.github/workflows/dbt_run_streamline_transaction_results_history.yml b/.github/workflows/dbt_run_streamline_transactions.yml similarity index 51% rename from .github/workflows/dbt_run_streamline_transaction_results_history.yml rename to .github/workflows/dbt_run_streamline_transactions.yml index 2c2f01d..d09904a 100644 --- a/.github/workflows/dbt_run_streamline_transaction_results_history.yml +++ b/.github/workflows/dbt_run_streamline_transactions.yml @@ -1,15 +1,15 @@ -name: dbt_run_streamline_transactions_history -run-name: dbt_run_streamline_transactions_history +name: dbt_run_streamline_transactions +run-name: dbt_run_streamline_transactions on: workflow_dispatch: schedule: - # Runs "at 10:00 UTC AM" (see https://crontab.guru) - - cron: '0 10 * * *' - -env: - DBT_PROFILES_DIR: ./ + # Runs on minute 45 (see https://crontab.guru) + - cron: "45 * * * *" +env: + USE_VARS: "${{ vars.USE_VARS }}" + DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}" ACCOUNT: "${{ vars.ACCOUNT }}" ROLE: "${{ vars.ROLE }}" USER: "${{ vars.USER }}" @@ -25,7 +25,7 @@ concurrency: jobs: run_dbt_jobs: runs-on: ubuntu-latest - environment: + environment: name: workflow_prod steps: @@ -40,6 +40,12 @@ jobs: run: | pip install -r requirements.txt dbt deps - - name: Run DBT Jobs + + - name: Run DBT Realtime run: | - dbt run --threads 8 --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/silver/streamline/core/history/transactions --profile flow --target prod \ No newline at end of file + dbt run -s 1+streamline__get_transactions_realtime --vars '{"STREAMLINE_INVOKE_STREAMS": True}' + + - name: Run DBT History Jobs - Mainnet22 + run: | + dbt run -s 1+streamline__get_transactions_history --vars '{"STREAMLINE_INVOKE_STREAMS": True, "node_url": "access-001.mainnet22.nodes.onflow.org:9000", "start_block": 47169687, "end_block": 55114466}' + diff --git a/.github/workflows/dbt_test.yml b/.github/workflows/dbt_test.yml index 495b86a..a66ad5d 100644 --- a/.github/workflows/dbt_test.yml +++ b/.github/workflows/dbt_test.yml @@ -49,4 +49,4 @@ jobs: - name: Log test results run: | - python python_scripts/test_alert/dbt_test_alert.py + python python/dbt_test_alert.py diff --git a/data/seeds__network_version.csv b/data/seeds__network_version.csv index e22107a..3c82d0a 100644 --- a/data/seeds__network_version.csv +++ b/data/seeds__network_version.csv @@ -27,4 +27,4 @@ root_height,network_version,node_url,end_height 40171634,mainnet-20,access-001.mainnet20.nodes.onflow.org:9000,44950206 44950207,mainnet-21,access-001.mainnet21.nodes.onflow.org:9000,47169686 47169687,mainnet-22,access-001.mainnet22.nodes.onflow.org:9000,55114466 -55114467,mainnet-23,access-001.mainnet23.nodes.onflow.org:9000,55114467 +55114467,mainnet-23,access-001.mainnet23.nodes.onflow.org:9000,100000000 diff --git a/dbt_project.yml b/dbt_project.yml index 02d1744..635a4a8 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -59,3 +59,4 @@ vars: OBSERV_FULL_TEST: False STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES: False STREAMLINE_INVOKE_STREAMS: False + STREAMLINE_RUN_HISTORY: False diff --git a/models/descriptions/_partition_by_block_id.md b/models/descriptions/_partition_by_block_id.md new file mode 100644 index 0000000..025ba87 --- /dev/null +++ b/models/descriptions/_partition_by_block_id.md @@ -0,0 +1,6 @@ +{% docs _partition_by_block_id %} + +Partition column used for efficient querying. +`ROUND(block_number, -5)` + +{% enddocs %} \ No newline at end of file diff --git a/models/descriptions/arguments.md b/models/descriptions/arguments.md new file mode 100644 index 0000000..7ec1652 --- /dev/null +++ b/models/descriptions/arguments.md @@ -0,0 +1,5 @@ +{% docs arguments %} + +The arguments included in the transaction body. + +{% enddocs %} diff --git a/models/descriptions/block_id.md b/models/descriptions/block_id.md new file mode 100644 index 0000000..27e389a --- /dev/null +++ b/models/descriptions/block_id.md @@ -0,0 +1,5 @@ +{% docs block_id %} + +The id, or hash, of the block. + +{% enddocs %} diff --git a/models/descriptions/block_number.md b/models/descriptions/block_number.md new file mode 100644 index 0000000..553ffad --- /dev/null +++ b/models/descriptions/block_number.md @@ -0,0 +1,6 @@ + +{% docs block_number %} + +The block number, corresponds with height. + +{% enddocs %} diff --git a/models/descriptions/block_seals.md b/models/descriptions/block_seals.md new file mode 100644 index 0000000..f611ef1 --- /dev/null +++ b/models/descriptions/block_seals.md @@ -0,0 +1,7 @@ +{% docs block_seals %} + +A block seal is an attestation that the execution result of a specific block has been verified and approved by a quorum of verification nodes. + +https://developers.flow.com/concepts/nodes/access-api#block-seal + +{% enddocs %} \ No newline at end of file diff --git a/models/descriptions/collection_count.md b/models/descriptions/collection_count.md new file mode 100644 index 0000000..c6fb7ff --- /dev/null +++ b/models/descriptions/collection_count.md @@ -0,0 +1,5 @@ +{% docs collection_count %} + +The number of collections included in a blocks. + +{% enddocs %} diff --git a/models/descriptions/collection_guarantees.md b/models/descriptions/collection_guarantees.md new file mode 100644 index 0000000..a2e1905 --- /dev/null +++ b/models/descriptions/collection_guarantees.md @@ -0,0 +1,7 @@ +{% docs collection_guarantees %} + +A signed attestation that specifies the collection nodes that have guaranteed to store and respond to queries about a collection. + +https://developers.flow.com/concepts/nodes/access-api#collection-guarantee + +{% enddocs %} \ No newline at end of file diff --git a/models/descriptions/collection_id.md b/models/descriptions/collection_id.md new file mode 100644 index 0000000..b7c7bf6 --- /dev/null +++ b/models/descriptions/collection_id.md @@ -0,0 +1,7 @@ +{% docs collection_id %} + +SHA3-256 hash of the collection contents. + +https://developers.flow.com/concepts/nodes/access-api#collections + +{% enddocs %} diff --git a/models/descriptions/envelope_signatures.md b/models/descriptions/envelope_signatures.md new file mode 100644 index 0000000..8e6739e --- /dev/null +++ b/models/descriptions/envelope_signatures.md @@ -0,0 +1,7 @@ +{% docs envelope_signatures %} + +A list of signatures generated by the payer role. + +https://developers.flow.com/concepts/start-here/transaction-signing#anatomy-of-a-transaction + +{% enddocs %} diff --git a/models/descriptions/error_message.md b/models/descriptions/error_message.md new file mode 100644 index 0000000..511a35f --- /dev/null +++ b/models/descriptions/error_message.md @@ -0,0 +1,5 @@ +{% docs error_message %} + +The error message, if a transaction has failed. + +{% enddocs %} diff --git a/models/descriptions/events.md b/models/descriptions/events.md new file mode 100644 index 0000000..05f6365 --- /dev/null +++ b/models/descriptions/events.md @@ -0,0 +1,5 @@ +{% docs events %} + +An array containing all events that occurred in the transaction. + +{% enddocs %} diff --git a/models/descriptions/payload_signatures.md b/models/descriptions/payload_signatures.md new file mode 100644 index 0000000..81f62a3 --- /dev/null +++ b/models/descriptions/payload_signatures.md @@ -0,0 +1,8 @@ +{% docs payload_signatures %} + +The authorizing signatures included with the payload in the transaction body. + +https://developers.flow.com/concepts/start-here/transaction-signing#payload + + +{% enddocs %} diff --git a/models/descriptions/proposal_key.md b/models/descriptions/proposal_key.md new file mode 100644 index 0000000..af43436 --- /dev/null +++ b/models/descriptions/proposal_key.md @@ -0,0 +1,9 @@ +{% docs proposal_key %} + +Each transaction must declare a proposal key, which can be an account key from any Flow account. The account that owns the proposal key is referred to as the proposer. + +A proposal key definition declares the address, key ID, and up-to-date sequence number for the account key. + +https://developers.flow.com/concepts/start-here/transaction-signing#proposal-key + +{% enddocs %} diff --git a/models/descriptions/script.md b/models/descriptions/script.md new file mode 100644 index 0000000..e65f01e --- /dev/null +++ b/models/descriptions/script.md @@ -0,0 +1,7 @@ +{% docs script %} + +The Cadence script executed in this transaction. + +https://developers.flow.com/tooling/fcl-js/scripts + +{% enddocs %} \ No newline at end of file diff --git a/models/descriptions/signatures.md b/models/descriptions/signatures.md new file mode 100644 index 0000000..469c0eb --- /dev/null +++ b/models/descriptions/signatures.md @@ -0,0 +1,5 @@ +{% docs signatures %} + +An array of signatures contained in the block header. + +{% enddocs %} diff --git a/models/descriptions/status.md b/models/descriptions/status.md new file mode 100644 index 0000000..07c0ac6 --- /dev/null +++ b/models/descriptions/status.md @@ -0,0 +1,7 @@ +{% docs status %} + +The transaction status. + +https://developers.flow.com/tooling/fcl-js/api#transaction-statuses + +{% enddocs %} diff --git a/models/descriptions/status_code.md b/models/descriptions/status_code.md new file mode 100644 index 0000000..827cb35 --- /dev/null +++ b/models/descriptions/status_code.md @@ -0,0 +1,7 @@ +{% docs status_code %} + +The transaction status code. + +https://developers.flow.com/tooling/fcl-js/api#transaction-statuses + +{% enddocs %} diff --git a/models/descriptions/transaction_ids.md b/models/descriptions/transaction_ids.md new file mode 100644 index 0000000..f26a7ab --- /dev/null +++ b/models/descriptions/transaction_ids.md @@ -0,0 +1,5 @@ +{% docs transaction_ids %} + +An array containing all transaction IDs that were included in the collection. + +{% enddocs %} diff --git a/models/silver/base/silver__blocks.sql b/models/silver/core/chainwalkers/silver__blocks.sql similarity index 100% rename from models/silver/base/silver__blocks.sql rename to models/silver/core/chainwalkers/silver__blocks.sql diff --git a/models/silver/base/silver__blocks.yml b/models/silver/core/chainwalkers/silver__blocks.yml similarity index 100% rename from models/silver/base/silver__blocks.yml rename to models/silver/core/chainwalkers/silver__blocks.yml diff --git a/models/silver/base/silver__event_attributes.sql b/models/silver/core/chainwalkers/silver__event_attributes.sql similarity index 100% rename from models/silver/base/silver__event_attributes.sql rename to models/silver/core/chainwalkers/silver__event_attributes.sql diff --git a/models/silver/base/silver__event_attributes_https.sql b/models/silver/core/chainwalkers/silver__event_attributes_https.sql similarity index 100% rename from models/silver/base/silver__event_attributes_https.sql rename to models/silver/core/chainwalkers/silver__event_attributes_https.sql diff --git a/models/silver/base/silver__event_attributes_https.yml b/models/silver/core/chainwalkers/silver__event_attributes_https.yml similarity index 100% rename from models/silver/base/silver__event_attributes_https.yml rename to models/silver/core/chainwalkers/silver__event_attributes_https.yml diff --git a/models/silver/base/silver__events.sql b/models/silver/core/chainwalkers/silver__events.sql similarity index 100% rename from models/silver/base/silver__events.sql rename to models/silver/core/chainwalkers/silver__events.sql diff --git a/models/silver/base/silver__events.yml b/models/silver/core/chainwalkers/silver__events.yml similarity index 100% rename from models/silver/base/silver__events.yml rename to models/silver/core/chainwalkers/silver__events.yml diff --git a/models/silver/base/silver__events_final.sql b/models/silver/core/chainwalkers/silver__events_final.sql similarity index 100% rename from models/silver/base/silver__events_final.sql rename to models/silver/core/chainwalkers/silver__events_final.sql diff --git a/models/silver/base/silver__events_final.yml b/models/silver/core/chainwalkers/silver__events_final.yml similarity index 100% rename from models/silver/base/silver__events_final.yml rename to models/silver/core/chainwalkers/silver__events_final.yml diff --git a/models/silver/base/silver__transactions.sql b/models/silver/core/chainwalkers/silver__transactions.sql similarity index 100% rename from models/silver/base/silver__transactions.sql rename to models/silver/core/chainwalkers/silver__transactions.sql diff --git a/models/silver/base/silver__transactions.yml b/models/silver/core/chainwalkers/silver__transactions.yml similarity index 100% rename from models/silver/base/silver__transactions.yml rename to models/silver/core/chainwalkers/silver__transactions.yml diff --git a/models/silver/core/streamline/silver__streamline_blocks.sql b/models/silver/core/streamline/silver__streamline_blocks.sql new file mode 100644 index 0000000..d3cdbc8 --- /dev/null +++ b/models/silver/core/streamline/silver__streamline_blocks.sql @@ -0,0 +1,40 @@ +-- depends_on: {{ ref('bronze__streamline_blocks') }} +{{ config( + materialized = 'incremental', + unique_key = "block_number", + cluster_by = "block_timestamp::date", + tags = ['streamline_load', 'core'] +) }} + +SELECT + block_number, + DATA: height :: STRING AS block_height, + DATA: id :: STRING AS block_id, + DATA :timestamp :: timestamp_ntz AS block_timestamp, + ARRAY_SIZE( + DATA :collection_guarantees :: ARRAY + ) AS collection_count, + DATA: parent_id :: STRING AS parent_id, + DATA: signatures :: ARRAY AS signatures, + DATA: collection_guarantees :: ARRAY AS collection_guarantees, + DATA: block_seals :: ARRAY AS block_seals, + _partition_by_block_id, + _inserted_timestamp +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_blocks') }} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + {{ this }} + ) +{% else %} + {{ ref('bronze__streamline_fr_blocks') }} +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY block_number +ORDER BY + _inserted_timestamp DESC)) = 1 diff --git a/models/silver/core/streamline/silver__streamline_blocks.yml b/models/silver/core/streamline/silver__streamline_blocks.yml new file mode 100644 index 0000000..1d61ca2 --- /dev/null +++ b/models/silver/core/streamline/silver__streamline_blocks.yml @@ -0,0 +1,40 @@ +version: 2 + +models: + - name: silver__streamline_blocks + description: -| + Initial table for the gRPC blocks response, loading data into Snowflake from the external AWS table. + + columns: + - name: BLOCK_NUMBER + description: "{{ doc('block_number') }}" + + - name: BLOCK_HEIGHT + description: "{{ doc('block_height') }}" + + - name: BLOCK_ID + description: "{{ doc('block_id') }}" + + - name: BLOCK_TIMESTAMP + description: "{{ doc('block_timestamp') }}" + + - name: COLLECTION_COUNT + description: "{{ doc('collection_count') }}" + + - name: PARENT_ID + description: "{{ doc('parent_id') }}" + + - name: SIGNATURES + description: "{{ doc('signatures') }}" + + - name: COLLECTION_GUARANTEES + description: "{{ doc('collection_guarantees') }}" + + - name: BLOCK_SEALS + description: "{{ doc('block_seals') }}" + + - name: _PARTITION_BY_BLOCK_ID + description: "{{ doc('_partition_by_block_id') }}" + + - name: _INSERTED_TIMESTAMP + description: "{{ doc('_inserted_timestamp') }}" diff --git a/models/silver/core/streamline/silver__streamline_collections.sql b/models/silver/core/streamline/silver__streamline_collections.sql new file mode 100644 index 0000000..0594d88 --- /dev/null +++ b/models/silver/core/streamline/silver__streamline_collections.sql @@ -0,0 +1,35 @@ +-- depends_on: {{ ref('bronze__streamline_collections') }} +{{ config( + materialized = 'incremental', + unique_key = "collection_id", + cluster_by = "block_number", + tags = ['streamline_load', 'core'] +) }} + +SELECT + block_number, + DATA: id :: STRING AS collection_id, + ARRAY_SIZE( + DATA :transaction_ids :: ARRAY + ) AS tx_count, + DATA: transaction_ids :: ARRAY AS transaction_ids, + _partition_by_block_id, + _inserted_timestamp +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_collections') }} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + {{ this }} + ) +{% else %} + {{ ref('bronze__streamline_fr_collections') }} +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY block_number +ORDER BY + _inserted_timestamp DESC)) = 1 diff --git a/models/silver/core/streamline/silver__streamline_collections.yml b/models/silver/core/streamline/silver__streamline_collections.yml new file mode 100644 index 0000000..d0f9eb2 --- /dev/null +++ b/models/silver/core/streamline/silver__streamline_collections.yml @@ -0,0 +1,25 @@ +version: 2 + +models: + - name: silver__streamline_collections + description: -| + Initial table for the gRPC collections response, loading data into Snowflake from the external AWS table. + + columns: + - name: BLOCK_NUMBER + description: "{{ doc('block_number') }}" + + - name: COLLECTION_ID + description: "{{ doc('collection_id') }}" + + - name: TX_COUNT + description: "{{ doc('tx_count') }}" + + - name: TRANSACTION_IDS + description: "{{ doc('transaction_ids') }}" + + - name: _PARTITION_BY_BLOCK_ID + description: "{{ doc('_partition_by_block_id') }}" + + - name: _INSERTED_TIMESTAMP + description: "{{ doc('_inserted_timestamp') }}" diff --git a/models/silver/core/streamline/silver__streamline_transaction_results.sql b/models/silver/core/streamline/silver__streamline_transaction_results.sql new file mode 100644 index 0000000..529d889 --- /dev/null +++ b/models/silver/core/streamline/silver__streamline_transaction_results.sql @@ -0,0 +1,35 @@ +-- depends_on: {{ ref('bronze__streamline_transaction_results') }} +{{ config( + materialized = 'incremental', + unique_key = "tx_id", + cluster_by = "_inserted_timestamp::date", + tags = ['streamline_load', 'core'] +) }} + +SELECT + block_number, + id AS tx_id, + DATA: error_message :: STRING AS error_message, + DATA: events :: ARRAY AS events, + DATA :status :: INT AS status, + DATA :status_code :: INT AS status_code, + _partition_by_block_id, + _inserted_timestamp +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_transaction_results') }} AS t +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + {{ this }} + ) +{% else %} + {{ ref('bronze__streamline_fr_transaction_results') }} AS t +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY t.block_number +ORDER BY + t._inserted_timestamp DESC)) = 1 diff --git a/models/silver/core/streamline/silver__streamline_transaction_results.yml b/models/silver/core/streamline/silver__streamline_transaction_results.yml new file mode 100644 index 0000000..50d69ca --- /dev/null +++ b/models/silver/core/streamline/silver__streamline_transaction_results.yml @@ -0,0 +1,31 @@ +version: 2 + +models: + - name: silver__streamline_transaction_results + description: -| + Initial table for the gRPC transaction results response, loading data into Snowflake from the external AWS table. + + columns: + - name: BLOCK_NUMBER + description: "{{ doc('block_number') }}" + + - name: TX_ID + description: "{{ doc('tx_id') }}" + + - name: ERROR_MESSAGE + description: "{{ doc('error_message') }}" + + - name: EVENTS + description: "{{ doc('events') }}" + + - name: STATUS + description: "{{ doc('status') }}" + + - name: STATUS_CODE + description: "{{ doc('status_code') }}" + + - name: _PARTITION_BY_BLOCK_ID + description: "{{ doc('_partition_by_block_id') }}" + + - name: _INSERTED_TIMESTAMP + description: "{{ doc('_inserted_timestamp') }}" diff --git a/models/silver/core/streamline/silver__streamline_transactions.sql b/models/silver/core/streamline/silver__streamline_transactions.sql new file mode 100644 index 0000000..ee0242b --- /dev/null +++ b/models/silver/core/streamline/silver__streamline_transactions.sql @@ -0,0 +1,40 @@ +-- depends_on: {{ ref('bronze__streamline_transactions') }} +{{ config( + materialized = 'incremental', + unique_key = "tx_id", + cluster_by = "_inserted_timestamp::date", + tags = ['streamline_load', 'core'] +) }} + +SELECT + block_number, + DATA: reference_block_id :: STRING AS block_id, + id AS tx_id, + DATA: gas_limit :: NUMBER AS gas_limit, + DATA: payer :: STRING AS payer, + DATA: arguments :: ARRAY AS arguments, + DATA: authorizers :: ARRAY AS authorizers, + DATA: envelope_signatures :: ARRAY AS envelope_signatures, + DATA: payload_signatures :: ARRAY AS payload_signatures, + DATA: proposal_key :: variant AS proposal_key, + DATA: script :: STRING AS script, + _partition_by_block_id, + _inserted_timestamp +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_transactions') }} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + {{ this }} + ) +{% else %} + {{ ref('bronze__streamline_fr_transactions') }} +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY block_number +ORDER BY + _inserted_timestamp DESC)) = 1 diff --git a/models/silver/core/streamline/silver__streamline_transactions.yml b/models/silver/core/streamline/silver__streamline_transactions.yml new file mode 100644 index 0000000..ec21981 --- /dev/null +++ b/models/silver/core/streamline/silver__streamline_transactions.yml @@ -0,0 +1,46 @@ +version: 2 + +models: + - name: silver__streamline_transactions + description: -| + Initial table for the gRPC transactions response, loading data into Snowflake from the external AWS table. + + columns: + - name: BLOCK_NUMBER + description: "{{ doc('block_number') }}" + + - name: BLOCK_ID + description: "{{ doc('block_id') }}" + + - name: TX_ID + description: "{{ doc('tx_id') }}" + + - name: GAS_LIMIT + description: "{{ doc('gas_limit') }}" + + - name: PAYER + description: "{{ doc('payer') }}" + + - name: ARGUMENTS + description: "{{ doc('arguments') }}" + + - name: AUTHORIZERS + description: "{{ doc('authorizers') }}" + + - name: ENVELOPE_SIGNATURES + description: "{{ doc('envelope_signatures') }}" + + - name: PAYLOAD_SIGNATURES + description: "{{ doc('payload_signatures') }}" + + - name: PROPOSAL_KEY + description: "{{ doc('proposal_key') }}" + + - name: SCRIPT + description: "{{ doc('script') }}" + + - name: _PARTITION_BY_BLOCK_ID + description: "{{ doc('_partition_by_block_id') }}" + + - name: _INSERTED_TIMESTAMP + description: "{{ doc('_inserted_timestamp') }}" diff --git a/models/silver/streamline/core/complete/streamline__complete_get_blocks.sql b/models/silver/streamline/core/complete/streamline__complete_get_blocks.sql index 4c57a37..5515009 100644 --- a/models/silver/streamline/core/complete/streamline__complete_get_blocks.sql +++ b/models/silver/streamline/core/complete/streamline__complete_get_blocks.sql @@ -5,7 +5,8 @@ unique_key = "block_number", cluster_by = "ROUND(block_number, -3)", merge_update_columns = ["block_number"], - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)" + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)", + tags = ['streamline_complete'] ) }} SELECT @@ -25,7 +26,7 @@ WHERE FROM {{ this }} ), - '1900-01-01'::timestamp + '1900-01-01'::timestamp_ntz ) {% else %} {{ ref('bronze__streamline_fr_blocks') }} diff --git a/models/silver/streamline/core/complete/streamline__complete_get_collections.sql b/models/silver/streamline/core/complete/streamline__complete_get_collections.sql index 4fc55c2..69b3b67 100644 --- a/models/silver/streamline/core/complete/streamline__complete_get_collections.sql +++ b/models/silver/streamline/core/complete/streamline__complete_get_collections.sql @@ -4,7 +4,8 @@ unique_key = "id", cluster_by = "ROUND(block_number, -3)", merge_update_columns = ["id"], - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)", + tags = ['streamline_complete'] ) }} SELECT @@ -25,7 +26,7 @@ WHERE FROM {{ this }} ), - '1900-01-01'::timestamp + '1900-01-01'::timestamp_ntz ) {% else %} {{ ref('bronze__streamline_fr_collections') }} diff --git a/models/silver/streamline/core/complete/streamline__complete_get_transaction_results.sql b/models/silver/streamline/core/complete/streamline__complete_get_transaction_results.sql index b6fd819..a5b6ee9 100644 --- a/models/silver/streamline/core/complete/streamline__complete_get_transaction_results.sql +++ b/models/silver/streamline/core/complete/streamline__complete_get_transaction_results.sql @@ -4,7 +4,8 @@ unique_key = "id", cluster_by = "ROUND(block_number, -3)", merge_update_columns = ["id"], - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)", + tags = ['streamline_complete'] ) }} SELECT @@ -25,7 +26,7 @@ WHERE FROM {{ this }} ), - '1900-01-01'::timestamp + '1900-01-01'::timestamp_ntz ) {% else %} {{ ref('bronze__streamline_fr_transaction_results') }} diff --git a/models/silver/streamline/core/complete/streamline__complete_get_transactions.sql b/models/silver/streamline/core/complete/streamline__complete_get_transactions.sql index 381c7e9..5b9b625 100644 --- a/models/silver/streamline/core/complete/streamline__complete_get_transactions.sql +++ b/models/silver/streamline/core/complete/streamline__complete_get_transactions.sql @@ -4,7 +4,8 @@ unique_key = "id", cluster_by = "ROUND(block_number, -3)", merge_update_columns = ["id"], - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)", + tags = ['streamline_complete'] ) }} SELECT @@ -25,7 +26,7 @@ WHERE FROM {{ this }} ), - '1900-01-01'::timestamp + '1900-01-01'::timestamp_ntz ) {% else %} {{ ref('bronze__streamline_fr_transactions') }} diff --git a/models/silver/streamline/core/history/collections/streamline__get_collections_history_mainnet22.sql b/models/silver/streamline/core/history/collections/streamline__get_collections_history_mainnet22.sql deleted file mode 100644 index 65244a0..0000000 --- a/models/silver/streamline/core/history/collections/streamline__get_collections_history_mainnet22.sql +++ /dev/null @@ -1,47 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = if_data_call_function( - func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','node_url','access-001.mainnet22.nodes.onflow.org:9000','external_table', 'collections', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", - target = "{{this.schema}}.{{this.identifier}}" - ) -) }} - --- CTE to get all block_heights and their associated collection_ids from the complete_get_blocks table -WITH block_collections AS ( - SELECT - cb.block_number AS block_height, - collection_guarantee.value:collection_id AS collection_id - FROM - {{ ref("streamline__complete_get_blocks") }} cb, - LATERAL FLATTEN(input => cb.data:collection_guarantees) AS collection_guarantee -), - --- CTE to identify collections that haven't been ingested yet -collections_to_ingest AS ( - SELECT - bc.block_height, - bc.collection_id - FROM - block_collections bc - LEFT JOIN - {{ ref("streamline__complete_get_collections") }} c - ON bc.block_height = c.block_number - AND bc.collection_id = c.id - WHERE - c.id IS NULL -) - --- Generate the requests based on the missing collections -SELECT - OBJECT_CONSTRUCT( - 'grpc', 'proto3', - 'method', 'get_collection_by_i_d', - 'block_height', block_height::INTEGER, - 'method_params', OBJECT_CONSTRUCT('id', collection_id) - ) AS request -FROM - collections_to_ingest -WHERE - block_height BETWEEN 47169687 AND 55114466 -- Mainnet22 block range -ORDER BY - block_height ASC \ No newline at end of file diff --git a/models/silver/streamline/core/history/blocks/streamline__get_blocks_history_mainnet22.sql b/models/silver/streamline/core/history/streamline__get_blocks_history.sql similarity index 59% rename from models/silver/streamline/core/history/blocks/streamline__get_blocks_history_mainnet22.sql rename to models/silver/streamline/core/history/streamline__get_blocks_history.sql index ac51a9c..d5d840b 100644 --- a/models/silver/streamline/core/history/blocks/streamline__get_blocks_history_mainnet22.sql +++ b/models/silver/streamline/core/history/streamline__get_blocks_history.sql @@ -1,13 +1,13 @@ -{{ config ( +{{ config( materialized = "view", post_hook = if_data_call_function( - func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','node_url','access-001.mainnet22.nodes.onflow.org:9000','external_table', 'blocks', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", + func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','node_url', '{{ var('node_url', Null) }}','external_table', 'blocks', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", target = "{{this.schema}}.{{this.identifier}}" - ) + ), + tags = ['streamline_history'] ) }} WITH blocks AS ( - SELECT block_height FROM @@ -28,6 +28,6 @@ SELECT FROM blocks WHERE - block_height BETWEEN 47169687 AND 55114466 -- Mainnet22 block range + block_height BETWEEN {{ var('start_block', Null) }} AND {{ var('end_block', Null) }} ORDER BY block_height ASC diff --git a/models/silver/streamline/core/history/streamline__get_collections_history.sql b/models/silver/streamline/core/history/streamline__get_collections_history.sql new file mode 100644 index 0000000..53a4747 --- /dev/null +++ b/models/silver/streamline/core/history/streamline__get_collections_history.sql @@ -0,0 +1,76 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','node_url','{{ var('node_url', Null) }}','external_table', 'collections', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", + target = "{{this.schema}}.{{this.identifier}}" + ), + tags = ['streamline_history'] +) }} + +WITH blocks AS ( + + SELECT + block_height + FROM + {{ ref("streamline__blocks") }} + EXCEPT + SELECT + block_number AS block_height + FROM + {{ ref("streamline__complete_get_collections") }} +), +collections AS ( + SELECT + block_number AS block_height, + DATA + FROM + {{ ref('streamline__complete_get_blocks') }} + JOIN blocks + ON blocks.block_height = block_number +), +-- CTE to get all block_heights and their associated collection_ids from the complete_get_blocks table +block_collections AS ( + SELECT + cb.block_number AS block_height, + collection_guarantee.value :collection_id AS collection_id + FROM + {{ ref("streamline__complete_get_blocks") }} + cb, + LATERAL FLATTEN( + input => cb.data :collection_guarantees + ) AS collection_guarantee +), +-- CTE to identify collections that haven't been ingested yet +collections_to_ingest AS ( + SELECT + bc.block_height, + bc.collection_id + FROM + block_collections bc + LEFT JOIN {{ ref("streamline__complete_get_collections") }} C + ON bc.block_height = C.block_number + AND bc.collection_id = C.id + WHERE + C.id IS NULL +) -- Generate the requests based on the missing collections +SELECT + OBJECT_CONSTRUCT( + 'grpc', + 'proto3', + 'method', + 'get_collection_by_i_d', + 'block_height', + block_height :: INTEGER, + 'method_params', + OBJECT_CONSTRUCT( + 'id', + collection_id + ) + ) AS request +FROM + collections_to_ingest +WHERE + block_height BETWEEN {{ var('start_block', Null) }} + AND {{ var('end_block', Null) }} +ORDER BY + block_height ASC diff --git a/models/silver/streamline/core/history/streamline__get_transaction_results_history.sql b/models/silver/streamline/core/history/streamline__get_transaction_results_history.sql new file mode 100644 index 0000000..eb9e40c --- /dev/null +++ b/models/silver/streamline/core/history/streamline__get_transaction_results_history.sql @@ -0,0 +1,43 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}', 'node_url', '{{ var('node_url', Null) }}', 'external_table', 'transaction_results', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','30000')}}, 'worker_batch_size', {{var('worker_batch_size','3000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", + target = "{{this.schema}}.{{this.identifier}}" + ), + tags = ['streamline_history'] +) }} + +WITH blocks AS ( + SELECT + block_height + FROM + {{ ref("streamline__blocks") }} + EXCEPT + SELECT + block_number as block_height + FROM + {{ ref("streamline__complete_get_transaction_results") }} +), +tx AS ( + SELECT + block_number as block_height, + data + FROM + {{ ref('streamline__complete_get_collections') }} + JOIN blocks ON blocks.block_height = block_number +) +SELECT + OBJECT_CONSTRUCT( + 'grpc', 'proto3', + 'method', 'get_transaction_result', + 'block_height', block_height::INTEGER, + 'transaction_id', transaction_id.value::string, + 'method_params', OBJECT_CONSTRUCT('id', transaction_id.value::string) + ) AS request +FROM + tx, + LATERAL FLATTEN(input => TRY_PARSE_JSON(data):transaction_ids) AS transaction_id +WHERE + block_height BETWEEN {{ var('start_block', Null) }} AND {{ var('end_block', Null) }} +ORDER BY + block_height ASC diff --git a/models/silver/streamline/core/history/streamline__get_transactions_history.sql b/models/silver/streamline/core/history/streamline__get_transactions_history.sql new file mode 100644 index 0000000..30bd59d --- /dev/null +++ b/models/silver/streamline/core/history/streamline__get_transactions_history.sql @@ -0,0 +1,43 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}', 'node_url', '{{ var('node_url', Null) }}', 'external_table', 'transactions', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','30000')}}, 'worker_batch_size', {{var('worker_batch_size','3000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", + target = "{{this.schema}}.{{this.identifier}}" + ), + tags = ['streamline_history'] +) }} + +WITH blocks AS ( + SELECT + block_height + FROM + {{ ref("streamline__blocks") }} + EXCEPT + SELECT + block_number as block_height + FROM + {{ ref("streamline__complete_get_transactions") }} +), +tx AS ( + SELECT + block_number as block_height, + data + FROM + {{ ref('streamline__complete_get_collections') }} + JOIN blocks ON blocks.block_height = block_number +) +SELECT + OBJECT_CONSTRUCT( + 'grpc', 'proto3', + 'method', 'get_transaction', + 'block_height', block_height::INTEGER, + 'transaction_id', transaction_id.value::string, + 'method_params', OBJECT_CONSTRUCT('id', transaction_id.value::string) + ) AS request +FROM + tx, + LATERAL FLATTEN(input => TRY_PARSE_JSON(data):transaction_ids) AS transaction_id +WHERE + block_height BETWEEN {{ var('start_block', Null) }} AND {{ var('end_block', Null) }} +ORDER BY + block_height ASC diff --git a/models/silver/streamline/core/history/transaction_results/streamline__get_transaction_results_history_mainnet22.sql b/models/silver/streamline/core/history/transaction_results/streamline__get_transaction_results_history_mainnet22.sql deleted file mode 100644 index 5bfcabc..0000000 --- a/models/silver/streamline/core/history/transaction_results/streamline__get_transaction_results_history_mainnet22.sql +++ /dev/null @@ -1,44 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = if_data_call_function( - func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','node_url','access-001.mainnet22.nodes.onflow.org:9000','external_table', 'transaction_results', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','30000')}}, 'worker_batch_size', {{var('worker_batch_size','3000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", - target = "{{this.schema}}.{{this.identifier}}" - ) -) }} - --- CTE to get all transaction_ids from the complete_get_collections table -WITH collection_transactions AS ( - SELECT - block_number AS block_height, - transaction.value::STRING AS transaction_id - FROM - {{ ref("streamline__complete_get_collections") }} cc, - LATERAL FLATTEN(input => cc.data:transaction_ids) AS transaction -), - --- CTE to identify transaction_results that haven't been ingested yet -transaction_results_to_ingest AS ( - SELECT - ct.block_height, - ct.transaction_id - FROM - collection_transactions ct - LEFT JOIN - {{ ref("streamline__complete_get_transaction_results") }} tr ON ct.transaction_id = tr.id - WHERE - tr.id IS NULL -) - --- Generate the requests column based on the missing transactions -SELECT - OBJECT_CONSTRUCT( - 'grpc', 'proto3', - 'method', 'get_transaction_result', - 'block_height', block_height::INTEGER, - 'transaction_id', transaction_id::STRING, - 'method_params', OBJECT_CONSTRUCT('id', transaction_id::STRING) - ) AS request -FROM - transaction_results_to_ingest -ORDER BY - block_height ASC \ No newline at end of file diff --git a/models/silver/streamline/core/history/transactions/streamline__get_transactions_history_mainnet22.sql b/models/silver/streamline/core/history/transactions/streamline__get_transactions_history_mainnet22.sql deleted file mode 100644 index b215bae..0000000 --- a/models/silver/streamline/core/history/transactions/streamline__get_transactions_history_mainnet22.sql +++ /dev/null @@ -1,45 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = if_data_call_function( - func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','node_url','access-001.mainnet22.nodes.onflow.org:9000','external_table', 'transactions', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','30000')}}, 'worker_batch_size', {{var('worker_batch_size','3000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", - target = "{{this.schema}}.{{this.identifier}}" - ) -) }} - --- CTE to get all transaction_ids from the complete_get_collections table -WITH collection_transactions AS ( - SELECT - block_number AS block_height, - transaction.value::STRING AS transaction_id - FROM - {{ ref("streamline__complete_get_collections") }} cc, - LATERAL FLATTEN(input => cc.data:transaction_ids) AS transaction -), - --- CTE to identify transactions that haven't been ingested yet -transactions_to_ingest AS ( - SELECT - ct.block_height, - ct.transaction_id - FROM - collection_transactions ct - LEFT JOIN - {{ ref("streamline__complete_get_transactions") }} t ON ct.transaction_id = t.id - WHERE - t.id IS NULL -) - --- Generate the requests based on the missing transactions -SELECT - OBJECT_CONSTRUCT( - 'grpc', 'proto3', - 'method', 'get_transaction', - 'block_height', block_height::INTEGER, - 'transaction_id', transaction_id::STRING, - 'method_params', OBJECT_CONSTRUCT('id', transaction_id::STRING) - ) AS request -FROM - transactions_to_ingest -ORDER BY - block_height ASC - diff --git a/models/silver/streamline/core/realtime/streamline__get_blocks_realtime.sql b/models/silver/streamline/core/realtime/streamline__get_blocks_realtime.sql new file mode 100644 index 0000000..5bb5064 --- /dev/null +++ b/models/silver/streamline/core/realtime/streamline__get_blocks_realtime.sql @@ -0,0 +1,64 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','node_url','access.mainnet.nodes.onflow.org:9000','external_table', 'blocks', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", + target = "{{this.schema}}.{{this.identifier}}" + ), + tags = ['streamline_realtime'] +) }} + +WITH last_3_days AS ({% if var('STREAMLINE_RUN_HISTORY') %} + {# TODO - this can't be 0, has to be block height of current spork #} + 0 AS block_height +{% else %} + + SELECT + MAX(block_height) - 210000 AS block_height + FROM + {{ ref('streamline__blocks') }} + {% endif %}), + tbl AS ( + SELECT + block_height + FROM + {{ ref('streamline__blocks') }} + WHERE + ( + block_height >= ( + SELECT + block_height + FROM + last_3_days + ) + ) + AND block_height IS NOT NULL + EXCEPT + SELECT + block_number AS block_height + FROM + {{ ref('streamline__complete_get_blocks') }} + WHERE + ( + block_height >= ( + SELECT + block_height + FROM + last_3_days + ) + ) + AND block_height IS NOT NULL + ) +SELECT + OBJECT_CONSTRUCT( + 'grpc', 'proto3', + 'method', 'get_block_by_height', + 'block_height', block_height :: INTEGER, + 'method_params', OBJECT_CONSTRUCT( + 'height', + block_height + ) + ) +FROM + tbl +ORDER BY + block_height ASC diff --git a/models/silver/streamline/core/realtime/streamline__get_collections_realtime.sql b/models/silver/streamline/core/realtime/streamline__get_collections_realtime.sql new file mode 100644 index 0000000..3553855 --- /dev/null +++ b/models/silver/streamline/core/realtime/streamline__get_collections_realtime.sql @@ -0,0 +1,98 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','node_url','access.mainnet.nodes.onflow.org:9000','external_table', 'collections', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", + target = "{{this.schema}}.{{this.identifier}}" + ), + tags = ['streamline_realtime'] +) }} + +WITH last_3_days AS ({% if var('STREAMLINE_RUN_HISTORY') %} + {# TODO - this can't be 0, has to be block height of current spork #} + 0 AS block_height +{% else %} + + SELECT + MAX(block_height) - 210000 AS block_height + FROM + {{ ref('streamline__blocks') }} + {% endif %}), + tbl AS ( + SELECT + block_height + FROM + {{ ref('streamline__blocks') }} + WHERE + ( + block_height >= ( + SELECT + block_height + FROM + last_3_days + ) + ) + AND block_height IS NOT NULL + EXCEPT + SELECT + block_number AS block_height + FROM + {{ ref('streamline__complete_get_collections') }} + WHERE + ( + block_height >= ( + SELECT + block_height + FROM + last_3_days + ) + ) + AND block_height IS NOT NULL + ), + collections AS ( + SELECT + block_number AS block_height, + DATA + FROM + {{ ref('streamline__complete_get_blocks') }} + JOIN tbl + ON tbl.block_height = block_number +), +-- CTE to get all block_heights and their associated collection_ids from the complete_get_blocks table +block_collections AS ( + SELECT + cb.block_number AS block_height, + collection_guarantee.value :collection_id AS collection_id + FROM + {{ ref("streamline__complete_get_blocks") }} + cb, + LATERAL FLATTEN( + input => cb.data :collection_guarantees + ) AS collection_guarantee +), +-- CTE to identify collections that haven't been ingested yet +collections_to_ingest AS ( + SELECT + bc.block_height, + bc.collection_id + FROM + block_collections bc + LEFT JOIN {{ ref("streamline__complete_get_collections") }} C + ON bc.block_height = C.block_number + AND bc.collection_id = C.id + WHERE + C.id IS NULL +) +SELECT + OBJECT_CONSTRUCT( + 'grpc', 'proto3', + 'method', 'get_collection_by_i_d', + 'block_height', block_height :: INTEGER, + 'method_params', OBJECT_CONSTRUCT( + 'id', + collection_id + ) + ) AS request +FROM + collections_to_ingest +ORDER BY + block_height ASC diff --git a/models/silver/streamline/core/realtime/streamline__get_transaction_results_realtime.sql b/models/silver/streamline/core/realtime/streamline__get_transaction_results_realtime.sql new file mode 100644 index 0000000..954eb6f --- /dev/null +++ b/models/silver/streamline/core/realtime/streamline__get_transaction_results_realtime.sql @@ -0,0 +1,96 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','node_url','access.mainnet.nodes.onflow.org:9000','external_table', 'transaction_results', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", + target = "{{this.schema}}.{{this.identifier}}" + ), + tags = ['streamline_realtime'] +) }} + +WITH last_3_days AS ({% if var('STREAMLINE_RUN_HISTORY') %} + {# TODO - this can't be 0, has to be block height of current spork #} + 0 AS block_height +{% else %} + + SELECT + MAX(block_height) - 210000 AS block_height + FROM + {{ ref('streamline__blocks') }} + {% endif %}), + tbl AS ( + SELECT + block_height + FROM + {{ ref('streamline__blocks') }} + WHERE + ( + block_height >= ( + SELECT + block_height + FROM + last_3_days + ) + ) + AND block_height IS NOT NULL + EXCEPT + SELECT + block_number as block_height + FROM + {{ ref('streamline__complete_get_transactions') }} + WHERE + ( + block_height >= ( + SELECT + block_height + FROM + last_3_days + ) + ) + AND block_height IS NOT NULL + ), + collection_transactions AS ( + SELECT + block_number AS block_height, + TRANSACTION.value :: STRING AS transaction_id + FROM + {{ ref("streamline__complete_get_collections") }} + cc, + LATERAL FLATTEN( + input => cc.data :transaction_ids + ) AS TRANSACTION + WHERE + block_height IN ( + SELECT + block_height + FROM + tbl + ) + ), + -- CTE to identify transactions that haven't been ingested yet + transactions_to_ingest AS ( + SELECT + ct.block_height, + ct.transaction_id + FROM + collection_transactions ct + LEFT JOIN {{ ref("streamline__complete_get_transaction_results") }} + t + ON ct.transaction_id = t.id + WHERE + t.id IS NULL + ) -- Generate the requests based on the missing transactions +SELECT + OBJECT_CONSTRUCT( + 'grpc', 'proto3', + 'method', 'get_transaction_result', + 'block_height', block_height :: INTEGER, + 'transaction_id', transaction_id :: STRING, + 'method_params', OBJECT_CONSTRUCT( + 'id', + transaction_id :: STRING + ) + ) AS request +FROM + transactions_to_ingest +ORDER BY + block_height ASC diff --git a/models/silver/streamline/core/realtime/streamline__get_transactions_realtime.sql b/models/silver/streamline/core/realtime/streamline__get_transactions_realtime.sql new file mode 100644 index 0000000..14349ab --- /dev/null +++ b/models/silver/streamline/core/realtime/streamline__get_transactions_realtime.sql @@ -0,0 +1,96 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','node_url','access.mainnet.nodes.onflow.org:9000','external_table', 'transactions', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", + target = "{{this.schema}}.{{this.identifier}}" + ), + tags = ['streamline_realtime'] +) }} + +WITH last_3_days AS ({% if var('STREAMLINE_RUN_HISTORY') %} + {# TODO - this can't be 0, has to be block height of current spork #} + 0 AS block_height +{% else %} + + SELECT + MAX(block_height) - 210000 AS block_height + FROM + {{ ref('streamline__blocks') }} + {% endif %}), + tbl AS ( + SELECT + block_height + FROM + {{ ref('streamline__blocks') }} + WHERE + ( + block_height >= ( + SELECT + block_height + FROM + last_3_days + ) + ) + AND block_height IS NOT NULL + EXCEPT + SELECT + block_number AS block_height + FROM + {{ ref('streamline__complete_get_transactions') }} + WHERE + ( + block_height >= ( + SELECT + block_height + FROM + last_3_days + ) + ) + AND block_height IS NOT NULL + ), + collection_transactions AS ( + SELECT + block_number AS block_height, + TRANSACTION.value :: STRING AS transaction_id + FROM + {{ ref("streamline__complete_get_collections") }} + cc, + LATERAL FLATTEN( + input => cc.data :transaction_ids + ) AS TRANSACTION + WHERE + block_height IN ( + SELECT + block_height + FROM + tbl + ) + ), + -- CTE to identify transactions that haven't been ingested yet + transactions_to_ingest AS ( + SELECT + ct.block_height, + ct.transaction_id + FROM + collection_transactions ct + LEFT JOIN {{ ref("streamline__complete_get_transactions") }} + t + ON ct.transaction_id = t.id + WHERE + t.id IS NULL + ) -- Generate the requests based on the missing transactions +SELECT + OBJECT_CONSTRUCT( + 'grpc', 'proto3', + 'method', 'get_transaction', + 'block_height', block_height :: INTEGER, + 'transaction_id', transaction_id :: STRING, + 'method_params', OBJECT_CONSTRUCT( + 'id', + transaction_id :: STRING + ) + ) AS request +FROM + transactions_to_ingest +ORDER BY + block_height ASC diff --git a/python/dbt_execute_flow_streamline.py b/python/dbt_execute_flow_streamline.py new file mode 100644 index 0000000..6fb1846 --- /dev/null +++ b/python/dbt_execute_flow_streamline.py @@ -0,0 +1,57 @@ +# run_dbt_for_seed.py +import csv +import subprocess +import sys +import time + + +def run_dbt_for_model(model_name, node_url, root_height, end_height, use_dev=False): + cmd = [ + "dbt", + "run", + "--threads", + "8", + "--vars", + f'{{"node_url":"{node_url}", "start_block":{root_height}, "end_block":{end_height},"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES":{use_dev}}}', + "-s", + f"1+streamline__get_{model_name}_history" + ] + + subprocess.run(cmd) + + +def main(model_name, use_dev=False): + seed_file = "./data/seeds__network_version.csv" + + with open(seed_file, "r") as file: + reader = csv.DictReader(file) + + for i, row in enumerate(reader): + root_height = row["root_height"] + node_url = row["node_url"] + end_height = row["end_height"] + + # segment the backfill into batches of 5 networks at a time, starting with the most recent 5 + # source CSV contains 29 networks, but the first 3 (candidates 3-6) are inaccessible + # so, valid rows are 4-29, or 25 rows + if i >= 25: + run_dbt_for_model(model_name, node_url, root_height, end_height, use_dev) + else: + continue + + +if __name__ == "__main__": + # accept model name as cli argument and pass to main + model_name = sys.argv[1] + # acceptable model names: blocks, collections, transactions, transaction_results + if model_name not in ["blocks", "collections", "transactions", "transaction_results"]: + raise ValueError("model_name must be one of the following: blocks, collections, transactions, transaction_results") + + # use_dev is optional cli argument that accepts only True or False + use_dev = False + if len(sys.argv) > 2: + use_dev = sys.argv[2] + if use_dev not in ["True", "False"]: + raise ValueError("use_dev must be True or False") + + main(model_name, use_dev) diff --git a/python_scripts/test_alert/dbt_test_alert.py b/python/dbt_test_alert.py similarity index 100% rename from python_scripts/test_alert/dbt_test_alert.py rename to python/dbt_test_alert.py