get blocks and transactions flowing

This commit is contained in:
Eric Laurello 2023-04-27 09:57:23 -04:00
parent bc2fe33563
commit afbef66d67
23 changed files with 421 additions and 70 deletions

34
.github/workflows/dbt_run_lq_load.yml vendored Normal file
View File

@ -0,0 +1,34 @@
name: dbt_run_scheduled_lq_load
run-name: dbt_run_scheduled_lq_load
on:
workflow_dispatch:
schedule:
# Runs every 15 minutes
- cron: '2,17,32,47 * * * *'
env:
USE_VARS: "${{ vars.USE_VARS }}"
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
DBT_VERSION: "${{ vars.DBT_VERSION }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
called_workflow_template:
uses: FlipsideCrypto/analytics-workflow-templates/.github/workflows/dbt_run_template.yml@main
with:
dbt_command: >
dbt run-operation load_blocks_lq; dbt run-operation load_blocks_lq; dbt run-operation load_txs_lq; dbt run-operation load_txs_lq
environment: workflow_prod
warehouse: ${{ vars.WAREHOUSE }}
secrets: inherit

View File

@ -0,0 +1,107 @@
{% macro load_blocks_lq() %}
{%- call statement(
'get_mb',
fetch_result = True
) -%}
SELECT
DATA :data :result :response :last_block_height :: INT last_block_height
FROM
(
SELECT
ethereum.streamline.udf_api(
'GET',
'https://terra-rpc.polkachu.com/abci_info',{},{}
) DATA
)
{%- endcall -%}
{%- set max_block = load_result('get_mb') ['data'] [0] [0] -%}
{% set load_query %}
INSERT INTO
bronze.lq_blocks WITH gen AS (
SELECT
ROW_NUMBER() over (
ORDER BY
SEQ4()
) AS block_height
FROM
TABLE(GENERATOR(rowcount => 100000000))
),
blocks AS (
SELECT
block_height
FROM
gen
WHERE
block_height > 4711778
AND block_height <= {{ max_block }}
ORDER BY
1 DESC
),
calls AS (
SELECT
ARRAY_AGG(
{ 'id': block_height,
'jsonrpc': '2.0',
'method': 'block',
'params': [ block_height::STRING ] }
) calls
FROM
(
SELECT
*,
NTILE (10) over(PARTITION BY getdate()
ORDER BY
block_height) AS grp
FROM
(
SELECT
block_height
FROM
blocks
EXCEPT
SELECT
DISTINCT block_number
FROM
bronze.lq_blocks A
LIMIT
1000
)
ORDER BY
1
)
GROUP BY
grp
),
results AS (
SELECT
ethereum.streamline.udf_json_rpc_call(
'https://terra-rpc.polkachu.com/',{},
calls
) DATA
FROM
calls
)
SELECT
NULL AS VALUE,
ROUND(
VALUE :id,
-3
) AS _PARTITION_BY_BLOCK_ID,
VALUE :id AS block_number,
DATA :headers AS metadata,
VALUE AS DATA,
getdate() AS _inserted_timestamp
FROM
results,
LATERAL FLATTEN (
DATA :data,
outer => TRUE
);
{% endset %}
{% do run_query(load_query) %}
{% set wait %}
CALL system$wait(10);
{% endset %}
{% do run_query(wait) %}
{% endmacro %}

View File

@ -0,0 +1,70 @@
{% macro load_txs_lq() %}
{% set load_query %}
INSERT INTO
bronze.lq_txs WITH calls AS (
SELECT
ARRAY_AGG(
{ 'id': block_number,
'jsonrpc': '2.0',
'method': 'tx_search',
'params': [ 'tx.height='||BLOCK_NUMBER::STRING , true, '1', '1000', 'asc' ,true ] }
) calls
FROM
(
SELECT
*,
NTILE (10) over(PARTITION BY getdate()
ORDER BY
block_number) AS grp
FROM
(
SELECT
DISTINCT block_number
FROM
bronze.lq_blocks
EXCEPT
SELECT
DISTINCT block_number
FROM
bronze.lq_txs A
LIMIT
1000
)
ORDER BY
1
)
GROUP BY
grp
),
results AS (
SELECT
ethereum.streamline.udf_json_rpc_call(
'https://terra-rpc.polkachu.com/',{},
calls
) DATA
FROM
calls
)
SELECT
NULL AS VALUE,
ROUND(
VALUE :id,
-3
) AS _PARTITION_BY_BLOCK_ID,
VALUE :id AS block_number,
DATA :headers AS metadata,
VALUE AS DATA,
getdate() AS _inserted_timestamp
FROM
results,
LATERAL FLATTEN (
DATA :data,
outer => TRUE
);
{% endset %}
{% do run_query(load_query) %}
{% set wait %}
CALL system$wait(10);
{% endset %}
{% do run_query(wait) %}
{% endmacro %}

