Streamline Realtime and Backfill Models (#156)

* add: model to backfill

* fix: node's string format

* add tags

* upd collection model

* mainnet21 hardcoded collections history model

* del tag

* mainnet 21 getblock

* tag for 21

* realtime models

* alias num as height

* realtime tags

* add missing tag and newlines

* backfiller

* backfiller

* move script to folder (renamed) python, upd test accordingly w dir name ch

* upd script to accept model input, update jobs per method call

* error w use_dev arg

* add: silver mdoels

* limit backfill job in python script

* rename silver dbt models to streamline_ and move into silver/core

* explicit casting to silver streamline models

* add documentation to silver streamline models

* run only current mainnet and history mainnet 22 first

* activate schedule for gha

* del hardcoded mainnet models

* move history modes out of subdirs into history dir

* fix GHA vars

* del upstream 1+ from history step

* del tag

---------

Co-authored-by: Jack Forgash <jmfxyz@pm.me>
This commit is contained in:
WHYTEWYLL 2023-08-30 16:38:01 +02:00 committed by GitHub
parent 920d811c90
commit 6ce17a7219
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
62 changed files with 1097 additions and 191 deletions

View File

@ -1,15 +1,15 @@
name: dbt_run_streamline_blocks_history
run-name: dbt_run_streamline_blocks_history
name: dbt_run_streamline_blocks
run-name: dbt_run_streamline_blocks
on:
workflow_dispatch:
schedule:
# Runs "at 10:00 UTC AM" (see https://crontab.guru)
- cron: '0 10 * * *'
env:
DBT_PROFILES_DIR: ./
# Runs on minute 15 (see https://crontab.guru)
- cron: "15 * * * *"
env:
USE_VARS: "${{ vars.USE_VARS }}"
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
@ -25,7 +25,7 @@ concurrency:
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
environment:
name: workflow_prod
steps:
@ -40,6 +40,11 @@ jobs:
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
- name: Run DBT Realtime
run: |
dbt run --threads 8 --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/silver/streamline/core/history/blocks --profile flow --target prod
dbt run -s 1+streamline__get_blocks_realtime --vars '{"STREAMLINE_INVOKE_STREAMS": True}'
- name: Run DBT History Jobs - Mainnet22
run: |
dbt run -s 1+streamline__get_blocks_history --vars '{"STREAMLINE_INVOKE_STREAMS": True, "node_url": "access-001.mainnet22.nodes.onflow.org:9000", "start_block": 47169687, "end_block": 55114466}'

View File

@ -1,15 +1,15 @@
name: dbt_run_streamline_collections_history
run-name: dbt_run_streamline_collections_history
name: dbt_run_streamline_collections
run-name: dbt_run_streamline_collections
on:
workflow_dispatch:
schedule:
# Runs "at 10:00 UTC AM" (see https://crontab.guru)
- cron: '0 10 * * *'
env:
DBT_PROFILES_DIR: ./
# Runs on minute 30 (see https://crontab.guru)
- cron: "30 * * * *"
env:
USE_VARS: "${{ vars.USE_VARS }}"
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
@ -25,7 +25,7 @@ concurrency:
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
environment:
name: workflow_prod
steps:
@ -40,6 +40,12 @@ jobs:
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
- name: Run DBT Realtime
run: |
dbt run --threads 8 --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/silver/streamline/core/history/collections --profile flow --target prod
dbt run -s 1+streamline__get_collections_realtime --vars '{"STREAMLINE_INVOKE_STREAMS": True}'
- name: Run DBT History Jobs - Mainnet22
run: |
dbt run -s 1+streamline__get_collections_history --vars '{"STREAMLINE_INVOKE_STREAMS": True, "node_url": "access-001.mainnet22.nodes.onflow.org:9000", "start_block": 47169687, "end_block": 55114466}'

View File

@ -0,0 +1,36 @@
name: dbt_run_scheduled
run-name: dbt_run_scheduled
on:
workflow_dispatch:
schedule:
# Runs "every hour" (see https://crontab.guru)
- cron: '0 */1 * * *'
env:
USE_VARS: "${{ vars.USE_VARS }}"
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
DBT_VERSION: "${{ vars.DBT_VERSION }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
called_workflow_template:
uses: FlipsideCrypto/analytics-workflow-templates/.github/workflows/dbt_run_template.yml@main
with:
dbt_command: >
dbt run-operation stage_external_sources --vars "ext_full_refresh: true";
dbt seed;
dbt run -s tag:streamline_load tag:streamline_complete
environment: workflow_prod
warehouse: ${{ vars.WAREHOUSE }}
secrets: inherit

View File

@ -1,15 +1,15 @@
name: dbt_run_streamline_transaction_results_history
run-name: dbt_run_streamline_transaction_results_history
name: dbt_run_streamline_transaction_results
run-name: dbt_run_streamline_transaction_results
on:
workflow_dispatch:
schedule:
# Runs "at 10:00 UTC AM" (see https://crontab.guru)
- cron: '0 10 * * *'
env:
DBT_PROFILES_DIR: ./
# Runs on minute 45 (see https://crontab.guru)
- cron: "45 * * * *"
env:
USE_VARS: "${{ vars.USE_VARS }}"
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
@ -25,7 +25,7 @@ concurrency:
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
environment:
name: workflow_prod
steps:
@ -40,6 +40,12 @@ jobs:
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
- name: Run DBT Realtime
run: |
dbt run --threads 8 --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/silver/streamline/core/history/transaction_results --profile flow --target prod
dbt run -s 1+streamline__get_transaction_results_realtime --vars '{"STREAMLINE_INVOKE_STREAMS": True}'
- name: Run DBT History Jobs - Mainnet22
run: |
dbt run -s 1+streamline__get_transaction_results_history --vars '{"STREAMLINE_INVOKE_STREAMS": True, "node_url": "access-001.mainnet22.nodes.onflow.org:9000", "start_block": 47169687, "end_block": 55114466}'

View File

@ -1,15 +1,15 @@
name: dbt_run_streamline_transactions_history
run-name: dbt_run_streamline_transactions_history
name: dbt_run_streamline_transactions
run-name: dbt_run_streamline_transactions
on:
workflow_dispatch:
schedule:
# Runs "at 10:00 UTC AM" (see https://crontab.guru)
- cron: '0 10 * * *'
env:
DBT_PROFILES_DIR: ./
# Runs on minute 45 (see https://crontab.guru)
- cron: "45 * * * *"
env:
USE_VARS: "${{ vars.USE_VARS }}"
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
@ -25,7 +25,7 @@ concurrency:
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
environment:
name: workflow_prod
steps:
@ -40,6 +40,12 @@ jobs:
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
- name: Run DBT Realtime
run: |
dbt run --threads 8 --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/silver/streamline/core/history/transactions --profile flow --target prod
dbt run -s 1+streamline__get_transactions_realtime --vars '{"STREAMLINE_INVOKE_STREAMS": True}'
- name: Run DBT History Jobs - Mainnet22
run: |
dbt run -s 1+streamline__get_transactions_history --vars '{"STREAMLINE_INVOKE_STREAMS": True, "node_url": "access-001.mainnet22.nodes.onflow.org:9000", "start_block": 47169687, "end_block": 55114466}'

View File

@ -49,4 +49,4 @@ jobs:
- name: Log test results
run: |
python python_scripts/test_alert/dbt_test_alert.py
python python/dbt_test_alert.py

View File

@ -27,4 +27,4 @@ root_height,network_version,node_url,end_height
40171634,mainnet-20,access-001.mainnet20.nodes.onflow.org:9000,44950206
44950207,mainnet-21,access-001.mainnet21.nodes.onflow.org:9000,47169686
47169687,mainnet-22,access-001.mainnet22.nodes.onflow.org:9000,55114466
55114467,mainnet-23,access-001.mainnet23.nodes.onflow.org:9000,55114467
55114467,mainnet-23,access-001.mainnet23.nodes.onflow.org:9000,100000000

1 root_height network_version node_url end_height
27 40171634 mainnet-20 access-001.mainnet20.nodes.onflow.org:9000 44950206
28 44950207 mainnet-21 access-001.mainnet21.nodes.onflow.org:9000 47169686
29 47169687 mainnet-22 access-001.mainnet22.nodes.onflow.org:9000 55114466
30 55114467 mainnet-23 access-001.mainnet23.nodes.onflow.org:9000 55114467 100000000

View File

@ -59,3 +59,4 @@ vars:
OBSERV_FULL_TEST: False
STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES: False
STREAMLINE_INVOKE_STREAMS: False
STREAMLINE_RUN_HISTORY: False

View File

@ -0,0 +1,6 @@
{% docs _partition_by_block_id %}
Partition column used for efficient querying.
`ROUND(block_number, -5)`
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs arguments %}
The arguments included in the transaction body.
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs block_id %}
The id, or hash, of the block.
{% enddocs %}

