Merge branch 'main' into upd-fsc-utils

Merge Main
This commit is contained in:
forgash_ 2024-08-21 16:32:39 -06:00
commit e45e5e330f
238 changed files with 3501 additions and 8463 deletions

59
.github/workflows/dbt.yml vendored Normal file
View File

@ -0,0 +1,59 @@
name: dbt
on:
workflow_call:
inputs:
warehouse:
required: false
type: string
environment:
required: true
type: string
command:
required: true
type: string
python_version:
required: false
type: string
default: "3.10"
env:
DBT_PROFILES_DIR: ./
ACCOUNT: ${{ vars.ACCOUNT }}
ROLE: ${{ vars.ROLE }}
USER: ${{ vars.USER }}
PASSWORD: ${{ secrets.PASSWORD }}
REGION: ${{ vars.REGION }}
DATABASE: ${{ vars.DATABASE }}
WAREHOUSE: ${{ inputs.warehouse }}
SCHEMA: ${{ vars.SCHEMA }}
TARGET: ${{ vars.TARGET }}
jobs:
dbt:
runs-on: ubuntu-latest
environment:
name: ${{ inputs.environment }}
env:
WAREHOUSE: ${{ inputs.warehouse || vars.WAREHOUSE }}
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: ${{ inputs.python_version }}
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
${{ inputs.command }}
- name: Store logs
if: always()
uses: actions/upload-artifact@v3
with:
name: logs-${{ inputs.environment }}-${{ github.run_number }}-${{ github.run_attempt }}
path: |
logs
target

View File

@ -0,0 +1,19 @@
name: integration test
run-name: ${{ github.event.inputs.branch }}
on:
workflow_dispatch:
concurrency: ${{ github.workflow }}
jobs:
test:
name: livequery_integration_test
uses: ./.github/workflows/dbt.yml
secrets: inherit
with:
warehouse: DBT_CLOUD
environment: ${{ github.ref == 'refs/heads/main' && 'prod' || 'stg' }}
command: dbt test -s 'livequery_models.deploy.core.*'

View File

@ -0,0 +1,88 @@
name: dbt_run_adhoc_with_alert
run-name: ${{ inputs.dbt_command }}
on:
workflow_dispatch:
inputs:
environment:
type: choice
description: DBT Run Environment
required: true
options:
- dev
- prod
default: dev
warehouse:
type: choice
description: Snowflake warehouse
required: true
options:
- DBT
- DBT_CLOUD
- DBT_EMERGENCY
default: DBT
dbt_command:
type: string
description: 'DBT Run Command'
required: true
env:
SLACK_WEBHOOK_URL: "${{ secrets.SLACK_WEBHOOK_URL }}"
USE_VARS: "${{ vars.USE_VARS }}"
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
DBT_VERSION: "${{ vars.DBT_VERSION }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
SCHEMA: "${{ vars.SCHEMA }}"
WAREHOUSE: "${{ inputs.warehouse }}"
ENVIRONMENT: "${{ inputs.environment }}"
DBT_COMMAND: "${{ inputs.dbt_command }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_${{ inputs.environment }}
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
id: dbt_run_command
run: |
start_time=$(date +%s)
${{ inputs.dbt_command }}
end_time=$(date +%s)
elapsed_time=$(expr $end_time - $start_time)
echo "ELAPSED_TIME=$elapsed_time" >> $GITHUB_ENV
continue-on-error: true
- name: Log run status
run: echo "DBT_RUN_STATUS=${{ steps.dbt_run_command.outcome }}" >> $GITHUB_ENV
- name: Send Notification
run: |
python python/dbt_slack_notification.py
- name: Store logs
uses: actions/upload-artifact@v3
with:
name: dbt-logs
path: logs

View File

@ -1,57 +0,0 @@
name: dbt_run_history
run-name: dbt_run_history
on:
workflow_dispatch:
schedule:
# Runs every 2 hours
# - cron: "0 */2 * * *"
# Runs every hour
- cron: "0 * * * *"
env:
USE_VARS: "${{ vars.USE_VARS }}"
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
DBT_VERSION: "${{ vars.DBT_VERSION }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
dbt:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run -s \
2+streamline__get_transactions_history_mainnet_18 \
2+streamline__get_transactions_history_mainnet_19 \
2+streamline__get_transaction_results_history_mainnet_14 \
2+streamline__get_transaction_results_history_mainnet_15 \
2+streamline__get_transaction_results_history_mainnet_16 \
2+streamline__get_batch_transaction_results_history_mainnet_17 \
2+streamline__get_batch_transaction_results_history_mainnet_18 \
2+streamline__get_batch_transaction_results_history_mainnet_19 \
2+streamline__get_batch_transaction_results_history_mainnet_22 \
--vars '{"STREAMLINE_INVOKE_STREAMS": True}'

View File

@ -7,7 +7,9 @@ on:
# 4x/hour schedule = Every 15 minutes (see https://crontab.guru)
# - cron: "*/15 * * * *"
# 3x/hour schedule = Every 20 minutes (see https://crontab.guru)
- cron: "*/20 * * * *"
# - cron: "*/20 * * * *"
# 2x/hour schedule = Every 30 minutes (see https://crontab.guru)
- cron: "*/30 * * * *"
env:
USE_VARS: "${{ vars.USE_VARS }}"
@ -46,7 +48,7 @@ jobs:
run: >
dbt run-operation stage_external_sources --vars "ext_full_refresh: true";
dbt seed;
dbt run -s tag:scheduled_core tag:streamline_complete "flow_models,models/gold" --vars '{"STREAMLINE_START_BLOCK": ${{ vars.STREAMLINE_START_BLOCK }}}'
dbt run -s tag:scheduled_core tag:streamline_complete "flow_models,models/gold"
- name: Store logs
uses: actions/upload-artifact@v3

View File

@ -1,5 +1,5 @@
name: dbt_run_scheduled
run-name: dbt_run_scheduled
name: dbt_run_scheduled_non_core
run-name: dbt_run_scheduled_non_core
on:
workflow_dispatch:
@ -44,7 +44,7 @@ jobs:
run: >
dbt run-operation stage_external_sources --vars "ext_full_refresh: true";
dbt seed;
dbt run -s tag:scheduled_non_core --vars '{"STREAMLINE_START_BLOCK": ${{ vars.STREAMLINE_START_BLOCK }}}'
dbt run -s tag:scheduled_non_core
- name: Store logs
uses: actions/upload-artifact@v3

View File

@ -0,0 +1,46 @@
name: dbt_run_streamline_blocks_testnet
run-name: dbt_run_streamline_blocks_testnet
on:
workflow_dispatch:
schedule:
# 1x/hour schedule = At hour 8, 21, 36, 51 every day (see https://crontab.guru)
- cron: "7,27,47 10,20 * * *"
env:
USE_VARS: "${{ vars.USE_VARS }}"
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Realtime
run: |
dbt run -s 2+streamline__get_testnet_blocks_realtime --vars '{"STREAMLINE_INVOKE_STREAMS": True}'

View File

@ -0,0 +1,46 @@
name: dbt_run_streamline_collections_testnet
run-name: dbt_run_streamline_collections_testnet
on:
workflow_dispatch:
schedule:
# 1x/hour schedule = At hour 8, 21, 36, 51 every day (see https://crontab.guru)
- cron: "11,31,51 10,20 * * *"
env:
USE_VARS: "${{ vars.USE_VARS }}"
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Realtime
run: |
dbt run -s 2+streamline__get_testnet_collections_realtime --vars '{"STREAMLINE_INVOKE_STREAMS": True}'

View File

@ -0,0 +1,46 @@
name: dbt_run_streamline_transaction_results_testnet
run-name: dbt_run_streamline_transaction_results_testnet
on:
workflow_dispatch:
schedule:
# 1x/hour schedule = At hour 8, 21, 36, 51 every day (see https://crontab.guru)
- cron: "15,35,55 10,20 * * *"
env:
USE_VARS: "${{ vars.USE_VARS }}"
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Realtime
run: |
dbt run -s 1+streamline__get_testnet_transaction_results_realtime --vars '{"STREAMLINE_INVOKE_STREAMS": True}'

View File

@ -0,0 +1,46 @@
name: dbt_run_streamline_transactions_testnet
run-name: dbt_run_streamline_transactions_testnet
on:
workflow_dispatch:
schedule:
# 1x/hour schedule = At hour 8, 21, 36, 51 every day (see https://crontab.guru)
- cron: "15,35,55 10,20 * * *"
env:
USE_VARS: "${{ vars.USE_VARS }}"
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Realtime
run: |
dbt run -s 1+streamline__get_testnet_transactions_realtime --vars '{"STREAMLINE_INVOKE_STREAMS": True}'

View File

@ -2,7 +2,7 @@
# and underscores. A good package name should reflect your organization's
# name or the intended use of these models
name: "flow_models"
version: "1.7.0"
version: "1.8.0"
config-version: 2
require-dbt-version: ">=1.7.0"
@ -64,8 +64,9 @@ vars:
STREAMLINE_RUN_HISTORY: False
DROP_UDFS_AND_SPS: False
REST_API_PREFIX_PROD: quxfxtl934.execute-api.us-east-1.amazonaws.com/prod/
REST_API_PREFIX_DEV: ul6x832e8l.execute-api.us-east-1.amazonaws.com/dev/
REST_API_PREFIX_DEV: sicl8dvvv9.execute-api.us-east-1.amazonaws.com/dev/
STREAMLINE_START_BLOCK: 55114467
LOAD_BACKFILL_VERSION: CANDIDATE_07
dispatch:
- macro_namespace: dbt
@ -73,6 +74,6 @@ dispatch:
- flow-models
- dbt_snowflake_query_tags
- dbt
# query-comment:
# comment: '{{ dbt_snowflake_query_tags.get_query_comment(node) }}'
# append: true # Snowflake removes prefixed comments.
query-comment:
comment: '{{ dbt_snowflake_query_tags.get_query_comment(node) }}'
append: true # Snowflake removes prefixed comments.

View File

@ -1,12 +1,12 @@
{% macro create_udfs() %}
{% if var("UPDATE_UDFS_AND_SPS") %}
{% if target.database != "FLOW_COMMUNITY_DEV" %}
{% set sql %}
{{ create_udtf_get_base_table(
schema = "streamline"
) }}
{{ create_udf_get_chainhead() }}
{{ create_udf_get_chainhead_testnet() }}
{{ create_udf_bulk_grpc() }}
{{ run_create_udf_array_disjunctive_union() }}
@ -15,6 +15,5 @@
{% endset %}
{% do run_query(sql) %}
{{- fsc_utils.create_udfs() -}}
{% endif %}
{% endif %}
{% endmacro %}

View File

@ -12,14 +12,23 @@
CREATE api integration IF NOT EXISTS aws_flow_api_prod_us_east_2 api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::490041342817:role/flow-api-prod-rolesnowflakeudfsAF733095-F6SPYWFGQX9Z' api_allowed_prefixes = (
'https://78rpbojpue.execute-api.us-east-2.amazonaws.com/prod/'
) enabled = TRUE;
{% endset %}
{% do run_query(sql) %}
{% set sql %}
CREATE api integration IF NOT EXISTS aws_flow_evm_api_prod api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::924682671219:role/flow-api-prod-rolesnowflakeudfsAF733095-RmrgKIWbzoFL' api_allowed_prefixes = (
'https://rajpkbgko9.execute-api.us-east-1.amazonaws.com/prod/'
) enabled = TRUE;
{% endset %}
{% do run_query(sql) %}
{% elif target.name == "dev" %}
{{ log("Generating api integration for target:" ~ target.name, info=True) }}
{% set sql %}
CREATE api integration IF NOT EXISTS aws_flow_api_dev_2 api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::490041342817:role/flow-api-dev-rolesnowflakeudfsAF733095-1IP9GV997U5RM' api_allowed_prefixes = (
'https://ul6x832e8l.execute-api.us-east-1.amazonaws.com/dev/'
CREATE api integration IF NOT EXISTS aws_flow_api_dev_2 api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::490041342817:role/flow-api-dev-rolesnowflakeudfsAF733095-i1JsMNTpSzX0' api_allowed_prefixes = (
'https://sicl8dvvv9.execute-api.us-east-1.amazonaws.com/dev/'
) enabled = TRUE;
{% endset %}
{% do run_query(sql) %}
@ -30,6 +39,14 @@
) enabled = TRUE;
{% endset %}
{% do run_query(sql) %}
{% set sql %}
CREATE api integration IF NOT EXISTS aws_flow_evm_api_dev api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::704693948482:role/flow-api-stg-rolesnowflakeudfsAF733095-tPEdygwPC6IV' api_allowed_prefixes = (
'https://pfv9lhg3kg.execute-api.us-east-1.amazonaws.com/stg/'
) enabled = TRUE;
{% endset %}
{% do run_query(sql) %}
{% elif target.name == "sbx" %}
{{ log("Generating api integration for target:" ~ target.name, info=True) }}
{% set sql %}

