Stream 98 collection backfill (#145)

* node url join optimized | warehouse privilege grants

* added string interpolation for env based target.database

* revert to candidate node 7 root height

* collections history success

* removed end height from network_version

* removed EOF

* added prod udf

---------

Co-authored-by: shah <info@shahnewazkhan.ca>
This commit is contained in:
Shah Newaz Khan 2023-08-10 14:23:29 -07:00 committed by GitHub
parent d14843bd0c
commit fe7f688cc2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 368 additions and 90 deletions

View File

@ -1,17 +1,17 @@
-- macro used to create flow api integrations
{% macro create_aws_flow_api() %}
{{ log("Creating integration for target:" ~ target, info=True) }}
{% if target.name == "prod" %}
{% set sql %}
CREATE api integration IF NOT EXISTS aws_flow_api_prod api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::490041342817:role/snowflake-api-flow' api_allowed_prefixes = (
'https://<PROD_FLOW_API_CHALICE_URL>/prod/'
CREATE api integration IF NOT EXISTS aws_flow_api_prod api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::490041342817:role/flow-api-prod-rolesnowflakeudfsAF733095-FNY67ODG1RFG' api_allowed_prefixes = (
'https://quxfxtl934.execute-api.us-east-1.amazonaws.com/prod/'
) enabled = TRUE;
{% endset %}
{% do run_query(sql) %}
{% elif target.name == "dev" %}
{{ log("Generating api integration for target:" ~ target.name, info=True) }}
{% set sql %}
CREATE api integration IF NOT EXISTS aws_flow_api_dev api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::490041342817:role/snowflake-api-flow' api_allowed_prefixes = (
'https://<DEV_FLOW_API_CHALICE_URL>/dev/'
CREATE api integration IF NOT EXISTS aws_flow_api_dev_2 api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::490041342817:role/flow-api-dev-rolesnowflakeudfsAF733095-1D0U05G1EDT3' api_allowed_prefixes = (
'https://8jjulyhxhj.execute-api.us-east-1.amazonaws.com/dev/'
) enabled = TRUE;
{% endset %}
{% do run_query(sql) %}

View File

@ -1,6 +1,6 @@
{% macro create_udtf_get_base_table(schema) %}
create or replace function {{ schema }}.udtf_get_base_table(max_height integer)
returns table (height number, node_url varchar(16777216))
returns table (height number)
as
$$
with base as (
@ -8,27 +8,14 @@ $$
row_number() over (
order by
seq4()
) as id
) as height
from
table(generator(rowcount => 100000000)) -- July 2023 Flow Chain head is at 57M
),
node_mapping as (
select
base.id as height,
first_value(nv.node_url) over (partition by base.id order by nv.root_height desc) as node_url
from
base
left join {{ target.database }}.seeds.network_version nv
on
base.id >= nv.root_height
and
base.id <= max_height
table(generator(rowcount => 100000000))
)
select
height,
coalesce(node_url, 'access.mainnet.nodes.onflow.org:9000') as node_url
height
from
node_mapping
base
where
height <= max_height
$$

View File

