streamline ingestion pipeline

This commit is contained in:
Jack Forgash 2022-12-14 18:02:10 -07:00
parent f05df435dc
commit 8ad35b5956
28 changed files with 461 additions and 71 deletions

View File

@ -46,6 +46,7 @@ tests:
+severity: warn # default to warn for all tests
core:
+severity: error # but error if a core view is having issues
+store_failures: true # all tests
vars:
"dbt_date:time_zone": GMT

View File

@ -0,0 +1,24 @@
{% macro partition_batch_load(batch_size) %}
{% if is_incremental() %}
WHERE
_partition_by_block_number BETWEEN (
SELECT
MAX(_partition_by_block_number)
FROM
{{ this }}
)
AND (
(
SELECT
MAX(_partition_by_block_number)
FROM
{{ this }}
) + {{ batch_size }}
)
{%- else -%}
WHERE
_partition_by_block_number BETWEEN 9820000
AND 10000000
{% endif %}
{%- endmacro %}

View File

@ -3,10 +3,14 @@
) }}
SELECT
metadata$filename as filename,
data,
metadata$filename AS _filename,
SPLIT(
_filename,
'/'
) [0] :: NUMBER AS block_id,
CURRENT_TIMESTAMP :: TIMESTAMP_NTZ as _load_timestamp,
VALUE,
_partition_by_block_number
FROM
{{ source(
"streamline_dev",

View File

@ -3,10 +3,15 @@
) }}
SELECT
metadata$filename as filename,
data,
metadata$filename AS _filename,
SPLIT(
_filename,
'/'
) [0] :: NUMBER AS block_id,
CURRENT_TIMESTAMP :: TIMESTAMP_NTZ AS _load_timestamp,
RIGHT(SPLIT(_filename, '.') [0], 1) :: NUMBER AS _shard_number,
VALUE,
_partition_by_block_number
FROM
{{ source(
"streamline_dev",

View File

@ -99,19 +99,3 @@ models:
column_type_list:
- STRING
- VARCHAR
- name: _INGESTED_AT
description: "{{ doc('_ingested_at') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ

View File

@ -0,0 +1,5 @@
{% docs _load_timestamp %}
Timestamp from when the data was loaded into the database.
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs _partition_by_block_number %}
Block number grouping for ingestion partition
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs receipt_execution_outcomes %}
The outcome of the execution of a receipt. This is the result of the execution of the receipt on the shard. It contains the status of the execution, the logs, and the result of the execution.
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs shard_id %}
The id for this shard, which is block_id plus shard number
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs shard_number %}
The numeric identifier for this shard, indexed at 0.
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs state_changes %}
Account and access key updates to contracts as recorded in the block shard.
{% enddocs %}

View File

@ -74,11 +74,6 @@ models:
- name: _INGESTED_AT
description: "{{ doc('_ingested_at')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp')}}"

View File

@ -105,11 +105,6 @@ models:
- name: _INGESTED_AT
description: "{{ doc('_ingested_at')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp')}}"

View File

@ -91,11 +91,6 @@ models:
- name: _INGESTED_AT
description: "{{ doc('_ingested_at')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp')}}"

View File

@ -304,11 +304,6 @@ models:
- name: _INGESTED_AT
description: "{{ doc('_ingested_at')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp')}}"

View File

@ -0,0 +1,24 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
cluster_by = ['_partition_by_block_number', '_load_timestamp::DATE'],
unique_key = 'block_id',
full_refresh = False
) }}
WITH blocksjson AS (
SELECT
block_id,
VALUE,
_filename,
_load_timestamp,
_partition_by_block_number
FROM
{{ ref('bronze__streamline_blocks') }}
{{ partition_batch_load(500000) }}
)
SELECT
*
FROM
blocksjson

View File

@ -0,0 +1,30 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
cluster_by = ['_partition_by_block_number', '_load_timestamp::DATE'],
unique_key = 'shard_id',
full_refresh = False
) }}
WITH shardsjson AS (
SELECT
block_id,
concat_ws(
'-',
block_id :: STRING,
_shard_number :: STRING
) AS shard_id,
_shard_number,
VALUE,
_filename,
_load_timestamp,
_partition_by_block_number
FROM
{{ ref('bronze__streamline_shards') }}
{{ partition_batch_load(300000) }}
)
SELECT
*
FROM
shardsjson

View File

@ -113,9 +113,3 @@ models:
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ

View File

@ -131,11 +131,6 @@ models:
- name: _INGESTED_AT
description: "{{ doc('_ingested_at')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp')}}"

View File