View File

@ -0,0 +1,6 @@
{% docs block_number %}
The block number, corresponds with height.
{% enddocs %}

View File

@ -0,0 +1,7 @@
{% docs block_seals %}
A block seal is an attestation that the execution result of a specific block has been verified and approved by a quorum of verification nodes.
https://developers.flow.com/concepts/nodes/access-api#block-seal
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs collection_count %}
The number of collections included in a blocks.
{% enddocs %}

View File

@ -0,0 +1,7 @@
{% docs collection_guarantees %}
A signed attestation that specifies the collection nodes that have guaranteed to store and respond to queries about a collection.
https://developers.flow.com/concepts/nodes/access-api#collection-guarantee
{% enddocs %}

View File

@ -0,0 +1,7 @@
{% docs collection_id %}
SHA3-256 hash of the collection contents.
https://developers.flow.com/concepts/nodes/access-api#collections
{% enddocs %}

View File

@ -0,0 +1,7 @@
{% docs envelope_signatures %}
A list of signatures generated by the payer role.
https://developers.flow.com/concepts/start-here/transaction-signing#anatomy-of-a-transaction
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs error_message %}
The error message, if a transaction has failed.
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs events %}
An array containing all events that occurred in the transaction.
{% enddocs %}

View File

@ -0,0 +1,8 @@
{% docs payload_signatures %}
The authorizing signatures included with the payload in the transaction body.
https://developers.flow.com/concepts/start-here/transaction-signing#payload
{% enddocs %}

