AN-4842 SL 2.0 (#55)

* SL 2.0

* prod endpoints
This commit is contained in:
eric-laurello 2024-05-29 10:09:14 -04:00 committed by GitHub
parent a59d7cc54e
commit 8181586da5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
51 changed files with 797 additions and 692 deletions

View File

@ -1,5 +1,5 @@
name: dbt_run_streamline_blocks
run-name: dbt_run_streamline_blocks
name: dbt_run_streamline_blocks_tx_counts
run-name: dbt_run_streamline_blocks_tx_counts
on:
workflow_dispatch:
@ -43,4 +43,4 @@ jobs:
dbt deps
- name: Run DBT Jobs
run: |
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/streamline/streamline__blocks_realtime.sql
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 2+models/streamline/silver/realtime/streamline__blocks_realtime.sql 1+models/streamline/silver/realtime/streamline__tx_counts_realtime.sql

View File

@ -43,4 +43,4 @@ jobs:
dbt deps
- name: Run DBT Jobs
run: |
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/streamline/streamline__transactions_realtime.sql
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/streamline/silver/realtime/streamline__transactions_realtime.sql

View File

@ -1,45 +0,0 @@
name: dbt_run_streamline_validators
run-name: dbt_run_streamline_validators
on:
workflow_dispatch:
schedule:
- cron: '0 2,8,14,20 * * *'
env:
USE_VARS: "${{ vars.USE_VARS }}"
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
DBT_VERSION: "${{ vars.DBT_VERSION }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "${{ vars.PYTHON_VERSION }}"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/streamline/streamline__validators_realtime.sql

View File

@ -1,6 +1,6 @@
workflow_name,workflow_schedule
dbt_run_incremental_core,"20,50 * * * *"
dbt_run_incremental_non_core,"33 * * * *"
dbt_run_streamline_blocks,"9,39 * * * *"
dbt_run_streamline_blocks_tx_counts,"9,39 * * * *"
dbt_run_streamline_transactions,"14,44 * * * *"
dbt_test_tasks,"0,30 * * * *"

1 workflow_name workflow_schedule
2 dbt_run_incremental_core 20,50 * * * *
3 dbt_run_incremental_non_core 33 * * * *
4 dbt_run_streamline_blocks dbt_run_streamline_blocks_tx_counts 9,39 * * * *
5 dbt_run_streamline_transactions 14,44 * * * *
6 dbt_test_tasks 0,30 * * * *

View File

@ -53,18 +53,6 @@ on-run-end:
# as tables. These settings can be overridden in the individual model files
# using the `{{ config(...) }}` macro.
vars:
"dbt_date:time_zone": GMT
UPDATE_UDFS_AND_SPS: False
STREAMLINE_INVOKE_STREAMS: False
STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES: False
"UPDATE_SNOWFLAKE_TAGS": True
OBSERV_FULL_TEST: TRUE
START_GHA_TASKS: False
API_INTEGRATION: '{{ var("config")[target.name]["API_INTEGRATION"] if var("config")[target.name] else var("config")["dev"]["API_INTEGRATION"] }}'
EXTERNAL_FUNCTION_URI: '{{ var("config")[target.name]["EXTERNAL_FUNCTION_URI"] if var("config")[target.name] else var("config")["dev"]["EXTERNAL_FUNCTION_URI"] }}'
ROLES: |
["INTERNAL_DEV"]
dispatch:
- macro_namespace: dbt
@ -75,4 +63,42 @@ dispatch:
query-comment:
comment: '{{ dbt_snowflake_query_tags.get_query_comment(node) }}'
append: true # Snowflake removes prefixed comments.
append: true # Snowflake removes prefixed comments.
vars:
"dbt_date:time_zone": GMT
STREAMLINE_INVOKE_STREAMS: False
STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES: False
UPDATE_UDFS_AND_SPS: False
UPDATE_SNOWFLAKE_TAGS: True
OBSERV_FULL_TEST: False
START_GHA_TASKS: False
#### STREAMLINE 2.0 BEGIN ####
API_INTEGRATION: '{{ var("config")[target.name]["API_INTEGRATION"] if var("config")[target.name] else var("config")["dev"]["API_INTEGRATION"] }}'
EXTERNAL_FUNCTION_URI: '{{ var("config")[target.name]["EXTERNAL_FUNCTION_URI"] if var("config")[target.name] else var("config")["dev"]["EXTERNAL_FUNCTION_URI"] }}'
ROLES: '{{ var("config")[target.name]["ROLES"] }}'
config:
# The keys correspond to dbt profiles and are case sensitive
dev:
API_INTEGRATION: aws_cosmos_api_stg
EXTERNAL_FUNCTION_URI: e8nbzsw4r9.execute-api.us-east-1.amazonaws.com/stg/
ROLES:
- AWS_LAMBDA_COSMOS_API
- INTERNAL_DEV
prod:
API_INTEGRATION: aws_cosmos_api
EXTERNAL_FUNCTION_URI: kpg3w2qkm4.execute-api.us-east-1.amazonaws.com/prod/
ROLES:
- AWS_LAMBDA_COSMOS_API
- INTERNAL_DEV
- DBT_CLOUD_LAVA
prod-2xl:
API_INTEGRATION: aws_cosmos_api
dev-2xl:
API_INTEGRATION: aws_cosmos_api_stg
#### STREAMLINE 2.0 END ####

View File

@ -5,16 +5,9 @@
{{ create_udtf_get_base_table(
schema = "streamline"
) }}
{{ create_udf_bulk_rest_api_v2() }}
{% endset %}
{% do run_query(sql) %}
{% set sql %}
{{ create_udf_get_cosmos_blocks() }}
{{ create_udf_get_cosmos_transactions() }}
{{ create_udf_get_cosmos_validators() }}
{{ create_udf_get_cosmos_generic() }}
{{ create_udf_get_cosmos_chainhead() }}
{% endset %}
{% do run_query(sql) %}
{% endif %}
{% endmacro %}

View File

@ -0,0 +1,16 @@
{% macro get_merge_sql(
target,
source,
unique_key,
dest_columns,
incremental_predicates
) -%}
{% set merge_sql = fsc_utils.get_merge_sql(
target,
source,
unique_key,
dest_columns,
incremental_predicates
) %}
{{ return(merge_sql) }}
{% endmacro %}

View File

@ -0,0 +1,8 @@
{% macro dbt_snowflake_get_tmp_relation_type(
strategy,
unique_key,
language
) %}
-- always table
{{ return('table') }}
{% endmacro %}

View File

@ -1,17 +1,17 @@
{% macro create_aws_cosmos_api() %}
{% if target.name == "prod" %}
{% set sql %}
CREATE api integration IF NOT EXISTS aws_cosmos_api api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::490041342817:role/cosmos-api-prod-rolesnowflakeudfsAF733095-14KYNLUQ3CWV2' api_allowed_prefixes = (
'https://bp6s0ib6fk.execute-api.us-east-1.amazonaws.com/prod/'
CREATE api integration IF NOT EXISTS aws_cosmos_api api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::924682671219:role/cosmos-api-prod-rolesnowflakeudfsAF733095-p9k0jZbJcQpS' api_allowed_prefixes = (
'https://kpg3w2qkm4.execute-api.us-east-1.amazonaws.com/prod/'
) enabled = TRUE;
{% endset %}
{% endset %}
{% do run_query(sql) %}
{% elif target.name == "dev" %}
{% elif target.name == "dev" %}
{% set sql %}
CREATE api integration IF NOT EXISTS aws_cosmos_api_dev api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::490041342817:role/cosmos-api-dev-rolesnowflakeudfsAF733095-1SQ9TX1BQRUE' api_allowed_prefixes = (
'https://qkwbozz9l0.execute-api.us-east-1.amazonaws.com/dev/'
CREATE api integration IF NOT EXISTS aws_cosmos_api_stg api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::704693948482:role/cosmos-api-stg-rolesnowflakeudfsAF733095-MWKNVHtNSA9n' api_allowed_prefixes = (
'https://e8nbzsw4r9.execute-api.us-east-1.amazonaws.com/stg/'
) enabled = TRUE;
{% endset %}
{% endset %}
{% do run_query(sql) %}
{% endif %}
{% endmacro %}

