initial setup

This commit is contained in:
Austin 2024-09-25 15:35:33 -04:00
parent aee1a94af9
commit 0f20d8126a
43 changed files with 945 additions and 0 deletions

View File

@ -0,0 +1,17 @@
name: dbt_run_integration_test
run-name: ${{ github.event.inputs.branch }}
on:
workflow_dispatch:
concurrency: ${{ github.workflow }}
jobs:
called_workflow_template:
uses: FlipsideCrypto/analytics-workflow-templates/.github/workflows/dbt.yml@main
with:
command: >
dbt test --selector 'integration_tests'
environment: ${{ github.ref == 'refs/heads/main' && 'workflow_prod' || 'workflow_dev' }}
warehouse: ${{ vars.WAREHOUSE }}
secrets: inherit

68
.github/workflows/dbt_run_adhoc.yml vendored Normal file
View File

@ -0,0 +1,68 @@
name: dbt_run_adhoc
run-name: dbt_run_adhoc
on:
workflow_dispatch:
branches:
- "main"
inputs:
environment:
type: choice
description: DBT Run Environment
required: true
options:
- dev
- prod
default: dev
warehouse:
type: choice
description: Snowflake warehouse
required: true
options:
- DBT
- DBT_CLOUD
- DBT_EMERGENCY
default: DBT
dbt_command:
type: string
description: 'DBT Run Command'
required: true
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ inputs.warehouse }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_${{ inputs.environment }}
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
${{ inputs.dbt_command }}

View File

@ -0,0 +1,70 @@
name: dbt_run_dev_refresh
run-name: dbt_run_dev_refresh
on:
workflow_dispatch:
schedule:
# Runs “At 1:20 on Monday.” (see https://crontab.guru)
- cron: '20 13 * * 1'
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs_refresh:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run Dev Refresh
run: |
dbt run-operation run_sp_create_prod_clone
run_dbt_jobs_udfs:
runs-on: ubuntu-latest
needs: run_dbt_jobs_refresh
environment:
name: workflow_dev
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run Recreate UDFs
run: |
dbt run-operation fsc_utils.create_evm_streamline_udfs --vars '{"UPDATE_UDFS_AND_SPS":True}' -t dev
dbt run -s livequery_models.deploy.core._live --vars '{"UPDATE_UDFS_AND_SPS":True}' -t dev

18
.gitignore vendored Normal file
View File

@ -0,0 +1,18 @@
target/
dbt_modules/
# newer versions of dbt use this directory instead of dbt_modules for test dependencies
dbt_packages/
logs/
.venv/
.python-version
# Visual Studio Code files
*/.vscode
*.code-workspace
.history/
**/.DS_Store
.vscode/
.env
dbt-env/

109
README.md Normal file
View File