View File

@ -0,0 +1,9 @@
{% docs proposal_key %}
Each transaction must declare a proposal key, which can be an account key from any Flow account. The account that owns the proposal key is referred to as the proposer.
A proposal key definition declares the address, key ID, and up-to-date sequence number for the account key.
https://developers.flow.com/concepts/start-here/transaction-signing#proposal-key
{% enddocs %}

View File

@ -0,0 +1,7 @@
{% docs script %}
The Cadence script executed in this transaction.
https://developers.flow.com/tooling/fcl-js/scripts
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs signatures %}
An array of signatures contained in the block header.
{% enddocs %}

View File

@ -0,0 +1,7 @@
{% docs status %}
The transaction status.
https://developers.flow.com/tooling/fcl-js/api#transaction-statuses
{% enddocs %}

View File

@ -0,0 +1,7 @@
{% docs status_code %}
The transaction status code.
https://developers.flow.com/tooling/fcl-js/api#transaction-statuses
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs transaction_ids %}
An array containing all transaction IDs that were included in the collection.
{% enddocs %}

View File

@ -0,0 +1,40 @@
-- depends_on: {{ ref('bronze__streamline_blocks') }}
{{ config(
materialized = 'incremental',
unique_key = "block_number",
cluster_by = "block_timestamp::date",
tags = ['streamline_load', 'core']
) }}
SELECT
block_number,
DATA: height :: STRING AS block_height,
DATA: id :: STRING AS block_id,
DATA :timestamp :: timestamp_ntz AS block_timestamp,
ARRAY_SIZE(
DATA :collection_guarantees :: ARRAY
) AS collection_count,
DATA: parent_id :: STRING AS parent_id,
DATA: signatures :: ARRAY AS signatures,
DATA: collection_guarantees :: ARRAY AS collection_guarantees,
DATA: block_seals :: ARRAY AS block_seals,
_partition_by_block_id,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_blocks') }}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
{{ this }}
)
{% else %}
{{ ref('bronze__streamline_fr_blocks') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY block_number
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,40 @@
version: 2
models:
- name: silver__streamline_blocks
description: -|
Initial table for the gRPC blocks response, loading data into Snowflake from the external AWS table.
columns:
- name: BLOCK_NUMBER
description: "{{ doc('block_number') }}"
- name: BLOCK_HEIGHT
description: "{{ doc('block_height') }}"
- name: BLOCK_ID
description: "{{ doc('block_id') }}"
- name: BLOCK_TIMESTAMP
description: "{{ doc('block_timestamp') }}"
- name: COLLECTION_COUNT
description: "{{ doc('collection_count') }}"
- name: PARENT_ID
description: "{{ doc('parent_id') }}"
- name: SIGNATURES
description: "{{ doc('signatures') }}"
- name: COLLECTION_GUARANTEES
description: "{{ doc('collection_guarantees') }}"
- name: BLOCK_SEALS
description: "{{ doc('block_seals') }}"
- name: _PARTITION_BY_BLOCK_ID
description: "{{ doc('_partition_by_block_id') }}"
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp') }}"

View File

@ -0,0 +1,35 @@
-- depends_on: {{ ref('bronze__streamline_collections') }}
{{ config(
materialized = 'incremental',
unique_key = "collection_id",
cluster_by = "block_number",
tags = ['streamline_load', 'core']
) }}
SELECT
block_number,
DATA: id :: STRING AS collection_id,
ARRAY_SIZE(
DATA :transaction_ids :: ARRAY
) AS tx_count,
DATA: transaction_ids :: ARRAY AS transaction_ids,
_partition_by_block_id,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_collections') }}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
{{ this }}
)
{% else %}
{{ ref('bronze__streamline_fr_collections') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY block_number
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,25 @@
version: 2
models:
- name: silver__streamline_collections
description: -|
Initial table for the gRPC collections response, loading data into Snowflake from the external AWS table.
columns:
- name: BLOCK_NUMBER
description: "{{ doc('block_number') }}"
- name: COLLECTION_ID
description: "{{ doc('collection_id') }}"
- name: TX_COUNT
description: "{{ doc('tx_count') }}"
- name: TRANSACTION_IDS
description: "{{ doc('transaction_ids') }}"
- name: _PARTITION_BY_BLOCK_ID
description: "{{ doc('_partition_by_block_id') }}"
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp') }}"

View File

@ -0,0 +1,35 @@
-- depends_on: {{ ref('bronze__streamline_transaction_results') }}
{{ config(
materialized = 'incremental',
unique_key = "tx_id",
cluster_by = "_inserted_timestamp::date",
tags = ['streamline_load', 'core']
) }}
SELECT
block_number,
id AS tx_id,
DATA: error_message :: STRING AS error_message,
DATA: events :: ARRAY AS events,
DATA :status :: INT AS status,
DATA :status_code :: INT AS status_code,
_partition_by_block_id,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_transaction_results') }} AS t
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
{{ this }}
)
{% else %}
{{ ref('bronze__streamline_fr_transaction_results') }} AS t
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY t.block_number
ORDER BY
t._inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,31 @@
version: 2
models:
- name: silver__streamline_transaction_results
description: -|
Initial table for the gRPC transaction results response, loading data into Snowflake from the external AWS table.
columns:
- name: BLOCK_NUMBER
description: "{{ doc('block_number') }}"
- name: TX_ID
description: "{{ doc('tx_id') }}"
- name: ERROR_MESSAGE
description: "{{ doc('error_message') }}"
- name: EVENTS
description: "{{ doc('events') }}"
- name: STATUS
description: "{{ doc('status') }}"
- name: STATUS_CODE
description: "{{ doc('status_code') }}"
- name: _PARTITION_BY_BLOCK_ID
description: "{{ doc('_partition_by_block_id') }}"
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp') }}"

View File

@ -0,0 +1,40 @@
-- depends_on: {{ ref('bronze__streamline_transactions') }}
{{ config(
materialized = 'incremental',
unique_key = "tx_id",
cluster_by = "_inserted_timestamp::date",
tags = ['streamline_load', 'core']
) }}
SELECT
block_number,
DATA: reference_block_id :: STRING AS block_id,
id AS tx_id,
DATA: gas_limit :: NUMBER AS gas_limit,
DATA: payer :: STRING AS payer,
DATA: arguments :: ARRAY AS arguments,
DATA: authorizers :: ARRAY AS authorizers,
DATA: envelope_signatures :: ARRAY AS envelope_signatures,
DATA: payload_signatures :: ARRAY AS payload_signatures,
DATA: proposal_key :: variant AS proposal_key,
DATA: script :: STRING AS script,
_partition_by_block_id,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_transactions') }}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
{{ this }}
)
{% else %}
{{ ref('bronze__streamline_fr_transactions') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY block_number
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,46 @@
version: 2
models:
- name: silver__streamline_transactions
description: -|
Initial table for the gRPC transactions response, loading data into Snowflake from the external AWS table.
columns:
- name: BLOCK_NUMBER
description: "{{ doc('block_number') }}"
- name: BLOCK_ID
description: "{{ doc('block_id') }}"
- name: TX_ID
description: "{{ doc('tx_id') }}"
- name: GAS_LIMIT
description: "{{ doc('gas_limit') }}"
- name: PAYER
description: "{{ doc('payer') }}"
- name: ARGUMENTS
description: "{{ doc('arguments') }}"
- name: AUTHORIZERS
description: "{{ doc('authorizers') }}"
- name: ENVELOPE_SIGNATURES
description: "{{ doc('envelope_signatures') }}"
- name: PAYLOAD_SIGNATURES
description: "{{ doc('payload_signatures') }}"
- name: PROPOSAL_KEY
description: "{{ doc('proposal_key') }}"
- name: SCRIPT
description: "{{ doc('script') }}"
- name: _PARTITION_BY_BLOCK_ID
description: "{{ doc('_partition_by_block_id') }}"
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp') }}"

View File

@ -5,7 +5,8 @@
unique_key = "block_number",
cluster_by = "ROUND(block_number, -3)",
merge_update_columns = ["block_number"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)"
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)",
tags = ['streamline_complete']
) }}
SELECT
@ -25,7 +26,7 @@ WHERE
FROM
{{ this }}
),
'1900-01-01'::timestamp
'1900-01-01'::timestamp_ntz
)
{% else %}
{{ ref('bronze__streamline_fr_blocks') }}

