bug fix for non block spine coll/tx ingestion (#157)

Co-authored-by: shah <info@shahnewazkhan.ca>
This commit is contained in:
Shah Newaz Khan 2023-08-23 12:13:40 -07:00 committed by GitHub
parent f3a5b863e1
commit 7e98bd6029
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 99 additions and 63 deletions

View File

@ -2,7 +2,7 @@ SHELL := /bin/bash
# set default target
DBT_TARGET ?= sbx
AWS_LAMBDA_ROLE ?= aws_lambda_flow_api_sbx
AWS_LAMBDA_ROLE ?= aws_lambda_flow_api_dev
dbt-console:
docker-compose run dbt_console
@ -42,8 +42,40 @@ streamline: sl-flow-api udfs grant-streamline-privileges streamline_bronze
streamline_bronze:
dbt run \
--vars '{"STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": False}' \
--vars '{"STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' \
-m 1+models/silver/streamline/bronze \
--profiles-dir ~/.dbt \
--target $(DBT_TARGET) \
--profile flow
--profile flow
blocks_history:
dbt run \
--vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' \
-m 1+models/silver/streamline/core/history/blocks/streamline__get_blocks_history_mainnet22.sql \
--profile flow \
--target $(DBT_TARGET) \
--profiles-dir ~/.dbt
collections_history:
dbt run \
--vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' \
-m 1+models/silver/streamline/core/history/collections/streamline__get_collections_history_mainnet22.sql \
--profile flow \
--target $(DBT_TARGET) \
--profiles-dir ~/.dbt
tx_history:
dbt run \
--vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' \
-m 1+models/silver/streamline/core/history/transactions/streamline__get_transactions_history_mainnet22.sql \
--profile flow \
--target $(DBT_TARGET) \
--profiles-dir ~/.dbt
tx_results_history:
dbt run \
--vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' \
-m 1+models/silver/streamline/core/history/transaction_results/streamline__get_transaction_results_history_mainnet22.sql \
--profile flow \
--target $(DBT_TARGET) \
--profiles-dir ~/.dbt

View File

@ -6,39 +6,42 @@
)
) }}
WITH blocks AS (
-- CTE to get all block_heights and their associated collection_ids from the complete_get_blocks table
WITH block_collections AS (
SELECT
block_height
cb.block_number AS block_height,
collection_guarantee.value:collection_id AS collection_id
FROM
{{ ref("streamline__blocks") }}
EXCEPT
SELECT
block_number as block_height
FROM
{{ ref("streamline__complete_get_collections") }}
{{ ref("streamline__complete_get_blocks") }} cb,
LATERAL FLATTEN(input => cb.data:collection_guarantees) AS collection_guarantee
),
collections AS (
-- CTE to identify collections that haven't been ingested yet
collections_to_ingest AS (
SELECT
block_number as block_height,
data
bc.block_height,
bc.collection_id
FROM
{{ ref('streamline__complete_get_blocks') }}
JOIN blocks ON blocks.block_height = block_number
block_collections bc
LEFT JOIN
{{ ref("streamline__complete_get_collections") }} c
ON bc.block_height = c.block_number
AND bc.collection_id = c.id
WHERE
c.id IS NULL
)
-- Generate the requests based on the missing collections
SELECT
OBJECT_CONSTRUCT(
'grpc', 'proto3',
'method', 'get_collection_by_i_d',
'block_height', block_height::INTEGER,
'method_params', OBJECT_CONSTRUCT('id', collection_guarantee.value:collection_id)
'method_params', OBJECT_CONSTRUCT('id', collection_id)
) AS request
FROM
collections,
LATERAL FLATTEN(input => data:collection_guarantees) AS collection_guarantee
collections_to_ingest
WHERE
block_height BETWEEN 47169687 AND 55114466 -- Mainnet22 block range
ORDER BY
block_height ASC
block_height ASC

View File

@ -6,39 +6,39 @@
)
) }}
WITH blocks AS (
-- CTE to get all transaction_ids from the complete_get_collections table
WITH collection_transactions AS (
SELECT
block_height
block_number AS block_height,
transaction.value::STRING AS transaction_id
FROM
{{ ref("streamline__blocks") }}
EXCEPT
SELECT
block_number as block_height
FROM
{{ ref("streamline__complete_get_transaction_results") }}
{{ ref("streamline__complete_get_collections") }} cc,
LATERAL FLATTEN(input => cc.data:transaction_ids) AS transaction
),
tx AS (
-- CTE to identify transaction_results that haven't been ingested yet
transaction_results_to_ingest AS (
SELECT
block_number as block_height,
data
ct.block_height,
ct.transaction_id
FROM
{{ ref('streamline__complete_get_collections') }}
JOIN blocks ON blocks.block_height = block_number
collection_transactions ct
LEFT JOIN
{{ ref("streamline__complete_get_transaction_results") }} tr ON ct.transaction_id = tr.id
WHERE
tr.id IS NULL
)
-- Generate the requests column based on the missing transactions
SELECT
OBJECT_CONSTRUCT(
'grpc', 'proto3',
'method', 'get_transaction_result',
'block_height', block_height::INTEGER,
'transaction_id', transaction_id.value::string,
'method_params', OBJECT_CONSTRUCT('id', transaction_id.value::string)
'transaction_id', transaction_id::STRING,
'method_params', OBJECT_CONSTRUCT('id', transaction_id::STRING)
) AS request
FROM
tx,
LATERAL FLATTEN(input => TRY_PARSE_JSON(data):transaction_ids) AS transaction_id
WHERE
block_height BETWEEN 47169687 AND 55114466 -- Mainnet22 block range
transaction_results_to_ingest
ORDER BY
block_height ASC
block_height ASC

View File

@ -6,39 +6,40 @@
)
) }}
WITH blocks AS (
-- CTE to get all transaction_ids from the complete_get_collections table
WITH collection_transactions AS (
SELECT
block_height
block_number AS block_height,
transaction.value::STRING AS transaction_id
FROM
{{ ref("streamline__blocks") }}
EXCEPT
SELECT
block_number as block_height
FROM
{{ ref("streamline__complete_get_transactions") }}
{{ ref("streamline__complete_get_collections") }} cc,
LATERAL FLATTEN(input => cc.data:transaction_ids) AS transaction
),
tx AS (
-- CTE to identify transactions that haven't been ingested yet
transactions_to_ingest AS (
SELECT
block_number as block_height,
data
ct.block_height,
ct.transaction_id
FROM
{{ ref('streamline__complete_get_collections') }}
JOIN blocks ON blocks.block_height = block_number
collection_transactions ct
LEFT JOIN
{{ ref("streamline__complete_get_transactions") }} t ON ct.transaction_id = t.id
WHERE
t.id IS NULL
)
-- Generate the requests based on the missing transactions
SELECT
OBJECT_CONSTRUCT(
'grpc', 'proto3',
'method', 'get_transaction',
'block_height', block_height::INTEGER,
'transaction_id', transaction_id.value::string,
'method_params', OBJECT_CONSTRUCT('id', transaction_id.value::string)
'transaction_id', transaction_id::STRING,
'method_params', OBJECT_CONSTRUCT('id', transaction_id::STRING)
) AS request
FROM
tx,
LATERAL FLATTEN(input => TRY_PARSE_JSON(data):transaction_ids) AS transaction_id
WHERE
block_height BETWEEN 47169687 AND 55114466 -- Mainnet22 block range
transactions_to_ingest
ORDER BY
block_height ASC