View File

@ -12,7 +12,7 @@
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -7, CURRENT_TIMESTAMP()),
start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze", model) }}')
) A
)
@ -87,3 +87,71 @@ WHERE
b.{{ partition_name }} = s.{{ partition_name }}
AND DATA :error :code IS NULL
{% endmacro %}
{% macro streamline_external_table_query_v2(
model,
partition_function
) %}
WITH meta AS (
SELECT
job_created_time AS inserted_timestamp,
file_name,
{{ partition_function }} AS partition_key
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze", model) }}')
) A
)
SELECT
s.*,
b.file_name,
inserted_timestamp
FROM
{{ source(
"bronze",
model
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.partition_key
WHERE
b.partition_key = s.partition_key
AND DATA :error IS NULL
{% endmacro %}
{% macro streamline_external_table_FR_query_v2(
model,
partition_function
) %}
WITH meta AS (
SELECT
registered_on AS inserted_timestamp,
file_name,
{{ partition_function }} AS partition_key
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze", model) }}'
)
) A
)
SELECT
s.*,
b.file_name,
inserted_timestamp
FROM
{{ source(
"bronze",
model
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.partition_key
WHERE
b.partition_key = s.partition_key
AND DATA :error IS NULL
{% endmacro %}

View File

@ -1,26 +0,0 @@
{% macro create_sp_get_cosmos_blocks_history() %}
{% set sql %}
CREATE
OR REPLACE PROCEDURE streamline.sp_get_cosmos_blocks_history() returns variant LANGUAGE SQL AS $$
DECLARE
RESULT variant;
row_cnt INTEGER;
BEGIN
row_cnt:= (
SELECT
COUNT(1)
FROM
{{ ref('streamline__blocks_history') }}
);
if (
row_cnt > 0
) THEN RESULT:= (
SELECT
streamline.udf_get_cosmos_blocks()
);
ELSE RESULT:= NULL;
END if;
RETURN RESULT;
END;$$ {% endset %}
{% do run_query(sql) %}
{% endmacro %}

View File

@ -1,26 +0,0 @@
{% macro create_sp_get_cosmos_blocks_realtime() %}
{% set sql %}
CREATE
OR REPLACE PROCEDURE streamline.sp_get_cosmos_blocks_realtime() returns variant LANGUAGE SQL AS $$
DECLARE
RESULT variant;
row_cnt INTEGER;
BEGIN
row_cnt:= (
SELECT
COUNT(1)
FROM
{{ ref('streamline__blocks_realtime') }}
);
if (
row_cnt > 0
) THEN RESULT:= (
SELECT
streamline.udf_get_cosmos_blocks()
);
ELSE RESULT:= NULL;
END if;
RETURN RESULT;
END;$$ {% endset %}
{% do run_query(sql) %}
{% endmacro %}

View File

@ -1,26 +0,0 @@
{% macro create_sp_get_cosmos_transactions_history() %}
{% set sql %}
CREATE
OR REPLACE PROCEDURE streamline.sp_get_cosmos_transactions_history() returns variant LANGUAGE SQL AS $$
DECLARE
RESULT variant;
row_cnt INTEGER;
BEGIN
row_cnt:= (
SELECT
COUNT(1)
FROM
{{ ref('streamline__transactions_history') }}
);
if (
row_cnt > 0
) THEN RESULT:= (
SELECT
streamline.udf_get_cosmos_transactions()
);
ELSE RESULT:= NULL;
END if;
RETURN RESULT;
END;$$ {% endset %}
{% do run_query(sql) %}
{% endmacro %}

View File

@ -1,26 +0,0 @@
{% macro create_sp_get_cosmos_transactions_realtime() %}
{% set sql %}
CREATE
OR REPLACE PROCEDURE streamline.sp_get_cosmos_transactions_realtime() returns variant LANGUAGE SQL AS $$
DECLARE
RESULT variant;
row_cnt INTEGER;
BEGIN
row_cnt:= (
SELECT
COUNT(1)
FROM
{{ ref('streamline__transactions_realtime') }}
);
if (
row_cnt > 0
) THEN RESULT:= (
SELECT
streamline.udf_get_cosmos_transactions()
);
ELSE RESULT:= NULL;
END if;
RETURN RESULT;
END;$$ {% endset %}
{% do run_query(sql) %}
{% endmacro %}

View File

@ -1,26 +0,0 @@
{% macro create_sp_get_cosmos_validators_history() %}
{% set sql %}
CREATE
OR REPLACE PROCEDURE streamline.sp_get_cosmos_validators_history() returns variant LANGUAGE SQL AS $$
DECLARE
RESULT variant;
row_cnt INTEGER;
BEGIN
row_cnt:= (
SELECT
COUNT(1)
FROM
{{ ref('streamline__validators_history') }}
);
if (
row_cnt > 0
) THEN RESULT:= (
SELECT
streamline.udf_get_cosmos_validators()
);
ELSE RESULT:= NULL;
END if;
RETURN RESULT;
END;$$ {% endset %}
{% do run_query(sql) %}
{% endmacro %}

View File

@ -1,26 +0,0 @@
{% macro create_sp_get_cosmos_validators_realtime() %}
{% set sql %}
CREATE
OR REPLACE PROCEDURE streamline.sp_get_cosmos_validators_realtime() returns variant LANGUAGE SQL AS $$
DECLARE
RESULT variant;
row_cnt INTEGER;
BEGIN
row_cnt:= (
SELECT
COUNT(1)
FROM
{{ ref('streamline__validators_realtime') }}
);
if (
row_cnt > 0
) THEN RESULT:= (
SELECT
streamline.udf_get_cosmos_validators()
);
ELSE RESULT:= NULL;
END if;
RETURN RESULT;
END;$$ {% endset %}
{% do run_query(sql) %}
{% endmacro %}

View File

@ -1,54 +1,10 @@
{% macro create_udf_get_cosmos_blocks() %}
{% macro create_udf_bulk_rest_api_v2() %}
CREATE
OR REPLACE EXTERNAL FUNCTION streamline.udf_get_cosmos_blocks(
json variant
) returns text {% if target.name == "prod" %}
api_integration = aws_cosmos_api AS 'https://bp6s0ib6fk.execute-api.us-east-1.amazonaws.com/prod/bulk_get_cosmos_blocks'
OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_rest_api_v2(
json OBJECT
) returns ARRAY api_integration = {% if target.name == "prod" %}
aws_cosmos_api_prod AS 'https://kpg3w2qkm4.execute-api.us-east-1.amazonaws.com/prod/udf_bulk_rest_api'
{% else %}
api_integration = aws_cosmos_api_dev AS 'https://qkwbozz9l0.execute-api.us-east-1.amazonaws.com/dev/bulk_get_cosmos_blocks'
{%- endif %};
{% endmacro %}
{% macro create_udf_get_cosmos_transactions() %}
CREATE
OR REPLACE EXTERNAL FUNCTION streamline.udf_get_cosmos_transactions(
json variant
) returns text {% if target.name == "prod" %}
api_integration = aws_cosmos_api AS 'https://bp6s0ib6fk.execute-api.us-east-1.amazonaws.com/prod/bulk_get_cosmos_transactions'
{% else %}
api_integration = aws_cosmos_api_dev AS 'https://qkwbozz9l0.execute-api.us-east-1.amazonaws.com/dev/bulk_get_cosmos_transactions'
{%- endif %};
{% endmacro %}
{% macro create_udf_get_cosmos_validators() %}
CREATE
OR REPLACE EXTERNAL FUNCTION streamline.udf_get_cosmos_validators(
json variant
) returns text {% if target.name == "prod" %}
api_integration = aws_cosmos_api AS 'https://bp6s0ib6fk.execute-api.us-east-1.amazonaws.com/prod/bulk_get_cosmos_validators'
{% else %}
api_integration = aws_cosmos_api_dev AS 'https://qkwbozz9l0.execute-api.us-east-1.amazonaws.com/dev/bulk_get_cosmos_validators'
{%- endif %};
{% endmacro %}
{% macro create_udf_get_cosmos_generic() %}
CREATE
OR REPLACE EXTERNAL FUNCTION streamline.udf_get_cosmos_generic(
json variant
) returns text{% if target.name == "prod" %}
api_integration = aws_cosmos_api AS 'https://bp6s0ib6fk.execute-api.us-east-1.amazonaws.com/prod/bulk_get_cosmos_generic'
{% else %}
api_integration = aws_cosmos_api_dev AS 'https://qkwbozz9l0.execute-api.us-east-1.amazonaws.com/dev/bulk_get_cosmos_generic'
{%- endif %};
{% endmacro %}
{% macro create_udf_get_cosmos_chainhead() %}
CREATE
OR REPLACE EXTERNAL FUNCTION streamline.udf_get_cosmos_chainhead()
returns variant {% if target.name == "prod" %}
api_integration = aws_cosmos_api AS 'https://bp6s0ib6fk.execute-api.us-east-1.amazonaws.com/prod/get_cosmos_chainhead'
{% else %}
api_integration = aws_cosmos_api_dev AS'https://qkwbozz9l0.execute-api.us-east-1.amazonaws.com/dev/get_cosmos_chainhead'
aws_cosmos_api_stg AS 'https://e8nbzsw4r9.execute-api.us-east-1.amazonaws.com/stg/udf_bulk_rest_api'
{%- endif %};
{% endmacro %}

View File

@ -8,12 +8,12 @@
SELECT
VALUE,
_partition_by_block_id,
block_number AS block_id,
partition_key AS _partition_by_block_id,
DATA :block :header :height :: INT AS block_id,
metadata,
DATA,
TO_TIMESTAMP(
_inserted_timestamp
inserted_timestamp
) AS _inserted_timestamp
FROM
@ -25,7 +25,7 @@ FROM
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
@ -33,6 +33,6 @@ WHERE
)
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY block_number
qualify(ROW_NUMBER() over (PARTITION BY block_id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -1,11 +1,24 @@
{{ config (
materialized = 'view'
materialized = 'view',
tags = ['core']
) }}
{% set model = this.identifier.split("_") [-1] %}
{{ streamline_external_table_FR_query(
model,
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "_partition_by_block_id",
unique_key = "block_number"
) }}
SELECT
VALUE,
partition_key,
metadata,
DATA,
file_name,
inserted_timestamp
FROM
{{ ref('bronze__streamline_FR_blocks_v2') }}
UNION ALL
SELECT
VALUE,
_partition_by_block_id AS partition_key,
metadata,
DATA,
file_name,
_INSERTED_TIMESTAMP
FROM
{{ ref('bronze__streamline_FR_blocks_v1') }}

View File

@ -0,0 +1,9 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_FR_query(
model = "blocks",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "_partition_by_block_id",
unique_key = "block_number"
) }}

View File

@ -0,0 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_FR_query_v2(
model = "blocks_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )"
) }}

View File

@ -1,11 +1,24 @@
{{ config (
materialized = 'view'
materialized = 'view',
tags = ['core']
) }}
{% set model = this.identifier.split("_") [-1] %}
{{ streamline_external_table_FR_query(
model,
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "_partition_by_block_id",
unique_key = "block_number"
) }}
SELECT
VALUE,
partition_key,
metadata,
DATA,
file_name,
inserted_timestamp
FROM
{{ ref('bronze__streamline_FR_transactions_v2') }}
UNION ALL
SELECT
VALUE,
_partition_by_block_id AS partition_key,
metadata,
DATA,
file_name,
_INSERTED_TIMESTAMP
FROM
{{ ref('bronze__streamline_FR_transactions_v1') }}

View File

@ -0,0 +1,11 @@
{{ config (
materialized = 'view'
) }}
{% set model = this.identifier.split("_") [-1] %}
{{ streamline_external_table_FR_query(
model = "transactions",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "_partition_by_block_id",
unique_key = "block_number"
) }}

View File

@ -0,0 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_FR_query_v2(
model = "transactions_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )"
) }}