View File

@ -4,7 +4,8 @@
unique_key = "id",
cluster_by = "ROUND(block_number, -3)",
merge_update_columns = ["id"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)"
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)",
tags = ['streamline_complete']
) }}
SELECT
@ -25,7 +26,7 @@ WHERE
FROM
{{ this }}
),
'1900-01-01'::timestamp
'1900-01-01'::timestamp_ntz
)
{% else %}
{{ ref('bronze__streamline_fr_collections') }}

View File

@ -4,7 +4,8 @@
unique_key = "id",
cluster_by = "ROUND(block_number, -3)",
merge_update_columns = ["id"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)"
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)",
tags = ['streamline_complete']
) }}
SELECT
@ -25,7 +26,7 @@ WHERE
FROM
{{ this }}
),
'1900-01-01'::timestamp
'1900-01-01'::timestamp_ntz
)
{% else %}
{{ ref('bronze__streamline_fr_transaction_results') }}

View File

@ -4,7 +4,8 @@
unique_key = "id",
cluster_by = "ROUND(block_number, -3)",
merge_update_columns = ["id"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)"
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)",
tags = ['streamline_complete']
) }}
SELECT
@ -25,7 +26,7 @@ WHERE
FROM
{{ this }}
),
'1900-01-01'::timestamp
'1900-01-01'::timestamp_ntz
)
{% else %}
{{ ref('bronze__streamline_fr_transactions') }}

