added tx & tx_result backfill models

This commit is contained in:
shah 2023-08-11 10:01:31 -07:00
parent 03ccb67d7c
commit 1e17c6378b
14 changed files with 149 additions and 58 deletions

View File

@ -22,6 +22,15 @@ udfs:
--target $(DBT_TARGET) \
--profiles-dir ~/.dbt/
complete:
dbt run \
--vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' \
-m 1+models/silver/streamline/core/complete \
--profile flow \
--target $(DBT_TARGET) \
--profiles-dir ~/.dbt
grant-streamline-privileges:
dbt run-operation grant_streamline_privileges \
--profile flow \
@ -29,4 +38,12 @@ grant-streamline-privileges:
--profiles-dir ~/.dbt/ \
--args '{role: $(AWS_LAMBDA_ROLE)}'
undo_clone_purge: sl-flow-api udfs grant-streamline-privileges
undo_clone_purge: sl-flow-api udfs grant-streamline-privileges
streamline_bronze:
dbt run \
--vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' \
-m 1+models/silver/streamline/bronze \
--profiles-dir ~/.dbt \
--target $(DBT_TARGET) \
--profile flow

View File

@ -56,3 +56,4 @@ vars:
"dbt_date:time_zone": GMT
UPDATE_SNOWFLAKE_TAGS: True
UPDATE_UDFS_AND_SPS: False
STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES: True

View File

@ -1,29 +0,0 @@
{% macro generate_blocks_grpc_request(block_height) %}
PARSE_JSON(
CONCAT(
'{"grpc": "proto3",',
'"method": "get_block_by_height",',
'"block_height":"',
block_height :: INTEGER,
'",',
'"method_params": {"height":',
block_height :: INTEGER,
'}}'
)
)
{% endmacro %}
{% macro generate_collections_grpc_request(block_height, collection_guarantee) %}
PARSE_JSON(
CONCAT(
'{"grpc": "proto3",',
'"method": "get_collection_by_i_d",',
'"block_height":"',
block_height :: INTEGER,
'",',
'"method_params": {"id":"',
collection_guarantee.value:collection_id,
'"}}'
)
)
{% endmacro %}

View File