View File

@ -0,0 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_FR_query_v2(
model = "txcount_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )"
) }}

View File

@ -1,11 +1,7 @@
{{ config (
materialized = 'view'
) }}
{% set model = this.identifier.split("_") [-1] %}
{{ streamline_external_table_query(
model,
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "_partition_by_block_id",
unique_key = "block_number"
{{ streamline_external_table_query_v2(
model = "blocks_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )"
) }}

View File

@ -1,11 +1,8 @@
{{ config (
materialized = 'view'
materialized = 'view',
tags = ['core']
) }}
{% set model = this.identifier.split("_") [-1] %}
{{ streamline_external_table_query(
model,
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "_partition_by_block_id",
unique_key = "block_number"
{{ streamline_external_table_query_v2(
model = "transactions_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)"
) }}

View File

@ -0,0 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_query_v2(
model = "txcount_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )"
) }}

View File

@ -0,0 +1,109 @@
{{ config(
materialized = 'incremental',
incremental_predicates = ["dynamic_range_predicate", "partition_key"],
unique_key = ['block_id_requested','tx_id'],
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['_inserted_timestamp::DATE' ]
) }}
-- depends_on: {{ ref('bronze__streamline_transactions') }}
-- depends_on: {{ ref('bronze__streamline_FR_transactions') }}
SELECT
COALESCE(
DATA :height,
t.value :height
) :: INT AS block_id,
COALESCE(
DATA :hash,
t.value :hash
) :: STRING AS tx_id,
COALESCE(
DATA :index,
t.index
) AS tx_index,
COALESCE(
DATA :tx_result :codespace,
t.value :tx_result :codespace
) :: STRING AS codespace,
COALESCE(
DATA :tx_result :gas_used,
t.value :tx_result :gas_used
) :: NUMBER AS gas_used,
COALESCE(
DATA :tx_result :gas_wanted,
t.value :tx_result :gas_wanted
) :: NUMBER AS gas_wanted,
COALESCE(
DATA :tx_result :code,
t.value :tx_result :code
) :: INT AS tx_code,
CASE
WHEN NULLIF(
tx_code,
0
) IS NOT NULL THEN FALSE
ELSE TRUE
END AS tx_succeeded,
COALESCE(
DATA :tx_result :events,
t.value :tx_result :events
) AS msgs,
COALESCE(
TRY_PARSE_JSON(
COALESCE(
DATA :tx_result :log,
t.value :tx_result :log
)
),
COALESCE(
DATA :tx_result :log,
t.value :tx_result :log
)
) AS tx_log,
CASE
WHEN t.value IS NOT NULL THEN t.value
ELSE DATA
END AS DATA,
partition_key,
COALESCE(
A.value :BLOCK_NUMBER_REQUESTED,
REPLACE(
metadata :request :params [0],
'tx.height='
)
) AS block_id_requested,
inserted_timestamp AS _inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['block_id_requested','tx_id']
) }} AS transactions_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_transactions') }}
{% else %}
{{ ref('bronze__streamline_FR_transactions') }}
{% endif %}
A
JOIN LATERAL FLATTEN(
DATA :result :txs,
outer => TRUE
) t
{% if is_incremental() %}
WHERE
inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
qualify(ROW_NUMBER() over(PARTITION BY block_id_requested, tx_id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -9,8 +9,8 @@
SELECT
VALUE,
_partition_by_block_id,
block_number AS block_id,
partition_key AS _partition_by_block_id,
DATA :height :: INT AS block_id,
REPLACE(
metadata :request :params [0],
'tx.height='
@ -19,7 +19,7 @@ SELECT
DATA,
DATA :id AS unique_key,
TO_TIMESTAMP(
_inserted_timestamp
inserted_timestamp
) AS _inserted_timestamp
FROM

View File

@ -10,32 +10,29 @@ WITH base_transactions AS (
SELECT
block_id,
t.value :hash :: STRING AS tx_id,
t.value :tx_result :codespace AS codespace,
t.value :tx_result :gas_used :: NUMBER AS gas_used,
t.value :tx_result :gas_wanted :: NUMBER AS gas_wanted,
CASE
WHEN t.value :tx_result :code :: NUMBER = 0 THEN TRUE
ELSE FALSE
END AS tx_succeeded,
t.value :tx_result :code :: NUMBER AS tx_code,
t.value :tx_result :events AS msgs,
t.value :tx_result :log :: STRING AS tx_log,
_inserted_timestamp
tx_id,
codespace :: variant AS codespace,
gas_used,
gas_wanted,
tx_succeeded,
tx_code,
msgs,
tx_log,
TO_TIMESTAMP(
_inserted_timestamp
) AS _inserted_timestamp
FROM
{{ ref('bronze__tx_search') }},
TABLE(FLATTEN(DATA :result :txs)) t
{{ ref('bronze__transactions') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp :: DATE >= (
SELECT
MAX(_inserted_timestamp) :: DATE - 2
DATEADD('minute', -15, MAX(_inserted_timestamp))
FROM
{{ this }}
{{ this }})
{% endif %}
)
{% endif %}
)
SELECT
t.block_id,
b.block_timestamp,
@ -46,7 +43,7 @@ SELECT
tx_succeeded,
tx_code,
msgs,
tx_log,
tx_log :: STRING AS tx_log,
concat_ws(
'-',
t.block_id,
@ -63,20 +60,8 @@ FROM
base_transactions t
JOIN {{ ref('silver__blocks') }}
b
ON t.block_id = b.block_id
{% if is_incremental() %}
WHERE
b._inserted_timestamp :: DATE >= (
SELECT
MAX(_inserted_timestamp) :: DATE - 2
FROM
{{ this }}
)
{% endif %}
qualify ROW_NUMBER() over (
PARTITION BY tx_id
ORDER BY
t._inserted_timestamp DESC
) = 1
ON t.block_id = b.block_id qualify ROW_NUMBER() over (
PARTITION BY tx_id
ORDER BY
t._inserted_timestamp DESC
) = 1

View File

@ -20,6 +20,9 @@ sources:
- name: blocks_ch3
- name: tx_search_ch3
- name: validators_ch3
- name: blocks_v2
- name: txcount_v2
- name: transactions_v2
- name: osmo
database: osmosis

View File

@ -0,0 +1,37 @@
-- depends_on: {{ ref('bronze__streamline_blocks') }}
{{ config (
materialized = "incremental",
incremental_strategy = 'merge',
unique_key = "block_number",
cluster_by = "ROUND(block_number, -3)",
merge_exclude_columns = ["inserted_timestamp"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)"
) }}
SELECT
DATA :result :block :header :height :: INT AS block_number,
{{ dbt_utils.generate_surrogate_key(
['block_number']
) }} AS complete_blocks_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
file_name,
'{{ invocation_id }}' AS _invocation_id
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_blocks') }}
WHERE
inserted_timestamp >= (
SELECT
MAX(modified_timestamp) modified_timestamp
FROM
{{ this }}
)
{% else %}
{{ ref('bronze__streamline_FR_blocks') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY block_number
ORDER BY
inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,42 @@
-- depends_on: {{ ref('bronze__streamline_transactions') }}
{{ config (
materialized = "incremental",
incremental_strategy = 'merge',
unique_key = "complete_transactions_id",
cluster_by = "ROUND(block_number, -3)",
merge_exclude_columns = ["inserted_timestamp"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)"
) }}
SELECT
DATA :height :: INT AS block_number,
VALUE :PAGE_NUMBER :: INT AS page_number,
{{ dbt_utils.generate_surrogate_key(
['block_number','page_number']
) }} AS complete_transactions_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
file_name,
'{{ invocation_id }}' AS _invocation_id
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_transactions') }}
{% else %}
{{ ref('bronze__streamline_FR_transactions') }}
{% endif %}
WHERE
DATA <> '[]'
{% if is_incremental() %}
AND inserted_timestamp >= (
SELECT
MAX(modified_timestamp) modified_timestamp
FROM
{{ this }}
)
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY complete_transactions_id
ORDER BY
inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,38 @@
-- depends_on: {{ ref('bronze__streamline_tx_counts') }}
{{ config (
materialized = "incremental",
incremental_strategy = 'merge',
unique_key = "block_number",
cluster_by = "ROUND(block_number, -3)",
merge_exclude_columns = ["inserted_timestamp"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)"
) }}
SELECT
VALUE :BLOCK_NUMBER :: INT AS block_number,
DATA :result :total_count :: INT AS tx_count,
{{ dbt_utils.generate_surrogate_key(
['block_number']
) }} AS complete_tx_counts_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
file_name,
'{{ invocation_id }}' AS _invocation_id
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_tx_counts') }}
WHERE
inserted_timestamp >= (
SELECT
MAX(modified_timestamp) modified_timestamp
FROM
{{ this }}
)
{% else %}
{{ ref('bronze__streamline_FR_tx_counts') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY block_number
ORDER BY
inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,59 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"blocks_v2",
"sql_limit" :"100000",
"producer_batch_size" :"1000",
"worker_batch_size" :"100",
"sql_source" :"{{this.identifier}}" }
)
) }}
WITH blocks AS (
SELECT
block_number
FROM
{{ ref("streamline__blocks") }}
EXCEPT
SELECT
block_number
FROM
{{ ref("streamline__complete_blocks") }}
ORDER BY
1 DESC
LIMIT
50000
)
SELECT
ROUND(
block_number,
-4
) :: INT AS partition_key,
{{ target.database }}.live.udf_api(
'POST',
'{service}/{x-allthatnode-api-key}',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json'
),
OBJECT_CONSTRUCT(
'id',
block_number,
'jsonrpc',
'2.0',
'method',
'block',
'params',
ARRAY_CONSTRUCT(
block_number :: STRING
)
),
'vault/prod/cosmos/allthatnode/mainnet-archive/rpc'
) AS request
FROM
blocks
ORDER BY
block_number

View File

@ -0,0 +1,100 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"transactions_v2",
"sql_limit" :"100000",
"producer_batch_size" :"10000",
"worker_batch_size" :"1000",
"exploded_key": "[\"result.txs\"]",
"sql_source" :"{{this.identifier}}" }
)
) }}
WITH blocks AS (
SELECT
A.block_number,
tx_count
FROM
{{ ref("streamline__complete_tx_counts") }} A
WHERE
tx_count > 0
),
numbers AS (
-- Recursive CTE to generate numbers. We'll use the maximum txcount value to limit our recursion.
SELECT
1 AS n
UNION ALL
SELECT
n + 1
FROM
numbers
WHERE
n < (
SELECT
CEIL(MAX(tx_count) / 100.0)
FROM
blocks)
),
blocks_with_page_numbers AS (
SELECT
tt.block_number :: INT AS block_number,
n.n AS page_number
FROM
blocks tt
JOIN numbers n
ON n.n <= CASE
WHEN tt.tx_count % 100 = 0 THEN tt.tx_count / 100
ELSE FLOOR(
tt.tx_count / 100
) + 1
END
EXCEPT
SELECT
block_number,
page_number
FROM
{{ ref("streamline__complete_transactions") }}
ORDER BY
1
LIMIT
50000
)
SELECT
ROUND(
block_number,
-3
) :: INT AS partition_key,
{{ target.database }}.live.udf_api(
'POST',
'{service}/{x-allthatnode-api-key}',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json'
),
OBJECT_CONSTRUCT(
'id',
block_number,
'jsonrpc',
'2.0',
'method',
'tx_search',
'params',
ARRAY_CONSTRUCT(
'tx.height=' || block_number :: STRING,
TRUE,
page_number :: STRING,
'100',
'asc'
)
),
'vault/prod/cosmos/allthatnode/mainnet-archive/rpc'
) AS request,
page_number,
block_number AS block_number_requested
FROM
blocks_with_page_numbers
ORDER BY
block_number