View File

@ -10,7 +10,7 @@ $$
seq4()
) as id
from
table(generator(rowcount => 100000000)) -- July 2023 Flow Chain head is at 57M
table(generator(rowcount => 1000000000))
)
select
id as height

View File

@ -6,13 +6,28 @@
{% if target.name == "prod" %}
aws_flow_api_prod AS 'https://quxfxtl934.execute-api.us-east-1.amazonaws.com/prod/get_chainhead'
{% elif target.name == "dev" %}
aws_flow_api_dev_2 AS 'https://ul6x832e8l.execute-api.us-east-1.amazonaws.com/dev/get_chainhead'
aws_flow_api_dev_2 AS 'https://sicl8dvvv9.execute-api.us-east-1.amazonaws.com/dev/get_chainhead'
{% elif target.name == "sbx" %}
{{ log("Creating sbx get_chainhead", info=True) }}
aws_flow_api_sbx AS 'https://bc5ejedoq8.execute-api.us-east-1.amazonaws.com/sbx/get_chainhead'
{%- endif %};
{% endmacro %}
{% macro create_udf_get_chainhead_testnet() %}
{{ log("Creating udf get_chainhead_testnet for target:" ~ target.name ~ ", schema: " ~ target.schema, info=True) }}
{{ log("role:" ~ target.role ~ ", user:" ~ target.user, info=True) }}
CREATE
OR REPLACE EXTERNAL FUNCTION streamline.udf_get_chainhead_testnet() returns variant api_integration =
{% if target.name == "prod" %}
aws_flow_api_prod_us_east_2 AS 'https://78rpbojpue.execute-api.us-east-2.amazonaws.com/prod/get_chainhead_testnet'
{% elif target.name == "dev" %}
aws_flow_api_dev_2 AS 'https://sicl8dvvv9.execute-api.us-east-1.amazonaws.com/dev/get_chainhead_testnet'
{% elif target.name == "sbx" %}
{{ log("Creating sbx get_chainhead_testnet", info=True) }}
aws_flow_api_sbx AS 'https://bc5ejedoq8.execute-api.us-east-1.amazonaws.com/sbx/get_chainhead_testnet'
{%- endif %};
{% endmacro %}
{% macro create_udf_bulk_grpc() %}
{{ log("Creating udf udf_bulk_grpc for target:" ~ target.name ~ ", schema: " ~ target.schema, info=True) }}
{{ log("role:" ~ target.role ~ ", user:" ~ target.user, info=True) }}
@ -21,7 +36,7 @@
{% if target.name == "prod" %}
aws_flow_api_prod AS 'https://quxfxtl934.execute-api.us-east-1.amazonaws.com/prod/udf_bulk_grpc'
{% elif target.name == "dev" %}
aws_flow_api_dev_2 AS 'https://ul6x832e8l.execute-api.us-east-1.amazonaws.com/dev/udf_bulk_grpc'
aws_flow_api_dev_2 AS 'https://sicl8dvvv9.execute-api.us-east-1.amazonaws.com/dev/udf_bulk_grpc'
{% elif target.name == "sbx" %}
{{ log("Creating sbx udf_bulk_grpc", info=True) }}
aws_flow_api_sbx AS 'https://bc5ejedoq8.execute-api.us-east-1.amazonaws.com/sbx/udf_bulk_grpc'
@ -58,10 +73,32 @@
{% if target.name == "prod" %}
aws_flow_api_prod AS 'https://quxfxtl934.execute-api.us-east-1.amazonaws.com/prod/udf_api'
{% elif target.name == "dev" %}
aws_flow_api_dev_2 AS 'https://ul6x832e8l.execute-api.us-east-1.amazonaws.com/dev/udf_api'
aws_flow_api_dev_2 AS 'https://sicl8dvvv9.execute-api.us-east-1.amazonaws.com/dev/udf_api'
{% elif target.name == "sbx" %}
{{ log("Creating sbx udf_api", info=True) }}
aws_flow_api_sbx AS 'https://bc5ejedoq8.execute-api.us-east-1.amazonaws.com/sbx/udf_api'
{%- endif %};
{% endmacro %}
{% macro create_udf_bulk_rest_api_v2() %}
CREATE
OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_rest_api_v2(
json OBJECT
) returns ARRAY api_integration =
{% if target.name == "prod" %}
aws_flow_evm_api_prod AS 'https://rajpkbgko9.execute-api.us-east-1.amazonaws.com/prod/udf_bulk_rest_api'
{% else %}
aws_flow_evm_api_dev AS 'https://pfv9lhg3kg.execute-api.us-east-1.amazonaws.com/stg/udf_bulk_rest_api'
{%- endif %};
{% endmacro %}
{% macro create_udf_bulk_decode_logs() %}
CREATE
OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_decode_logs(
json OBJECT
) returns ARRAY api_integration = {% if target.name == "prod" %}
aws_flow_evm_api_prod AS 'https://rajpkbgko9.execute-api.us-east-1.amazonaws.com/prod/bulk_decode_logs'
{% else %}
aws_flow_evm_api_dev AS'https://pfv9lhg3kg.execute-api.us-east-1.amazonaws.com/stg/bulk_decode_logs'
{%- endif %};
{% endmacro %}

View File

@ -19,7 +19,7 @@ WITH labels AS (
labels_combined_id
FROM
{{ source(
'crosschain_silver',
'silver_crosschain',
'labels_combined'
) }}
WHERE

View File

@ -1,41 +0,0 @@
{{ config(
materialized = 'view',
enabled = False
) }}
WITH token_prices AS (
SELECT
*
FROM
{{ source(
'silver',
'prices_v2'
) }}
WHERE
asset_id IN (
'4558',
-- Flow
'6993',
-- Revv
'8075',
-- Rally
'12182',
-- Blocto Token
'15139',
-- Starly
'15194',
-- Sportium
'flow',
'rally-2',
'revv',
'sportium',
'starly',
'blocto-token'
)
AND provider IS NOT NULL
)
SELECT
*
FROM
token_prices

View File

@ -1,82 +0,0 @@
{{ config(
materialized = 'view',
tags = ['scheduled']
) }}
WITH coingecko AS (
SELECT
'coingecko' AS provider,
id :: STRING AS id,
recorded_hour,
OPEN,
high,
low,
CLOSE,
_inserted_timestamp
FROM
{{ source(
'crosschain_silver',
'hourly_prices_coin_gecko'
) }}
),
coinmarketcap AS (
SELECT
'coinmarketcap' AS provider,
id :: STRING AS id,
recorded_hour,
OPEN,
high,
low,
CLOSE,
_inserted_timestamp
FROM
{{ source(
'crosschain_silver',
'hourly_prices_coin_market_cap'
) }}
),
token_prices AS (
SELECT
*
FROM
coingecko
UNION ALL
SELECT
*
FROM
coinmarketcap
)
SELECT
*
FROM
token_prices
WHERE
-- numeric ids are cmc, alpha are coingecko
id IN (
'4558',
-- Flow
'6993',
-- Revv
'8075',
-- Rally
'12182',
-- Blocto Token
'15139',
-- Starly
'15194',
-- Sportium
'flow',
'rally-2',
'revv',
'sportium',
'starly',
'blocto-token'
)
AND provider IS NOT NULL -- tokens on increment that are not on either proider:
-- my
-- ozone
-- sdm
-- stFLOVATAR
-- thul
-- ce tokens

View File

@ -0,0 +1,26 @@
{{ config (
materialized = 'view'
) }}
SELECT
asset_id,
symbol,
NAME,
decimals,
blockchain,
is_deprecated,
provider,
source,
_inserted_timestamp,
inserted_timestamp,
modified_timestamp,
complete_native_asset_metadata_id,
_invocation_id
FROM
{{ source(
'silver_crosschain',
'complete_native_asset_metadata'
) }}
WHERE
blockchain = 'flow'
AND symbol = 'FLOW'

View File

@ -0,0 +1,29 @@
{{ config (
materialized = 'view'
) }}
SELECT
HOUR,
asset_id,
symbol,
NAME,
decimals,
price,
blockchain,
is_imputed,
is_deprecated,
provider,
source,
_inserted_timestamp,
inserted_timestamp,
modified_timestamp,
complete_native_prices_id,
_invocation_id
FROM
{{ source(
'silver_crosschain',
'complete_native_prices'
) }}
WHERE
blockchain = 'flow'
AND symbol = 'FLOW'

View File

@ -0,0 +1,25 @@
{{ config (
materialized = 'view'
) }}
SELECT
asset_id,
token_address,
NAME,
symbol,
platform,
platform_id,
provider,
source,
_inserted_timestamp,
inserted_timestamp,
modified_timestamp,
complete_provider_asset_metadata_id,
_invocation_id
FROM
{{ source(
'silver_crosschain',
'complete_provider_asset_metadata'
) }}
WHERE
platform = 'flow'

View File

@ -0,0 +1,24 @@
{{ config (
materialized = 'view'
) }}
SELECT
asset_id,
recorded_hour,
OPEN,
high,
low,
CLOSE,
provider,
source,
_inserted_timestamp,
inserted_timestamp,
modified_timestamp,
complete_provider_prices_id,
_invocation_id
FROM
{{ source(
'silver_crosschain',
'complete_provider_prices'
) }}
-- prices for all ids

View File

@ -0,0 +1,28 @@
{{ config (
materialized = 'view'
) }}
SELECT
token_address,
asset_id,
symbol,
NAME,
decimals,
blockchain,
blockchain_name,
blockchain_id,
is_deprecated,
provider,
source,
_inserted_timestamp,
inserted_timestamp,
modified_timestamp,
complete_token_asset_metadata_id,
_invocation_id
FROM
{{ source(
'silver_crosschain',
'complete_token_asset_metadata'
) }}
WHERE
blockchain = 'flow'

View File

@ -0,0 +1,31 @@
{{ config (
materialized = 'view'
) }}
SELECT
HOUR,
token_address,
asset_id,
symbol,
NAME,
decimals,
price,
blockchain,
blockchain_name,
blockchain_id,
is_imputed,
is_deprecated,
provider,
source,
_inserted_timestamp,
inserted_timestamp,
modified_timestamp,
complete_token_prices_id,
_invocation_id
FROM
{{ source(
'silver_crosschain',
'complete_token_prices'
) }}
WHERE
blockchain = 'flow'

View File

@ -0,0 +1,54 @@
# Backfill
Bronze backfill models have been parametrized to load one network version at a time, as each set of NVs and method responses is a separate bucket and external table.
Run either an individual model type (blocks, collections, transactions, transaction_results) or all 4 at once with `tag:streamline_load`.
```shell
dbt run -s 1+tag:streamline_load --vars '{"LOAD_BACKFILL": True, "LOAD_BACKFILL_VERSION": "<NV>"}'
```
## Valid Network Versions
- CANDIDATE_07
- CANDIDATE_08
- CANDIDATE_09
- MAINNET_01
- MAINNET_02
- MAINNET_03
- MAINNET_04
- MAINNET_05
- MAINNET_06
- MAINNET_07
- MAINNET_08
- MAINNET_09
- MAINNET_10
- MAINNET_11
- MAINNET_12
- MAINNET_13
- MAINNET_14
- MAINNET_15
- MAINNET_16
- MAINNET_17
- MAINNET_18
- MAINNET_19
- MAINNET_20
- MAINNET_21
- MAINNET_22
## View Types
Views with the word `complete` in the name are used in the complete history models at `models/silver/streamline/core/complete`. These use a macro to scan multiple external tables in one call, and feed the streamline backfill process.
The views `bronze__streamline_<method>_history` query just one network version based on the `LOAD_BACKFILL_VERSION` argument passed at runtime. No default is set for this variable so execution fails if it is forgottten.
## Running Streamline Backfill
If a a network version requires more backfill due to missing blocks or transactions (at present, there are 5800 missing transaction results), run the following command as the workflow dbt_run_history has been deleted.
```shell
dbt run -s 2+streamline__get_<method>_history_<network_version> --vars '{"STREAMLINE_INVOKE_STREAMS": True}'
```
i.e.
```shell
dbt run -s \
2+streamline__get_transaction_results_history_mainnet_22 \
--vars '{"STREAMLINE_INVOKE_STREAMS": True}'
```

View File

@ -0,0 +1,36 @@
{{ config (
materialized = 'ephemeral'
) }}
{% set history_model = "BLOCKS_" ~ var('LOAD_BACKFILL_VERSION') %}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER) AS _partition_by_block_id
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", history_model ) }}'
)
) A
)
SELECT
block_number,
DATA,
_inserted_timestamp,
MD5(
CAST(
COALESCE(CAST(block_number AS text), '' :: STRING) AS text
)
) AS _fsc_id,
s._partition_by_block_id,
s.value AS VALUE
FROM
{{ source("bronze_streamline", history_model ) }} s
JOIN meta b
ON b.file_name = metadata$filename
AND b._partition_by_block_id = s._partition_by_block_id
WHERE
b._partition_by_block_id = s._partition_by_block_id