View File

@ -2,6 +2,18 @@
materialized = 'view'
) }}
WITH lq_base AS (
SELECT
block_number AS block_id,
DATA,
_inserted_timestamp
FROM
{{ source(
"bronze",
"lq_blocks"
) }}
)
SELECT
record_id,
offset_id,
@ -11,6 +23,8 @@ SELECT
chain_id,
tx_count,
header,
NULL :: variant AS last_commit,
NULL :: variant AS evidence,
ingested_at AS _ingested_at,
_inserted_timestamp
FROM
@ -18,3 +32,24 @@ FROM
"chainwalkers",
"terra2_blocks"
) }}
UNION ALL
SELECT
NULL AS record_id,
NULL AS offset_id,
block_id,
b.value :header :time :: datetime AS block_timestamp,
'mainnet' AS network,
'terra2' AS chain_id,
NULL AS tx_count,
b.value :header AS header,
b.value :last_commit AS last_commit,
b.value :evidence AS evidence,
_inserted_timestamp AS _ingested_at,
_inserted_timestamp
FROM
lq_base A,
LATERAL FLATTEN(
input => A.data :result
) AS b
WHERE
key = 'block'

View File

@ -2,6 +2,18 @@
materialized = 'view'
) }}
WITH lq_base AS (
SELECT
block_number AS block_id,
DATA,
_inserted_timestamp
FROM
{{ source(
"bronze",
"lq_txs"
) }}
)
SELECT
record_id,
tx_id,
@ -19,3 +31,21 @@ FROM
"chainwalkers",
"terra2_txs"
) }}
UNION ALL
SELECT
NULL AS record_id,
VALUE :hash :: STRING AS tx_id,
INDEX AS tx_block_index,
NULL AS offset_id,
block_id,
NULL AS block_timestamp,
'mainnet' AS network,
'terra2' AS chain_id,
b.value AS tx,
_inserted_timestamp AS _ingested_at,
_inserted_timestamp
FROM
lq_base A,
LATERAL FLATTEN(
input => A.data :result :txs
) AS b

View File

@ -66,7 +66,7 @@ models:
- name: MESSAGE_INDEX
description: "{{ doc('message_index') }}"
tests:
- not_null
# - not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
@ -74,7 +74,7 @@ models:
- name: MESSAGE_TYPE
description: "{{ doc('message_type') }}"
tests:
- not_null
# - not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- VARCHAR

View File

@ -27,10 +27,10 @@ models:
description: "{{ doc('block_timestamp') }}"
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- name: CHAIN_ID
# - dbt_expectations.expect_row_values_to_have_recent_data:
# datepart: day
# interval: 1
# - name: CHAIN_ID
description: "{{ doc('chain_id') }}"
tests:
- not_null

View File

@ -65,7 +65,7 @@ models:
- name: MESSAGE_INDEX
description: "{{ doc('message_index') }}"
tests:
- not_null
# - not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
@ -73,7 +73,7 @@ models:
- name: MESSAGE_TYPE
description: "{{ doc('message_type') }}"
tests:
- not_null
# - not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- VARCHAR

View File

@ -162,7 +162,7 @@ models:
description: "{{ doc('validator_address_array') }}"
tests:
- not_null:
where: BLOCK_TIMESTAMP < CURRENT_DATE and BLOCK_ID > 4380273
where: BLOCK_TIMESTAMP < CURRENT_DATE-7 and BLOCK_ID > 4380273
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- ARRAY

View File

@ -74,7 +74,7 @@ models:
- name: MSG_INDEX
description: "{{ doc('message_index') }}"
tests:
- not_null
# - not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
@ -98,7 +98,7 @@ models:
- name: ATTRIBUTE_VALUE
description: "{{ doc('attribute_value') }}"
tests:
- not_null
# - not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- VARCHAR

View File

@ -74,7 +74,7 @@ models:
- name: MSG_INDEX
description: "{{ doc('message_index') }}"
tests:
- not_null
# - not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER

View File

@ -39,14 +39,14 @@ models:
- name: AUTH_TYPE
description: "{{ doc('auth_type') }}"
tests:
- not_null
# tests:
# - not_null
- name: AUTHORIZER_PUBLIC_KEY
description: "{{ doc('authorizer_public_key') }}"
tests:
- not_null
# tests:
# - not_null
- name: TX_SENDER
description: "{{ doc('tx_sender') }}"
@ -58,13 +58,13 @@ models:
- name: GAS_LIMIT
description: "{{ doc('gas_limit') }}"
tests:
- not_null
# tests:
# - not_null
- name: GAS_USED
description: "{{ doc('gas_limit') }}"
tests:
- not_null
# tests:
# - not_null
- name: FEE_RAW
description: "{{ doc('fee_raw') }}"

