This commit is contained in:
tarikceric 2024-11-13 12:42:07 -08:00
parent 92d78b6b88
commit 03fcaa7b33
20 changed files with 489 additions and 68 deletions

View File

@ -39,19 +39,6 @@ models:
columns: true
+on_schema_change: "append_new_columns"
vars:
"dbt_date:time_zone": GMT
OBSERV_FULL_TEST: FALSE
UPDATE_SNOWFLAKE_TAGS: TRUE
STREAMLINE_INVOKE_STREAMS: FALSE
STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES: FALSE
UPDATE_UDFS_AND_SPS: FALSE
START_GHA_TASKS: False
API_INTEGRATION: '{{ var("config")[target.name]["API_INTEGRATION"] if var("config")[target.name] else var("config")["dev"]["API_INTEGRATION"] }}'
EXTERNAL_FUNCTION_URI: '{{ var("config")[target.name]["EXTERNAL_FUNCTION_URI"] if var("config")[target.name] else var("config")["dev"]["EXTERNAL_FUNCTION_URI"] }}'
ROLES: |
["INTERNAL_DEV"]
dbt_constraints_sources_nn_enabled: false
tests:
@ -73,4 +60,37 @@ dispatch:
query-comment:
comment: '{{ dbt_snowflake_query_tags.get_query_comment(node) }}'
append: true # Snowflake removes prefixed comments.
append: true # Snowflake removes prefixed comments.
vars:
"dbt_date:time_zone": GMT
STREAMLINE_INVOKE_STREAMS: FALSE
STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES: TRUE
UPDATE_SNOWFLAKE_TAGS: TRUE
OBSERV_FULL_TEST: FALSE
START_GHA_TASKS: FALSE
UPDATE_UDFS_AND_SPS: FALSE
#### STREAMLINE 2.0 BEGIN ####
API_INTEGRATION: '{{ var("config")[target.name]["API_INTEGRATION"] if var("config")[target.name] else var("config")["dev"]["API_INTEGRATION"] }}'
EXTERNAL_FUNCTION_URI: '{{ var("config")[target.name]["EXTERNAL_FUNCTION_URI"] if var("config")[target.name] else var("config")["dev"]["EXTERNAL_FUNCTION_URI"] }}'
ROLES: '{{ var("config")[target.name]["ROLES"] }}'
config:
# The keys correspond to dbt profiles and are case sensitive
dev:
API_INTEGRATION: aws_aptos_api_dev
EXTERNAL_FUNCTION_URI: 9v6g64rv1e.execute-api.us-east-1.amazonaws.com/stg/
API_AWS_ROLE_ARN: arn:aws:iam::704693948482:role/aptos-api-stg-rolesnowflakeudfsAF733095-k23uBmqxZRsN
ROLES:
- AWS_LAMBDA_APTOS_API
- INTERNAL_DEV
prod:
API_INTEGRATION: aws_aptos_api
EXTERNAL_FUNCTION_URI: sfl36j9j2c.execute-api.us-east-1.amazonaws.com/prod/
ROLES:
- AWS_LAMBDA_APTOS_API
- DBT_CLOUD_APTOS
- INTERNAL_DEV

View File

@ -9,6 +9,7 @@
{% set sql %}
{{ create_udf_bulk_json_rpc() }}
{{ create_udf_bulk_rest_api() }}
{{ create_udf_bulk_rest_api_v2() }}
{% endset %}

View File

@ -12,8 +12,8 @@
{% do run_query(sql) %}
{% elif target.name == "dev" %}
{% set sql %}
CREATE api integration IF NOT EXISTS aws_aptos_api_dev api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::490041342817:role/aptos-api-dev-rolesnowflakeudfsAF733095-sLREQ0qf4XVH' api_allowed_prefixes = (
'https://jx15f84555.execute-api.us-east-1.amazonaws.com/dev/'
CREATE api integration IF NOT EXISTS aws_aptos_api_dev api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::704693948482:role/aptos-api-stg-rolesnowflakeudfsAF733095-k23uBmqxZRsN' api_allowed_prefixes = (
'https://9v6g64rv1e.execute-api.us-east-1.amazonaws.com/stg/'
) enabled = TRUE;
{% endset %}
{% do run_query(sql) %}