View File

@ -0,0 +1,85 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"txcount_v2",
"sql_limit" :"100000",
"producer_batch_size" :"10000",
"worker_batch_size" :"1000",
"sql_source" :"{{this.identifier}}" }
)
) }}
-- depends_on: {{ ref('streamline__complete_tx_counts') }}
WITH blocks AS (
SELECT
block_number
FROM
{{ ref("streamline__blocks") }}
EXCEPT
SELECT
block_number
FROM
{{ ref("streamline__complete_tx_counts") }}
ORDER BY
1
LIMIT
50000
), {# retry AS (
SELECT
NULL AS A.block_number
FROM
{{ ref("streamline__complete_tx_counts") }} A
JOIN {{ ref("silver__blockchain") }}
b
ON A.block_number = b.block_id
WHERE
A.tx_count <> b.num_txs
),
#}
combo AS (
SELECT
block_number
FROM
blocks {# UNION
SELECT
block_number
FROM
retry #}
)
SELECT
ROUND(
block_number,
-3
) :: INT AS partition_key,
{{ target.database }}.live.udf_api(
'POST',
'{service}/{x-allthatnode-api-key}',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json'
),
OBJECT_CONSTRUCT(
'id',
block_number,
'jsonrpc',
'2.0',
'method',
'tx_search',
'params',
ARRAY_CONSTRUCT(
'tx.height=' || block_number :: STRING,
TRUE,
'1',
'1',
'asc'
)
),
'vault/prod/cosmos/allthatnode/mainnet-archive/rpc'
) AS request,
block_number
FROM
combo
ORDER BY
block_number