@ -35,17 +35,29 @@ dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTER
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' -m 1+models/silver/streamline/core/complete/streamline__complete_get_collections.sql --profile flow --target dev --profiles-dir ~/.dbt
# dev complete_get_transactions
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' -m 1+models/silver/streamline/core/complete/streamline__complete_get_transactionss.sql --profile flow --target dev --profiles-dir ~/.dbt
--
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' -m 1+models/silver/streamline/core/complete/streamline__complete_get_transactions.sql --profile flow --target dev --profiles-dir ~/.dbt
# dev get_blocks_history
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' -m 1+models/silver/streamline/core/history/blocks/streamline__get_blocks_history.sql --profile flow --target dev --profiles-dir ~/.dbt
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' -m 1+models/silver/streamline/core/history/blocks/streamline__get_blocks_history_mainnet22.sql --profile flow --target dev --profiles-dir ~/.dbt
# dev get_collections_history
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' -m 1+models/silver/streamline/core/history/collections/streamline__get_collections_history_mainnet22.sql --profile flow --target dev --profiles-dir ~/.dbt
# dev get_transactions_history
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' -m 1+models/silver/streamline/core/history/transactions/streamline__get_transactions_history_mainnet22.sql --profile flow --target dev --profiles-dir ~/.dbt
# dev complete_get_collections
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' -m 1+models/silver/streamline/core/complete/streamline__complete_get_collections.sql --profile flow --target dev --profiles-dir ~/.dbt
# dev get_transaction_results_history
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' -m 1+models/silver/streamline/core/history/transaction_results/streamline__get_transaction_results_history_mainnet22.sql --profile flow --target dev --profiles-dir ~/.dbt
# dev bronze__streamline_blocks.sql
dbt run --select bronze__streamline_blocks.sql --profiles-dir ~/.dbt --target dev --profile flow --vars '{"STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}'
```
```zsh
DBT_TARGET=dev \
AWS_LAMBDA_ROLE=AWS_LAMBDA_FLOW_API_DEV \
make undo_clone_purge
DBT_TARGET=dev make streamline_bronze
```

View File

@ -0,0 +1,11 @@
{{ config (
materialized = 'view'
) }}
{% set model = this.identifier.split("_")[-2:] | join('_') %}
{{ streamline_external_table_query(
model,
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "_partition_by_block_id",
unique_key = "block_number"
) }}

View File

@ -1,4 +1,3 @@
-- depends_on: {{ ref('bronze__streamline_blocks') }}
{{ config (
materialized = "incremental",
unique_key = "id",
@ -29,6 +28,6 @@ WHERE
{{ ref('bronze__streamline_blocks') }} -- TODO: change to bronze__streamline_FR_blocks
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY _partition_by_block_id
qualify(ROW_NUMBER() over (PARTITION BY id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -1,4 +1,3 @@
-- depends_on: {{ ref('bronze__streamline_collections') }}
{{ config (
materialized = "incremental",
unique_key = "id",

View File

@ -0,0 +1,34 @@
-- depends_on: {{ ref('bronze__streamline_transactions') }}
{{ config (
materialized = "incremental",
unique_key = "id",
cluster_by = "ROUND(block_number, -3)",
merge_update_columns = ["id"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)"
) }}
SELECT
id,
data,
block_number,
_partition_by_block_id,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_transaction_results') }}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
{{ this }}
)
{% else %}
{{ ref('bronze__streamline_transaction_results') }} -- TODO: change to bronze__streamline_FR_transaction_results
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -29,6 +29,6 @@ WHERE
{{ ref('bronze__streamline_transactions') }} -- TODO: change to bronze__streamline_FR_transactions
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY _partition_by_block_id
qualify(ROW_NUMBER() over (PARTITION BY id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -1,7 +1,7 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','method', 'get_block_by_height','node_url','access-001.mainnet22.nodes.onflow.org:9000','external_table', 'blocks', 'sql_limit', {{var('sql_limit','100000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))",
func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','method', 'get_block_by_height','node_url','access-001.mainnet22.nodes.onflow.org:9000','external_table', 'blocks', 'sql_limit', {{var('sql_limit','10000')}}, 'producer_batch_size', {{var('producer_batch_size','100')}}, 'worker_batch_size', {{var('worker_batch_size','10')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))",
target = "{{this.schema}}.{{this.identifier}}"
)
) }}
@ -19,7 +19,12 @@ WITH blocks AS (
{{ ref("streamline__complete_get_blocks") }}
)
SELECT
{{ generate_blocks_grpc_request(block_height) }} AS request
OBJECT_CONSTRUCT(
'grpc', 'proto3',
'method', 'get_block_by_height',
'block_height', block_height,
'method_params', OBJECT_CONSTRUCT('height', block_height)
) AS request
FROM
blocks
WHERE

View File

@ -1,4 +1,3 @@
-- depends_on: {{ ref('streamline__get_blocks_history_mainnet22') }}
{{ config (
materialized = "view",
post_hook = if_data_call_function(
@ -29,7 +28,12 @@ collections AS (
JOIN blocks ON blocks.block_height = block_number
)
SELECT
{{ generate_collections_grpc_request(block_height, collection_guarantee) }} AS request
OBJECT_CONSTRUCT(
'grpc', 'proto3',
'method', 'get_collection_by_i_d',
'block_height', block_height::INTEGER,
'method_params', OBJECT_CONSTRUCT('id', collection_guarantee.value:collection_id)
) AS request
FROM
collections,
LATERAL FLATTEN(input => data:collection_guarantees) AS collection_guarantee

View File

@ -0,0 +1,43 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','method', 'get_transaction_result','node_url','access-001.mainnet22.nodes.onflow.org:9000','external_table', 'transaction_results', 'sql_limit', {{var('sql_limit','10000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','10000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))",
target = "{{this.schema}}.{{this.identifier}}"
)
) }}
WITH blocks AS (
SELECT
block_height
FROM
{{ ref("streamline__blocks") }}
EXCEPT
SELECT
block_number as block_height
FROM
{{ ref("streamline__complete_get_transaction_results") }}
),
tx AS (
SELECT
block_number as block_height,
data
FROM
{{ ref('streamline__complete_get_collections') }}
JOIN blocks ON blocks.block_height = block_number
)
SELECT
OBJECT_CONSTRUCT(
'grpc', 'proto3',
'method', 'get_transaction_result',
'block_height', block_height::INTEGER,
'method_params', OBJECT_CONSTRUCT('id', transaction_id.value::string)
) AS request
FROM
tx,
LATERAL FLATTEN(input => TRY_PARSE_JSON(data):transaction_ids) AS transaction_id
WHERE
block_height BETWEEN 47169687 AND 55114466 -- Mainnet22 block range
ORDER BY
block_height ASC

View File

@ -24,25 +24,19 @@ tx AS (
block_number as block_height,
data
FROM
{{ ref('bronze__streamline_collections') }}
{{ ref('streamline__complete_get_collections') }}
JOIN blocks ON blocks.block_height = block_number
)
SELECT
PARSE_JSON(
CONCAT(
'{"grpc": "proto3",',
'"method": "get_collection_by_i_d",',
'"block_height":"',
block_height :: INTEGER,
'",',
'"method_params": {"id":"',
transaction_ids,
'"}}'
)
OBJECT_CONSTRUCT(
'grpc', 'proto3',
'method', 'get_transaction',
'block_height', block_height::INTEGER,
'method_params', OBJECT_CONSTRUCT('id', transaction_id.value::string)
) AS request
FROM
tx,
LATERAL FLATTEN(input => data:transaction_ids) AS transaction_ids
LATERAL FLATTEN(input => TRY_PARSE_JSON(data):transaction_ids) AS transaction_id
WHERE
block_height BETWEEN 47169687 AND 55114466 -- Mainnet22 block range
ORDER BY

View File

@ -30,6 +30,7 @@ sources:
- name: blocks
- name: collections
- name: transactions
- name: transaction_results
- name: crosschain_v2
database: crosschain