View File

@ -76,21 +76,19 @@ WHERE
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()),
start_time => DATEADD('day', -7, CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze_streamline", model) }}')
) A
)
SELECT
{{ unique_key }},
DATA,
_inserted_timestamp,
MD5(
CAST(
COALESCE(CAST({{ unique_key }} AS text), '' :: STRING) AS text
)
) AS id,
s.{{ partition_name }},
s.value AS VALUE
s.*,
b.file_name,
_inserted_timestamp
FROM
{{ source(
"bronze_streamline",
@ -115,7 +113,9 @@ WHERE
'-32007',
'-32008',
'-32009',
'-32010'
'-32010',
'-32602',
'-32603'
)
)
{% endmacro %}
@ -139,16 +139,14 @@ WHERE
) A
)
SELECT
{{ unique_key }},
DATA,
_inserted_timestamp,
MD5(
CAST(
COALESCE(CAST({{ unique_key }} AS text), '' :: STRING) AS text
)
) AS id,
s.{{ partition_name }},
s.value AS VALUE
s.*,
b.file_name,
_inserted_timestamp
FROM
{{ source(
"bronze_streamline",
@ -173,7 +171,78 @@ WHERE
'-32007',
'-32008',
'-32009',
'-32010'
'-32010',
'-32602',
'-32603'
)
)
{% endmacro %}
{% macro streamline_external_table_query_v2(
model,
partition_function
) %}
WITH meta AS (
SELECT
job_created_time AS _inserted_timestamp,
file_name,
{{ partition_function }} AS partition_key
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze_streamline", model) }}')
) A
)
SELECT
s.*,
b.file_name,
_inserted_timestamp
FROM
{{ source(
"bronze_streamline",
model
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.partition_key
WHERE
b.partition_key = s.partition_key
AND DATA :error IS NULL
{% endmacro %}
{% macro streamline_external_table_FR_query_v2(
model,
partition_function
) %}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
{{ partition_function }} AS partition_key
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", model) }}'
)
) A
)
SELECT
s.*,
b.file_name,
_inserted_timestamp
FROM
{{ source(
"bronze_streamline",
model
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.partition_key
WHERE
b.partition_key = s.partition_key
AND DATA :error IS NULL
{% endmacro %}

View File

@ -22,4 +22,18 @@
) returns ARRAY api_integration = aws_aptos_api_dev AS
'https://jx15f84555.execute-api.us-east-1.amazonaws.com/dev/udf_bulk_rest_api'
{%- endif %};
{% endmacro %}
{% macro create_udf_bulk_rest_api_v2() %}
{% if target.name == "prod" %}
CREATE OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_rest_api_v2(
json OBJECT
) returns ARRAY api_integration = aws_aptos_api AS
'https://dedvhh9fi1.execute-api.us-east-1.amazonaws.com/prod/udf_bulk_rest_api'
{% else %}
CREATE OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_rest_api(
json OBJECT
) returns ARRAY api_integration = aws_aptos_api_dev AS
'https://9v6g64rv1e.execute-api.us-east-1.amazonaws.com/stg/udf_bulk_rest_api'
{%- endif %};
{% endmacro %}

View File

@ -1,12 +1,23 @@
{{ config (
materialized = 'view',
tags = ['core']
materialized = 'view'
) }}
{% set model = this.identifier.split("_") [-1] %}
{{ streamline_external_table_FR_query(
model = "blocks_tx",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",
unique_key = "partition_key"
) }}
SELECT
VALUE,
partition_key,
metadata,
DATA,
file_name,
_INSERTED_TIMESTAMP
FROM
{{ ref('bronze__streamline_FR_blocks_tx_v2') }}
UNION ALL
SELECT
VALUE,
_partition_by_block_id AS partition_key,
metadata,
DATA,
file_name,
_INSERTED_TIMESTAMP
FROM
{{ ref('bronze__streamline_FR_blocks_tx_v1') }}

View File

@ -0,0 +1,12 @@
{{ config (
materialized = 'view',
tags = ['core']
) }}
{% set model = this.identifier.split("_") [-1] %}
{{ streamline_external_table_FR_query(
model = "blocks_tx",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",
unique_key = "partition_key"
) }}

View File

@ -0,0 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_FR_query_v2(
"blocks_tx_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )"
) }}