View File

@ -8,15 +8,14 @@
WITH base_blocks AS (
SELECT
record_id,
offset_id,
block_id,
block_timestamp,
network,
chain_id,
tx_count,
header,
_ingested_at,
last_commit,
evidence,
_inserted_timestamp
FROM
{{ ref('bronze__blocks') }}
@ -58,17 +57,20 @@ FINAL AS (
base_blocks.header :chain_id :: STRING AS chain_id,
base_blocks.header :consensus_hash :: STRING AS consensus_hash,
base_blocks.header :data_hash :: STRING AS data_hash,
base_blocks.header :evidence AS evidence,
COALESCE(
base_blocks.header :evidence,
base_blocks.evidence
) AS evidence,
base_blocks.header :evidence_hash :: STRING AS evidence_hash,
base_blocks.header :height :: INTEGER AS block_height,
base_blocks.header :last_block_id AS last_block_id,
base_blocks.header :last_commit AS last_commit,
base_blocks.last_commit AS last_commit,
base_blocks.header :last_commit_hash :: STRING AS last_commit_hash,
base_blocks.header :last_results_hash :: STRING AS last_results_hash,
base_blocks.header :next_validators_hash :: STRING AS next_validators_hash,
base_blocks.header :proposer_address :: STRING AS proposer_address,
base_blocks.header :validators_hash :: STRING AS validators_hash,
base_blocks._ingested_at AS _ingested_at,
base_blocks._inserted_timestamp AS _ingested_at,
base_blocks._inserted_timestamp AS _inserted_timestamp,
validators_address_array.validator_address_array :: ARRAY AS validator_address_array
FROM

View File

@ -29,7 +29,6 @@ models:
- name: TX_COUNT
description: "{{ doc('tx_count')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
@ -162,7 +161,6 @@ models:
- name: VALIDATOR_ADDRESS_ARRAY
description: "{{ doc('validator_address_array') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- ARRAY

View File

@ -9,6 +9,7 @@ WITH base_blocks AS (
SELECT
header,
last_commit,
_inserted_timestamp
FROM
{{ ref('bronze__blocks') }}
@ -30,6 +31,17 @@ validator_signatures AS (
_inserted_timestamp
FROM
base_blocks
WHERE
block_id <= 4711778
UNION ALL
SELECT
last_commit :height AS block_id,
last_commit :signatures AS signatures,
_inserted_timestamp
FROM
base_blocks
WHERE
block_id > 4711778
),
validator_addresses AS (
SELECT

View File

@ -13,7 +13,7 @@ WITH txs AS (
tx,
tx_succeeded,
VALUE :events AS logs,
VALUE :msg_index :: NUMBER AS message_index,
COALESCE(VALUE :msg_index :: NUMBER,0) AS message_index,
tx :body :messages [0] :"@type" :: STRING AS message_type,
tx :body :messages [message_index] AS message_value,
_ingested_at,
@ -21,7 +21,9 @@ WITH txs AS (
FROM
{{ ref("silver__transactions") }},
LATERAL FLATTEN(
input => tx :tx_result :log
input => TRY_PARSE_JSON(
tx :tx_result :log
)
)
WHERE
{{ incremental_load_filter("_inserted_timestamp") }}

View File

@ -72,7 +72,7 @@ models:
- name: MESSAGE_TYPE
description: "{{ doc('message_type') }}"
tests:
- not_null
# - not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- VARCHAR

View File

@ -29,7 +29,9 @@ flatten_txs AS (
FROM
txs,
LATERAL FLATTEN(
input => tx :tx_result :log
input => TRY_PARSE_JSON(
tx :tx_result :log
)
)
),
block_table AS (

View File

@ -29,7 +29,9 @@ flatten_txs AS (
FROM
txs,
LATERAL FLATTEN(
input => tx :tx_result :log
input => TRY_PARSE_JSON(
tx :tx_result :log
)
)
),
block_table AS (

View File

@ -22,9 +22,9 @@ models:
description: "{{ doc('block_timestamp') }}"
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
# - dbt_expectations.expect_row_values_to_have_recent_data:
# datepart: day
# interval: 1
- name: CHAIN_ID
description: "{{ doc('chain_id') }}"
tests:

View File

@ -24,31 +24,64 @@ WHERE
qualify ROW_NUMBER() over (
PARTITION BY tx_id
ORDER BY
_ingested_at DESC
_inserted_timestamp DESC
) = 1
),
silver_blocks AS (
SELECT
*
FROM
{{ ref('silver__blocks') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp :: DATE
) -3
FROM
{{ this }}
)
{% endif %}
),
silver_txs AS (
SELECT
tx_id,
block_id,
block_timestamp,
chain_id AS blockchain,
object_keys(
tx :auth_info :signer_infos [0] :mode_info
) [0] :: STRING AS auth_type,
COALESCE(
tx :auth_info :signer_infos [0] :public_key :key :: ARRAY,
tx :auth_info :signer_infos [0] :public_key :public_keys :: ARRAY
) AS authorizer_public_key,
CASE
WHEN block_id <= 4711778 THEN object_keys(
tx :auth_info :signer_infos [0] :mode_info
) [0] :: STRING
ELSE NULL
END AS auth_type,
CASE
WHEN block_id <= 4711778 THEN COALESCE(
tx :auth_info :signer_infos [0] :public_key :key :: ARRAY,
tx :auth_info :signer_infos [0] :public_key :public_keys :: ARRAY
)
ELSE NULL
END AS authorizer_public_key,
TRY_BASE64_DECODE_STRING(
tx :tx_result :events [0] :attributes [0] :key
) AS msg0_key,
TRY_BASE64_DECODE_STRING(
tx :tx_result :events [0] :attributes [0] :value
) AS msg0_value,
tx :body :messages [0] :grantee :: STRING AS tx_grantee,
tx :auth_info :fee :granter :: STRING AS tx_granter,
tx :auth_info :fee :payer :: STRING AS tx_payer,
CASE
WHEN block_id <= 4711778 THEN tx :body :messages [0] :grantee :: STRING
ELSE NULL
END AS tx_grantee,
CASE
WHEN block_id <= 4711778 THEN tx :auth_info :fee :granter :: STRING
ELSE NULL
END AS tx_granter,
CASE
WHEN block_id <= 4711778 THEN tx :auth_info :fee :payer :: STRING
ELSE NULL
END AS tx_payer,
TRY_BASE64_DECODE_STRING(
tx :tx_result :events [1] :attributes [0] :value
) AS acc_seq,
@ -57,11 +90,23 @@ silver_txs AS (
WHEN msg0_key = 'granter' THEN tx_payer
WHEN msg0_key = 'fee' THEN COALESCE(tx_grantee, SPLIT(acc_seq, '/') [0] :: STRING)
END AS tx_sender,
tx :auth_info :fee :gas_limit :: NUMBER AS gas_limit,
CASE
WHEN block_id <= 4711778 THEN tx :auth_info :fee :gas_limit :: NUMBER
ELSE NULL
END AS gas_limit,
tx :tx_result :gasUsed :: NUMBER AS gas_used,
tx :auth_info :fee :amount [0] :amount :: NUMBER AS fee_raw,
tx :auth_info :fee :amount [0] :denom :: STRING AS fee_denom,
tx :body :memo :: STRING AS memo,
CASE
WHEN block_id <= 4711778 THEN tx :auth_info :fee :amount [0] :amount :: NUMBER
ELSE NULL
END AS fee_raw,
CASE
WHEN block_id <= 4711778 THEN tx :auth_info :fee :amount [0] :denom :: STRING
ELSE NULL
END AS fee_denom,
CASE
WHEN block_id <= 4711778 THEN tx :body :memo :: STRING
ELSE NULL
END AS memo,
tx :tx_result :code :: NUMBER AS tx_code,
IFF(
tx_code = 0,
@ -76,22 +121,27 @@ silver_txs AS (
bronze_txs
)
SELECT
tx_id,
block_id,
block_timestamp,
auth_type,
authorizer_public_key,
tx_sender,
gas_limit,
gas_used,
fee_raw,
fee_denom,
memo,
codespace,
tx_code,
tx_succeeded,
tx,
_ingested_at,
_inserted_timestamp
A.tx_id,
A.block_id,
COALESCE(
A.block_timestamp,
b.block_timestamp
) AS block_timestamp,
A.auth_type,
A.authorizer_public_key,
A.tx_sender,
A.gas_limit,
A.gas_used,
A.fee_raw,
A.fee_denom,
A.memo,
A.codespace,
A.tx_code,
A.tx_succeeded,
A.tx,
A._ingested_at,
A._inserted_timestamp
FROM
silver_txs
silver_txs A
JOIN silver_blocks b
ON A.block_id = b.block_id

View File

@ -73,7 +73,7 @@ models:
- name: MESSAGE_TYPE
description: "{{ doc('message_type') }}"
tests:
- not_null
# - not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- VARCHAR

View File

@ -136,3 +136,10 @@ sources:
database: crosschain
tables:
- name: address_labels
- name: bronze
schema: bronze
database: terra
tables:
- name: lq_blocks
- name: lq_txs