View File

@ -3,17 +3,18 @@
tags = ['streamline_view']
) }}
{% if execute %}
{% set height = run_query('SELECT streamline.udf_get_cosmos_chainhead()') %}
{% set block_height = height.columns[0].values()[0] %}
{% else %}
{% set block_height = 13000000 %}
{% endif %}
SELECT
height as block_number
_id AS block_number
FROM
TABLE(streamline.udtf_get_base_table({{block_height}}))
{{ source(
'crosschain_silver',
'number_sequence'
) }}
WHERE
height >= 5200791 -- Highest block the archive has available
_id >= 5200791
AND _id <= (
SELECT
MAX(block_number)
FROM
{{ ref('streamline__chainhead') }}
)

View File

@ -1,40 +0,0 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_get_cosmos_blocks(object_construct('sql_source', '{{this.identifier}}'))",
target = "{{this.schema}}.{{this.identifier}}"
)
) }}
{% for item in range(13) %}
(
SELECT
{{ dbt_utils.generate_surrogate_key(
['block_number']
) }} AS id,
block_number
FROM
{{ ref("streamline__blocks") }}
WHERE
block_number BETWEEN {{ item * 1000000 + 1 }}
AND {{(
item + 1
) * 1000000 }}
EXCEPT
SELECT
id,
block_number
FROM
{{ ref("streamline__complete_blocks") }}
WHERE
block_number BETWEEN {{ item * 1000000 + 1 }}
AND {{(
item + 1
) * 1000000 }}
ORDER BY
block_number
) {% if not loop.last %}
UNION ALL
{% endif %}
{% endfor %}