View File

@ -1,10 +1,23 @@
{{ config (
materialized = 'view',
tags = ['core']
) }}
{{ streamline_external_table_FR_query(
model = "transaction_batch",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",
unique_key = "partition_key"
materialized = 'view'
) }}
SELECT
VALUE,
partition_key,
metadata,
DATA,
file_name,
_INSERTED_TIMESTAMP
FROM
{{ ref('bronze__streamline_FR_transaction_batch_v2') }}
UNION ALL
SELECT
VALUE,
_partition_by_block_id AS partition_key,
metadata,
DATA,
file_name,
_INSERTED_TIMESTAMP
FROM
{{ ref('bronze__streamline_FR_transaction_batch_v1') }}

View File

@ -0,0 +1,10 @@
{{ config (
materialized = 'view',
tags = ['core']
) }}
{{ streamline_external_table_FR_query(
model = "transaction_batch",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",
unique_key = "partition_key"
) }}

View File

@ -0,0 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_FR_query_v2(
"transaction_batch_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )"
) }}

View File

@ -3,10 +3,7 @@
tags = ['core']
) }}
{% set model = this.identifier.split("_") [-1] %}
{{ streamline_external_table_query(
model = "blocks_tx",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",
unique_key = "partition_key"
{{ streamline_external_table_query_v2(
model = "blocks_tx_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)"
) }}

View File

@ -2,9 +2,7 @@
materialized = 'view',
tags = ['core']
) }}
{{ streamline_external_table_query(
model = "transaction_batch",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",
unique_key = "partition_key"
{{ streamline_external_table_query_v2(
model = "transaction_batch_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)"
) }}

View File

@ -38,4 +38,9 @@ sources:
- name: blocks_tx
- name: transactions
- name: transaction_batch
- name: blocks_tx_v2
- name: transactions_v2
- name: transaction_batch_v2

View File

@ -0,0 +1,36 @@
-- depends_on: {{ ref('bronze__streamline_blocks_tx') }}
-- depends_on: {{ ref('bronze__streamline_FR_blocks_tx') }}
{{ config (
materialized = "incremental",
unique_key = "block_number",
cluster_by = "ROUND(block_number, -3)",
merge_update_columns = ["block_number"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)"
) }}
SELECT
DATA :result :block :header :height :: INT AS block_number,
{{ dbt_utils.generate_surrogate_key(
['block_number']
) }} AS complete_blocks_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
_inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_blocks_tx') }}
WHERE
_inserted_timestamp >= (
SELECT
COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP
FROM
{{ this }})
{% else %}
{{ ref('bronze__streamline_FR_blocks_tx') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY block_number
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,39 @@
-- depends_on: {{ ref('bronze__streamline_blocks_tx') }}
{{ config (
materialized = "incremental",
unique_key = "block_number",
cluster_by = "ROUND(_partition_by_block_id, -3)",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)"
) }}
-- depends_on: {{ ref('bronze__streamline_transaction_batch') }}
SELECT
block_number,
_partition_by_block_id,
A._inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_transaction_batch') }}
{% else %}
{{ ref('bronze__streamline_FR_transaction_batch') }}
{% endif %}
A
JOIN {{ ref('silver__blocks') }}
b
ON DATA [0] :version :: INT BETWEEN b.first_version
AND b.last_version
{% if is_incremental() %}
WHERE
A._inserted_timestamp >= (
SELECT
COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP
FROM
{{ this }})
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY block_number
ORDER BY
A._inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,62 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"blocks_tx_v2",
"sql_limit" :"1200000",
"producer_batch_size" :"300000",
"worker_batch_size" :"50000",
"sql_source" :"{{this.identifier}}" }
),
tags = ['streamline_core_realtime']
) }}
{% if execute %}
{% set next_batch_num_query %}
SELECT
greatest(
251452492,
(SELECT coalesce(max(block_number),0) FROM {{ ref('streamline__complete_blocks_tx_v2') }})
)+1
{% endset %}
{% set next_batch_num = run_query(next_batch_num_query)[0][0] %}
{% endif %}
WITH blocks AS (
SELECT
block_number
FROM
{{ ref("streamline__blocks") }}
EXCEPT
SELECT
block_number
FROM
{{ ref("streamline__complete_blocks_tx") }}
WHERE
block_number <= 251452492
EXCEPT
SELECT
block_number
FROM
{{ ref('streamline__complete_blocks_tx_v2') }}
)
SELECT
ROUND(
block_number,
-3
) :: INT AS partition_key,
aptos.live.udf_api(
'GET',
'{service}/{Authentication}/v1/blocks/by_height/' || block_number || '?with_transactions=true',
object_construct(
'Content-Type',
'application/json',
'token',
'{Authentication}'
),
{},
'Vault/prod/aptos/node/mainnet'
) AS request
FROM
blocks

