updated to dedicate node

This commit is contained in:
xiuy001 2023-08-24 08:23:00 -04:00
parent b141bea16e
commit cb440daa2d
16 changed files with 242 additions and 83 deletions

View File

@ -1,5 +1,11 @@
{% macro create_udfs() %}
{% if var("UPDATE_UDFS_AND_SPS") %}
{{- fsc_utils.create_udfs() -}}
{% set sql %}
{{ create_udf_get_chainhead() }}
{{ create_udf_json_rpc() }}
{% endset %}
{% do run_query(sql) %}
{% endif %}
{% endmacro %}

View File

@ -8,8 +8,8 @@
{% do run_query(sql) %}
{% else %}
{% set sql %}
CREATE api integration IF NOT EXISTS aws_aurora_dev_api api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::490041342817:role/aurora-api-dev-rolesnowflakeudfsAF733095-1MX4LOX4UFE9M' api_allowed_prefixes = (
'https://66lx4fxkui.execute-api.us-east-1.amazonaws.com/dev/'
CREATE OR REPLACE api integration aws_aurora_dev_api api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::490041342817:role/aurora-api-dev-rolesnowflakeudfsAF733095-AN4Q3176CUYA' api_allowed_prefixes = (
'https://xh409mek2a.execute-api.us-east-1.amazonaws.com/dev/'
) enabled = TRUE;
{% endset %}
{% do run_query(sql) %}

View File

@ -4,7 +4,7 @@
'https://sl2f5beopl.execute-api.us-east-1.amazonaws.com/prod/get_chainhead'
{% else %}
CREATE OR REPLACE EXTERNAL FUNCTION streamline.udf_get_chainhead() returns variant api_integration = aws_aurora_dev_api AS
'https://66lx4fxkui.execute-api.us-east-1.amazonaws.com/dev/get_chainhead'
'https://xh409mek2a.execute-api.us-east-1.amazonaws.com/dev/get_chainhead'
{%- endif %};
{% endmacro %}
@ -18,6 +18,6 @@
CREATE OR REPLACE EXTERNAL FUNCTION streamline.udf_json_rpc(
json OBJECT
) returns ARRAY api_integration = aws_aurora_dev_api AS
'https://66lx4fxkui.execute-api.us-east-1.amazonaws.com/dev/bulk_get_json_rpc'
'https://xh409mek2a.execute-api.us-east-1.amazonaws.com/dev/bulk_get_json_rpc'
{%- endif %};
{% endmacro %}

View File

