Merge pull request #62 from FlipsideCrypto/fix-realtime-views

revert speedup
This commit is contained in:
eric-laurello 2023-04-26 08:21:31 -04:00 committed by GitHub
commit aaa07c9b38
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 305 additions and 59 deletions

View File

@ -0,0 +1,59 @@
{{ config(
materialized = 'incremental',
unique_key = 'block_id',
cluster_by = ['_inserted_timestamp::date'],
merge_update_columns = ["block_id"],
) }}
WITH meta AS (
SELECT
registered_on,
last_modified,
LEAST(
last_modified,
registered_on
) AS _inserted_timestamp,
file_name
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze", "blocks") }}'
)
) A
{% if is_incremental() %}
WHERE
LEAST(
registered_on,
last_modified
) >= (
SELECT
COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP
FROM
{{ this }})
)
{% else %}
)
{% endif %}
SELECT
VALUE,
_partition_by_block_id,
block_number AS block_id,
metadata,
DATA,
TO_TIMESTAMP(
m._inserted_timestamp
) AS _inserted_timestamp
FROM
{{ source(
'bronze',
'blocks'
) }}
JOIN meta m
ON m.file_name = metadata$filename
WHERE
DATA: error IS NULL qualify(ROW_NUMBER() over (PARTITION BY block_number
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,60 @@
{{ config(
materialized = 'incremental',
unique_key = 'tx_id',
cluster_by = ['_inserted_timestamp::date'],
merge_update_columns = ["block_id"],
) }}
WITH meta AS (
SELECT
registered_on,
last_modified,
LEAST(
last_modified,
registered_on
) AS _inserted_timestamp,
file_name
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze", "txs_details") }}'
)
) A
{% if is_incremental() %}
WHERE
LEAST(
registered_on,
last_modified
) >= (
SELECT
COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP
FROM
{{ this }})
)
{% else %}
)
{% endif %}
SELECT
VALUE,
_partition_by_block_id,
block_number AS block_id,
metadata,
DATA,
tx_hash :: STRING AS tx_id,
tx_result,
TO_TIMESTAMP(
m._inserted_timestamp
) AS _inserted_timestamp
FROM
{{ source(
'bronze',
'txs_details'
) }}
JOIN meta m
ON m.file_name = metadata$filename
WHERE
DATA: error IS NULL qualify(ROW_NUMBER() over (PARTITION BY tx_hash :: STRING
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -4,11 +4,9 @@
incremental_strategy = 'delete+insert',
cluster_by = ['block_timestamp::DATE'],
) }}
-- depends_on: {{ ref('streamline__blocks_history') }}
-- depends_on: {{ ref('streamline__blocks_history_FR') }}
SELECT
block_number AS block_id,
block_id,
COALESCE(
DATA :result :block :header :time :: TIMESTAMP,
DATA :block :header :time :: TIMESTAMP,
@ -39,29 +37,26 @@ SELECT
DATA :result :block :header,
DATA :block :header
) AS header,
TO_TIMESTAMP(
_inserted_timestamp
) AS _inserted_timestamp,
_inserted_timestamp,
concat_ws(
'-',
chain_id,
block_id
) AS _unique_key
FROM
{{ ref('bronze__blocks') }}
WHERE
DATA :error :code IS NULL
{% if is_incremental() %}
{{ ref('streamline__blocks_history') }}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% else %}
{{ ref('streamline__blocks_history_FR') }}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) :: DATE - 1
FROM
{{ this }}
)
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY block_number
qualify(ROW_NUMBER() over (PARTITION BY block_id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -5,13 +5,12 @@
cluster_by = 'block_timestamp::DATE',
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION",
) }}
-- depends_on: {{ ref('streamline__txs_history') }}
-- depends_on: {{ ref('streamline__txs_history_FR') }}
WITH base_table AS (
SELECT
block_number AS block_id,
DATA :hash :: STRING AS tx_id,
block_id,
tx_id,
DATA :tx_result :codespace AS codespace,
DATA :tx_result :gas_used :: NUMBER AS gas_used,
DATA :tx_result :gas_wanted :: NUMBER AS gas_wanted,
@ -22,25 +21,22 @@ WITH base_table AS (
DATA :tx_result :code :: NUMBER AS tx_code,
DATA :tx_result :events AS msgs,
DATA :tx_result :log AS tx_log,
TO_TIMESTAMP(
_inserted_timestamp
) AS _inserted_timestamp
_inserted_timestamp
FROM
{{ ref('bronze__transactions') }}
WHERE
DATA :error :code IS NULL
{% if is_incremental() %}
{{ ref('streamline__txs_history') }}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% else %}
{{ ref('streamline__txs_history_FR') }}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY DATA :hash :: STRING
qualify(ROW_NUMBER() over (PARTITION BY tx_id
ORDER BY
_inserted_timestamp DESC)) = 1
)