@ -0,0 +1,109 @@
## Profile Set Up
#### Use the following within profiles.yml
----
```yml
core:
target: dev
outputs:
dev:
type: snowflake
account: <ACCOUNT>
role: <ROLE>
user: <USERNAME>
password: <PASSWORD>
region: <REGION>
database: CORE_DEV
warehouse: <WAREHOUSE>
schema: silver
threads: 12
client_session_keep_alive: False
query_tag: <TAG>
prod:
type: snowflake
account: <ACCOUNT>
role: <ROLE>
user: <USERNAME>
password: <PASSWORD>
region: <REGION>
database: CORE
warehouse: <WAREHOUSE>
schema: silver
threads: 12
client_session_keep_alive: False
query_tag: <TAG>
```
### Variables
To control the creation of UDF or SP macros with dbt run:
* UPDATE_UDFS_AND_SPS
When True, executes all macros included in the on-run-start hooks within dbt_project.yml on model run as normal
When False, none of the on-run-start macros are executed on model run
Default values are False
* Usage:
dbt run --vars '{"UPDATE_UDFS_AND_SPS":True}' -m ...
To reload records in a curated complete table without a full-refresh, such as `silver_bridge.complete_bridge_activity`:
* HEAL_CURATED_MODEL
Default is an empty array []
When item is included in var array [], incremental logic will be skipped for that CTE / code block
When item is not included in var array [] or does not match specified item in model, incremental logic will apply
Example set up: `{% if is_incremental() and 'axelar' not in var('HEAL_CURATED_MODEL') %}`
* Usage:
Single CTE: dbt run --vars '{"HEAL_CURATED_MODEL":"axelar"}' -m ...
Multiple CTEs: dbt run --vars '{"HEAL_CURATED_MODEL":["axelar","across","celer_cbridge"]}' -m ...
### Resources:
- Learn more about dbt [in the docs](https://docs.getdbt.com/docs/introduction)
- Check out [Discourse](https://discourse.getdbt.com/) for commonly asked questions and answers
- Join the [chat](https://community.getdbt.com/) on Slack for live discussions and support
- Find [dbt events](https://events.getdbt.com) near you
- Check out [the blog](https://blog.getdbt.com/) for the latest news on dbt's development and best practices
## Applying Model Tags
### Database / Schema level tags
Database and schema tags are applied via the `add_database_or_schema_tags` macro. These tags are inherited by their downstream objects. To add/modify tags call the appropriate tag set function within the macro.
```
{{ set_database_tag_value('SOME_DATABASE_TAG_KEY','SOME_DATABASE_TAG_VALUE') }}
{{ set_schema_tag_value('SOME_SCHEMA_TAG_KEY','SOME_SCHEMA_TAG_VALUE') }}
```
### Model tags
To add/update a model's snowflake tags, add/modify the `meta` model property under `config`. Only table level tags are supported at this time via DBT.
```
{{ config(
...,
meta={
'database_tags':{
'table': {
'PURPOSE': 'SOME_PURPOSE'
}
}
},
...
) }}
```
By default, model tags are pushed to Snowflake on each load. You can disable this by setting the `UPDATE_SNOWFLAKE_TAGS` project variable to `False` during a run.
```
dbt run --vars '{"UPDATE_SNOWFLAKE_TAGS":False}' -s models/testnet/testnet__fact_blocks.sql
```
### Querying for existing tags on a model in snowflake
```
select *
from table(core.information_schema.tag_references('core.testnet.fact_blocks', 'table'));
```

0
analysis/.gitkeep Normal file
View File

0
data/.gitkeep Normal file
View File

View File

@ -0,0 +1,2 @@
workflow_name,workflow_schedule
dbt_run_streamline_chainhead,"3,33 * * * *"
1 workflow_name workflow_schedule
2 dbt_run_streamline_chainhead 3,33 * * * *

99
dbt_project.yml Normal file
View File

@ -0,0 +1,99 @@
# Name your project! Project names should contain only lowercase characters
# and underscores. A good package name should reflect your organization's
# name or the intended use of these models
name: "core_models"
version: "1.0.0"
config-version: 2
# This setting configures which "profile" dbt uses for this project.
profile: "core"
# These configurations specify where dbt should look for different types of files.
# The `source-paths` config, for example, states that models in this project can be
# found in the "models/" directory. You probably won't need to change these!
model-paths: ["models"]
analysis-paths: ["analysis"]
test-paths: ["tests"]
seed-paths: ["data"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
target-path: "target" # directory which will store compiled SQL files
clean-targets: # directories to be removed by `dbt clean`
- "target"
- "dbt_modules"
- "dbt_packages"
tests:
+store_failures: true # all tests
on-run-start:
- "{{ create_sps() }}"
- "{{ create_udfs() }}"
on-run-end:
- '{{ apply_meta_as_tags(results) }}'
dispatch:
- macro_namespace: dbt
search_order:
- core-models
- dbt_snowflake_query_tags
- dbt
query-comment:
comment: '{{ dbt_snowflake_query_tags.get_query_comment(node) }}'
append: true # Snowflake removes prefixed comments.
# Configuring models
# Full documentation: https://docs.getdbt.com/docs/configuring-models
models:
+copy_grants: true
+on_schema_change: "append_new_columns"
livequery_models:
+materialized: ephemeral
# In this example config, we tell dbt to build all models in the example/ directory
# as tables. These settings can be overridden in the individual model files
# using the `{{ config(...) }}` macro.
vars:
"dbt_date:time_zone": GMT
STREAMLINE_INVOKE_STREAMS: False
STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES: False
UPDATE_UDFS_AND_SPS: False
UPDATE_SNOWFLAKE_TAGS: True
OBSERV_FULL_TEST: False
WAIT: 0
HEAL_MODEL: False
HEAL_MODELS: []
START_GHA_TASKS: False
RELOAD_TRACES: False
#### STREAMLINE 2.0 BEGIN ####
API_INTEGRATION: '{{ var("config")[target.name]["API_INTEGRATION"] if var("config")[target.name] else var("config")["dev"]["API_INTEGRATION"] }}'
EXTERNAL_FUNCTION_URI: '{{ var("config")[target.name]["EXTERNAL_FUNCTION_URI"] if var("config")[target.name] else var("config")["dev"]["EXTERNAL_FUNCTION_URI"] }}'
ROLES: |
["INTERNAL_DEV"]
config:
# The keys correspond to dbt profiles and are case sensitive
dev:
API_INTEGRATION: aws_core_api_stg_v2
EXTERNAL_FUNCTION_URI: y5r18zijv5.execute-api.us-east-1.amazonaws.com/stg/
ROLES:
- AWS_LAMBDA_CORE_API
- INTERNAL_DEV
prod:
API_INTEGRATION: aws_core_api_prod_v2
EXTERNAL_FUNCTION_URI:
ROLES:
- AWS_LAMBDA_CORE_API
- INTERNAL_DEV
- DBT_CLOUD_CORE
#### STREAMLINE 2.0 END ####

8
macros/create_sps.sql Normal file
View File

@ -0,0 +1,8 @@
{% macro create_sps() %}
{% if var("UPDATE_UDFS_AND_SPS") %}
{% if target.database == 'CORE' %}
CREATE schema IF NOT EXISTS _internal;
{{ sp_create_prod_clone('_internal') }};
{% endif %}
{% endif %}
{% endmacro %}

9
macros/create_udfs.sql Normal file
View File

@ -0,0 +1,9 @@
{% macro create_udfs() %}
{% if var("UPDATE_UDFS_AND_SPS") %}
{% set sql %}
CREATE schema if NOT EXISTS silver;
{% endset %}
{% do run_query(sql) %}
{{- fsc_utils.create_udfs() -}}
{% endif %}
{% endmacro %}

View File

@ -0,0 +1,11 @@
{% macro generate_schema_name(custom_schema_name=none, node=none) -%}
{% set node_name = node.name %}
{% set split_name = node_name.split('__') %}
{{ split_name[0] | trim }}
{%- endmacro %}
{% macro generate_alias_name(custom_alias_name=none, node=none) -%}
{% set node_name = node.name %}
{% set split_name = node_name.split('__') %}
{{ split_name[1] | trim }}
{%- endmacro %}

View File

@ -0,0 +1,44 @@
{% macro get_merge_sql(
target,
source,
unique_key,
dest_columns,
incremental_predicates
) -%}
{% set predicate_override = "" %}
{% if incremental_predicates [0] == "dynamic_range" %}
-- run some queries to dynamically determine the min + max of this 'input_column' in the new data
{% set input_column = incremental_predicates [1] %}
{% set get_limits_query %}
SELECT
MIN(
{{ input_column }}
) AS lower_limit,
MAX(
{{ input_column }}
) AS upper_limit
FROM
{{ source }}
{% endset %}
{% set limits = run_query(get_limits_query) [0] %}
{% set lower_limit,
upper_limit = limits [0],
limits [1] %}
-- use those calculated min + max values to limit 'target' scan, to only the days with new data
{% set predicate_override %}
dbt_internal_dest.{{ input_column }} BETWEEN '{{ lower_limit }}'
AND '{{ upper_limit }}' {% endset %}
{% endif %}
{% set predicates = [predicate_override] if predicate_override else incremental_predicates %}
-- standard merge from here
{% set merge_sql = dbt.get_merge_sql(
target,
source,
unique_key,
dest_columns,
predicates
) %}
{{ return(merge_sql) }}
{% endmacro %}

View File

@ -0,0 +1,8 @@
{% macro dbt_snowflake_get_tmp_relation_type(
strategy,
unique_key,
language
) %}
-- always table
{{ return('table') }}
{% endmacro %}

View File

@ -0,0 +1,10 @@
{% macro run_sp_create_prod_clone() %}
{% set clone_query %}
call core._internal.create_prod_clone(
'core',
'core_dev',
'internal_dev'
);
{% endset %}
{% do run_query(clone_query) %}
{% endmacro %}

View File

@ -0,0 +1,44 @@
{% macro sp_create_prod_clone(target_schema) -%}
create or replace procedure {{ target_schema }}.create_prod_clone(source_db_name string, destination_db_name string, role_name string)
returns boolean
language javascript
execute as caller
as
$$
snowflake.execute({sqlText: `BEGIN TRANSACTION;`});
try {
snowflake.execute({sqlText: `CREATE OR REPLACE DATABASE ${DESTINATION_DB_NAME} CLONE ${SOURCE_DB_NAME}`});
snowflake.execute({sqlText: `DROP SCHEMA IF EXISTS ${DESTINATION_DB_NAME}._INTERNAL`}); /* this only needs to be in prod */
snowflake.execute({sqlText: `GRANT OWNERSHIP ON ALL SCHEMAS IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON ALL FUNCTIONS IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON ALL PROCEDURES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON ALL VIEWS IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON ALL STAGES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON ALL TABLES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON FUTURE FUNCTIONS IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME};`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON FUTURE PROCEDURES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME};`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON FUTURE VIEWS IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME};`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON FUTURE STAGES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME};`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON FUTURE TABLES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME};`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`})
var existing_tags = snowflake.execute({sqlText: `SHOW TAGS IN DATABASE ${DESTINATION_DB_NAME};`});
while (existing_tags.next()) {
var schema = existing_tags.getColumnValue(4);
var tag_name = existing_tags.getColumnValue(2)
snowflake.execute({sqlText: `GRANT OWNERSHIP ON TAG ${DESTINATION_DB_NAME}.${schema}.${tag_name} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`});
}
snowflake.execute({sqlText: `COMMIT;`});
} catch (err) {
snowflake.execute({sqlText: `ROLLBACK;`});
throw(err);
}
return true
$$
{%- endmacro %}

View File

@ -0,0 +1,4 @@
{% macro add_database_or_schema_tags() %}
{{ set_database_tag_value('BLOCKCHAIN_NAME','BERACHAIN') }}
{{ set_database_tag_value('BLOCKCHAIN_TYPE','EVM') }}
{% endmacro %}

View File

@ -0,0 +1,127 @@
{% macro apply_meta_as_tags(results) %}
{% if var("UPDATE_SNOWFLAKE_TAGS") %}
{{ log('apply_meta_as_tags', info=False) }}
{{ log(results, info=False) }}
{% if execute %}
{%- set tags_by_schema = {} -%}
{% for res in results -%}
{% if res.node.meta.database_tags %}
{%- set model_database = res.node.database -%}
{%- set model_schema = res.node.schema -%}
{%- set model_schema_full = model_database+'.'+model_schema -%}
{%- set model_alias = res.node.alias -%}
{% if model_schema_full not in tags_by_schema.keys() %}
{{ log('need to fetch tags for schema '+model_schema_full, info=False) }}
{%- call statement('main', fetch_result=True) -%}
show tags in {{model_database}}.{{model_schema}}
{%- endcall -%}
{%- set _ = tags_by_schema.update({model_schema_full: load_result('main')['table'].columns.get('name').values()|list}) -%}
{{ log('Added tags to cache', info=False) }}
{% else %}
{{ log('already have tag info for schema', info=False) }}
{% endif %}
{%- set current_tags_in_schema = tags_by_schema[model_schema_full] -%}
{{ log('current_tags_in_schema:', info=False) }}
{{ log(current_tags_in_schema, info=False) }}
{{ log("========== Processing tags for "+model_schema_full+"."+model_alias+" ==========", info=False) }}
{% set line -%}
node: {{ res.node.unique_id }}; status: {{ res.status }} (message: {{ res.message }})
node full: {{ res.node}}
meta: {{ res.node.meta}}
materialized: {{ res.node.config.materialized }}
{%- endset %}
{{ log(line, info=False) }}
{%- call statement('main', fetch_result=True) -%}
select LEVEL,UPPER(TAG_NAME) as TAG_NAME,TAG_VALUE from table(information_schema.tag_references_all_columns('{{model_schema}}.{{model_alias}}', 'table'))
{%- endcall -%}
{%- set existing_tags_for_table = load_result('main')['data'] -%}
{{ log('Existing tags for table:', info=False) }}
{{ log(existing_tags_for_table, info=False) }}
{{ log('--', info=False) }}
{% for table_tag in res.node.meta.database_tags.table %}
{{ create_tag_if_missing(current_tags_in_schema,table_tag|upper) }}
{% set desired_tag_value = res.node.meta.database_tags.table[table_tag] %}
{{set_table_tag_value_if_different(model_schema,model_alias,table_tag,desired_tag_value,existing_tags_for_table)}}
{% endfor %}
{{ log("========== Finished processing tags for "+model_alias+" ==========", info=False) }}
{% endif %}
{% endfor %}
{% endif %}
{% endif %}
{% endmacro %}
{% macro create_tag_if_missing(all_tag_names,table_tag) %}
{% if table_tag not in all_tag_names %}
{{ log('Creating missing tag '+table_tag, info=False) }}
{%- call statement('main', fetch_result=True) -%}
create tag if not exists silver.{{table_tag}}
{%- endcall -%}
{{ log(load_result('main').data, info=False) }}
{% else %}
{{ log('Tag already exists: '+table_tag, info=False) }}
{% endif %}
{% endmacro %}
{% macro set_table_tag_value_if_different(model_schema,table_name,tag_name,desired_tag_value,existing_tags) %}
{{ log('Ensuring tag '+tag_name+' has value '+desired_tag_value+' at table level', info=False) }}
{%- set existing_tag_for_table = existing_tags|selectattr('0','equalto','TABLE')|selectattr('1','equalto',tag_name|upper)|list -%}
{{ log('Filtered tags for table:', info=False) }}
{{ log(existing_tag_for_table[0], info=False) }}
{% if existing_tag_for_table|length > 0 and existing_tag_for_table[0][2]==desired_tag_value %}
{{ log('Correct tag value already exists', info=False) }}
{% else %}
{{ log('Setting tag value for '+tag_name+' to value '+desired_tag_value, info=False) }}
{%- call statement('main', fetch_result=True) -%}
alter table {{model_schema}}.{{table_name}} set tag {{tag_name}} = '{{desired_tag_value}}'
{%- endcall -%}
{{ log(load_result('main').data, info=False) }}
{% endif %}
{% endmacro %}
{% macro set_column_tag_value_if_different(table_name,column_name,tag_name,desired_tag_value,existing_tags) %}
{{ log('Ensuring tag '+tag_name+' has value '+desired_tag_value+' at column level', info=False) }}
{%- set existing_tag_for_column = existing_tags|selectattr('0','equalto','COLUMN')|selectattr('1','equalto',tag_name|upper)|list -%}
{{ log('Filtered tags for column:', info=False) }}
{{ log(existing_tag_for_column[0], info=False) }}
{% if existing_tag_for_column|length > 0 and existing_tag_for_column[0][2]==desired_tag_value %}
{{ log('Correct tag value already exists', info=False) }}
{% else %}
{{ log('Setting tag value for '+tag_name+' to value '+desired_tag_value, info=False) }}
{%- call statement('main', fetch_result=True) -%}
alter table {{table_name}} modify column {{column_name}} set tag {{tag_name}} = '{{desired_tag_value}}'
{%- endcall -%}
{{ log(load_result('main').data, info=False) }}
{% endif %}
{% endmacro %}
{% macro set_database_tag_value(tag_name,tag_value) %}
{% set query %}
create tag if not exists silver.{{tag_name}}
{% endset %}
{% do run_query(query) %}
{% set query %}
alter database {{target.database}} set tag {{target.database}}.silver.{{tag_name}} = '{{tag_value}}'
{% endset %}
{% do run_query(query) %}
{% endmacro %}
{% macro set_schema_tag_value(target_schema,tag_name,tag_value) %}
{% set query %}
create tag if not exists silver.{{tag_name}}
{% endset %}
{% do run_query(query) %}
{% set query %}
alter schema {{target.database}}.{{target_schema}} set tag {{target.database}}.silver.{{tag_name}} = '{{tag_value}}'
{% endset %}
{% do run_query(query) %}
{% endmacro %}

View File

@ -0,0 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_query(
model = "blocks",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)"
) }}

View File

@ -0,0 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_query(
model = "receipts",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)"
) }}

View File

@ -0,0 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_query(
model = "traces",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)"
) }}

View File

@ -0,0 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_query(
model = "transactions",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)"
) }}

View File

@ -0,0 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_fr_query(
model = "blocks",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)"
) }}

View File

@ -0,0 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_fr_query(
model = "receipts",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)"
) }}

View File

@ -0,0 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_fr_query(
model = "traces",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)"
) }}

View File

@ -0,0 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_fr_query(
model = "transactions",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)"
) }}

View File

@ -0,0 +1,8 @@
{{ config(
materialized = 'table',
cluster_by = 'round(_id,-3)',
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION"
) }}
{{ fsc_evm.number_sequence(
max_num = 50000000
) }}

19
models/sources.yml Normal file
View File

@ -0,0 +1,19 @@
version: 2
sources:
- name: github_actions
database: "{{ 'CORE' if target.database == 'CORE' else 'CORE_DEV' }}"
schema: github_actions
tables:
- name: workflows
- name: bronze_streamline
database: streamline
schema: |
{{ "CORE_DEV" if var("STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES", False) else "CORE" }}
tables:
- name: blocks
- name: transactions
- name: receipts
- name: traces
- name: confirm_blocks
- name: decoded_logs

View File

@ -0,0 +1,11 @@
-- depends_on: {{ ref('bronze__streamline_blocks') }}
{{ config (
materialized = "incremental",
unique_key = "block_number",
cluster_by = "ROUND(block_number, -3)",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)",
tags = ['streamline_core_complete']
) }}
{{ fsc_evm.streamline_core_complete(
model = 'blocks'
) }}

View File

@ -0,0 +1,11 @@
-- depends_on: {{ ref('bronze__streamline_receipts') }}
{{ config (
materialized = "incremental",
unique_key = "block_number",
cluster_by = "ROUND(block_number, -3)",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)",
tags = ['streamline_core_complete']
) }}
{{ fsc_evm.streamline_core_complete(
model = 'receipts'
) }}

View File

@ -0,0 +1,11 @@
-- depends_on: {{ ref('bronze__streamline_traces') }}
{{ config (
materialized = "incremental",
unique_key = "block_number",
cluster_by = "ROUND(block_number, -3)",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)",
tags = ['streamline_core_complete']
) }}
{{ fsc_evm.streamline_core_complete(
model = 'traces'
) }}

View File

@ -0,0 +1,11 @@
-- depends_on: {{ ref('bronze__streamline_transactions') }}
{{ config (
materialized = "incremental",
unique_key = "block_number",
cluster_by = "ROUND(block_number, -3)",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)",
tags = ['streamline_core_complete']
) }}
{{ fsc_evm.streamline_core_complete(
model = 'transactions'
) }}

View File

@ -0,0 +1,29 @@
{{ config(
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = "streamline.udf_bulk_rest_api_v2",
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table": "blocks",
"sql_limit": "50000",
"producer_batch_size": "1000",
"worker_batch_size": "100",
"sql_source": "{{this.identifier}}",
"exploded_key": tojson(["data", "result.transactions"]) }
),
tags = ['streamline_core_realtime']
) }}
{{ fsc_evm.streamline_core_requests(
model_type = 'realtime',
model = 'blocks_transactions',
vault_secret_path = 'Vault/prod/core/ankr/mainnet',
query_limit = 3,
new_build = true,
testing_limit = 3
) }}
{#
TO DO:
add back: quantum_state = 'streamline',
up query limit
add retry later on
#}

View File

@ -0,0 +1,29 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table": "receipts",
"sql_limit": "100000",
"producer_batch_size": "100",
"worker_batch_size": "10",
"sql_source": "{{this.identifier}}",
"exploded_key": tojson(["result"]) }
),
tags = ['streamline_core_realtime']
) }}
{{ fsc_evm.streamline_core_requests(
model_type = 'realtime',
model = 'receipts',
vault_secret_path = 'Vault/prod/core/ankr/mainnet',
query_limit = 3,
new_build = true,
testing_limit = 3
) }}
{#
TO DO:
add back: quantum_state = 'streamline',
up query limit
add retry later on
#}

View File

@ -0,0 +1,29 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table": "traces",
"sql_limit": "100000",
"producer_batch_size": "100",
"worker_batch_size": "10",
"sql_source": "{{this.identifier}}",
"exploded_key": tojson(["result"]) }
),
tags = ['streamline_core_realtime']
) }}
{{ fsc_evm.streamline_core_requests(
model_type = 'realtime',
model = 'traces',
vault_secret_path = 'Vault/prod/core/ankr/mainnet',
query_limit = 3,
new_build = true,
testing_limit = 3
) }}
{#
TO DO:
add back: quantum_state = 'streamline',
up query limit
add retry later on
#}

View File

@ -0,0 +1,5 @@
{{ config (
materialized = "view",
tags = ['streamline_core_complete']
) }}
{{ fsc_evm.block_sequence(min_block = 17962900) }}

View File

@ -0,0 +1,7 @@
{{ config (
materialized = 'table',
tags = ['streamline_core_complete']
) }}
{{ fsc_evm.streamline_core_chainhead(
vault_secret_path = 'Vault/prod/core/ankr/mainnet'
) }}

18
package-lock.yml Normal file
View File

@ -0,0 +1,18 @@
packages:
- package: calogica/dbt_expectations
version: 0.8.2
- package: dbt-labs/dbt_external_tables
version: 0.8.2
- package: dbt-labs/dbt_utils
version: 1.0.0
- git: https://github.com/FlipsideCrypto/fsc-utils.git
revision: eb33ac727af26ebc8a8cc9711d4a6ebc3790a107
- package: get-select/dbt_snowflake_query_tags
version: 2.5.0
- git: https://github.com/FlipsideCrypto/fsc-evm.git
revision: a700111699af1a355d9076dde9aa04e26efb0617
- package: calogica/dbt_date
version: 0.7.2
- git: https://github.com/FlipsideCrypto/livequery-models.git
revision: b024188be4e9c6bc00ed77797ebdc92d351d620e
sha1_hash: 3742a22068bc9996bdc221603634fe51fe62e11e

13
packages.yml Normal file
View File

@ -0,0 +1,13 @@
packages:
- package: calogica/dbt_expectations
version: 0.8.2
- package: dbt-labs/dbt_external_tables
version: 0.8.2
- package: dbt-labs/dbt_utils
version: 1.0.0
- git: https://github.com/FlipsideCrypto/fsc-utils.git
revision: v1.29.0
- package: get-select/dbt_snowflake_query_tags
version: [">=2.0.0", "<3.0.0"]
- git: https://github.com/FlipsideCrypto/fsc-evm.git
revision: v1.9.0

31
profiles.yml Normal file
View File

@ -0,0 +1,31 @@
core:
target: prod
outputs:
dev:
type: snowflake
account: "{{ env_var('ACCOUNT') }}"
role: "{{ env_var('ROLE') }}"
user: "{{ env_var('USER') }}"
password: "{{ env_var('PASSWORD') }}"
region: "{{ env_var('REGION') }}"
database: "{{ env_var('DATABASE') }}"
warehouse: "{{ env_var('WAREHOUSE') }}"
schema: SILVER
threads: 4
client_session_keep_alive: False
query_tag: core_curator
prod:
type: snowflake
account: "{{ env_var('ACCOUNT') }}"
role: "{{ env_var('ROLE') }}"
user: "{{ env_var('USER') }}"
password: "{{ env_var('PASSWORD') }}"
region: "{{ env_var('REGION') }}"
database: "{{ env_var('DATABASE') }}"
warehouse: "{{ env_var('WAREHOUSE') }}"
schema: SILVER
threads: 4
client_session_keep_alive: False
query_tag: core_curator
config:
send_anonymous_usage_stats: False

2
requirements.txt Normal file
View File

@ -0,0 +1,2 @@
dbt-snowflake>=1.7,<1.8
protobuf==4.25.3

7
selectors.yml Normal file
View File

@ -0,0 +1,7 @@
selectors:
- name: integration_tests
description: "Selector for integration tests"
definition:
union:
- method: fqn
value: "livequery_models.deploy.core._utils"

0
snapshots/.gitkeep Normal file
View File