@ -0,0 +1,76 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
cluster_by = ['_partition_by_block_number', 'block_timestamp::DATE'],
unique_key = 'block_id',
enabled = true
) }}
WITH blocksjson AS (
SELECT
*
FROM
{{ ref('silver__load_blocks') }}
WHERE
{{ incremental_load_filter('_load_timestamp') }}
qualify ROW_NUMBER() over (
PARTITION BY block_id
ORDER BY
_load_timestamp DESC
) = 1
),
blocks AS (
SELECT
VALUE :header :height :: NUMBER AS block_id,
TO_TIMESTAMP_NTZ(
VALUE :header :timestamp :: STRING
) AS block_timestamp,
VALUE :header :hash :: STRING AS block_hash,
VALUE :header :prev_hash :: STRING AS prev_hash,
VALUE :author :: STRING AS block_author,
VALUE :header :gas_price :: NUMBER AS gas_price,
VALUE :header :total_supply :: NUMBER AS total_supply,
VALUE :header :validator_proposals :: ARRAY AS validator_proposals,
VALUE :header :validator_reward :: NUMBER AS validator_reward,
VALUE :header :latest_protocol_version :: NUMBER AS latest_protocol_version,
VALUE :header :epoch_id :: STRING AS epoch_id,
VALUE :header :next_epoch_id :: STRING AS next_epoch_id,
NULL AS tx_count,
-- TODO tx_count not included in the data
[] AS events,
-- events does not exist, Figment created this
VALUE :chunks :: ARRAY AS chunks,
VALUE :header :: OBJECT AS header,
_partition_by_block_number,
_load_timestamp
FROM
blocksjson
),
FINAL AS (
SELECT
block_id,
block_timestamp,
block_hash,
prev_hash,
block_author,
gas_price,
total_supply,
validator_proposals,
validator_reward,
latest_protocol_version,
epoch_id,
next_epoch_id,
tx_count,
events,
chunks,
header,
_partition_by_block_number,
_load_timestamp
FROM
blocks
)
SELECT
*
FROM
FINAL

View File

@ -0,0 +1,156 @@
version: 2
models:
- name: silver__streamline_blocks
description: |-
Parses the blocks JSON files for NEAR.
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- block_id
columns:
- name: BLOCK_ID
description: "{{ doc('block_id')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- INTEGER
- name: BLOCK_TIMESTAMP
description: "{{ doc('block_timestamp')}}"
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: BLOCK_HASH
description: "{{ doc('block_hash')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: TX_COUNT
description: "{{ doc('tx_count')}}"
- name: BLOCK_AUTHOR
description: "{{ doc('block_author')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: CHUNKS
description: "{{ doc('chunks')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- ARRAY
- VARIANT
- OBJECT
- name: EPOCH_ID
description: "{{ doc('epoch_id')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: EVENTS
description: "{{ doc('events')}}"
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- ARRAY
- name: GAS_PRICE
description: "{{ doc('gas_price')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: LATEST_PROTOCOL_VERSION
description: "{{ doc('latest_protocol_version')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: NEXT_EPOCH_ID
description: "{{ doc('next_epoch_id')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: PREV_HASH
description: "{{ doc('prev_hash')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: TOTAL_SUPPLY
description: "{{ doc('total_supply')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: VALIDATOR_PROPOSALS
description: "{{ doc('validator_proposals')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- ARRAY
- VARIANT
- OBJECT
- name: VALIDATOR_REWARD
description: "{{ doc('validator_reward')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: _partition_by_block_number
description: "{{ doc('_partition_by_block_number')}}"
- name: _load_timestamp
description: "{{ doc('_load_timestamp')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1

View File

@ -0,0 +1,34 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
cluster_by = ['_partition_by_block_number'],
unique_key = ['shard_id'],
enabled = true
) }}
WITH shardsjson AS (
SELECT
*
FROM
{{ ref('silver__load_shards') }}
WHERE
{{ incremental_load_filter('_load_timestamp') }}
),
shards AS (
SELECT
block_id,
shard_id,
VALUE :chunk :: VARIANT AS chunk,
VALUE :receipt_execution_outcomes :: VARIANT AS receipt_execution_outcomes,
VALUE :shard_id :: NUMBER AS shard_number,
VALUE :state_changes :: VARIANT AS state_changes,
_partition_by_block_number,
_load_timestamp
FROM
shardsjson
)
SELECT
*
FROM
shards

View File

@ -0,0 +1,68 @@
version: 2
models:
- name: silver__streamline_shards
description: |-
Parses the shards JSON files for NEAR.
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- shard_id
columns:
- name: BLOCK_ID
description: "{{ doc('block_id')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- INTEGER
- name: SHARD_ID
description: "{{ doc('shard_id')}}"
tests:
- not_null
- name: CHUNK
description: "{{ doc('chunks')}}"
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- VARIANT
- OBJECT
- name: RECEIPT_EXECUTION_OUTCOMES
description: "{{ doc('receipt_execution_outcomes')}}"
- name: SHARD_NUMBER
description: "{{ doc('shard_number')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- INTEGER
- name: STATE_CHANGES
description: "{{ doc('state_changes')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- ARRAY
- VARIANT
- name: _PARTITION_BY_BLOCK_NUMBER
description: "{{ doc('_partition_by_block_number')}}"
- name: _load_timestamp
description: "{{ doc('_load_timestamp')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1

View File

@ -125,11 +125,6 @@ models:
- name: _INGESTED_AT
description: "{{ doc('_ingested_at')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp')}}"

View File

@ -109,11 +109,6 @@ models:
- name: _INGESTED_AT
description: "{{ doc('_ingested_at')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp')}}"

View File

@ -78,11 +78,6 @@ models:
- name: _INGESTED_AT
description: "{{ doc('_ingested_at')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp')}}"

View File

@ -68,4 +68,4 @@ sources:
schema: near_dev
tables:
- name: blocks
- name: shards
- name: shards