View File

@ -1,26 +0,0 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_get_cosmos_blocks(object_construct('sql_source', '{{this.identifier}}', 'call_type','NON_BATCH', 'batch_call_limit', 20))",
target = "{{this.schema}}.{{this.identifier}}"
)
) }}
SELECT
{{ dbt_utils.generate_surrogate_key(
['block_number']
) }} AS id,
block_number
FROM
{{ ref("streamline__blocks") }}
WHERE
block_number > 13000000
AND block_number IS NOT NULL
EXCEPT
SELECT
id,
block_number
FROM
{{ ref("streamline__complete_blocks") }}
WHERE
block_number > 13000000

View File

@ -0,0 +1,27 @@
{{ config (
materialized = "view",
tags = ['streamline_view']
) }}
SELECT
{{ target.database }}.live.udf_api(
'POST',
'{service}/{x-allthatnode-api-key}',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json',
'fsc-quantum-state',
'livequery'
),
OBJECT_CONSTRUCT(
'id',
0,
'jsonrpc',
'2.0',
'method',
'status',
'params',
[]
),
'vault/prod/cosmos/allthatnode/mainnet-archive/rpc'
) :data :result :sync_info :latest_block_height :: INT AS block_number

View File

@ -1,31 +0,0 @@
{{ config (
materialized = "incremental",
unique_key = "id",
cluster_by = "ROUND(block_number, -3)",
merge_update_columns = ["id"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)"
) }}
-- depends_on: {{ ref('bronze__streamline_blocks') }}
SELECT
id,
block_number,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_blocks') }}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
{{ this }}
)
{% else %}
{{ ref('bronze__streamline_FR_blocks') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -1,31 +0,0 @@
{{ config (
materialized = "incremental",
unique_key = "id",
cluster_by = "ROUND(block_number, -3)",
merge_update_columns = ["id"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)"
) }}
-- depends_on: {{ ref('bronze__streamline_transactions') }}
SELECT
id,
block_number,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_transactions') }}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
{{ this }}
)
{% else %}
{{ ref('bronze__streamline_FR_transactions') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -1,54 +0,0 @@
{{ config (
materialized = "incremental",
unique_key = "id",
cluster_by = "ROUND(block_number, -3)",
merge_update_columns = ["id"]
) }}
WITH meta AS (
SELECT
last_modified,
file_name
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze", "validators") }}'
)
) A
)
{% if is_incremental() %},
max_date AS (
SELECT
COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP
FROM
{{ this }})
{% endif %}
SELECT
{{ dbt_utils.generate_surrogate_key(
['block_number']
) }} AS id,
block_number,
last_modified AS _inserted_timestamp
FROM
{{ source(
"bronze",
"validators"
) }}
JOIN meta b
ON b.file_name = metadata$filename
{% if is_incremental() %}
WHERE
b.last_modified > (
SELECT
max_INSERTED_TIMESTAMP
FROM
max_date
)
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -1,40 +0,0 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_get_cosmos_transactions(object_construct('sql_source', '{{this.identifier}}'))",
target = "{{this.schema}}.{{this.identifier}}"
)
) }}
{% for item in range(13) %}
(
SELECT
{{ dbt_utils.generate_surrogate_key(
['block_number']
) }} AS id,
block_number
FROM
{{ ref("streamline__blocks") }}
WHERE
block_number BETWEEN {{ item * 1000000 + 1 }}
AND {{(
item + 1
) * 1000000 }}
EXCEPT
SELECT
id,
block_number
FROM
{{ ref("streamline__complete_transactions") }}
WHERE
block_number BETWEEN {{ item * 1000000 + 1 }}
AND {{(
item + 1
) * 1000000 }}
ORDER BY
block_number
) {% if not loop.last %}
UNION ALL
{% endif %}
{% endfor %}