View File

@ -1,9 +1,55 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_query(
model = "blocks",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "_partition_by_block_id",
unique_key = "block_number"
materialized = "incremental",
unique_key = "id",
cluster_by = "ROUND(block_number, -3)",
merge_update_columns = ["id"]
) }}
WITH meta AS (
SELECT
last_modified,
file_name
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze", "blocks") }}'
)
) A
)
{% if is_incremental() %},
max_date AS (
SELECT
COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP
FROM
{{ this }})
{% endif %}
SELECT
{{ dbt_utils.generate_surrogate_key(
['block_number']
) }} AS id,
block_number,
last_modified AS _inserted_timestamp
FROM
{{ source(
"bronze",
"blocks"
) }}
JOIN meta b
ON b.file_name = metadata$filename
{% if is_incremental() %}
WHERE
b.last_modified > (
SELECT
max_INSERTED_TIMESTAMP
FROM
max_date
)
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -22,7 +22,7 @@ SELECT
block_number
FROM
{{ ref(
"streamline__blocks_history_FR"
"streamline__blocks_history"
) }}
ORDER BY
1 ASC

View File

@ -1,9 +1,54 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_query(
model = "txs_details",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "_partition_by_block_id",
unique_key = "block_number"
materialized = "incremental",
unique_key = "id",
cluster_by = "ROUND(block_number, -3)",
merge_update_columns = ["id"]
) }}
WITH meta AS (
SELECT
last_modified,
file_name
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze", "txs_details") }}'
)
) A
)
{% if is_incremental() %},
max_date AS (
SELECT
COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP
FROM
{{ this }})
{% endif %}
SELECT
DISTINCT {{ dbt_utils.generate_surrogate_key(
['block_number']
) }} AS id,
block_number,
last_modified AS _inserted_timestamp
FROM
{{ source(
"bronze",
"txs_details"
) }}
JOIN meta b
ON b.file_name = metadata$filename
{% if is_incremental() %}
WHERE
b.last_modified > (
SELECT
max_INSERTED_TIMESTAMP
FROM
max_date
)
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -24,7 +24,7 @@ WHERE
block_number
FROM
{{ ref(
"streamline__txs_history_FR"
"streamline__txs_history"
) }}
GROUP BY
1

View File

@ -1,9 +1,54 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_query(
model = "validators",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "_partition_by_block_id",
unique_key = "block_number"
materialized = "incremental",
unique_key = "id",
cluster_by = "ROUND(block_number, -3)",
merge_update_columns = ["id"]
) }}
WITH meta AS (
SELECT
last_modified,
file_name
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze", "validators") }}'
)
) A
)
{% if is_incremental() %},
max_date AS (
SELECT
COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP
FROM
{{ this }})
{% endif %}
SELECT
{{ dbt_utils.generate_surrogate_key(
['block_number']
) }} AS id,
block_number,
last_modified AS _inserted_timestamp
FROM
{{ source(
"bronze",
"validators"
) }}
JOIN meta b
ON b.file_name = metadata$filename
{% if is_incremental() %}
WHERE
b.last_modified > (
SELECT
max_INSERTED_TIMESTAMP
FROM
max_date
)
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -22,7 +22,7 @@ SELECT
block_number
FROM
{{ ref(
"streamline__validators_history_FR"
"streamline__validators_history"
) }}
ORDER BY
1 ASC