View File

@ -0,0 +1,37 @@
{{ config (
materialized = 'ephemeral'
) }}
{% set history_model = "COLLECTIONS_" ~ var('LOAD_BACKFILL_VERSION') %}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER) AS _partition_by_block_id
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", history_model ) }}'
)
) A
)
SELECT
block_number,
id,
DATA,
_inserted_timestamp,
MD5(
CAST(
COALESCE(CAST(block_number AS text), '' :: STRING) AS text
)
) AS _fsc_id,
s._partition_by_block_id,
s.value AS VALUE
FROM
{{ source("bronze_streamline", history_model ) }} s
JOIN meta b
ON b.file_name = metadata$filename
AND b._partition_by_block_id = s._partition_by_block_id
WHERE
b._partition_by_block_id = s._partition_by_block_id

View File

@ -0,0 +1,37 @@
{{ config (
materialized = 'ephemeral'
) }}
{% set history_model = "TRANSACTION_RESULTS_" ~ var('LOAD_BACKFILL_VERSION') %}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER) AS _partition_by_block_id
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", history_model ) }}'
)
) A
)
SELECT
block_number,
id,
DATA,
_inserted_timestamp,
MD5(
CAST(
COALESCE(CAST(block_number AS text), '' :: STRING) AS text
)
) AS _fsc_id,
s._partition_by_block_id,
s.value AS VALUE
FROM
{{ source("bronze_streamline", history_model ) }} s
JOIN meta b
ON b.file_name = metadata$filename
AND b._partition_by_block_id = s._partition_by_block_id
WHERE
b._partition_by_block_id = s._partition_by_block_id

View File

@ -0,0 +1,37 @@
{{ config (
materialized = 'ephemeral'
) }}
{% set history_model = "TRANSACTIONS_" ~ var('LOAD_BACKFILL_VERSION') %}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER) AS _partition_by_block_id
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", history_model ) }}'
)
) A
)
SELECT
block_number,
id,
DATA,
_inserted_timestamp,
MD5(
CAST(
COALESCE(CAST(block_number AS text), '' :: STRING) AS text
)
) AS _fsc_id,
s._partition_by_block_id,
s.value AS VALUE
FROM
{{ source("bronze_streamline", history_model ) }} s
JOIN meta b
ON b.file_name = metadata$filename
AND b._partition_by_block_id = s._partition_by_block_id
WHERE
b._partition_by_block_id = s._partition_by_block_id

View File

@ -0,0 +1,3 @@
# Streamline Realtime Models
Flow migrated to Streamline with Mainnet-23. These views point to the external tables from there forward.

View File

@ -0,0 +1,34 @@
{{ config (
materialized = 'view'
) }}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER) AS _partition_by_block_id
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", "testnet_blocks") }}'
)
) A
)
SELECT
block_number,
DATA,
_inserted_timestamp,
MD5(
CAST(
COALESCE(CAST(block_number AS text), '' :: STRING) AS text
)
) AS _fsc_id,
s._partition_by_block_id,
s.value AS VALUE
FROM
{{ source("bronze_streamline", "testnet_blocks") }} s
JOIN meta b
ON b.file_name = metadata$filename
AND b._partition_by_block_id = s._partition_by_block_id
WHERE
b._partition_by_block_id = s._partition_by_block_id

View File

@ -0,0 +1,11 @@
{{ config (
materialized = 'view'
) }}
{% set model = this.identifier.split("_")[-2:] | join('_') %}
{{ streamline_external_table_FR_query(
model,
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "_partition_by_block_id",
unique_key = "id"
) }}

View File

@ -0,0 +1,13 @@
{{ config (
materialized = 'view'
) }}
{% set model = this.identifier.split("_")[-3:] | join('_') %}
{{ streamline_external_table_FR_query(
model,
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "_partition_by_block_id",
unique_key = "id"
) }}

View File

@ -0,0 +1,11 @@
{{ config (
materialized = 'view'
) }}
{% set model = this.identifier.split("_")[-2:] | join('_') %}
{{ streamline_external_table_FR_query(
model,
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "_partition_by_block_id",
unique_key = "id"
) }}

View File

@ -0,0 +1,35 @@
{{ config (
materialized = 'view'
) }}
WITH meta AS (
SELECT
last_modified AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER) AS _partition_by_block_id
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze_streamline", "testnet_blocks") }}')
) A
)
SELECT
block_number,
DATA,
_inserted_timestamp,
MD5(
CAST(
COALESCE(CAST(block_number AS text), '' :: STRING) AS text
)
) AS _fsc_id,
s._partition_by_block_id,
s.value AS VALUE
FROM
{{ source("bronze_streamline","testnet_blocks") }} s
JOIN meta b
ON b.file_name = metadata$filename
AND b._partition_by_block_id = s._partition_by_block_id
WHERE
b._partition_by_block_id = s._partition_by_block_id

View File

@ -0,0 +1,11 @@
{{ config (
materialized = 'view'
) }}
{% set model = this.identifier.split("_")[-2:] | join('_') %}
{{ streamline_external_table_query(
model,
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "_partition_by_block_id",
unique_key = "id"
) }}

View File

@ -0,0 +1,11 @@
{{ config (
materialized = 'view'
) }}
{% set model = this.identifier.split("_")[-3:] | join('_') %}
{{ streamline_external_table_query(
model,
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "_partition_by_block_id",
unique_key = "id"
) }}

View File

@ -0,0 +1,11 @@
{{ config (
materialized = 'view'
) }}
{% set model = this.identifier.split("_")[-2:] | join('_') %}
{{ streamline_external_table_query(
model,
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "_partition_by_block_id",
unique_key = "id"
) }}

View File

@ -0,0 +1,5 @@
{% docs amount_fee_usd %}
Amount denominated in USD, where pricing data is available.
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs amount_usd %}
Amount denominated in USD, where pricing data is available.
{% enddocs %}

View File

@ -1,5 +1,5 @@
{% docs arguments %}
The arguments included in the transaction body.
The arguments passed into the Cadence script when the transaction was submitted.
{% enddocs %}

View File

@ -0,0 +1,137 @@
{% docs prices_dim_asset_metadata_table_doc %}
A comprehensive dimensional table holding asset metadata and other relevant details pertaining to each id, from multiple providers. This data set includes raw, non-transformed data coming directly from the provider APIs and rows are not intended to be unique. As a result, there may be data quality issues persisting in the APIs that flow through to this dimensional model. If you are interested in using a curated data set instead, please utilize ez_asset_metadata.
{% enddocs %}
{% docs prices_ez_asset_metadata_table_doc %}
A convenience table holding prioritized asset metadata and other relevant details pertaining to each token_address and native asset. This data set is highly curated and contains metadata for one unique asset per blockchain.
{% enddocs %}
{% docs prices_fact_prices_ohlc_hourly_table_doc %}
A comprehensive fact table holding id and provider specific open, high, low, close hourly prices, from multiple providers. This data set includes raw, non-transformed data coming directly from the provider APIs and rows are not intended to be unique. As a result, there may be data quality issues persisting in the APIs that flow through to this fact based model. If you are interested in using a curated data set instead, please utilize ez_prices_hourly.
{% enddocs %}
{% docs prices_ez_prices_hourly_table_doc %}
A convenience table for determining token prices by address and blockchain, and native asset prices by symbol and blockchain. This data set is highly curated and contains metadata for one price per hour per unique asset and blockchain.
{% enddocs %}
{% docs prices_provider %}
The provider or source of the data.
{% enddocs %}
{% docs prices_asset_id %}
The unique identifier representing the asset.
{% enddocs %}
{% docs prices_name %}
The name of asset.
{% enddocs %}
{% docs prices_symbol %}
The symbol of asset.
{% enddocs %}
{% docs prices_token_address %}
The specific address representing the asset on a specific platform. This will be NULL if referring to a native asset.
{% enddocs %}
{% docs prices_blockchain %}
The Blockchain, Network, or Platform for this asset.
{% enddocs %}
{% docs prices_blockchain_id %}
The unique identifier of the Blockchain, Network, or Platform for this asset.
{% enddocs %}
{% docs prices_decimals %}
The number of decimals for the asset. May be NULL.
{% enddocs %}
{% docs prices_is_native %}
A flag indicating assets native to the respective blockchain.
{% enddocs %}
{% docs prices_is_deprecated %}
A flag indicating if the asset is deprecated or no longer supported by the provider.
{% enddocs %}
{% docs prices_id_deprecation %}
Deprecating soon! Please use the `asset_id` column instead.
{% enddocs %}
{% docs prices_decimals_deprecation %}
Deprecating soon! Please use the decimals column in `ez_asset_metadata` or join in `dim_contracts` instead.
{% enddocs %}
{% docs prices_hour %}
Hour that the price was recorded at.
{% enddocs %}
{% docs prices_price %}
Closing price of the recorded hour in USD.
{% enddocs %}
{% docs prices_is_imputed %}
A flag indicating if the price was imputed, or derived, from the last arriving record. This is generally used for tokens with low-liquidity or inconsistent reporting.
{% enddocs %}
{% docs prices_open %}
Opening price of the recorded hour in USD.
{% enddocs %}
{% docs prices_high %}
Highest price of the recorded hour in USD
{% enddocs %}
{% docs prices_low %}
Lowest price of the recorded hour in USD
{% enddocs %}
{% docs prices_close %}
Closing price of the recorded hour in USD
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs token_symbol %}
Abbreviated symbol for the asset.
{% enddocs %}

View File

@ -3,61 +3,16 @@
tags = ['scheduled']
) }}
WITH chainwalkers AS (
SELECT
NULL AS event_contract_id,
event_contract,
contract_name,
account_address,
_inserted_timestamp,
NULL AS inserted_timestamp,
NULL AS modified_timestamp
FROM
{{ ref('silver__contract_labels') }}
),
streamline AS (
SELECT
event_contract_id,
event_contract,
contract_name,
account_address,
_inserted_timestamp,
inserted_timestamp,
modified_timestamp
FROM
{{ ref('silver__contract_labels_s') }}
),
FINAL AS (
SELECT
*
FROM
chainwalkers
UNION ALL
SELECT
*
FROM
streamline
)
SELECT
event_contract_id,
event_contract,
contract_name,
account_address,
inserted_timestamp,
modified_timestamp,
COALESCE (
event_contract_id,
{{ dbt_utils.generate_surrogate_key(['event_contract']) }}
) AS dim_contract_labels_id,
COALESCE (
inserted_timestamp,
_inserted_timestamp
) AS inserted_timestamp,
COALESCE (
modified_timestamp,
_inserted_timestamp
) AS modified_timestamp
) AS dim_contract_labels_id
FROM
FINAL qualify ROW_NUMBER() over (
PARTITION BY event_contract
ORDER BY
_inserted_timestamp DESC
) = 1
{{ ref('silver__contract_labels_s') }}

View File

@ -3,68 +3,6 @@
tags = ['ez', 'scheduled']
) }}
WITH chainwalkers AS (
SELECT
NULL AS token_transfers_id,
block_height,
block_timestamp,
tx_id,
sender,
recipient,
token_contract,
amount,
tx_succeeded,
_inserted_timestamp,
NULL AS inserted_timestamp,
NULL AS modified_timestamp
FROM
{{ ref('silver__token_transfers') }}
WHERE
token_contract NOT IN (
'A.c38aea683c0c4d38.ZelosAccountingToken',
'A.f1b97c06745f37ad.SwapPair'
)
AND block_height < {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
streamline AS (
SELECT
token_transfers_id,
block_height,
block_timestamp,
tx_id,
sender,
recipient,
token_contract,
amount,
tx_succeeded,
_inserted_timestamp,
inserted_timestamp,
modified_timestamp
FROM
{{ ref('silver__token_transfers_s') }}
WHERE
token_contract NOT IN (
'A.c38aea683c0c4d38.ZelosAccountingToken',
'A.f1b97c06745f37ad.SwapPair'
)
AND block_height >= {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
FINAL AS (
SELECT
*
FROM
streamline
UNION ALL
SELECT
*
FROM
chainwalkers
)
SELECT
block_height,
block_timestamp,
@ -80,13 +18,12 @@ SELECT
['tx_id','sender', 'recipient','token_contract', 'amount']
) }}
) AS ez_token_transfers_id,
COALESCE (
inserted_timestamp,
_inserted_timestamp
) AS inserted_timestamp,
COALESCE (
modified_timestamp,
_inserted_timestamp
) AS modified_timestamp
inserted_timestamp,
modified_timestamp
FROM
FINAL
{{ ref('silver__token_transfers_s') }}
WHERE
token_contract NOT IN (
'A.c38aea683c0c4d38.ZelosAccountingToken',
'A.f1b97c06745f37ad.SwapPair'
)

