add new streamline model for tx_search

This commit is contained in:
Eric Laurello 2024-01-31 17:20:03 -05:00
parent 6f7dce1345
commit 56523573a5
6 changed files with 153 additions and 1 deletions

View File

@ -42,4 +42,4 @@ jobs:
dbt deps
- name: Run DBT Jobs
run: |
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/streamline/core/realtime/streamline__transactions_realtime.sql
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/streamline/core/realtime/streamline__transactions_realtime.sql +models/streamline/core/realtime/streamline__tx_search_realtime.sql

View File

@ -0,0 +1,12 @@
{{ config (
materialized = 'view',
tags = ['core']
) }}
{% set model = this.identifier.split("_") [-1] %}
{{ streamline_external_table_FR_query(
model = 'tx_search',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "_partition_by_block_id",
unique_key = "COALESCE( block_number,metadata:request:params[2]::INT )"
) }}

View File

@ -0,0 +1,12 @@
{{ config (
materialized = 'view',
tags = ['core']
) }}
{% set model = this.identifier.split("_") [-1] %}
{{ streamline_external_table_query(
model = 'tx_search',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "_partition_by_block_id",
unique_key = "COALESCE( block_number,metadata:request:params[2]::INT )"
) }}

View File

@ -29,6 +29,7 @@ sources:
- name: blocks
- name: transactions
- name: txcount
- name: tx_search
- name: pool_balances
- name: balances
- name: crosschain_silver

View File

@ -0,0 +1,36 @@
-- depends_on: {{ ref('bronze__streamline_transactions') }}
{{ config (
materialized = "incremental",
unique_key = "id",
cluster_by = "ROUND(block_number, -3)",
merge_update_columns = ["id"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)"
) }}
SELECT
id,
block_number,
metadata :request :params [2] :: INT AS page,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_tx_search') }}
{% else %}
{{ ref('bronze__streamline_FR_tx_search') }}
{% endif %}
WHERE
DATA <> '[]'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
{{ this }}
)
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,91 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'tx_search', 'sql_limit', {{var('sql_limit','6000000')}}, 'producer_batch_size', {{var('producer_batch_size','3000000')}}, 'worker_batch_size', {{var('worker_batch_size','15000')}}, 'batch_call_limit', {{var('batch_call_limit','10')}}, 'exploded_key', '[\"result\", \"txs\"]', 'call_type', 'batch'))",
target = "{{this.schema}}.{{this.identifier}}"
)
) }}
WITH blocks AS (
SELECT
block_number
FROM
{{ ref("streamline__complete_txcount") }}
WHERE
block_number >= 12834102
),
transactions_counts_by_block AS (
SELECT
tc.block_number,
tc.txcount
FROM
{{ ref("streamline__complete_txcount") }}
tc
INNER JOIN blocks b
ON tc.block_number = b.block_number
),
numbers AS (
-- Recursive CTE to generate numbers. We'll use the maximum txcount value to limit our recursion.
SELECT
1 AS n
UNION ALL
SELECT
n + 1
FROM
numbers
WHERE
n < (
SELECT
CEIL(MAX(txcount) / 100.0)
FROM
transactions_counts_by_block)
),
blocks_with_page_numbers AS (
SELECT
tt.block_number AS block_number,
n.n AS page_number
FROM
transactions_counts_by_block tt
JOIN numbers n
ON n.n <= CASE
WHEN tt.txcount % 100 = 0 THEN tt.txcount / 100
ELSE FLOOR(
tt.txcount / 100
) + 1
END
{% if is_incremental() %}
EXCEPT
SELECT
block_number,
page
FROM
{{ ref("streamline__complete_tx_search") }}
{% endif %}
)
SELECT
PARSE_JSON(
CONCAT(
'{"jsonrpc": "2.0",',
'"method": "tx_search", "params":["',
'tx.height=',
block_number :: STRING,
'",',
TRUE,
',"',
page_number :: STRING,
'",',
'"100",',
'"asc"',
'],"id":"',
block_number :: STRING,
'"}'
)
) AS request
FROM
blocks_with_page_numbers
ORDER BY
block_number ASC
LIMIT
1000