View File

@ -1,47 +0,0 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','node_url','access-001.mainnet22.nodes.onflow.org:9000','external_table', 'collections', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))",
target = "{{this.schema}}.{{this.identifier}}"
)
) }}
-- CTE to get all block_heights and their associated collection_ids from the complete_get_blocks table
WITH block_collections AS (
SELECT
cb.block_number AS block_height,
collection_guarantee.value:collection_id AS collection_id
FROM
{{ ref("streamline__complete_get_blocks") }} cb,
LATERAL FLATTEN(input => cb.data:collection_guarantees) AS collection_guarantee
),
-- CTE to identify collections that haven't been ingested yet
collections_to_ingest AS (
SELECT
bc.block_height,
bc.collection_id
FROM
block_collections bc
LEFT JOIN
{{ ref("streamline__complete_get_collections") }} c
ON bc.block_height = c.block_number
AND bc.collection_id = c.id
WHERE
c.id IS NULL
)
-- Generate the requests based on the missing collections
SELECT
OBJECT_CONSTRUCT(
'grpc', 'proto3',
'method', 'get_collection_by_i_d',
'block_height', block_height::INTEGER,
'method_params', OBJECT_CONSTRUCT('id', collection_id)
) AS request
FROM
collections_to_ingest
WHERE
block_height BETWEEN 47169687 AND 55114466 -- Mainnet22 block range
ORDER BY
block_height ASC

View File

@ -1,13 +1,13 @@
{{ config (
{{ config(
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','node_url','access-001.mainnet22.nodes.onflow.org:9000','external_table', 'blocks', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))",
func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','node_url', '{{ var('node_url', Null) }}','external_table', 'blocks', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))",
target = "{{this.schema}}.{{this.identifier}}"
)
),
tags = ['streamline_history']
) }}
WITH blocks AS (
SELECT
block_height
FROM
@ -28,6 +28,6 @@ SELECT
FROM
blocks
WHERE
block_height BETWEEN 47169687 AND 55114466 -- Mainnet22 block range
block_height BETWEEN {{ var('start_block', Null) }} AND {{ var('end_block', Null) }}
ORDER BY
block_height ASC

View File

@ -0,0 +1,76 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','node_url','{{ var('node_url', Null) }}','external_table', 'collections', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))",
target = "{{this.schema}}.{{this.identifier}}"
),
tags = ['streamline_history']
) }}
WITH blocks AS (
SELECT
block_height
FROM
{{ ref("streamline__blocks") }}
EXCEPT
SELECT
block_number AS block_height
FROM
{{ ref("streamline__complete_get_collections") }}
),
collections AS (
SELECT
block_number AS block_height,
DATA
FROM
{{ ref('streamline__complete_get_blocks') }}
JOIN blocks
ON blocks.block_height = block_number
),
-- CTE to get all block_heights and their associated collection_ids from the complete_get_blocks table
block_collections AS (
SELECT
cb.block_number AS block_height,
collection_guarantee.value :collection_id AS collection_id
FROM
{{ ref("streamline__complete_get_blocks") }}
cb,
LATERAL FLATTEN(
input => cb.data :collection_guarantees
) AS collection_guarantee
),
-- CTE to identify collections that haven't been ingested yet
collections_to_ingest AS (
SELECT
bc.block_height,
bc.collection_id
FROM
block_collections bc
LEFT JOIN {{ ref("streamline__complete_get_collections") }} C
ON bc.block_height = C.block_number
AND bc.collection_id = C.id
WHERE
C.id IS NULL
) -- Generate the requests based on the missing collections
SELECT
OBJECT_CONSTRUCT(
'grpc',
'proto3',
'method',
'get_collection_by_i_d',
'block_height',
block_height :: INTEGER,
'method_params',
OBJECT_CONSTRUCT(
'id',
collection_id
)
) AS request
FROM
collections_to_ingest
WHERE
block_height BETWEEN {{ var('start_block', Null) }}
AND {{ var('end_block', Null) }}
ORDER BY
block_height ASC

View File