View File

@ -15,7 +15,6 @@ models:
- amount
config:
severity: warn
error_if: ">10"
columns:
- name: TX_ID

View File

@ -3,67 +3,12 @@
tags = ['scheduled']
) }}
WITH chainwalkers AS (
SELECT
NULL AS blocks_id,
block_height,
block_timestamp,
network,
network_version,
chain_id,
tx_count,
id,
parent_id,
_inserted_timestamp,
NULL AS inserted_timestamp,
NULL AS modified_timestamp
FROM
{{ ref('silver__blocks') }}
WHERE
block_height < {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
streamline AS (
SELECT
blocks_id,
block_height,
block_timestamp,
'mainnet' AS network,
network_version,
'flow' AS chain_id,
tx_count,
id,
parent_id,
_inserted_timestamp,
inserted_timestamp,
modified_timestamp
FROM
{{ ref('silver__streamline_blocks') }}
WHERE
block_height >= {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
FINAL AS (
SELECT
*
FROM
chainwalkers
UNION ALL
SELECT
*
FROM
streamline
)
SELECT
blocks_id,
block_height,
block_height :: INT AS block_height,
block_timestamp,
network,
'mainnet' AS network,
network_version,
chain_id,
'flow' AS chain_id,
tx_count,
id,
parent_id,
@ -71,13 +16,8 @@ SELECT
blocks_id,
{{ dbt_utils.generate_surrogate_key(['block_height']) }}
) AS fact_blocks_id,
COALESCE (
inserted_timestamp,
_inserted_timestamp
) AS inserted_timestamp,
COALESCE (
modified_timestamp,
_inserted_timestamp
) AS modified_timestamp
_inserted_timestamp,
inserted_timestamp,
modified_timestamp
FROM
FINAL
{{ ref('silver__streamline_blocks') }}

View File

@ -3,64 +3,10 @@
tags = ['scheduled']
) }}
WITH chainwalkers AS (
SELECT
NULL AS streamline_event_id,
tx_id,
block_timestamp,
block_height,
tx_succeeded,
event_index,
event_contract,
event_type,
event_data,
_inserted_timestamp,
NULL AS inserted_timestamp,
NULL AS modified_timestamp
FROM
{{ ref('silver__events_final') }}
WHERE
block_height < {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
streamline AS (
SELECT
streamline_event_id,
tx_id,
block_timestamp,
block_height,
tx_succeeded,
event_index,
event_contract,
event_type,
event_data,
_inserted_timestamp,
inserted_timestamp,
modified_timestamp
FROM
{{ ref('silver__streamline_events') }}
WHERE
block_height >= {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
FINAL AS (
SELECT
*
FROM
chainwalkers
UNION ALL
SELECT
*
FROM
streamline
)
SELECT
tx_id,
block_timestamp,
block_height,
block_height :: INT AS block_height,
tx_succeeded,
event_index,
event_contract,
@ -70,13 +16,8 @@ SELECT
streamline_event_id,
{{ dbt_utils.generate_surrogate_key(['tx_id']) }}
) AS fact_events_id,
COALESCE (
inserted_timestamp,
_inserted_timestamp
) AS inserted_timestamp,
COALESCE (
modified_timestamp,
_inserted_timestamp
) AS modified_timestamp
_inserted_timestamp,
inserted_timestamp,
modified_timestamp
FROM
FINAL
{{ ref('silver__streamline_events') }}

View File

@ -3,103 +3,35 @@
tags = ['scheduled']
) }}
WITH chainwalkers AS (
SELECT
NULL AS streamline_transaction_id,
tx_id,
block_timestamp,
block_height,
chain_id,
tx_index,
proposer,
payer,
authorizers,
count_authorizers,
gas_limit,
transaction_result,
tx_succeeded,
error_msg,
_inserted_timestamp,
NULL AS inserted_timestamp,
NULL AS modified_timestamp
FROM
{{ ref('silver__transactions') }}
WHERE
block_height < {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
streamline AS (
SELECT
streamline_transaction_id,
tx_id,
block_timestamp,
block_height,
'flow' AS chain_id,
NULL AS tx_index,
proposer,
payer,
authorizers,
count_authorizers,
gas_limit,
OBJECT_CONSTRUCT(
'error',
error_message,
'events',
events,
'status',
status
) AS transaction_result,
tx_succeeded,
error_message AS error_msg,
_inserted_timestamp,
NULL AS inserted_timestamp,
NULL AS modified_timestamp
FROM
{{ ref('silver__streamline_transactions_final') }}
WHERE
NOT pending_result_response
AND block_height >= {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
FINAL AS (
SELECT
*
FROM
chainwalkers
UNION ALL
SELECT
*
FROM
streamline
)
SELECT
tx_id,
block_timestamp,
block_height,
chain_id,
tx_index,
block_height :: INT AS block_height,
'flow' AS chain_id,
proposer,
payer,
authorizers,
count_authorizers,
gas_limit,
transaction_result,
script,
arguments,
OBJECT_CONSTRUCT(
'error',
error_message,
'events',
events,
'status',
status
) AS transaction_result,
tx_succeeded,
error_msg,
error_message AS error_msg,
COALESCE (
streamline_transaction_id,
{{ dbt_utils.generate_surrogate_key(['tx_id']) }}
) AS fact_transactions_id,
COALESCE (
inserted_timestamp,
_inserted_timestamp
) AS inserted_timestamp,
COALESCE (
modified_timestamp,
_inserted_timestamp
) AS modified_timestamp
inserted_timestamp,
modified_timestamp
FROM
FINAL
{{ ref('silver__streamline_transactions_final') }}
WHERE
NOT pending_result_response

View File

@ -85,6 +85,12 @@ models:
column_type_list:
- NUMBER
- name: SCRIPT
description: "{{ doc('script') }}"
- name: ARGUMENTS
description: "{{ doc('arguments') }}"
- name: TRANSACTION_RESULT
description: "{{ doc('transaction_result') }}"
tests:
@ -92,6 +98,7 @@ models:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- VARIANT
- OBJECT
- name: TX_SUCCEEDED
description: "{{ doc('tx_succeeded') }}"

View File

@ -3,40 +3,8 @@
tag = ['scheduled']
) }}
WITH pairs_cw AS (
WITH pairs_s AS (
SELECT
tx_id,
NULL AS labels_pools_metapier_id,
swap_contract,
deployment_timestamp,
token0_contract,
token1_contract,
pool_id,
vault_address,
NULL AS inserted_timestamp,
_inserted_timestamp,
NULL AS modified_timestamp
FROM
{{ ref('silver__labels_pools') }}
),
metapier_cw AS (
SELECT
tx_id,
NULL AS labels_pools_metapier_id,
swap_contract,
deployment_timestamp,
token0_contract,
token1_contract,
pool_id,
vault_address,
NULL AS inserted_timestamp,
_inserted_timestamp,
NULL AS modified_timestamp
FROM
{{ ref('silver__labels_pools_metapier') }}
),
pairs_s AS (
SELECT
tx_id,
labels_pools_id AS labels_pools_metapier_id,
@ -47,7 +15,6 @@ pairs_s AS (
pool_id,
vault_address,
inserted_timestamp,
_inserted_timestamp,
modified_timestamp
FROM
{{ ref('silver__labels_pools_s') }}
@ -63,27 +30,16 @@ metapier_s AS (
pool_id,
vault_address,
inserted_timestamp,
_inserted_timestamp,
modified_timestamp
FROM
{{ ref('silver__labels_pools_metapier_s') }}
),
FINAL AS (
SELECT
*
FROM
pairs_cw
UNION
SELECT
*
FROM
metapier_cw
UNION
SELECT
*
FROM
pairs_s
UNION
UNION ALL
SELECT
*
FROM
@ -100,13 +56,7 @@ SELECT
labels_pools_metapier_id,
{{ dbt_utils.generate_surrogate_key(['tx_id']) }}
) AS dim_swap_pool_labels_id,
COALESCE (
inserted_timestamp,
_inserted_timestamp
) AS inserted_timestamp,
COALESCE (
modified_timestamp,
_inserted_timestamp
) AS modified_timestamp
inserted_timestamp,
modified_timestamp
FROM
FINAL
FINAL

View File

@ -0,0 +1,56 @@
{{ config(
materialized = 'view',
tags = ['ez', 'bridge', 'scheduled'],
meta ={ 'database_tags':{ 'table':{ 'PURPOSE': 'BRIDGE' }} }
) }}
WITH prices AS (
SELECT
hour,
token_address,
symbol,
price
FROM
{{ ref('silver__complete_token_prices') }}
UNION ALL
SELECT
hour,
'A.1654653399040a61.FlowToken' AS token_address,
symbol,
price
FROM
{{ ref('silver__complete_native_prices') }}
)
SELECT
tx_id,
block_timestamp,
block_height,
bridge_address,
b.token_address,
p.symbol AS token_symbol,
gross_amount AS amount,
amount_fee,
gross_amount * p.price AS amount_usd,
amount_fee * p.price AS amount_fee_usd,
source_address,
destination_address,
source_chain,
destination_chain,
platform,
bridge_complete_id AS ez_bridge_activity_id,
inserted_timestamp,
modified_timestamp
FROM
{{ ref('silver__bridge_complete') }} b
LEFT JOIN prices p
ON LOWER(
b.token_address
) = LOWER(
p.token_address
)
AND DATE_TRUNC(
'hour',
b.block_timestamp
) = p.hour

View File

@ -1,32 +1,29 @@
version: 2
models:
- name: silver__bridge_blocto
- name: defi__ez_bridge_activity
description: |-
This table parses transactions where tokens are bridged to or from the Flow network via Blocto Teleport.
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- tx_id
This table parses transactions where tokens are bridged to or from the Flow network using Blocto teleport or the Celer bridge.
columns:
- name: tx_id
- name: TX_ID
description: "{{ doc('tx_id') }}"
tests:
- not_null
- name: block_timestamp
- name: BLOCK_TIMESTAMP
description: "{{ doc('block_timestamp') }}"
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 3
where: block_height >= 55114467
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: block_height
- name: BLOCK_HEIGHT
description: "{{ doc('block_height') }}"
tests:
- not_null
@ -35,8 +32,8 @@ models:
- NUMBER
- FLOAT
- name: teleport_contract
description: "{{ doc('teleport_contract') }}"
- name: BRIDGE_ADDRESS
description: "{{ doc('bridge_contract') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
@ -44,7 +41,7 @@ models:
- STRING
- VARCHAR
- name: token_contract
- name: TOKEN_ADDRESS
description: "{{ doc('token_contract') }}"
tests:
- not_null
@ -53,8 +50,11 @@ models:
- STRING
- VARCHAR
- name: gross_amount
description: "{{ doc('gross_amount') }}"
- name: TOKEN_SYMBOL
description: "{{ doc('token_symbol') }}"
- name: AMOUNT
description: "{{ doc('amount') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
@ -63,45 +63,39 @@ models:
- DOUBLE
- FLOAT
- name: amount_fee
- name: AMOUNT_FEE
description: "{{ doc('amount_fee') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- DOUBLE
- FLOAT
- not_null:
where: platform = 'blocto'
- name: net_amount
description: "{{ doc('net_amount') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- DOUBLE
- FLOAT
- name: AMOUNT_USD
description: "{{ doc('amount_usd') }}"
- name: AMOUNT_FEE_USD
description: "{{ doc('amount_fee_usd') }}"
- name: flow_wallet_address
- name: SOURCE_ADDRESS
description: "{{ doc('flow_wallet_address') }}"
tests:
- not_null
- not_null:
where: source_chain = 'flow'
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: teleport_direction
description: "{{ doc('teleport_direction') }}"
- name: DESTINATION_ADDRESS
description: "{{ doc('flow_wallet_address') }}"
tests:
- not_null
- not_null:
where: destination_chain = 'flow'
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: blockchain
- name: SOURCE_CHAIN
description: "{{ doc('blockchain') }}"
tests:
- not_null
@ -110,7 +104,16 @@ models:
- STRING
- VARCHAR
- name: bridge
- name: DESTINATION_CHAIN
description: "{{ doc('blockchain') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: PLATFORM
description: "{{ doc('bridge') }}"
tests:
- not_null
@ -119,13 +122,11 @@ models:
- STRING
- VARCHAR
- name: _ingested_at
description: "{{ doc('_ingested_at') }}"
- name: FACT_BRIDGE_TRANSACTIONS_ID
description: "{{ doc('pk_id') }}"
- name: _inserted_timestamp
description: "{{ doc('_inserted_timestamp') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: INSERTED_TIMESTAMP
description: "{{ doc('inserted_timestamp') }}"
- name: MODIFIED_TIMESTAMP
description: "{{ doc('modified_timestamp') }}"

View File

@ -4,84 +4,20 @@
meta ={ 'database_tags':{ 'table':{ 'PURPOSE': 'BRIDGE' }} }
) }}
WITH blocto_cw AS (
SELECT
*
FROM
{{ ref('silver__bridge_blocto') }}
WHERE
block_height < {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
celer_cw AS (
SELECT
*
FROM
{{ ref('silver__bridge_celer') }}
WHERE
block_height < {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
WITH
blocto_s AS (
SELECT
*
FROM
{{ ref('silver__bridge_blocto_s') }}
WHERE
block_height >= {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
celer_s AS (
SELECT
*
FROM
{{ ref('silver__bridge_celer_s') }}
WHERE
block_height >= {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
combo AS (
SELECT
NULL AS bridge_id,
tx_id,
block_timestamp,
block_height,
teleport_contract AS bridge_contract,
token_contract,
gross_amount AS amount,
flow_wallet_address,
blockchain,
teleport_direction AS direction,
bridge,
_inserted_timestamp,
NULL AS inserted_timestamp,
NULL AS modified_timestamp
FROM
blocto_cw
UNION ALL
SELECT
NULL AS bridge_id,
tx_id,
block_timestamp,
block_height,
bridge_contract,
token_contract,
amount,
flow_wallet_address,
blockchain,
direction,
bridge,
_inserted_timestamp,
NULL AS inserted_timestamp,
NULL AS modified_timestamp
FROM
celer_cw
UNION ALL
SELECT
bridge_blocto_id AS bridge_id,
tx_id,
@ -94,7 +30,6 @@ combo AS (
blockchain,
teleport_direction AS direction,
bridge,
_inserted_timestamp,
inserted_timestamp,
modified_timestamp
FROM
@ -112,7 +47,6 @@ combo AS (
blockchain,
direction,
bridge,
_inserted_timestamp,
inserted_timestamp,
modified_timestamp
FROM
@ -133,13 +67,7 @@ SELECT
bridge_id,
{{ dbt_utils.generate_surrogate_key(['tx_id']) }}
) AS ez_bridge_transactions_id,
COALESCE (
inserted_timestamp,
_inserted_timestamp
) AS inserted_timestamp,
COALESCE (
modified_timestamp,
_inserted_timestamp
) AS modified_timestamp
inserted_timestamp,
modified_timestamp
FROM
combo

View File

@ -3,8 +3,8 @@ version: 2
models:
- name: defi__ez_bridge_transactions
description: |-
This table parses transactions where tokens are bridged to or from the Flow network using Blocto teleport or the Celer bridge.
Deprecating Soon! This table will be deprecated on September 1st, please migrate work `defi__ez_bridge_activity` instead.
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:

View File

@ -0,0 +1,69 @@
{{ config (
materialized = 'view',
tags = ['ez', 'scheduled'],
meta ={ 'database_tags':{ 'table':{ 'PURPOSE': 'SWAPS' }} }
) }}
WITH prices AS (
SELECT
hour,
token_address,
symbol,
price
FROM
{{ ref('silver__complete_token_prices') }}
UNION ALL
SELECT
hour,
'A.1654653399040a61.FlowToken' AS token_address,
symbol,
price
FROM
{{ ref('silver__complete_native_prices') }}
)
SELECT
tx_id,
block_timestamp,
block_height,
swap_contract AS contract_address,
swap_index,
trader,
platform,
token_out_source AS origin_from_address,
token_out_contract AS token_out,
po.symbol AS token_out_symbol,
token_out_amount AS amount_out,
token_out_amount * po.price AS amount_out_usd,
token_in_destination AS origin_to_address,
token_in_contract AS token_in,
pi.symbol AS token_in_symbol,
token_in_amount AS amount_in,
token_in_amount * pi.price AS amount_in_usd,
swaps_final_id AS ez_dex_swaps_id,
inserted_timestamp,
modified_timestamp
FROM
{{ ref('silver__swaps_final') }} s
LEFT JOIN prices po
ON LOWER(
s.token_out_contract
) = LOWER(
po.token_address
)
AND DATE_TRUNC(
'hour',
s.block_timestamp
) = po.hour
LEFT JOIN prices pi
ON LOWER(
s.token_in_contract
) = LOWER(
pi.token_address
)
AND DATE_TRUNC(
'hour',
s.block_timestamp
) = pi.hour
WHERE
token_in_contract IS NOT NULL

View File

@ -0,0 +1,88 @@
version: 2
models:
- name: defi__ez_dex_swaps
description: |-
This table records asset swaps on the Flow blockchain, with price and label information.
columns:
- name: TX_ID
description: "{{ doc('tx_id') }}"
tests:
- not_null
- name: BLOCK_TIMESTAMP
description: "{{ doc('block_timestamp') }}"
tests:
- not_null
- name: BLOCK_HEIGHT
description: "{{ doc('block_height') }}"
tests:
- not_null
- name: CONTRACT_ADDRESS
description: "{{ doc('swap_contract') }}"
tests:
- not_null
- name: SWAP_INDEX
description: "{{ doc('swap_index') }}"
tests:
- not_null
- name: TRADER
description: "{{ doc('trader') }}"
tests:
- not_null:
severity: warn
- name: PLATFORM
description: "{{ doc('platform') }}"
- name: ORIGIN_FROM_ADDRESS
description: "{{ doc('token_out_source') }}"
- name: TOKEN_OUT
description: "{{ doc('token_out_contract') }}"
tests:
- not_null
- name: TOKEN_OUT_SYMBOL
description: "{{ doc('symbol') }}"
- name: AMOUNT_OUT
description: "{{ doc('token_out_amount') }}"
tests:
- not_null
- name: AMOUNT_OUT_USD
description: "{{ doc('amount_usd') }}"
- name: ORIGIN_TO_ADDRESS
description: "{{ doc('token_in_destination') }}"
- name: TOKEN_IN
description: "{{ doc('token_in_contract') }}"
tests:
- not_null
- name: TOKEN_IN_SYMBOL
description: "{{ doc('symbol') }}"
- name: AMOUNT_IN
description: "{{ doc('token_in_amount') }}"
tests:
- not_null
- name: AMOUNT_IN_USD
description: "{{ doc('amount_usd') }}"
- name: EZ_DEX_SWAPS_ID
description: "{{ doc('pk_id') }}"
- name: INSERTED_TIMESTAMP
description: "{{ doc('inserted_timestamp') }}"
- name: MODIFIED_TIMESTAMP
description: "{{ doc('modified_timestamp') }}"

View File

@ -4,67 +4,8 @@
meta ={ 'database_tags':{ 'table':{ 'PURPOSE': 'SWAPS' }} }
) }}
WITH chainwalkers AS (
SELECT
NULL AS swaps_id,
tx_id,
block_timestamp,
block_height,
swap_contract,
swap_index,
trader,
token_out_source,
token_out_contract,
token_out_amount,
token_in_destination,
token_in_contract,
token_in_amount,
NULL AS inserted_timestamp,
NULL AS modified_timestamp
FROM
{{ ref('silver__swaps') }}
WHERE
block_height < {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
streamline AS (
SELECT
swaps_final_id AS swaps_id,
tx_id,
block_timestamp,
block_height,
swap_contract,
swap_index,
trader,
token_out_source,
token_out_contract,
token_out_amount,
token_in_destination,
token_in_contract,
token_in_amount,
inserted_timestamp,
modified_timestamp
FROM
{{ ref('silver__swaps_final') }}
WHERE
block_height >= {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
FINAL AS (
SELECT
*
FROM
chainwalkers
UNION ALL
SELECT
*
FROM
streamline
)
SELECT
swaps_final_id AS swaps_id,
tx_id,
block_timestamp,
block_height,
@ -81,9 +22,9 @@ SELECT
swaps_id,
{{ dbt_utils.generate_surrogate_key(['tx_id', 'swap_index']) }}
) AS ez_swaps_id,
COALESCE(inserted_timestamp, '2000-01-01' :: TIMESTAMP_NTZ) AS inserted_timestamp,
COALESCE(modified_timestamp, '2000-01-01' :: TIMESTAMP_NTZ) AS modified_timestamp
inserted_timestamp,
modified_timestamp
FROM
FINAL
{{ ref('silver__swaps_final') }}
WHERE
token_in_contract IS NOT NULL

View File

@ -3,7 +3,7 @@ version: 2
models:
- name: defi__ez_swaps
description: |-
This table records asset swaps on the Flow blockchain.
Deprecating Soon! Please migrate work to the new `defi.ez_dex_swaps` table by September 1st.
columns:
- name: TX_ID
@ -34,7 +34,8 @@ models:
- name: TRADER
description: "{{ doc('trader') }}"
tests:
- not_null
- not_null:
severity: warn
- name: TOKEN_OUT_SOURCE
description: "{{ doc('token_out_source') }}"

View File

@ -0,0 +1,24 @@
{{ config(
materialized = 'view',
tags = ['ez', 'bridge', 'scheduled'],
meta ={ 'database_tags':{ 'table':{ 'PURPOSE': 'BRIDGE' }} }
) }}
SELECT
tx_id,
block_timestamp,
block_height,
bridge_address,
token_address,
gross_amount AS amount,
amount_fee,
source_address,
destination_address,
source_chain,
destination_chain,
platform,
bridge_complete_id AS fact_bridge_activity_id,
inserted_timestamp,
modified_timestamp
FROM
{{ ref('silver__bridge_complete') }}

View File

@ -1,32 +1,29 @@
version: 2
models:
- name: silver__bridge_celer
- name: defi__fact_bridge_activity
description: |-
This table parses transactions where tokens are bridged to or from the Flow network using the Celer cBridge.
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- tx_id
This table parses transactions where tokens are bridged to or from the Flow network using Blocto teleport or the Celer bridge.
columns:
- name: tx_id
- name: TX_ID
description: "{{ doc('tx_id') }}"
tests:
- not_null
- name: block_timestamp
- name: BLOCK_TIMESTAMP
description: "{{ doc('block_timestamp') }}"
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 3
where: block_height >= 55114467
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: block_height
- name: BLOCK_HEIGHT
description: "{{ doc('block_height') }}"
tests:
- not_null
@ -35,7 +32,7 @@ models:
- NUMBER
- FLOAT
- name: bridge_contract
- name: BRIDGE_ADDRESS
description: "{{ doc('bridge_contract') }}"
tests:
- not_null
@ -44,7 +41,7 @@ models:
- STRING
- VARCHAR
- name: token_contract
- name: TOKEN_ADDRESS
description: "{{ doc('token_contract') }}"
tests:
- not_null
@ -53,7 +50,7 @@ models:
- STRING
- VARCHAR
- name: amount
- name: AMOUNT
description: "{{ doc('amount') }}"
tests:
- not_null
@ -63,33 +60,33 @@ models:
- DOUBLE
- FLOAT
- name: flow_wallet_address
- name: AMOUNT_FEE
description: "{{ doc('amount_fee') }}"
tests:
- not_null:
where: platform = 'blocto'
- name: SOURCE_ADDRESS
description: "{{ doc('flow_wallet_address') }}"
tests:
- not_null
- not_null:
where: source_chain = 'flow'
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: counterparty
description: "{{ doc('counterparty') }}"
- name: DESTINATION_ADDRESS
description: "{{ doc('flow_wallet_address') }}"
tests:
- not_null
- not_null:
where: destination_chain = 'flow'
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: chain_id
description: "{{ doc('chain_id') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: blockchain
- name: SOURCE_CHAIN
description: "{{ doc('blockchain') }}"
tests:
- not_null
@ -98,8 +95,8 @@ models:
- STRING
- VARCHAR
- name: direction
description: "{{ doc('direction') }}"
- name: DESTINATION_CHAIN
description: "{{ doc('blockchain') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
@ -107,7 +104,7 @@ models:
- STRING
- VARCHAR
- name: bridge
- name: PLATFORM
description: "{{ doc('bridge') }}"
tests:
- not_null
@ -116,13 +113,11 @@ models:
- STRING
- VARCHAR
- name: _ingested_at
description: "{{ doc('_ingested_at') }}"
- name: FACT_BRIDGE_TRANSACTIONS_ID
description: "{{ doc('pk_id') }}"
- name: _inserted_timestamp
description: "{{ doc('_inserted_timestamp') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: INSERTED_TIMESTAMP
description: "{{ doc('inserted_timestamp') }}"
- name: MODIFIED_TIMESTAMP
description: "{{ doc('modified_timestamp') }}"

View File

@ -0,0 +1,27 @@
{{ config (
materialized = 'view',
tags = ['ez', 'scheduled'],
meta ={ 'database_tags':{ 'table':{ 'PURPOSE': 'SWAPS' }} }
) }}
SELECT
tx_id,
block_timestamp,
block_height,
swap_contract AS contract_address,
swap_index,
trader,
platform,
token_out_source AS origin_from_address,
token_out_contract AS token_out,
token_out_amount AS amount_out,
token_in_destination AS origin_to_address,
token_in_contract AS token_in,
token_in_amount AS amount_in,
swaps_final_id AS fact_dex_swaps_id,
inserted_timestamp,
modified_timestamp
FROM
{{ ref('silver__swaps_final') }}
WHERE
token_in_contract IS NOT NULL

View File

@ -0,0 +1,76 @@
version: 2
models:
- name: defi__fact_dex_swaps
description: |-
This table records asset swaps on the Flow blockchain.
columns:
- name: TX_ID
description: "{{ doc('tx_id') }}"
tests:
- not_null
- name: BLOCK_TIMESTAMP
description: "{{ doc('block_timestamp') }}"
tests:
- not_null
- name: BLOCK_HEIGHT
description: "{{ doc('block_height') }}"
tests:
- not_null
- name: CONTRACT_ADDRESS
description: "{{ doc('swap_contract') }}"
tests:
- not_null
- name: SWAP_INDEX
description: "{{ doc('swap_index') }}"
tests:
- not_null
- name: TRADER
description: "{{ doc('trader') }}"
tests:
- not_null:
severity: warn
- name: PLATFORM
description: "{{ doc('platform') }}"
- name: ORIGIN_FROM_ADDRESS
description: "{{ doc('token_out_source') }}"
- name: AMOUNT_OUT
description: "{{ doc('token_out_amount') }}"
tests:
- not_null
- name: TOKEN_OUT
description: "{{ doc('token_out_contract') }}"
tests:
- not_null
- name: ORIGIN_TO_ADDRESS
description: "{{ doc('token_in_destination') }}"
- name: AMOUNT_IN
description: "{{ doc('token_in_amount') }}"
tests:
- not_null
- name: TOKEN_IN
description: "{{ doc('token_in_contract') }}"
tests:
- not_null
- name: FACT_DEX_SWAPS_ID
description: "{{ doc('pk_id') }}"
- name: INSERTED_TIMESTAMP
description: "{{ doc('inserted_timestamp') }}"
- name: MODIFIED_TIMESTAMP
description: "{{ doc('modified_timestamp') }}"

View File

@ -1,65 +1,9 @@
{{ config(
materialized = 'view',
tags = ['ez', 'scheduled'],
meta ={ 'database_tags':{ 'table':{ 'PURPOSE': 'STAKING' }}}
meta ={ 'database_tags':{ 'table':{ 'PURPOSE': 'STAKING' }} }
) }}
WITH chainwalkers AS (
SELECT
*
FROM
{{ ref('silver__staking_actions') }}
WHERE
block_height < {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
streamline AS (
SELECT
*
FROM
{{ ref('silver__staking_actions_s') }}
WHERE
block_height >= {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
FINAL AS (
SELECT
NULL AS staking_actions_id,
tx_id,
event_index,
block_timestamp,
block_height,
tx_succeeded,
delegator,
action,
amount,
node_id,
_inserted_timestamp,
NULL AS inserted_timestamp,
NULL AS modified_timestamp
FROM
chainwalkers
UNION ALL
SELECT
staking_actions_id,
tx_id,
event_index,
block_timestamp,
block_height,
tx_succeeded,
delegator,
action,
amount,
node_id,
_inserted_timestamp,
inserted_timestamp,
modified_timestamp
FROM
streamline
)
SELECT
tx_id,
event_index,
@ -74,13 +18,7 @@ SELECT
staking_actions_id,
{{ dbt_utils.generate_surrogate_key(['tx_id']) }}
) AS ez_staking_actions_id,
COALESCE (
inserted_timestamp,
_inserted_timestamp
) AS inserted_timestamp,
COALESCE (
modified_timestamp,
_inserted_timestamp
) AS modified_timestamp
inserted_timestamp,
modified_timestamp
FROM
FINAL
{{ ref('silver__staking_actions_s') }}

View File

@ -6,32 +6,6 @@
WITH allday AS (
SELECT
NULL AS nft_unique_id,
nft_id,
nft_collection,
nflallday_id,
serial_number,
moment_tier,
total_circulation,
moment_description,
player,
team,
season,
week,
classification,
play_type,
moment_date,
series,
set_name,
video_urls,
moment_stats_full,
_inserted_timestamp,
_inserted_timestamp AS inserted_timestamp,
_inserted_timestamp AS modified_timestamp
FROM
{{ ref('silver__nft_allday_metadata') }}
UNION
SELECT
nft_allday_metadata_s_id AS nft_unique_id,
nft_id,
@ -52,7 +26,6 @@ WITH allday AS (
set_name,
video_urls,
moment_stats_full,
_inserted_timestamp,
inserted_timestamp,
modified_timestamp
FROM
@ -78,7 +51,6 @@ WITH allday AS (
set_name,
video_urls,
moment_stats_full,
_inserted_timestamp,
_inserted_timestamp AS inserted_timestamp,
_inserted_timestamp AS modified_timestamp
FROM
@ -107,13 +79,7 @@ SELECT
nft_unique_id,
{{ dbt_utils.generate_surrogate_key(['nft_id']) }}
) AS dim_allday_metadata_id,
COALESCE (
inserted_timestamp,
_inserted_timestamp
) AS inserted_timestamp,
COALESCE (
modified_timestamp,
_inserted_timestamp
) AS modified_timestamp
inserted_timestamp,
modified_timestamp
FROM
allday

View File

@ -5,99 +5,56 @@ models:
description: |-
Data for NFL AllDay Moments, including player, team, stats and more. This is produced via API and may differ in structure from metadata available on-chain in the `dim_moment_metadata` table.
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- nft_id
severity: error
error_if: ">10"
warn_if: "<10"
columns:
- name: NFT_ID
description: "{{ doc('nft_id') }}"
tests:
- not_null:
severity: error
error_if: ">10"
warn_if: "<10"
- unique:
severity: error
error_if: ">10"
warn_if: "<10"
- not_null
- unique
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
severity: error
error_if: ">10"
warn_if: "<10"
- name: NFT_COLLECTION
description: "{{ doc('nft_collection') }}"
tests:
- not_null:
severity: error
error_if: ">10"
warn_if: "<10"
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
severity: error
error_if: ">10"
warn_if: "<10"
- name: NFLALLDAY_ID
description: "{{ doc('nflallday_id') }}"
tests:
- not_null:
severity: error
error_if: ">10"
warn_if: "<10"
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
severity: error
error_if: ">10"
warn_if: "<10"
- name: SERIAL_NUMBER
description: "{{ doc('serial_number') }}"
tests:
- not_null:
severity: error
error_if: ">10"
warn_if: "<10"
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
severity: error
error_if: ">10"
warn_if: "<10"
- name: TOTAL_CIRCULATION
description: "{{ doc('total_circulation') }}"
tests:
- not_null:
severity: error
error_if: ">10"
warn_if: "<10"
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
severity: error
error_if: ">10"
warn_if: "<10"
- name: MOMENT_DESCRIPTION
description: "{{ doc('moment_description') }}"
tests:
- not_null:
severity: error
error_if: ">10"
warn_if: "<10"
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
@ -106,10 +63,7 @@ models:
- name: PLAYER
description: "{{ doc('player') }}"
tests:
- not_null:
severity: error
error_if: ">10"
warn_if: "<10"
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
@ -118,10 +72,7 @@ models:
- name: TEAM
description: "{{ doc('team') }}"
tests:
- not_null:
severity: error
error_if: ">10"
warn_if: "<10"
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
@ -130,10 +81,7 @@ models:
- name: SEASON
description: "{{ doc('season') }}"
tests:
- not_null:
severity: error
error_if: ">10"
warn_if: "<10"
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
@ -142,10 +90,7 @@ models:
- name: WEEK
description: "{{ doc('week') }}"
tests:
- not_null:
severity: error
error_if: ">10"
warn_if: "<10"
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
@ -154,10 +99,7 @@ models:
- name: CLASSIFICATION
description: "{{ doc('classification') }}"
tests:
- not_null:
severity: error
error_if: ">10"
warn_if: "<10"
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
@ -166,10 +108,7 @@ models:
- name: PLAY_TYPE
description: "{{ doc('play_type') }}"
tests:
- not_null:
severity: error
error_if: ">10"
warn_if: "<10"
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
@ -178,10 +117,7 @@ models:
- name: MOMENT_DATE
description: "{{ doc('moment_date') }}"
tests:
- not_null:
severity: error
error_if: ">10"
warn_if: "<10"
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
@ -189,10 +125,7 @@ models:
- name: SERIES
description: "{{ doc('series') }}"
tests:
- not_null:
severity: error
error_if: ">10"
warn_if: "<10"
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
@ -201,10 +134,7 @@ models:
- name: SET_NAME
description: "{{ doc('set_name') }}"
tests:
- not_null:
severity: error
error_if: ">10"
warn_if: "<10"
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
@ -213,10 +143,7 @@ models:
- name: VIDEO_URLS
description: "{{ doc('video_urls') }}"
tests:
- not_null:
severity: error
error_if: ">10"
warn_if: "<10"
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- ARRAY
@ -224,10 +151,7 @@ models:
- name: MOMENT_STATS_FULL
description: "{{ doc('moment_stats_full') }}"
tests:
- not_null:
severity: error
error_if: ">10"
warn_if: "<10"
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- OBJECT

View File

@ -4,80 +4,8 @@
meta ={ 'database_tags':{ 'table':{ 'PURPOSE': 'NFT, ALLDAY, GOLAZOS, TOPSHOT' }} }
) }}
WITH chainwalkers AS (
SELECT
NULL AS nft_moment_metadata_id,
event_contract AS nft_collection,
nft_id,
serial_number,
max_mint_size,
play_id,
series_id,
series_name,
set_id,
set_name,
edition_id,
tier,
metadata,
_inserted_timestamp,
NULL AS inserted_timestamp,
NULL AS modified_timestamp
FROM
{{ ref('silver__nft_moment_metadata_final') }}
WHERE
NOT (
nft_collection = 'A.87ca73a41bb50ad5.Golazos'
AND edition_id = 486
)
AND NOT (
nft_collection = 'A.e4cf4bdc1751c65d.AllDay'
AND edition_id = 1486
)
),
streamline AS (
SELECT
nft_moment_metadata_id,
event_contract AS nft_collection,
nft_id,
serial_number,
max_mint_size,
play_id,
series_id,
series_name,
set_id,
set_name,
edition_id,
tier,
metadata,
_inserted_timestamp,
inserted_timestamp,
modified_timestamp
FROM
{{ ref('silver__nft_moment_metadata_final_s') }}
WHERE
NOT (
nft_collection = 'A.87ca73a41bb50ad5.Golazos'
AND edition_id = 486
)
AND NOT (
nft_collection = 'A.e4cf4bdc1751c65d.AllDay'
AND edition_id = 1486
)
),
FINAL AS (
SELECT
*
FROM
chainwalkers
UNION ALL
SELECT
*
FROM
streamline
)
SELECT
nft_collection,
event_contract AS nft_collection,
nft_id,
serial_number,
max_mint_size,
@ -95,17 +23,23 @@ SELECT
['nft_collection','edition_id','nft_id']
) }}
) AS dim_moment_metadata_id,
COALESCE (
inserted_timestamp,
_inserted_timestamp
) AS inserted_timestamp,
COALESCE (
modified_timestamp,
_inserted_timestamp
) AS modified_timestamp
inserted_timestamp,
modified_timestamp
FROM
FINAL qualify ROW_NUMBER() over (
PARTITION BY dim_moment_metadata_id
ORDER BY
_inserted_timestamp DESC
) = 1
{{ ref('silver__nft_moment_metadata_final_s') }}
WHERE
NOT (
nft_collection = 'A.87ca73a41bb50ad5.Golazos'
AND edition_id = 486
)
AND NOT (
nft_collection = 'A.e4cf4bdc1751c65d.AllDay'
AND edition_id = 1486
)
AND metadata IS NOT NULL
qualify ROW_NUMBER() over (
PARTITION BY dim_moment_metadata_id
ORDER BY
inserted_timestamp DESC
) = 1

View File

@ -5,54 +5,47 @@ models:
description: |-
NFT Moment Metadata scraped from on-chain activity, where available. This should be joinable on sales or mints using the nft_collection (event_contract) and nft id.
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- nft_collection
- edition_id
- nft_id
columns:
- name: NFT_COLLECTION
description: "{{ doc('nft_collection') }}"
tests:
- not_null
- name: NFT_ID
description: "{{ doc('nft_id') }}"
tests:
- not_null
- name: SERIAL_NUMBER
description: "{{ doc('serial_number') }}"
tests:
- not_null
- name: MAX_MINT_SIZE
description: "{{ doc('max_mint_size') }}"
tests:
- not_null
- name: PLAY_ID
description: "{{ doc('play_id') }}"
tests:
- not_null
- name: SERIES_ID
description: "{{ doc('series_id') }}"
tests:
- not_null
- name: SERIES_NAME
description: "{{ doc('series_name') }}"
- name: SET_ID
description: "{{ doc('set_id') }}"
tests:
- not_null
- name: SET_NAME
description: "{{ doc('set_name') }}"
- name: EDITION_ID
description: "{{ doc('edition_id') }}"
tests:
- not_null
- name: TIER
description: "{{ doc('tier') }}"
tests:
- not_null
- name: METADATA
description: "{{ doc('metadata') }}"
tests:

View File

@ -4,70 +4,6 @@
meta ={ 'database_tags':{ 'table':{ 'PURPOSE': 'NFT' }} }
) }}
WITH chainwalkers AS (
SELECT
*
FROM
{{ ref('silver__nft_sales') }}
WHERE
block_height < {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
streamline AS (
SELECT
*
FROM
{{ ref('silver__nft_sales_s') }}
WHERE
block_height >= {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
FINAL AS (
SELECT
NULL AS nft_sales_id,
tx_id,
block_height,
block_timestamp,
marketplace,
nft_collection,
nft_id,
buyer,
seller,
price,
currency,
tx_succeeded,
tokenflow,
counterparties,
NULL AS inserted_timestamp,
_inserted_timestamp,
NULL AS modified_timestamp
FROM
chainwalkers
UNION ALL
SELECT
nft_sales_id,
tx_id,
block_height,
block_timestamp,
marketplace,
nft_collection,
nft_id,
buyer,
seller,
price,
currency,
tx_succeeded,
tokenflow,
counterparties,
inserted_timestamp,
_inserted_timestamp,
modified_timestamp
FROM
streamline
)
SELECT
tx_id,
block_height,
@ -86,13 +22,7 @@ SELECT
nft_sales_id,
{{ dbt_utils.generate_surrogate_key(['tx_id']) }}
) AS ez_nft_sales_id,
COALESCE (
inserted_timestamp,
_inserted_timestamp
) AS inserted_timestamp,
COALESCE (
modified_timestamp,
_inserted_timestamp
) AS modified_timestamp
inserted_timestamp,
modified_timestamp
FROM
FINAL
{{ ref('silver__nft_sales_s') }}

View File

@ -13,9 +13,8 @@ models:
- buyer
- nft_collection
- nft_id
severity: error
error_if: ">10"
warn_if: "<10"
severity: warn
error_if: ">5000"
columns:
- name: TX_ID

View File

@ -0,0 +1,19 @@
{{ config(
materialized = 'view',
persist_docs ={ "relation": true,
"columns": true }
) }}
SELECT
token_address,
asset_id,
symbol,
name,
platform AS blockchain,
platform_id AS blockchain_id,
provider,
inserted_timestamp,
modified_timestamp,
complete_provider_asset_metadata_id AS dim_asset_metadata_id
FROM
{{ ref('silver__complete_provider_asset_metadata') }}

View File

@ -0,0 +1,23 @@
version: 2
models:
- name: price__dim_asset_metadata
description: '{{ doc("prices_dim_asset_metadata_table_doc") }}'
columns:
- name: PROVIDER
description: '{{ doc("prices_provider")}}'
- name: ASSET_ID
description: '{{ doc("prices_asset_id") }}'
- name: NAME
description: '{{ doc("prices_name") }}'
- name: SYMBOL
description: '{{ doc("prices_symbol") }}'
- name: TOKEN_ADDRESS
description: '{{ doc("prices_token_address") }}'
- name: BLOCKCHAIN
description: '{{ doc("prices_blockchain") }}'
- name: BLOCKCHAIN_ID
description: '{{ doc("prices_blockchain_id") }}'
- name: DIM_ASSET_METADATA_ID
description: '{{ doc("pk_id") }}'
- name: INSERTED_TIMESTAMP

View File

@ -0,0 +1,35 @@
{{ config(
materialized = 'view',
persist_docs ={ "relation": true,
"columns": true }
) }}
SELECT
token_address,
asset_id,
symbol,
NAME,
decimals,
blockchain,
is_deprecated,
FALSE AS is_native,
inserted_timestamp,
modified_timestamp,
complete_token_asset_metadata_id AS ez_asset_metadata_id
FROM
{{ ref('silver__complete_token_asset_metadata') }}
UNION ALL
SELECT
NULL AS token_address,
asset_id,
symbol,
NAME,
decimals,
blockchain,
is_deprecated,
TRUE AS is_native,
inserted_timestamp,
modified_timestamp,
complete_native_asset_metadata_id AS ez_asset_metadata_id
FROM
{{ ref('silver__complete_native_asset_metadata') }}

View File

@ -0,0 +1,28 @@
version: 2
models:
- name: price__ez_asset_metadata
description: '{{ doc("prices_ez_asset_metadata_table_doc") }}'
columns:
- name: ASSET_ID
description: '{{ doc("prices_asset_id") }}'
- name: NAME
description: '{{ doc("prices_name") }}'
- name: SYMBOL
description: '{{ doc("prices_symbol") }}'
- name: TOKEN_ADDRESS
description: '{{ doc("prices_token_address") }}'
- name: IS_DEPRECATED
description: '{{ doc("prices_is_deprecated") }}'
- name: BLOCKCHAIN
description: '{{ doc("prices_blockchain") }}'
- name: DECIMALS
description: '{{ doc("prices_decimals") }}'
- name: IS_NATIVE
description: '{{ doc("prices_is_native") }}'
- name: EZ_ASSET_METADATA_ID
description: '{{ doc("pk_id") }}'
- name: INSERTED_TIMESTAMP
description: '{{ doc("inserted_timestamp") }}'
- name: MODIFIED_TIMESTAMP
description: '{{ doc("modified_timestamp") }}'

View File

@ -0,0 +1,39 @@
{{ config(
materialized = 'view',
persist_docs ={ "relation": true,
"columns": true }
) }}
SELECT
HOUR,
token_address,
symbol,
NAME,
decimals,
price,
blockchain,
FALSE AS is_native,
is_imputed,
is_deprecated,
inserted_timestamp,
modified_timestamp,
complete_token_prices_id AS ez_prices_hourly_id
FROM
{{ ref('silver__complete_token_prices') }}
UNION ALL
SELECT
HOUR,
NULL AS token_address,
symbol,
NAME,
decimals,
price,
blockchain,
TRUE AS is_native,
is_imputed,
is_deprecated,
inserted_timestamp,
modified_timestamp,
complete_native_prices_id AS ez_prices_hourly_id
FROM
{{ ref('silver__complete_native_prices') }}

View File

@ -0,0 +1,30 @@
version: 2
models:
- name: price__ez_prices_hourly
description: '{{ doc("prices_ez_prices_hourly_table_doc") }}'
columns:
- name: HOUR
description: '{{ doc("prices_hour")}}'
- name: TOKEN_ADDRESS
description: '{{ doc("prices_token_address") }}'
- name: SYMBOL
description: '{{ doc("prices_symbol") }}'
- name: BLOCKCHAIN
description: '{{ doc("prices_blockchain") }}'
- name: DECIMALS
description: '{{ doc("prices_decimals") }}'
- name: PRICE
description: '{{ doc("prices_price") }}'
- name: IS_NATIVE
description: '{{ doc("prices_is_native") }}'
- name: IS_IMPUTED
description: '{{ doc("prices_is_imputed") }}'
- name: IS_DEPRECATED
description: '{{ doc("prices_is_deprecated") }}'
- name: EZ_PRICES_HOURLY_ID
description: '{{ doc("pk_id") }}'
- name: INSERTED_TIMESTAMP
description: '{{ doc("inserted_timestamp") }}'
- name: MODIFIED_TIMESTAMP
description: '{{ doc("modified_timestamp") }}'

View File

@ -1,124 +0,0 @@
{{ config(
materialized = 'view',
tag = ['scheduled']
) }}
WITH api AS (
SELECT
NULL AS prices_swaps_hourly_id,
recorded_hour,
id,
token,
OPEN,
high,
low,
CLOSE,
provider,
NULL AS _inserted_timestamp,
NULL AS inserted_timestamp,
NULL AS modified_timestamp
FROM
{{ ref('silver__prices_hourly') }}
),
swaps_cw AS (
SELECT
NULL AS prices_swaps_hourly_id,
recorded_hour,
id,
CASE
WHEN id = 'A.1654653399040a61.FlowToken' THEN 'Flow'
WHEN id = 'A.cfdd90d4a00f7b5b.TeleportedTetherToken' THEN 'USDT'
WHEN id = 'A.3c5959b568896393.FUSD' THEN 'FUSD'
WHEN id = 'A.0f9df91c9121c460.BloctoToken' THEN 'Blocto'
WHEN id = 'A.d01e482eb680ec9f.REVV' THEN 'Revv'
WHEN id = 'A.b19436aae4d94622.FiatToken' THEN 'USDC'
WHEN id = 'A.142fa6570b62fd97.StarlyToken' THEN 'Starly'
WHEN id = 'A.475755d2c9dccc3a.TeleportedSportiumToken' THEN 'Sportium'
ELSE NULL -- will trigger alert if swaps model picks up another token
END AS token,
OPEN,
high,
low,
CLOSE,
provider,
NULL AS _inserted_timestamp,
NULL AS inserted_timestamp,
NULL AS modified_timestamp
FROM
{{ ref('silver__prices_swaps_hourly') }}
),
swaps_s AS (
SELECT
prices_swaps_hourly_id,
recorded_hour,
id,
CASE
WHEN id = 'A.1654653399040a61.FlowToken' THEN 'Flow'
WHEN id = 'A.cfdd90d4a00f7b5b.TeleportedTetherToken' THEN 'USDT'
WHEN id = 'A.3c5959b568896393.FUSD' THEN 'FUSD'
WHEN id = 'A.0f9df91c9121c460.BloctoToken' THEN 'Blocto'
WHEN id = 'A.d01e482eb680ec9f.REVV' THEN 'Revv'
WHEN id = 'A.b19436aae4d94622.FiatToken' THEN 'USDC'
WHEN id = 'A.142fa6570b62fd97.StarlyToken' THEN 'Starly'
WHEN id = 'A.475755d2c9dccc3a.TeleportedSportiumToken' THEN 'Sportium'
ELSE NULL -- will trigger alert if swaps model picks up another token
END AS token,
OPEN,
high,
low,
CLOSE,
provider,
NULL AS _inserted_timestamp,
COALESCE(
inserted_timestamp,
'2000-01-01' :: timestamp_ntz
) AS inserted_timestamp,
COALESCE(
modified_timestamp,
'2000-01-01' :: timestamp_ntz
) AS modified_timestamp
FROM
{{ ref('silver__prices_swaps_hourly_s') }}
),
FINAL AS (
SELECT
*
FROM
api
UNION ALL
SELECT
*
FROM
swaps_cw
UNION ALL
SELECT
*
FROM
swaps_s
)
SELECT
COALESCE (
prices_swaps_hourly_id,
{{ dbt_utils.generate_surrogate_key(['recorded_hour', 'token']) }}
) AS fact_hourly_prices_id,
recorded_hour,
id,
token,
OPEN,
high,
low,
CLOSE,
provider,
COALESCE (
inserted_timestamp,
_inserted_timestamp
) AS inserted_timestamp,
COALESCE (
modified_timestamp,
_inserted_timestamp
) AS modified_timestamp
FROM
FINAL
WHERE
recorded_hour IS NOT NULL

View File

@ -1,86 +0,0 @@
version: 2
models:
- name: price__fact_hourly_prices
description: |-
This table provides hourly token price data for FLOW tokens from CoinGecko and CoinMarketCap.
columns:
- name: RECORDED_HOUR
description: "{{ doc('recorded_hour') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: ID
description: "{{ doc('asset_id') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: TOKEN
description: "{{ doc('token') }}"
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: OPEN
description: "{{ doc('open') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- FLOAT
- DOUBLE
- name: HIGH
description: "{{ doc('high') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- FLOAT
- DOUBLE
- name: LOW
description: "{{ doc('low') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- FLOAT
- DOUBLE
- name: CLOSE
description: "{{ doc('asset_id') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- FLOAT
- DOUBLE
- name: PROVIDER
description: "{{ doc('provider') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: FACT_HOURLY_PRICES_ID
description: "{{ doc('pk_id') }}"
- name: INSERTED_TIMESTAMP
description: "{{ doc('inserted_timestamp') }}"
- name: MODIFIED_TIMESTAMP
description: "{{ doc('modified_timestamp') }}"

View File

@ -1,132 +0,0 @@
{{ config(
materialized = 'view',
tag = ['scheduled']
) }}
WITH token_labels AS (
SELECT
token,
UPPER(symbol) AS symbol,
token_contract
FROM
{{ ref('seeds__token_labels') }}
),
prices AS (
SELECT
recorded_at AS TIMESTAMP,
token,
UPPER(symbol) AS symbol,
price_usd,
source
FROM
{{ this.database }}.silver.prices
),
prices_swaps_cw AS (
SELECT
tx_id,
block_timestamp AS TIMESTAMP,
token_contract,
swap_price AS price_usd,
source,
_inserted_timestamp,
NULL AS prices_swaps_id,
NULL AS inserted_timestamp,
NULL AS modified_timestamp
FROM
{{ ref('silver__prices_swaps') }}
),
prices_swaps_s AS (
SELECT
prices_swaps_id,
tx_id,
block_timestamp AS TIMESTAMP,
token_contract,
swap_price AS price_usd,
source,
_inserted_timestamp,
NULL AS inserted_timestamp,
NULL AS modified_timestamp
FROM
{{ ref('silver__prices_swaps_s') }}
),
viewnion AS (
SELECT
TIMESTAMP,
p.token,
p.symbol,
l.token_contract,
price_usd,
source,
NULL AS tx_id,
NULL AS prices_swaps_id,
NULL AS _inserted_timestamp,
NULL AS inserted_timestamp,
NULL AS modified_timestamp
FROM
prices p
LEFT JOIN token_labels l USING (symbol)
UNION ALL
SELECT
TIMESTAMP,
l.token,
l.symbol,
ps.token_contract,
price_usd,
source,
tx_id,
prices_swaps_id,
_inserted_timestamp,
NULL AS inserted_timestamp,
NULL AS modified_timestamp
FROM
prices_swaps_cw ps
LEFT JOIN token_labels l USING (token_contract)
UNION ALL
SELECT
TIMESTAMP,
l.token,
l.symbol,
pss.token_contract,
price_usd,
source,
tx_id,
prices_swaps_id,
_inserted_timestamp,
NULL AS inserted_timestamp,
NULL AS modified_timestamp
FROM
prices_swaps_s pss
LEFT JOIN token_labels l USING (token_contract)
)
SELECT
TIMESTAMP,
token,
symbol,
token_contract,
price_usd,
source,
tx_id,
COALESCE (
prices_swaps_id,
{{ dbt_utils.generate_surrogate_key(['TIMESTAMP','TOKEN', 'TOKEN_CONTRACT', 'SOURCE']) }}
) AS fact_prices_id,
COALESCE (
inserted_timestamp,
TIMESTAMP
) AS inserted_timestamp,
COALESCE (
modified_timestamp,
TIMESTAMP
) AS modified_timestamp
FROM
viewnion
WHERE
TIMESTAMP IS NOT NULL qualify ROW_NUMBER() over (
PARTITION BY TIMESTAMP,
token,
token_contract,
source
ORDER BY
_inserted_timestamp DESC
) = 1

View File

@ -1,70 +0,0 @@
version: 2
models:
- name: price__fact_prices
description: |-
This table reports prices derived from various on-chain sources. CoinGecko and CoinMarketCap price feeds can be found in the hourly price table.
Note that prices from swaps may be volatile and are an approximation of price in USD terms, at best. A tx_hash is included for prices from swaps to provide a source.
Low cap coins may have questionable prices due to low liquidity.
columns:
- name: TIMESTAMP
description: "{{ doc('timestamp') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: TOKEN
description: "{{ doc('token') }}"
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: SYMBOL
description: "{{ doc('symbol') }}"
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: TOKEN_CONTRACT
description: "{{ doc('token_contract') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: PRICE_USD
description: "{{ doc('price_usd') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- DOUBLE
- FLOAT
- name: SOURCE
description: "{{ doc('source') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: FACT_PRICES_ID
description: "{{ doc('pk_id') }}"
- name: INSERTED_TIMESTAMP
description: "{{ doc('inserted_timestamp') }}"
- name: MODIFIED_TIMESTAMP
description: "{{ doc('modified_timestamp') }}"

View File

@ -0,0 +1,19 @@
{{ config(
materialized = 'view',
persist_docs ={ "relation": true,
"columns": true }
) }}
SELECT
asset_id,
recorded_hour AS HOUR,
OPEN,
high,
low,
CLOSE,
provider,
inserted_timestamp,
modified_timestamp,
complete_provider_prices_id AS fact_prices_ohlc_hourly_id
FROM
{{ ref('silver__complete_provider_prices') }}

View File

@ -0,0 +1,24 @@
version: 2
models:
- name: price__fact_prices_ohlc_hourly
description: '{{ doc("prices_fact_prices_ohlc_hourly_table_doc") }}'
columns:
- name: HOUR
description: '{{ doc("prices_hour")}}'
- name: ASSET_ID
description: '{{ doc("prices_asset_id") }}'
- name: OPEN
description: '{{ doc("prices_open") }}'
- name: HIGH
description: '{{ doc("prices_high") }}'
- name: LOW
description: '{{ doc("prices_low") }}'
- name: CLOSE
description: '{{ doc("prices_close") }}'
- name: FACT_PRICES_OHLC_HOURLY_ID
description: '{{ doc("pk_id") }}'
- name: INSERTED_TIMESTAMP
description: '{{ doc("inserted_timestamp") }}'
- name: MODIFIED_TIMESTAMP
description: '{{ doc("modified_timestamp") }}'

View File

@ -14,7 +14,7 @@ WITH txs AS (
unique_from_count,
total_fees AS total_fees_native,
LAST_VALUE(
p.close ignore nulls
p.price ignore nulls
) over (
ORDER BY
block_timestamp_hour rows unbounded preceding
@ -25,10 +25,10 @@ WITH txs AS (
FROM
{{ ref('silver_stats__core_metrics_hourly') }}
s
LEFT JOIN {{ ref('silver__prices_hourly') }}
LEFT JOIN {{ ref('silver__complete_native_prices') }}
p
ON s.block_timestamp_hour = p.recorded_hour
AND p.id = 'Flow'
ON s.block_timestamp_hour = p.hour
AND p.asset_id = 'flow'
)
SELECT
A.block_timestamp_hour,
@ -41,7 +41,7 @@ SELECT
b.unique_from_count,
b.total_fees_native,
ROUND(
b.total_fees_native * b.imputed_close,
b.total_fees_native * ZEROIFNULL(b.imputed_close),
2
) AS total_fees_usd,
A.core_metrics_block_hourly_id AS ez_core_metrics_hourly_id,

View File

@ -66,7 +66,7 @@ block_range AS (
_id AS block_height
FROM
{{ source(
'crosschain_silver',
'silver_crosschain',
'number_sequence'
) }}
WHERE
@ -111,7 +111,7 @@ block_gen AS (
_id AS block_height
FROM
{{ source(
'crosschain_silver',
'silver_crosschain',
'number_sequence'
) }}
WHERE

View File

@ -66,7 +66,7 @@ block_range AS (
_id AS block_height
FROM
{{ source(
'crosschain_silver',
'silver_crosschain',
'number_sequence'
) }}
WHERE

View File

@ -1,4 +1,6 @@
-- depends_on: {{ ref('bronze__streamline_blocks') }}
-- depends_on: {{ ref('bronze__streamline_blocks_history') }}
{{ config(
materialized = 'incremental',
unique_key = "block_number",
@ -10,7 +12,7 @@
WITH
{% if is_incremental() %}
{% if is_incremental() and not var('LOAD_BACKFILL', False) %}
tx_count_lookback AS (
-- lookback to ensure tx count is correct
@ -21,7 +23,7 @@ tx_count_lookback AS (
WHERE
block_height >= {{ var(
'STREAMLINE_START_BLOCK'
) }}
) }} -- TODO, remove AFTER backfill is complete
-- limit to 3 day lookback for performance
AND _inserted_timestamp >= SYSDATE() - INTERVAL '3 days'
AND (
@ -47,7 +49,11 @@ streamline_blocks AS (
_partition_by_block_id,
_inserted_timestamp
FROM
{% if var('LOAD_BACKFILL', False) %}
{{ ref('bronze__streamline_blocks_history') }}
-- TODO need incremental logic of some sort probably (for those 5800 missing txs)
-- where inserted timestamp >= max from this where network version = backfill version OR block range between root and end
{% else %}
{% if is_incremental() %}
{{ ref('bronze__streamline_blocks') }}
WHERE
@ -66,6 +72,7 @@ WHERE
{% else %}
{{ ref('bronze__streamline_fr_blocks') }}
{% endif %}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY block_number
ORDER BY
@ -86,7 +93,14 @@ collections AS (
*
FROM
{{ ref('silver__streamline_collections') }}
{% if var('LOAD_BACKFILL', False) %}
WHERE
block_number between (
SELECT root_height FROM network_version WHERE lower(network_version) = lower('{{ var('LOAD_BACKFILL_VERSION').replace('_', '-') }}')
) AND (
SELECT end_height FROM network_version WHERE lower(network_version) = lower('{{ var('LOAD_BACKFILL_VERSION').replace('_', '-') }}')
)
{% else %}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
@ -102,6 +116,7 @@ WHERE
tx_count_lookback
)
{% endif %}
{% endif %}
),
tx_count AS (
SELECT

View File

@ -25,6 +25,12 @@ SELECT
'{{ invocation_id }}' AS _invocation_id
FROM
{% if var('LOAD_BACKFILL', False) %}
{{ ref('bronze__streamline_collections_history') }}
-- TODO need incremental logic of some sort probably (for those 5800 missing txs)
-- where inserted timestamp >= max from this where network version = backfill version OR block range between root and end
{% else %}
{% if is_incremental() %}
{{ ref('bronze__streamline_collections') }}
WHERE
@ -38,6 +44,8 @@ WHERE
{{ ref('bronze__streamline_fr_collections') }}
{% endif %}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY collection_id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -3,7 +3,8 @@
unique_key = 'event_id',
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = "_inserted_timestamp::date",
cluster_by = "block_timestamp::date",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_id,event_id,event_contract,event_type);",
tags = ['core', 'streamline_scheduled', 'scheduled', 'scheduled_core']
) }}
@ -14,12 +15,12 @@ WITH transactions AS (
FROM
{{ ref('silver__streamline_transactions_final') }}
WHERE
NOT pending_result_response -- inserted timestamp will update w TR ingestion, so should flow thru to events and curated
NOT pending_result_response
{% if is_incremental() %}
AND _inserted_timestamp >= (
AND modified_timestamp >= (
SELECT
MAX(_inserted_timestamp)
MAX(modified_timestamp)
FROM
{{ this }}
)
@ -75,6 +76,8 @@ flatten_events AS (
LATERAL FLATTEN(
input => events
) e
QUALIFY ROW_NUMBER() OVER (PARTITION BY event_id ORDER BY _inserted_timestamp DESC) = 1
),
attributes AS (
SELECT

Some files were not shown because too many files have changed in this diff Show More