View File

@ -1,26 +0,0 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_get_cosmos_transactions(object_construct('sql_source', '{{this.identifier}}','batch_call_limit','10','producer_batch_size', {{var('producer_batch_size','2000')}}, 'worker_batch_size', {{var('worker_batch_size','200')}}))",
target = "{{this.schema}}.{{this.identifier}}"
)
) }}
SELECT
{{ dbt_utils.generate_surrogate_key(
['block_number']
) }} AS id,
block_number
FROM
{{ ref("streamline__blocks") }}
WHERE
block_number > 6754140 {# block_number > 13000000 #}
AND block_number IS NOT NULL
EXCEPT
SELECT
id,
block_number
FROM
{{ ref("streamline__complete_transactions") }}
WHERE
block_number > 6754140 {# block_number > 13000000 #}

View File

@ -1,40 +0,0 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_get_cosmos_validators(object_construct('sql_source', '{{this.identifier}}'))",
target = "{{this.schema}}.{{this.identifier}}"
)
) }}
{% for item in range(13) %}
(
SELECT
{{ dbt_utils.generate_surrogate_key(
['block_number']
) }} AS id,
block_number
FROM
{{ ref("streamline__blocks") }}
WHERE
block_number BETWEEN {{ item * 1000000 + 1 }}
AND {{(
item + 1
) * 1000000 }}
EXCEPT
SELECT
id,
block_number
FROM
{{ ref("streamline__complete_validators") }}
WHERE
block_number BETWEEN {{ item * 1000000 + 1 }}
AND {{(
item + 1
) * 1000000 }}
ORDER BY
block_number
) {% if not loop.last %}
UNION ALL
{% endif %}
{% endfor %}