@ -0,0 +1,43 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}', 'node_url', '{{ var('node_url', Null) }}', 'external_table', 'transaction_results', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','30000')}}, 'worker_batch_size', {{var('worker_batch_size','3000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))",
target = "{{this.schema}}.{{this.identifier}}"
),
tags = ['streamline_history']
) }}
WITH blocks AS (
SELECT
block_height
FROM
{{ ref("streamline__blocks") }}
EXCEPT
SELECT
block_number as block_height
FROM
{{ ref("streamline__complete_get_transaction_results") }}
),
tx AS (
SELECT
block_number as block_height,
data
FROM
{{ ref('streamline__complete_get_collections') }}
JOIN blocks ON blocks.block_height = block_number
)
SELECT
OBJECT_CONSTRUCT(
'grpc', 'proto3',
'method', 'get_transaction_result',
'block_height', block_height::INTEGER,
'transaction_id', transaction_id.value::string,
'method_params', OBJECT_CONSTRUCT('id', transaction_id.value::string)
) AS request
FROM
tx,
LATERAL FLATTEN(input => TRY_PARSE_JSON(data):transaction_ids) AS transaction_id
WHERE
block_height BETWEEN {{ var('start_block', Null) }} AND {{ var('end_block', Null) }}
ORDER BY
block_height ASC

View File

@ -0,0 +1,43 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}', 'node_url', '{{ var('node_url', Null) }}', 'external_table', 'transactions', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','30000')}}, 'worker_batch_size', {{var('worker_batch_size','3000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))",
target = "{{this.schema}}.{{this.identifier}}"
),
tags = ['streamline_history']
) }}
WITH blocks AS (
SELECT
block_height
FROM
{{ ref("streamline__blocks") }}
EXCEPT
SELECT
block_number as block_height
FROM
{{ ref("streamline__complete_get_transactions") }}
),
tx AS (
SELECT
block_number as block_height,
data
FROM
{{ ref('streamline__complete_get_collections') }}
JOIN blocks ON blocks.block_height = block_number
)
SELECT
OBJECT_CONSTRUCT(
'grpc', 'proto3',
'method', 'get_transaction',
'block_height', block_height::INTEGER,
'transaction_id', transaction_id.value::string,
'method_params', OBJECT_CONSTRUCT('id', transaction_id.value::string)
) AS request
FROM
tx,
LATERAL FLATTEN(input => TRY_PARSE_JSON(data):transaction_ids) AS transaction_id
WHERE
block_height BETWEEN {{ var('start_block', Null) }} AND {{ var('end_block', Null) }}
ORDER BY
block_height ASC

View File

@ -1,44 +0,0 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','node_url','access-001.mainnet22.nodes.onflow.org:9000','external_table', 'transaction_results', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','30000')}}, 'worker_batch_size', {{var('worker_batch_size','3000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))",
target = "{{this.schema}}.{{this.identifier}}"
)
) }}
-- CTE to get all transaction_ids from the complete_get_collections table
WITH collection_transactions AS (
SELECT
block_number AS block_height,
transaction.value::STRING AS transaction_id
FROM
{{ ref("streamline__complete_get_collections") }} cc,
LATERAL FLATTEN(input => cc.data:transaction_ids) AS transaction
),
-- CTE to identify transaction_results that haven't been ingested yet
transaction_results_to_ingest AS (
SELECT
ct.block_height,
ct.transaction_id
FROM
collection_transactions ct
LEFT JOIN
{{ ref("streamline__complete_get_transaction_results") }} tr ON ct.transaction_id = tr.id
WHERE
tr.id IS NULL
)
-- Generate the requests column based on the missing transactions
SELECT
OBJECT_CONSTRUCT(
'grpc', 'proto3',
'method', 'get_transaction_result',
'block_height', block_height::INTEGER,
'transaction_id', transaction_id::STRING,
'method_params', OBJECT_CONSTRUCT('id', transaction_id::STRING)
) AS request
FROM
transaction_results_to_ingest
ORDER BY
block_height ASC

View File

@ -1,45 +0,0 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','node_url','access-001.mainnet22.nodes.onflow.org:9000','external_table', 'transactions', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','30000')}}, 'worker_batch_size', {{var('worker_batch_size','3000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))",
target = "{{this.schema}}.{{this.identifier}}"
)
) }}
-- CTE to get all transaction_ids from the complete_get_collections table
WITH collection_transactions AS (
SELECT
block_number AS block_height,
transaction.value::STRING AS transaction_id
FROM
{{ ref("streamline__complete_get_collections") }} cc,
LATERAL FLATTEN(input => cc.data:transaction_ids) AS transaction
),
-- CTE to identify transactions that haven't been ingested yet
transactions_to_ingest AS (
SELECT
ct.block_height,
ct.transaction_id
FROM
collection_transactions ct
LEFT JOIN
{{ ref("streamline__complete_get_transactions") }} t ON ct.transaction_id = t.id
WHERE
t.id IS NULL
)
-- Generate the requests based on the missing transactions
SELECT
OBJECT_CONSTRUCT(
'grpc', 'proto3',
'method', 'get_transaction',
'block_height', block_height::INTEGER,
'transaction_id', transaction_id::STRING,
'method_params', OBJECT_CONSTRUCT('id', transaction_id::STRING)
) AS request
FROM
transactions_to_ingest
ORDER BY
block_height ASC