@ -2,10 +2,54 @@
materialized = 'view'
) }}
{% set model = this.identifier.split("_") [-1] %}
{{ streamline_external_table_FR_query(
model,
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",
unique_key = "block_number"
) }}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER) AS _partition_by_block_id
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", "transactions") }}'
)
) A
)
SELECT
block_number,
DATA :hash ::STRING AS tx_hash,
_inserted_timestamp,
MD5(
CAST(
COALESCE(CAST(CONCAT(block_number, '_-_', COALESCE(DATA :hash ::STRING, '')) AS text), '' :: STRING) AS text
)
) AS id,
s._partition_by_block_id,
s.value AS VALUE
FROM
{{ source(
"bronze_streamline",
"transactions"
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b._partition_by_block_id = s._partition_by_block_id
WHERE
b._partition_by_block_id = s._partition_by_block_id
AND (
DATA :error :code IS NULL
OR DATA :error :code NOT IN (
'-32000',
'-32001',
'-32002',
'-32003',
'-32004',
'-32005',
'-32006',
'-32007',
'-32008',
'-32009',
'-32010'
)
)

View File

@ -2,10 +2,54 @@
materialized = 'view'
) }}
{% set model = this.identifier.split("_") [-2] + '_' + this.identifier.split("_") [-1] %}
{{ streamline_external_table_FR_query(
model,
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",
unique_key = "block_number"
) }}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER) AS _partition_by_block_id
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", "tx_receipts") }}'
)
) A
)
SELECT
block_number,
value :data :result :transactionHash ::STRING AS tx_hash,
_inserted_timestamp,
MD5(
CAST(
COALESCE(CAST(CONCAT(block_number, '_-_', COALESCE(value :data :result :transactionHash ::STRING, '')) AS text), '' :: STRING) AS text
)
) AS id,
s._partition_by_block_id,
s.value AS VALUE
FROM
{{ source(
"bronze_streamline",
"tx_receipts"
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b._partition_by_block_id = s._partition_by_block_id
WHERE
b._partition_by_block_id = s._partition_by_block_id
AND (
DATA :error :code IS NULL
OR DATA :error :code NOT IN (
'-32000',
'-32001',
'-32002',
'-32003',
'-32004',
'-32005',
'-32006',
'-32007',
'-32008',
'-32009',
'-32010'
)
)

View File

@ -2,10 +2,55 @@
materialized = 'view'
) }}
{% set model = this.identifier.split("_") [-1] %}
{{ streamline_external_table_query(
model,
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",
unique_key = "block_number"
) }}
WITH meta AS (
SELECT
job_created_time AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER) AS _partition_by_block_id
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => GREATEST(DATEADD('day', -1, CURRENT_TIMESTAMP), '2023-08-01 18:44:00.000' :: timestamp_ntz),
table_name => '{{ source( "bronze_streamline", "transactions") }}'
)
) A
)
SELECT
block_number,
DATA :hash ::STRING AS tx_hash,
_inserted_timestamp,
MD5(
CAST(
COALESCE(CAST(CONCAT(block_number, '_-_', COALESCE(DATA :hash ::STRING, '')) AS text), '' :: STRING) AS text
)
) AS id,
s._partition_by_block_id,
s.value AS VALUE
FROM
{{ source(
"bronze_streamline",
"transactions"
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b._partition_by_block_id = s._partition_by_block_id
WHERE
b._partition_by_block_id = s._partition_by_block_id
AND (
DATA :error :code IS NULL
OR DATA :error :code NOT IN (
'-32000',
'-32001',
'-32002',
'-32003',
'-32004',
'-32005',
'-32006',
'-32007',
'-32008',
'-32009',
'-32010'
)
)

View File

@ -2,10 +2,55 @@
materialized = 'view'
) }}
{% set model = this.identifier.split("_") [-2] + '_' + this.identifier.split("_") [-1] %}
{{ streamline_external_table_query(
model,
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",
unique_key = "block_number"
) }}
WITH meta AS (
SELECT
job_created_time AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER) AS _partition_by_block_id
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => GREATEST(DATEADD('day', -1, CURRENT_TIMESTAMP), '2023-08-01 18:44:00.000' :: timestamp_ntz),
table_name => '{{ source( "bronze_streamline", "tx_receipts") }}'
)
) A
)
SELECT
block_number,
value :data :result :transactionHash ::STRING AS tx_hash,
_inserted_timestamp,
MD5(
CAST(
COALESCE(CAST(CONCAT(block_number, '_-_', COALESCE(value :data :result :transactionHash ::STRING, '')) AS text), '' :: STRING) AS text
)
) AS id,
s._partition_by_block_id,
s.value AS VALUE
FROM
{{ source(
"bronze_streamline",
"tx_receipts"
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b._partition_by_block_id = s._partition_by_block_id
WHERE
b._partition_by_block_id = s._partition_by_block_id
AND (
DATA :error :code IS NULL
OR DATA :error :code NOT IN (
'-32000',
'-32001',
'-32002',
'-32003',
'-32004',
'-32005',
'-32006',
'-32007',
'-32008',
'-32009',
'-32010'
)
)

View File

@ -1,7 +1,5 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = "logs_id",
cluster_by = "block_timestamp::date, _inserted_timestamp::date",
tags = ['core']
) }}

View File