View File

@ -1,26 +0,0 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_get_cosmos_validators(object_construct('sql_source', '{{this.identifier}}', 'call_type','NON_BATCH', 'batch_call_limit', 20))",
target = "{{this.schema}}.{{this.identifier}}"
)
) }}
SELECT
{{ dbt_utils.generate_surrogate_key(
['block_number']
) }} AS id,
block_number
FROM
{{ ref("streamline__blocks") }}
WHERE
block_number > 13000000
AND block_number IS NOT NULL
EXCEPT
SELECT
id,
block_number
FROM
{{ ref("streamline__complete_validators") }}
WHERE
block_number > 13000000

View File

@ -1,14 +1,14 @@
packages:
- package: calogica/dbt_expectations
version: 0.8.0
version: 0.8.5
- package: dbt-labs/dbt_utils
version: 1.0.0
- git: https://github.com/FlipsideCrypto/fsc-utils.git
revision: 80485821ff14c1393b9a533cd06ca2ec6fdb04df
revision: 484e9db07d2060286768bb745e1b0e879178d43b
- package: get-select/dbt_snowflake_query_tags
version: 2.3.3
version: 2.5.0
- package: calogica/dbt_date
version: 0.7.2
- git: https://github.com/FlipsideCrypto/livequery-models.git
revision: 883675b4021cc9a777e12fe6be8114ab039ab365
sha1_hash: d3f55d1deca5bc212326f61245754d646816057d
revision: b024188be4e9c6bc00ed77797ebdc92d351d620e
sha1_hash: c5a90d3c4a3f4e5450031099b59d32e30c7d759e

View File

@ -1,9 +1,9 @@
packages:
- package: calogica/dbt_expectations
version: 0.8.0
version: [">=0.4.0", "<0.9.0"]
- package: dbt-labs/dbt_utils
version: 1.0.0
- git: https://github.com/FlipsideCrypto/fsc-utils.git
revision: v1.20.0
revision: v1.23.0
- package: get-select/dbt_snowflake_query_tags
version: [">=2.0.0", "<3.0.0"]