View File

@ -0,0 +1,64 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','node_url','access.mainnet.nodes.onflow.org:9000','external_table', 'blocks', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))",
target = "{{this.schema}}.{{this.identifier}}"
),
tags = ['streamline_realtime']
) }}
WITH last_3_days AS ({% if var('STREAMLINE_RUN_HISTORY') %}
{# TODO - this can't be 0, has to be block height of current spork #}
0 AS block_height
{% else %}
SELECT
MAX(block_height) - 210000 AS block_height
FROM
{{ ref('streamline__blocks') }}
{% endif %}),
tbl AS (
SELECT
block_height
FROM
{{ ref('streamline__blocks') }}
WHERE
(
block_height >= (
SELECT
block_height
FROM
last_3_days
)
)
AND block_height IS NOT NULL
EXCEPT
SELECT
block_number AS block_height
FROM
{{ ref('streamline__complete_get_blocks') }}
WHERE
(
block_height >= (
SELECT
block_height
FROM
last_3_days
)
)
AND block_height IS NOT NULL
)
SELECT
OBJECT_CONSTRUCT(
'grpc', 'proto3',
'method', 'get_block_by_height',
'block_height', block_height :: INTEGER,
'method_params', OBJECT_CONSTRUCT(
'height',
block_height
)
)
FROM
tbl
ORDER BY
block_height ASC

View File

@ -0,0 +1,98 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','node_url','access.mainnet.nodes.onflow.org:9000','external_table', 'collections', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))",
target = "{{this.schema}}.{{this.identifier}}"
),
tags = ['streamline_realtime']
) }}
WITH last_3_days AS ({% if var('STREAMLINE_RUN_HISTORY') %}
{# TODO - this can't be 0, has to be block height of current spork #}
0 AS block_height
{% else %}
SELECT
MAX(block_height) - 210000 AS block_height
FROM
{{ ref('streamline__blocks') }}
{% endif %}),
tbl AS (
SELECT
block_height
FROM
{{ ref('streamline__blocks') }}
WHERE
(
block_height >= (
SELECT
block_height
FROM
last_3_days
)
)
AND block_height IS NOT NULL
EXCEPT
SELECT
block_number AS block_height
FROM
{{ ref('streamline__complete_get_collections') }}
WHERE
(
block_height >= (
SELECT
block_height
FROM
last_3_days
)
)
AND block_height IS NOT NULL
),
collections AS (
SELECT
block_number AS block_height,
DATA
FROM
{{ ref('streamline__complete_get_blocks') }}
JOIN tbl
ON tbl.block_height = block_number
),
-- CTE to get all block_heights and their associated collection_ids from the complete_get_blocks table
block_collections AS (
SELECT
cb.block_number AS block_height,
collection_guarantee.value :collection_id AS collection_id
FROM
{{ ref("streamline__complete_get_blocks") }}
cb,
LATERAL FLATTEN(
input => cb.data :collection_guarantees
) AS collection_guarantee
),
-- CTE to identify collections that haven't been ingested yet
collections_to_ingest AS (
SELECT
bc.block_height,
bc.collection_id
FROM
block_collections bc
LEFT JOIN {{ ref("streamline__complete_get_collections") }} C
ON bc.block_height = C.block_number
AND bc.collection_id = C.id
WHERE
C.id IS NULL
)
SELECT
OBJECT_CONSTRUCT(
'grpc', 'proto3',
'method', 'get_collection_by_i_d',
'block_height', block_height :: INTEGER,
'method_params', OBJECT_CONSTRUCT(
'id',
collection_id
)
) AS request
FROM
collections_to_ingest
ORDER BY
block_height ASC

View File

@ -0,0 +1,96 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','node_url','access.mainnet.nodes.onflow.org:9000','external_table', 'transaction_results', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))",
target = "{{this.schema}}.{{this.identifier}}"
),
tags = ['streamline_realtime']
) }}
WITH last_3_days AS ({% if var('STREAMLINE_RUN_HISTORY') %}
{# TODO - this can't be 0, has to be block height of current spork #}
0 AS block_height
{% else %}
SELECT
MAX(block_height) - 210000 AS block_height
FROM
{{ ref('streamline__blocks') }}
{% endif %}),
tbl AS (
SELECT
block_height
FROM
{{ ref('streamline__blocks') }}
WHERE
(
block_height >= (
SELECT
block_height
FROM
last_3_days
)
)
AND block_height IS NOT NULL
EXCEPT
SELECT
block_number as block_height
FROM
{{ ref('streamline__complete_get_transactions') }}
WHERE
(
block_height >= (
SELECT
block_height
FROM
last_3_days
)
)
AND block_height IS NOT NULL
),
collection_transactions AS (
SELECT
block_number AS block_height,
TRANSACTION.value :: STRING AS transaction_id
FROM
{{ ref("streamline__complete_get_collections") }}
cc,
LATERAL FLATTEN(
input => cc.data :transaction_ids
) AS TRANSACTION
WHERE
block_height IN (
SELECT
block_height
FROM
tbl
)
),
-- CTE to identify transactions that haven't been ingested yet
transactions_to_ingest AS (
SELECT
ct.block_height,
ct.transaction_id
FROM
collection_transactions ct
LEFT JOIN {{ ref("streamline__complete_get_transaction_results") }}
t
ON ct.transaction_id = t.id
WHERE
t.id IS NULL
) -- Generate the requests based on the missing transactions
SELECT
OBJECT_CONSTRUCT(
'grpc', 'proto3',
'method', 'get_transaction_result',
'block_height', block_height :: INTEGER,
'transaction_id', transaction_id :: STRING,
'method_params', OBJECT_CONSTRUCT(
'id',
transaction_id :: STRING
)
) AS request
FROM
transactions_to_ingest
ORDER BY
block_height ASC

View File

@ -0,0 +1,96 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','node_url','access.mainnet.nodes.onflow.org:9000','external_table', 'transactions', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))",
target = "{{this.schema}}.{{this.identifier}}"
),
tags = ['streamline_realtime']
) }}
WITH last_3_days AS ({% if var('STREAMLINE_RUN_HISTORY') %}
{# TODO - this can't be 0, has to be block height of current spork #}
0 AS block_height
{% else %}
SELECT
MAX(block_height) - 210000 AS block_height
FROM
{{ ref('streamline__blocks') }}
{% endif %}),
tbl AS (
SELECT
block_height
FROM
{{ ref('streamline__blocks') }}
WHERE
(
block_height >= (
SELECT
block_height
FROM
last_3_days
)
)
AND block_height IS NOT NULL
EXCEPT
SELECT
block_number AS block_height
FROM
{{ ref('streamline__complete_get_transactions') }}
WHERE
(
block_height >= (
SELECT
block_height
FROM
last_3_days
)
)
AND block_height IS NOT NULL
),
collection_transactions AS (
SELECT
block_number AS block_height,
TRANSACTION.value :: STRING AS transaction_id
FROM
{{ ref("streamline__complete_get_collections") }}
cc,
LATERAL FLATTEN(
input => cc.data :transaction_ids
) AS TRANSACTION
WHERE
block_height IN (
SELECT
block_height
FROM
tbl
)
),
-- CTE to identify transactions that haven't been ingested yet
transactions_to_ingest AS (
SELECT
ct.block_height,
ct.transaction_id
FROM
collection_transactions ct
LEFT JOIN {{ ref("streamline__complete_get_transactions") }}
t
ON ct.transaction_id = t.id
WHERE
t.id IS NULL
) -- Generate the requests based on the missing transactions
SELECT
OBJECT_CONSTRUCT(
'grpc', 'proto3',
'method', 'get_transaction',
'block_height', block_height :: INTEGER,
'transaction_id', transaction_id :: STRING,
'method_params', OBJECT_CONSTRUCT(
'id',
transaction_id :: STRING
)
) AS request
FROM
transactions_to_ingest
ORDER BY
block_height ASC

View File

@ -0,0 +1,57 @@
# run_dbt_for_seed.py
import csv
import subprocess
import sys
import time
def run_dbt_for_model(model_name, node_url, root_height, end_height, use_dev=False):
cmd = [
"dbt",
"run",
"--threads",
"8",
"--vars",
f'{{"node_url":"{node_url}", "start_block":{root_height}, "end_block":{end_height},"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES":{use_dev}}}',
"-s",
f"1+streamline__get_{model_name}_history"
]
subprocess.run(cmd)
def main(model_name, use_dev=False):
seed_file = "./data/seeds__network_version.csv"
with open(seed_file, "r") as file:
reader = csv.DictReader(file)
for i, row in enumerate(reader):
root_height = row["root_height"]
node_url = row["node_url"]
end_height = row["end_height"]
# segment the backfill into batches of 5 networks at a time, starting with the most recent 5
# source CSV contains 29 networks, but the first 3 (candidates 3-6) are inaccessible
# so, valid rows are 4-29, or 25 rows
if i >= 25:
run_dbt_for_model(model_name, node_url, root_height, end_height, use_dev)
else:
continue
if __name__ == "__main__":
# accept model name as cli argument and pass to main
model_name = sys.argv[1]
# acceptable model names: blocks, collections, transactions, transaction_results
if model_name not in ["blocks", "collections", "transactions", "transaction_results"]:
raise ValueError("model_name must be one of the following: blocks, collections, transactions, transaction_results")
# use_dev is optional cli argument that accepts only True or False
use_dev = False
if len(sys.argv) > 2:
use_dev = sys.argv[2]
if use_dev not in ["True", "False"]:
raise ValueError("use_dev must be True or False")
main(model_name, use_dev)