chunks and receipts models

This commit is contained in:
forgash_ 2023-01-04 15:16:34 -07:00
parent b817e0ecd5
commit 6cf2e1fa0b
21 changed files with 162 additions and 0 deletions

View File

@ -0,0 +1,39 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
unique_key = 'chunk_hash',
cluster_by = ['_load_timestamp::date','height_created','height_included']
) }}
WITH shards AS (
SELECT
*
FROM
{{ ref('silver__streamline_shards') }}
WHERE
chunk != 'null'
AND {{ incremental_load_filter('_load_timestamp') }}
-- sample for dev testing TODO remove before prod merge
AND block_id BETWEEN 52800000 AND 53000000
),
FINAL AS (
SELECT
block_id,
shard_id,
_load_timestamp,
chunk,
chunk :header :height_created :: NUMBER AS height_created,
chunk :header :height_included :: NUMBER AS height_included,
chunk :author :: STRING AS author,
chunk :header :chunk_hash :: STRING AS chunk_hash,
chunk :header :: OBJECT AS header,
chunk :receipts :: ARRAY AS receipts,
chunk :transactions :: ARRAY AS chunk_transactions
FROM
shards
)
SELECT
*
FROM
FINAL

View File

@ -0,0 +1,39 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
unique_key = 'receipt_execution_outcome_id',
cluster_by = ['_load_timestamp::date','block_id','chunk_hash']
) }}
WITH shards AS (
SELECT
*
FROM
{{ ref('silver__streamline_shards') }}
WHERE
ARRAY_SIZE(receipt_execution_outcomes) > 0
AND {{ incremental_load_filter('_load_timestamp') }}
-- sample for dev testing TODO remove before prod merge
AND block_id BETWEEN 52800000 AND 53000000
),
FINAL AS (
SELECT
block_id,
shard_id,
INDEX AS receipt_outcome_execution_index,
CONCAT_WS('-',shard_id,INDEX) as receipt_execution_outcome_id,
_load_timestamp,
chunk :header :chunk_hash :: STRING AS chunk_hash,
VALUE :execution_outcome :: OBJECT AS execution_outcome,
VALUE :receipt :: OBJECT AS receipt
FROM
shards,
LATERAL FLATTEN(
input => receipt_execution_outcomes
)
)
SELECT
*
FROM
FINAL

View File

@ -0,0 +1,84 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
unique_key = '',
cluster_by = ['_load_timestamp::date']
) }}
WITH chunks AS (
SELECT
*
FROM
{{ ref('silver__streamline_chunks') }}
WHERE
ARRAY_SIZE(receipts) > 0
AND {{ incremental_load_filter('_load_timestamp') }}
),
receipt_execution_outcomes AS (
SELECT
*
FROM
{{ ref('silver__streamline_receipt_execution_outcome') }}
WHERE
{{ incremental_load_filter('_load_timestamp') }}
),
chunk_receipts AS (
SELECT
block_id,
shard_id,
'chunk' AS source_object,
INDEX AS object_receipt_index,
_load_timestamp,
chunk_hash,
VALUE AS receipt
FROM
chunks,
LATERAL FLATTEN(
input => receipts
)
WHERE
ARRAY_SIZE(receipts) > 0
),
reo_receipts AS (
SELECT
block_id,
shard_id,
'receipt_execution_outcomes' AS source_object,
receipt_outcome_execution_index AS object_receipt_index,
_load_timestamp,
chunk_hash,
receipt
FROM
receipt_execution_outcomes
),
receipts AS (
SELECT
*
FROM
chunk_receipts
UNION
SELECT
*
FROM
reo_receipts
),
FINAL AS (
SELECT
block_id,
shard_id,
source_object,
object_receipt_index AS receipt_index,
_load_timestamp,
chunk_hash,
receipt,
receipt :receipt_id :: STRING AS receipt_id,
receipt :receiver_id :: STRING AS receiver_id,
receipt :receipt :Action :signer_id :: STRING AS signer_id
FROM
receipts
)
SELECT
*
FROM
FINAL