fixed incremental complete runs (#152)

Co-authored-by: shah <info@shahnewazkhan.ca>
This commit is contained in:
Shah Newaz Khan 2023-08-22 11:24:22 -07:00 committed by GitHub
parent cb1000d082
commit f6bbfed2f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 36 additions and 29 deletions

View File

@ -42,7 +42,7 @@ streamline: sl-flow-api udfs grant-streamline-privileges streamline_bronze
streamline_bronze:
dbt run \
--vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' \
--vars '{"STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": False}' \
-m 1+models/silver/streamline/bronze \
--profiles-dir ~/.dbt \
--target $(DBT_TARGET) \

View File

@ -26,8 +26,7 @@ SELECT
s._partition_by_block_id,
s.value AS VALUE
FROM
streamline.FLOW_DEV.blocks
s
{{ source("bronze_streamline","blocks") }} s
JOIN meta b
ON b.file_name = metadata$filename
AND b._partition_by_block_id = s._partition_by_block_id

View File

@ -26,8 +26,7 @@ WITH meta AS (
s._partition_by_block_id,
s.value AS VALUE
FROM
streamline.FLOW_DEV.blocks
s
{{ source("bronze_streamline","blocks") }} s
JOIN meta b
ON b.file_name = metadata$filename
AND b._partition_by_block_id = s._partition_by_block_id

View File

@ -18,11 +18,14 @@ FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_blocks') }}
WHERE
_inserted_timestamp >= (
SELECT
_inserted_timestamp >= COALESCE(
(
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
{{ this }}
FROM
{{ this }}
),
'1900-01-01'::timestamp
)
{% else %}
{{ ref('bronze__streamline_fr_blocks') }}

View File

@ -18,17 +18,19 @@ FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_collections') }}
WHERE
_inserted_timestamp >= (
SELECT
_inserted_timestamp >= COALESCE(
(
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
{{ this }}
FROM
{{ this }}
),
'1900-01-01'::timestamp
)
{% else %}
{{ ref('bronze__streamline_fr_collections') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY _partition_by_block_id
qualify(ROW_NUMBER() over (PARTITION BY id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -18,13 +18,15 @@ FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_transaction_results') }}
WHERE
_inserted_timestamp >= (
SELECT
_inserted_timestamp >= COALESCE(
(
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
{{ this }}
FROM
{{ this }}
),
'1900-01-01'::timestamp
)
{% else %}
{{ ref('bronze__streamline_fr_transaction_results') }}
{% endif %}

View File

@ -18,13 +18,15 @@ FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_transactions') }}
WHERE
_inserted_timestamp >= (
SELECT
_inserted_timestamp >= COALESCE(
(
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
{{ this }}
FROM
{{ this }}
),
'1900-01-01'::timestamp
)
{% else %}
{{ ref('bronze__streamline_fr_transactions') }}
{% endif %}

View File

@ -1,7 +1,7 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','method', 'get_block_by_height','node_url','access-001.mainnet22.nodes.onflow.org:9000','external_table', 'blocks', 'sql_limit', {{var('sql_limit','10000')}}, 'producer_batch_size', {{var('producer_batch_size','100')}}, 'worker_batch_size', {{var('worker_batch_size','10')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))",
func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','node_url','access-001.mainnet22.nodes.onflow.org:9000','external_table', 'blocks', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))",
target = "{{this.schema}}.{{this.identifier}}"
)
) }}

View File

@ -1,7 +1,7 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','method', 'get_collection_by_i_d','node_url','access-001.mainnet22.nodes.onflow.org:9000','external_table', 'collections', 'sql_limit', {{var('sql_limit','10000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','10000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))",
func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','node_url','access-001.mainnet22.nodes.onflow.org:9000','external_table', 'collections', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))",
target = "{{this.schema}}.{{this.identifier}}"
)
) }}

View File

@ -1,7 +1,7 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','method', 'get_transaction_result','node_url','access-001.mainnet22.nodes.onflow.org:9000','external_table', 'transaction_results', 'sql_limit', {{var('sql_limit','10000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','10000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))",
func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','node_url','access-001.mainnet22.nodes.onflow.org:9000','external_table', 'transaction_results', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','30000')}}, 'worker_batch_size', {{var('worker_batch_size','3000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))",
target = "{{this.schema}}.{{this.identifier}}"
)
) }}

View File

@ -1,7 +1,7 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','method', 'get_transaction','node_url','access-001.mainnet22.nodes.onflow.org:9000','external_table', 'transactions', 'sql_limit', {{var('sql_limit','10000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','10000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))",
func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','node_url','access-001.mainnet22.nodes.onflow.org:9000','external_table', 'transactions', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','30000')}}, 'worker_batch_size', {{var('worker_batch_size','3000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))",
target = "{{this.schema}}.{{this.identifier}}"
)
) }}