@ -8,13 +8,11 @@
) }}
WITH base AS (
SELECT
block_number,
DATA :result AS DATA,
DATA:result AS DATA,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_tx_receipts') }}
WHERE
@ -35,12 +33,14 @@ WHERE
)
{% endif %}
),
blocks AS (
SELECT
SELECT
*
FROM
{{ ref('silver__blocks') }}
{{ ref('silver__blocks') }}
),
FINAL AS (
SELECT
b.block_number,
@ -86,15 +86,16 @@ FINAL AS (
b._inserted_timestamp
FROM
base b
LEFT JOIN blocks
LEFT JOIN blocks
ON blocks.block_number = b.block_number
)
SELECT
*,
{{ dbt_utils.generate_surrogate_key(['BLOCK_NUMBER', 'TX_HASH']) }} AS receipts_id
{{ dbt_utils.generate_surrogate_key(['BLOCK_NUMBER', 'TX_HASH']) }} AS receipts_id,
*
FROM
FINAL
WHERE
tx_hash IS NOT NULL qualify(ROW_NUMBER() over (PARTITION BY block_number, POSITION
tx_hash IS NOT NULL qualify(ROW_NUMBER() over (PARTITION BY TX_HASH
ORDER BY
_inserted_timestamp DESC)) = 1
_inserted_timestamp DESC)) = 1

View File

@ -1,8 +1,6 @@
-- depends_on: {{ ref('bronze__streamline_transactions') }}
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = "tx_id",
cluster_by = "block_timestamp::date, _inserted_timestamp::date",
tags = ['core']
) }}
@ -286,8 +284,7 @@ FROM
{% endif %}
)
SELECT
*,
{{ dbt_utils.generate_surrogate_key(['BLOCK_NUMBER', 'TX_HASH', 'POSITION']) }} AS tx_id
*
FROM
FINAL qualify(ROW_NUMBER() over (PARTITION BY block_number, tx_hash
ORDER BY

View File

@ -9,7 +9,7 @@
SELECT
id,
block_number,
DATA :hash :: STRING AS tx_hash,
tx_hash,
_inserted_timestamp
FROM

View File

@ -2,14 +2,12 @@
{{ config (
materialized = "incremental",
unique_key = "id",
cluster_by = "ROUND(block_number, -3)",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)"
) }}
SELECT
id,
block_number,
DATA :result :transactionHash :: STRING AS tx_hash,
value:data:result:transactionHash::string AS tx_hash,
_inserted_timestamp
FROM

View File

@ -1,7 +1,7 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table','blocks', 'producer_batch_size',10000, 'producer_limit_size', 10000000, 'worker_batch_size',200))",
func = "{{this.schema}}.udf_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table','blocks', 'producer_batch_size',5000, 'producer_limit_size', 5000, 'worker_batch_size',50))",
target = "{{this.schema}}.{{this.identifier}}"
)
) }}
@ -62,4 +62,5 @@ SELECT
'false'
) AS params
FROM
tbl
{{ ref("streamline__blocks") }}
WHERE block_number <= 39511995

View File

@ -1,7 +1,7 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'transactions', 'exploded_key','[\"result\", \"transactions\"]', 'producer_batch_size',10000, 'producer_limit_size', 10000000, 'worker_batch_size',200))",
func = "{{this.schema}}.udf_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'transactions', 'exploded_key','[\"result\", \"transactions\"]', 'producer_batch_size',500, 'producer_limit_size', 5000000, 'worker_batch_size',50))",
target = "{{this.schema}}.{{this.identifier}}"
)
) }}
@ -41,7 +41,7 @@ WITH last_3_days AS ({% if var('STREAMLINE_RUN_HISTORY') %}
''
) AS block_number_hex
FROM
{{ ref("streamline__complete_blocks") }}
{{ ref("streamline__complete_transactions") }}
WHERE
(
block_number >= (
@ -62,4 +62,5 @@ SELECT
'true'
) AS params
FROM
tbl
{{ ref("streamline__blocks") }}
WHERE block_number <= 39511995

View File

@ -1,7 +1,7 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'tx_receipts', 'producer_batch_size',10000, 'producer_limit_size', 10000000, 'worker_batch_size',200))",
func = "{{this.schema}}.udf_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'tx_receipts', 'producer_batch_size',500, 'producer_limit_size', 5000000, 'worker_batch_size',50))",
target = "{{this.schema}}.{{this.identifier}}"
)
) }}
@ -14,26 +14,14 @@ WITH last_3_days AS ({% if var('STREAMLINE_RUN_HISTORY') %}
SELECT
MAX(block_number) - 50000 AS block_number -- aprox 3 days
FROM
{{ ref("streamline__complete_blocks") }}
{{ ref("streamline__complete_transactions") }}
{% endif %}),
flattened_tbl AS (
SELECT
block_number,
VALUE :: STRING AS tx_hash
FROM
{{ ref("streamline__complete_blocks") }},
LATERAL FLATTEN(
input => transactions
)
WHERE
transactions IS NOT NULL
),
tbl AS (
SELECT
block_number,
tx_hash
FROM
flattened_tbl
{{ ref("streamline__complete_transactions") }}
WHERE
(
block_number >= (
@ -50,16 +38,6 @@ WITH last_3_days AS ({% if var('STREAMLINE_RUN_HISTORY') %}
tx_hash
FROM
{{ ref("streamline__complete_tx_receipts") }}
WHERE
(
block_number >= (
SELECT
block_number
FROM
last_3_days
)
)
AND block_number IS NOT NULL
)
)
SELECT

View File

@ -7,3 +7,4 @@ SELECT
*
FROM
{{ ref('silver__transactions') }}
WHERE tx_id IS NOT NULL