@ -0,0 +1,43 @@
{% macro streamline_external_table_query(
model,
partition_function,
partition_name,
unique_key
) %}
WITH meta AS (
SELECT
last_modified AS _inserted_timestamp,
file_name,
{{ partition_function }} AS {{ partition_name }}
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze_streamline", model) }}')
) A
)
SELECT
{{ unique_key }},
DATA,
_inserted_timestamp,
MD5(
CAST(
COALESCE(CAST({{ unique_key }} AS text), '' :: STRING) AS text
)
) AS id,
s.{{ partition_name }},
s.value AS VALUE
FROM
{{ source(
"bronze_streamline",
model
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.{{ partition_name }} = s.{{ partition_name }}
WHERE
b.{{ partition_name }} = s.{{ partition_name }}
{% endmacro %}

View File

@ -4,9 +4,9 @@
CREATE
OR REPLACE EXTERNAL FUNCTION streamline.udf_get_chainhead() returns variant api_integration =
{% if target.name == "prod" %}
aws_flow_api AS '<PROD_CHALICE_URL>/prod/get_chainhead'
aws_flow_api AS 'https://quxfxtl934.execute-api.us-east-1.amazonaws.com/prod/get_chainhead'
{% elif target.name == "dev" %}
aws_flow_api_dev AS '<DEV_CHALICE_URL>/dev/get_chainhead'
aws_flow_api_dev_2 AS 'https://8jjulyhxhj.execute-api.us-east-1.amazonaws.com/dev/get_chainhead'
{% elif target.name == "sbx" %}
{{ log("Creating sbx get_chainhead", info=True) }}
aws_flow_api_sbx AS 'https://bc5ejedoq8.execute-api.us-east-1.amazonaws.com/sbx/get_chainhead'
@ -19,9 +19,9 @@
CREATE
OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_grpc(json variant) returns variant api_integration =
{% if target.name == "prod" %}
aws_flow_api AS '<PROD_CHALICE_URL>/prod/udf_bulk_grpc'
aws_flow_api AS 'https://quxfxtl934.execute-api.us-east-1.amazonaws.com/prod/udf_bulk_grpc'
{% elif target.name == "dev" %}
aws_flow_api_dev AS '<DEV_CHALICE_URL>/dev/udf_bulk_grpc'
aws_flow_api_dev_2 AS 'https://8jjulyhxhj.execute-api.us-east-1.amazonaws.com/dev/udf_bulk_grpc'
{% elif target.name == "sbx" %}
{{ log("Creating sbx udf_bulk_grpc", info=True) }}
aws_flow_api_sbx AS 'https://bc5ejedoq8.execute-api.us-east-1.amazonaws.com/sbx/udf_bulk_grpc'

View File

@ -0,0 +1,29 @@
{% macro generate_blocks_grpc_request(block_height) %}
PARSE_JSON(
CONCAT(
'{"grpc": "proto3",',
'"method": "get_block_by_height",',
'"block_height":"',
block_height :: INTEGER,
'",',
'"method_params": {"height":',
block_height :: INTEGER,
'}}'
)
)
{% endmacro %}
{% macro generate_collections_grpc_request(block_height, collection_guarantee) %}
PARSE_JSON(
CONCAT(
'{"grpc": "proto3",',
'"method": "get_collection_by_i_d",',
'"block_height":"',
block_height :: INTEGER,
'",',
'"method_params": {"id":"',
collection_guarantee.value:collection_id,
'"}}'
)
)
{% endmacro %}

View File

@ -100,11 +100,18 @@
{% set sql %}
grant usage on database {{ target.database }} to role {{ role }};
grant usage on schema {{ target.schema }} to role {{ role }};
grant usage on warehouse {{ target.warehouse }} to role {{ role }};
grant select on all tables in schema {{ target.schema }} to role {{ role }};
grant select on all views in schema {{ target.schema }} to role {{ role }};
grant select on future views in schema {{ target.schema }} to role {{ role }}
grant select on future views in schema {{ target.schema }} to role {{ role }};
grant usage on database streamline to role {{ role }};
grant usage on schema streamline.{{ target.schema }} to role {{ role }};
grant select on all tables in schema streamline.{{ target.schema }} to role {{ role }};
grant usage on schema {{target.database}}.bronze to role {{ role }};
{% endset %}
{% do run_query(sql) %}
{% do run_and_log_sql(sql) %}
{% do log("Privileges granted", info=True) %}
{% endmacro %}

View File

@ -21,3 +21,31 @@ DBT_TARGET=sbx make sl-flow-api
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' -m 1+models/silver/streamline/core/history/streamline__get_blocks_history.sql --profile flow --target sbx --profiles-dir ~/.dbt
```
```zsh
# dev bronze__streamline_blocks.sql
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' -m 1+models/silver/streamline/bronze/core/bronze__streamline_blocks.sql --profile flow --target dev --profiles-dir ~/.dbt
```
```zsh
# dev complete_get_blocks
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' -m 1+models/silver/streamline/core/complete/streamline__complete_get_blocks.sql --profile flow --target dev --profiles-dir ~/.dbt
# dev complete_get_collections
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' -m 1+models/silver/streamline/core/complete/streamline__complete_get_collections.sql --profile flow --target dev --profiles-dir ~/.dbt
# dev complete_get_transactions
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' -m 1+models/silver/streamline/core/complete/streamline__complete_get_transactionss.sql --profile flow --target dev --profiles-dir ~/.dbt
--
# dev get_blocks_history
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' -m 1+models/silver/streamline/core/history/blocks/streamline__get_blocks_history.sql --profile flow --target dev --profiles-dir ~/.dbt
# dev get_transactions_history
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' -m 1+models/silver/streamline/core/history/transactions/streamline__get_transactions_history_mainnet22.sql --profile flow --target dev --profiles-dir ~/.dbt
# dev complete_get_collections
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' -m 1+models/silver/streamline/core/complete/streamline__complete_get_collections.sql --profile flow --target dev --profiles-dir ~/.dbt
```

View File

@ -0,0 +1,11 @@
{{ config (
materialized = 'view'
) }}
{% set model = this.identifier.split("_") [-1] %}
{{ streamline_external_table_query(
model,
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "_partition_by_block_id",
unique_key = "block_number"
) }}

View File

@ -0,0 +1,11 @@
{{ config (
materialized = 'view'
) }}
{% set model = this.identifier.split("_") [-1] %}
{{ streamline_external_table_query(
model,
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "_partition_by_block_id",
unique_key = "block_number"
) }}

View File

@ -0,0 +1,11 @@
{{ config (
materialized = 'view'
) }}
{% set model = this.identifier.split("_") [-1] %}
{{ streamline_external_table_query(
model,
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "_partition_by_block_id",
unique_key = "block_number"
) }}

View File

@ -1,20 +1,22 @@
-- depends_on: {{ ref('bronze__streamline_blocks') }}
{{ config (
materialized = "incremental",
unique_key = "record_id",
cluster_by = "ROUND(block_id, -3)",
merge_update_columns = ["record_id"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(record_id)"
unique_key = "id",
cluster_by = "ROUND(block_number, -3)",
merge_update_columns = ["id"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)"
) }}
SELECT
record_id,
block_id,
_inserted_timestamp,
network AS node_url
id,
data,
block_number,
_partition_by_block_id,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__blocks') }} -- TODO: change to bronze__streamline_blocks
{{ ref('bronze__streamline_blocks') }}
WHERE
_inserted_timestamp >= (
SELECT
@ -24,9 +26,9 @@ WHERE
)
{% else %}
{{ ref('bronze__blocks') }} -- TODO: change to bronze__streamline_FR_blocks
{{ ref('bronze__streamline_blocks') }} -- TODO: change to bronze__streamline_FR_blocks
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY record_id
qualify(ROW_NUMBER() over (PARTITION BY _partition_by_block_id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,34 @@
-- depends_on: {{ ref('bronze__streamline_collections') }}
{{ config (
materialized = "incremental",
unique_key = "id",
cluster_by = "ROUND(block_number, -3)",
merge_update_columns = ["id"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)"
) }}
SELECT
id,
data,
block_number,
_partition_by_block_id,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_collections') }}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
{{ this }}
)
{% else %}
{{ ref('bronze__streamline_collections') }} -- TODO: change to bronze__streamline_FR_collections
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY _partition_by_block_id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,34 @@
-- depends_on: {{ ref('bronze__streamline_transactions') }}
{{ config (
materialized = "incremental",
unique_key = "id",
cluster_by = "ROUND(block_number, -3)",
merge_update_columns = ["id"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)"
) }}
SELECT
id,
data,
block_number,
_partition_by_block_id,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_transactions') }}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
{{ this }}
)
{% else %}
{{ ref('bronze__streamline_transactions') }} -- TODO: change to bronze__streamline_FR_transactions
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY _partition_by_block_id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,28 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','method', 'get_block_by_height','node_url','access-001.mainnet22.nodes.onflow.org:9000','external_table', 'blocks', 'sql_limit', {{var('sql_limit','100000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))",
target = "{{this.schema}}.{{this.identifier}}"
)
) }}
WITH blocks AS (
SELECT
block_height
FROM
{{ ref("streamline__blocks") }}
EXCEPT
SELECT
block_number as block_height
FROM
{{ ref("streamline__complete_get_blocks") }}
)
SELECT
{{ generate_blocks_grpc_request(block_height) }} AS request
FROM
blocks
WHERE
block_height BETWEEN 47169687 AND 55114466 -- Mainnet22 block range
ORDER BY
block_height ASC

View File

@ -0,0 +1,39 @@
-- depends_on: {{ ref('streamline__get_blocks_history_mainnet22') }}
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','method', 'get_collection_by_i_d','node_url','access-001.mainnet22.nodes.onflow.org:9000','external_table', 'collections', 'sql_limit', {{var('sql_limit','10000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','10000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))",
target = "{{this.schema}}.{{this.identifier}}"
)
) }}
WITH blocks AS (
SELECT
block_height
FROM
{{ ref("streamline__blocks") }}
EXCEPT
SELECT
block_number as block_height
FROM
{{ ref("streamline__complete_get_collections") }}
),
collections AS (
SELECT
block_number as block_height,
data
FROM
{{ ref('streamline__complete_get_blocks') }}
JOIN blocks ON blocks.block_height = block_number
)
SELECT
{{ generate_collections_grpc_request(block_height, collection_guarantee) }} AS request
FROM
collections,
LATERAL FLATTEN(input => data:collection_guarantees) AS collection_guarantee
WHERE
block_height BETWEEN 47169687 AND 55114466 -- Mainnet22 block range
ORDER BY
block_height ASC

View File

@ -1,39 +0,0 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','method', 'get_block_by_height', 'external_table', 'streamline_blocks', 'sql_limit', {{var('sql_limit','1000')}}, 'producer_batch_size', {{var('producer_batch_size','1000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))",
target = "{{this.schema}}.{{this.identifier}}"
)
) }}
WITH blocks AS (
SELECT
block_height,
node_url
FROM
{{ ref("streamline__blocks") }}
EXCEPT
SELECT
block_id as block_height,
node_url
FROM
{{ ref("streamline__complete_get_blocks") }}
)
SELECT
PARSE_JSON(
CONCAT(
'{"grpc": "proto3",',
'"method": "get_block_by_height",',
'"block_height":"',
block_height :: INTEGER,
'",',
'"node_url":"',
node_url,
'"}'
)
) AS request
FROM
blocks
ORDER BY
block_height ASC

View File

@ -0,0 +1,49 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','method', 'get_transaction','node_url','access-001.mainnet22.nodes.onflow.org:9000','external_table', 'transactions', 'sql_limit', {{var('sql_limit','10000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','10000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))",
target = "{{this.schema}}.{{this.identifier}}"
)
) }}
WITH blocks AS (
SELECT
block_height
FROM
{{ ref("streamline__blocks") }}
EXCEPT
SELECT
block_number as block_height
FROM
{{ ref("streamline__complete_get_transactions") }}
),
tx AS (
SELECT
block_number as block_height,
data
FROM
{{ ref('bronze__streamline_collections') }}
JOIN blocks ON blocks.block_height = block_number
)
SELECT
PARSE_JSON(
CONCAT(
'{"grpc": "proto3",',
'"method": "get_collection_by_i_d",',
'"block_height":"',
block_height :: INTEGER,
'",',
'"method_params": {"id":"',
transaction_ids,
'"}}'
)
) AS request
FROM
tx,
LATERAL FLATTEN(input => data:transaction_ids) AS transaction_ids
WHERE
block_height BETWEEN 47169687 AND 55114466 -- Mainnet22 block range
ORDER BY
block_height ASC

View File

@ -12,12 +12,11 @@
{% endif %}
SELECT
height as block_height,
node_url
height as block_height
FROM
TABLE(streamline.udtf_get_base_table({{block_height}}))
WHERE
block_height > 4132133 -- Root Height for Candidate node 7
-- the earliest available block we can ingest since earlier candidat nodes
-- the earliest available block we can ingest since earlier candidate nodes
-- do not have the get_block_by_height grpc method
-- https://developers.flow.com/concepts/nodes/node-operation/past-sporks#candidate-4

View File

@ -23,9 +23,13 @@ sources:
- name: bronze_streamline
database: streamline
schema: flow
schema: |
{{ "FLOW_DEV" if var("STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES", False) else "FLOW" }}
tables:
- name: moments_minted_metadata_api
- name: blocks
- name: collections
- name: transactions
- name: crosschain_v2
database: crosschain
@ -33,3 +37,4 @@ sources:
tables:
- name: hourly_prices_coin_gecko
- name: hourly_prices_coin_market_cap