flow-models/macros/streamline/models.sql
Jack Forgash 88df737fa2
AN-5203/Deploy flowEVM (#343)
* flow evm init

* flow evm testnet - blocks

* deploy dev udfs, update typo

* send block height as hex

* dev limit

* use stg external table

* some links

* add query to side doc

* upd external tbl to stg & testnet

* silver and evm txs

* testnet silver models pt1

* testnet events final, decode hash udf

* udf get evm chainhead

* upd resource list

* new udf. receipts pipeline. upd testnet model cols. expand readme

* receipts silver

* traces

* vault

* reorg to evm dir

* rm testnet, blocks lookback

* lookbacks, move qualify

* macro and align naming

* del testnet, reset namespace to gen

* silver_evm - receipts and txs

* core_evm fact_blocks & fact_transactions

* fix vault path in get blocks. Logs v1 (need sample)

* core_evm fact_logs

* CR updates

* del readme and add workflow

* upd vault path, batch size

* incr logic on modified to core_evm

* use local utils.udf, add blockNumber col to complete blocks check

* rm col

* correct _invocation_id

* upd nv csv

* incr batch limit due to late start

* add evm tag to model run
2024-09-05 20:43:33 -06:00

223 lines
6.1 KiB
SQL

{% macro streamline_external_table_query(
model,
partition_function,
partition_name,
unique_key
) %}
WITH meta AS (
SELECT
last_modified AS _inserted_timestamp,
file_name,
{{ partition_function }} AS {{ partition_name }}
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze_streamline", model) }}')
) A
)
SELECT
block_number,
{{ unique_key }},
DATA,
_inserted_timestamp,
MD5(
CAST(
COALESCE(CAST({{ unique_key }} AS text), '' :: STRING) AS text
)
) AS _fsc_id,
s.{{ partition_name }},
s.value AS VALUE
FROM
{{ source(
"bronze_streamline",
model
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.{{ partition_name }} = s.{{ partition_name }}
WHERE
b.{{ partition_name }} = s.{{ partition_name }}
{% endmacro %}
{% macro streamline_external_table_FR_query(
model,
partition_function,
partition_name,
unique_key
) %}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
{{ partition_function }} AS {{ partition_name }}
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", model) }}'
)
) A
)
SELECT
block_number,
{{ unique_key }},
DATA,
_inserted_timestamp,
MD5(
CAST(
COALESCE(CAST({{ unique_key }} AS text), '' :: STRING) AS text
)
) AS _fsc_id,
s.{{ partition_name }},
s.value AS VALUE
FROM
{{ source(
"bronze_streamline",
model
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.{{ partition_name }} = s.{{ partition_name }}
WHERE
b.{{ partition_name }} = s.{{ partition_name }}
{% endmacro %}
{% macro streamline_multiple_external_table_query(
table_names,
partition_function,
partition_name,
unique_key
)%}
WITH
{% for table_name in table_names %}
meta_{{ table_name }} AS (
SELECT
last_modified AS _inserted_timestamp,
file_name,
{{ partition_function }} AS {{ partition_name }}
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('hour', -36, SYSDATE()),
table_name => '{{ source( "bronze_streamline", table_name ) }}')
) A
),
{{ table_name }} AS (
SELECT
block_number,
{{ unique_key }},
DATA,
_inserted_timestamp,
MD5(
CAST(
COALESCE(CAST(block_number AS text), '' :: STRING) AS text
)
) AS _fsc_id,
s.{{ partition_name }},
s.value AS VALUE
FROM
{{ source(
"bronze_streamline",
table_name
) }}
s
JOIN meta_{{ table_name }}
b
ON b.file_name = metadata$filename
AND b.{{ partition_name }} = s.{{ partition_name }}
WHERE
b.{{ partition_name }} = s.{{ partition_name }}
),
{% endfor %}
FINAL AS ({% for table_name in table_names %}
SELECT
*
FROM
{{ table_name }}
{% if not loop.last %}
UNION ALL
{% endif %}
{% endfor %}
)
SELECT
*
FROM
FINAL
{% endmacro %}
{# Added v2 external table query for flowEVM Deployment Sept 2024 #}
{% macro streamline_external_table_query_v2(
model,
partition_function
) %}
WITH meta AS (
SELECT
job_created_time AS _inserted_timestamp,
file_name,
{{ partition_function }} AS partition_key
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -3, SYSDATE()),
table_name => '{{ source( "bronze_streamline", model) }}')
) A
)
SELECT
s.*,
b.file_name,
_inserted_timestamp
FROM
{{ source(
"bronze_streamline",
model
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.partition_key
WHERE
b.partition_key = s.partition_key
AND DATA :error IS NULL
{% endmacro %}
{% macro streamline_external_table_FR_query_v2(
model,
partition_function
) %}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
{{ partition_function }} AS partition_key
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", model) }}'
)
) A
)
SELECT
s.*,
b.file_name,
_inserted_timestamp
FROM
{{ source(
"bronze_streamline",
model
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.partition_key
WHERE
b.partition_key = s.partition_key
AND DATA :error IS NULL
{% endmacro %}