View File

@ -0,0 +1,107 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"transactions_batch_v2",
"sql_limit" :"1200000",
"producer_batch_size" :"300000",
"worker_batch_size" :"50000",
"sql_source" :"{{this.identifier}}" }
)
) }}
-- depends_on: {{ ref('bronze__streamline_transaction_batch') }}
--- to do:nothing done here yet
WITH blocks AS (
SELECT
A.block_number,
tx_count_from_versions -100 AS tx_count,
first_version + 100 version_start
FROM
{{ ref('silver__blocks') }} A
WHERE
tx_count_from_versions > 100
AND (
block_number >= 184014560
OR block_number IN (
183537590,
183564192,
183587755,
183587754,
183666216
)
)
),
numbers AS (
-- Recursive CTE to generate numbers. We'll use the maximum txcount value to limit our recursion.
SELECT
1 AS n
UNION ALL
SELECT
n + 1
FROM
numbers
WHERE
n < (
SELECT
CEIL(MAX(tx_count) / 100.0)
FROM
blocks)
),
blocks_with_page_numbers AS (
SELECT
tt.block_number :: INT AS block_number,
n.n - 1 AS multiplier,
version_start,
tx_count
FROM
blocks tt
JOIN numbers n
ON n.n <= CASE
WHEN tt.tx_count % 100 = 0 THEN tt.tx_count / 100
ELSE FLOOR(
tt.tx_count / 100
) + 1
END
),
WORK AS (
SELECT
A.block_number,
version_start +(
100 * multiplier
) AS tx_version
FROM
blocks_with_page_numbers A
LEFT JOIN {{ ref('streamline__complete_transaction_batch') }}
b
ON A.block_number = b.block_number
WHERE
b.block_number IS NULL
),
calls AS (
SELECT
'{service}/{Authentication}/v1/transactions?start=' || tx_version || '&limit=100' calls,
block_number
FROM
WORK
)
SELECT
ARRAY_CONSTRUCT(
ROUND(
block_number,
-3
) :: INT,
ARRAY_CONSTRUCT(
'GET',
calls,
PARSE_JSON('{}'),
PARSE_JSON('{}'),
''
)
) AS request
FROM
calls
ORDER BY
block_number

View File

@ -3,13 +3,6 @@
tags = ['streamline_view']
) }}
{% if execute %}
{% set height = run_query("SELECT live.udf_api( 'GET', 'https://fullnode.mainnet.aptoslabs.com/v1', OBJECT_CONSTRUCT( 'Content-Type', 'application/json' ),{} ):data:block_height::INT") %}
{% set block_height = height.columns [0].values() [0] %}
{% else %}
{% set block_height = 0 %}
{% endif %}
SELECT
_id AS block_number
FROM
@ -18,7 +11,9 @@ FROM
'number_sequence'
) }}
WHERE
_id <= {{ block_height }}
UNION ALL
SELECT
0 AS block_number
_id <= (
SELECT
MAX(block_number)
FROM
{{ ref('streamline__chainhead') }}
)

View File

@ -0,0 +1,18 @@
{{ config (
materialized = "view",
tags = ['streamline_view']
) }}
SELECT
aptos.live.udf_api(
'GET',
'{service}/{Authentication}/v1',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json',
'fsc-quantum-state',
'livequery'
),
OBJECT_CONSTRUCT(),
'Vault/prod/aptos/node/mainnet'
) :data:block_height :: INT AS block_number