From ec708c30e01cbd72c84c2fa0a8dcf52f9dc01182 Mon Sep 17 00:00:00 2001 From: xiuy001 Date: Thu, 29 Jun 2023 16:33:27 -0400 Subject: [PATCH] added the GHA --- .../workflows/dbt_run_streamline_realtime.yml | 34 ++++ dbt_project.yml | 8 +- macros/js_hextoint.sql | 6 + macros/python/udfs.sql | 21 ++ macros/streamline/api_integrations.sql | 17 ++ macros/streamline/get_base_table_udtf.sql | 22 +++ macros/streamline/models.sql | 179 ++++++++++++++++++ macros/streamline/streamline_udfs.sql | 23 +++ macros/utils.sql | 78 ++++++++ .../core/bronze__streamline_FR_blocks.sql | 11 ++ .../bronze__streamline_FR_transactions.sql | 11 ++ .../bronze/core/bronze__streamline_blocks.sql | 11 ++ .../core/bronze__streamline_transactions.sql | 11 ++ .../complete/streamline__complete_blocks.sql | 30 +++ .../streamline__complete_transactions.sql | 30 +++ .../realtime/streamline__blocks_realtime.sql | 55 ++++++ .../streamline__transactions_realtime.sql | 55 ++++++ .../silver/streamline/streamline__blocks.sql | 21 ++ models/sources.yml | 8 + 19 files changed, 629 insertions(+), 2 deletions(-) create mode 100644 .github/workflows/dbt_run_streamline_realtime.yml create mode 100644 macros/js_hextoint.sql create mode 100644 macros/python/udfs.sql create mode 100644 macros/streamline/api_integrations.sql create mode 100644 macros/streamline/get_base_table_udtf.sql create mode 100644 macros/streamline/models.sql create mode 100644 macros/streamline/streamline_udfs.sql create mode 100644 macros/utils.sql create mode 100644 models/bronze/core/bronze__streamline_FR_blocks.sql create mode 100644 models/bronze/core/bronze__streamline_FR_transactions.sql create mode 100644 models/bronze/core/bronze__streamline_blocks.sql create mode 100644 models/bronze/core/bronze__streamline_transactions.sql create mode 100644 models/silver/streamline/complete/streamline__complete_blocks.sql create mode 100644 models/silver/streamline/complete/streamline__complete_transactions.sql create mode 100644 models/silver/streamline/realtime/streamline__blocks_realtime.sql create mode 100644 models/silver/streamline/realtime/streamline__transactions_realtime.sql create mode 100644 models/silver/streamline/streamline__blocks.sql diff --git a/.github/workflows/dbt_run_streamline_realtime.yml b/.github/workflows/dbt_run_streamline_realtime.yml new file mode 100644 index 0000000..3227803 --- /dev/null +++ b/.github/workflows/dbt_run_streamline_realtime.yml @@ -0,0 +1,34 @@ +name: dbt_run_streamline_realtime +run-name: dbt_run_streamline_realtime + +on: + workflow_dispatch: + schedule: + # Runs "every 2 hours" (see https://crontab.guru) + - cron: '0 */2 * * *' + +env: + USE_VARS: "${{ vars.USE_VARS }}" + DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}" + DBT_VERSION: "${{ vars.DBT_VERSION }}" + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + called_workflow_template: + uses: FlipsideCrypto/analytics-workflow-templates/.github/workflows/dbt_run_template.yml@main + with: + dbt_command: > + dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/silver/streamline/realtime/ + environment: workflow_prod + warehouse: ${{ vars.WAREHOUSE }} + secrets: inherit diff --git a/dbt_project.yml b/dbt_project.yml index 630cd9b..7200917 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -41,8 +41,12 @@ models: vars: "dbt_date:time_zone": GMT - "UPDATE_SNOWFLAKE_TAGS": TRUE - "UPDATE_UDFS_AND_SPS": FALSE + STREAMLINE_INVOKE_STREAMS: False + STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES: False + STREAMLINE_RUN_HISTORY: False + UPDATE_UDFS_AND_SPS: False + UPDATE_SNOWFLAKE_TAGS: True + WAIT: 0 tests: +store_failures: true # all tests diff --git a/macros/js_hextoint.sql b/macros/js_hextoint.sql new file mode 100644 index 0000000..8742ac3 --- /dev/null +++ b/macros/js_hextoint.sql @@ -0,0 +1,6 @@ +{% macro create_js_hex_to_int() %} + CREATE + OR REPLACE FUNCTION {{ target.schema }}.js_hex_to_int ( + s STRING + ) returns DOUBLE LANGUAGE javascript AS 'if (S !== null) { yourNumber = parseInt(S, 16); } return yourNumber' +{% endmacro %} diff --git a/macros/python/udfs.sql b/macros/python/udfs.sql new file mode 100644 index 0000000..df0489e --- /dev/null +++ b/macros/python/udfs.sql @@ -0,0 +1,21 @@ +{% macro create_udf_hex_to_int(schema) %} +create or replace function {{ schema }}.udf_hex_to_int(hex string) +returns string +language python +runtime_version = '3.8' +handler = 'hex_to_int' +as +$$ +def hex_to_int(hex) -> str: + """ + Converts hex (of any size) to int (as a string). Snowflake and java script can only handle up to 64-bit (38 digits of precision) + select hex_to_int('200000000000000000000000000000211'); + >> 680564733841876926926749214863536423441 + select hex_to_int('0x200000000000000000000000000000211'); + >> 680564733841876926926749214863536423441 + select hex_to_int(NULL); + >> NULL + """ + return str(int(hex, 16)) if hex else None +$$; +{% endmacro %} \ No newline at end of file diff --git a/macros/streamline/api_integrations.sql b/macros/streamline/api_integrations.sql new file mode 100644 index 0000000..92f44a2 --- /dev/null +++ b/macros/streamline/api_integrations.sql @@ -0,0 +1,17 @@ +{% macro create_aws_aurora_api() %} + {% if target.name == "prod" %} + {% set sql %} + CREATE api integration IF NOT EXISTS aws_aurora_api api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::490041342817:role/aurora-api-prod-rolesnowflakeudfsAF733095-3WVDCVO54NPX' api_allowed_prefixes = ( + 'https://sl2f5beopl.execute-api.us-east-1.amazonaws.com/prod/' + ) enabled = TRUE; + {% endset %} + {% do run_query(sql) %} + {% else %} + {% set sql %} + CREATE api integration IF NOT EXISTS aws_aurora_dev_api api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::490041342817:role/aurora-api-dev-rolesnowflakeudfsAF733095-1MX4LOX4UFE9M' api_allowed_prefixes = ( + 'https://66lx4fxkui.execute-api.us-east-1.amazonaws.com/dev/' + ) enabled = TRUE; + {% endset %} + {% do run_query(sql) %} + {% endif %} +{% endmacro %} \ No newline at end of file diff --git a/macros/streamline/get_base_table_udtf.sql b/macros/streamline/get_base_table_udtf.sql new file mode 100644 index 0000000..c4ee9ab --- /dev/null +++ b/macros/streamline/get_base_table_udtf.sql @@ -0,0 +1,22 @@ +{% macro create_udtf_get_base_table(schema) %} + CREATE + OR REPLACE FUNCTION {{ schema }}.udtf_get_base_table( + max_height INTEGER + ) returns TABLE ( + height NUMBER + ) AS $$ WITH base AS ( + SELECT + ROW_NUMBER() over ( + ORDER BY + SEQ4() + ) AS id + FROM + TABLE(GENERATOR(rowcount => 100000000)) + ) +SELECT + id AS height +FROM + base +WHERE + id <= max_height $$; +{% endmacro %} diff --git a/macros/streamline/models.sql b/macros/streamline/models.sql new file mode 100644 index 0000000..e17327d --- /dev/null +++ b/macros/streamline/models.sql @@ -0,0 +1,179 @@ +{% macro decode_logs_history( + start, + stop + ) %} + WITH look_back AS ( + SELECT + block_number + FROM + {{ ref("_max_block_by_date") }} + qualify ROW_NUMBER() over ( + ORDER BY + block_number DESC + ) = 1 + ) +SELECT + l.block_number, + l._log_id, + A.abi AS abi, + OBJECT_CONSTRUCT( + 'topics', + l.topics, + 'data', + l.data, + 'address', + l.contract_address + ) AS DATA +FROM + {{ ref("silver__logs") }} + l + INNER JOIN {{ ref("silver__complete_event_abis") }} A + ON A.parent_contract_address = l.contract_address + AND A.event_signature = l.topics[0]:: STRING + AND l.block_number BETWEEN A.start_block + AND A.end_block +WHERE + ( + l.block_number BETWEEN {{ start }} + AND {{ stop }} + ) + AND l.block_number <= ( + SELECT + block_number + FROM + look_back + ) + AND _log_id NOT IN ( + SELECT + _log_id + FROM + {{ ref("streamline__complete_decode_logs") }} + WHERE + ( + block_number BETWEEN {{ start }} + AND {{ stop }} + ) + AND block_number <= ( + SELECT + block_number + FROM + look_back + ) + ) +{% endmacro %} + +{% macro streamline_external_table_query( + model, + partition_function, + partition_name, + unique_key + ) %} + WITH meta AS ( + SELECT + job_created_time AS _inserted_timestamp, + file_name, + {{ partition_function }} AS {{ partition_name }} + FROM + TABLE( + information_schema.external_table_file_registration_history( + start_time => DATEADD('day', -7, CURRENT_TIMESTAMP()), + table_name => '{{ source( "bronze_streamline", model) }}') + ) A + ) + SELECT + {{ unique_key }}, + DATA, + _inserted_timestamp, + MD5( + CAST( + COALESCE(CAST({{ unique_key }} AS text), '' :: STRING) AS text + ) + ) AS id, + s.{{ partition_name }}, + s.value AS VALUE + FROM + {{ source( + "bronze_streamline", + model + ) }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.{{ partition_name }} = s.{{ partition_name }} + WHERE + b.{{ partition_name }} = s.{{ partition_name }} + AND ( + DATA :error :code IS NULL + OR DATA :error :code NOT IN ( + '-32000', + '-32001', + '-32002', + '-32003', + '-32004', + '-32005', + '-32006', + '-32007', + '-32008', + '-32009', + '-32010' + ) + ) +{% endmacro %} + +{% macro streamline_external_table_FR_query( + model, + partition_function, + partition_name, + unique_key + ) %} + WITH meta AS ( + SELECT + registered_on AS _inserted_timestamp, + file_name, + {{ partition_function }} AS {{ partition_name }} + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze_streamline", model) }}' + ) + ) A + ) +SELECT + {{ unique_key }}, + DATA, + _inserted_timestamp, + MD5( + CAST( + COALESCE(CAST({{ unique_key }} AS text), '' :: STRING) AS text + ) + ) AS id, + s.{{ partition_name }}, + s.value AS VALUE +FROM + {{ source( + "bronze_streamline", + model + ) }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.{{ partition_name }} = s.{{ partition_name }} +WHERE + b.{{ partition_name }} = s.{{ partition_name }} + AND ( + DATA :error :code IS NULL + OR DATA :error :code NOT IN ( + '-32000', + '-32001', + '-32002', + '-32003', + '-32004', + '-32005', + '-32006', + '-32007', + '-32008', + '-32009', + '-32010' + ) + ) +{% endmacro %} diff --git a/macros/streamline/streamline_udfs.sql b/macros/streamline/streamline_udfs.sql new file mode 100644 index 0000000..767ac10 --- /dev/null +++ b/macros/streamline/streamline_udfs.sql @@ -0,0 +1,23 @@ +{% macro create_udf_get_chainhead() %} + {% if target.name == "prod" %} + CREATE OR REPLACE EXTERNAL FUNCTION streamline.udf_get_chainhead() returns variant api_integration = aws_aurora_api AS + 'https://sl2f5beopl.execute-api.us-east-1.amazonaws.com/prod/get_chainhead' + {% else %} + CREATE OR REPLACE EXTERNAL FUNCTION streamline.udf_get_chainhead() returns variant api_integration = aws_aurora_dev_api AS + 'https://66lx4fxkui.execute-api.us-east-1.amazonaws.com/dev/get_chainhead' + {%- endif %}; +{% endmacro %} + +{% macro create_udf_json_rpc() %} + {% if target.name == "prod" %} + CREATE OR REPLACE EXTERNAL FUNCTION streamline.udf_json_rpc( + json OBJECT + ) returns ARRAY api_integration = aws_aurora_api AS + 'https://sl2f5beopl.execute-api.us-east-1.amazonaws.com/prod/bulk_get_json_rpc' + {% else %} + CREATE OR REPLACE EXTERNAL FUNCTION streamline.udf_json_rpc( + json OBJECT + ) returns ARRAY api_integration = aws_aurora_dev_api AS + 'https://66lx4fxkui.execute-api.us-east-1.amazonaws.com/dev/bulk_get_json_rpc' + {%- endif %}; +{% endmacro %} \ No newline at end of file diff --git a/macros/utils.sql b/macros/utils.sql new file mode 100644 index 0000000..0b800b8 --- /dev/null +++ b/macros/utils.sql @@ -0,0 +1,78 @@ +{% macro if_data_call_function( + func, + target + ) %} + {% if var( + "STREAMLINE_INVOKE_STREAMS" + ) %} + {% if execute %} + {{ log( + "Running macro `if_data_call_function`: Calling udf " ~ func ~ " on " ~ target, + True + ) }} + {% endif %} + SELECT + {{ func }} + WHERE + EXISTS( + SELECT + 1 + FROM + {{ target }} + LIMIT + 1 + ) + {% else %} + {% if execute %} + {{ log( + "Running macro `if_data_call_function`: NOOP", + False + ) }} + {% endif %} + SELECT + NULL + {% endif %} +{% endmacro %} + +{% macro if_data_call_wait() %} + {% if var( + "STREAMLINE_INVOKE_STREAMS" + ) %} + {% set query %} + SELECT + 1 + WHERE + EXISTS( + SELECT + 1 + FROM + {{ model.schema ~ "." ~ model.alias }} + LIMIT + 1 + ) {% endset %} + {% if execute %} + {% set results = run_query( + query + ) %} + {% if results %} + {{ log( + "Waiting...", + info = True + ) }} + + {% set wait_query %} + SELECT + system$wait( + {{ var( + "WAIT", + 600 + ) }} + ) {% endset %} + {% do run_query(wait_query) %} + {% else %} + SELECT + NULL; + {% endif %} + {% endif %} + {% endif %} +{% endmacro %} diff --git a/models/bronze/core/bronze__streamline_FR_blocks.sql b/models/bronze/core/bronze__streamline_FR_blocks.sql new file mode 100644 index 0000000..e0a16c6 --- /dev/null +++ b/models/bronze/core/bronze__streamline_FR_blocks.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_") [-1] %} +{{ streamline_external_table_FR_query( + model, + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} diff --git a/models/bronze/core/bronze__streamline_FR_transactions.sql b/models/bronze/core/bronze__streamline_FR_transactions.sql new file mode 100644 index 0000000..e0a16c6 --- /dev/null +++ b/models/bronze/core/bronze__streamline_FR_transactions.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_") [-1] %} +{{ streamline_external_table_FR_query( + model, + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} diff --git a/models/bronze/core/bronze__streamline_blocks.sql b/models/bronze/core/bronze__streamline_blocks.sql new file mode 100644 index 0000000..de33225 --- /dev/null +++ b/models/bronze/core/bronze__streamline_blocks.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_") [-1] %} +{{ streamline_external_table_query( + model, + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} diff --git a/models/bronze/core/bronze__streamline_transactions.sql b/models/bronze/core/bronze__streamline_transactions.sql new file mode 100644 index 0000000..de33225 --- /dev/null +++ b/models/bronze/core/bronze__streamline_transactions.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_") [-1] %} +{{ streamline_external_table_query( + model, + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} diff --git a/models/silver/streamline/complete/streamline__complete_blocks.sql b/models/silver/streamline/complete/streamline__complete_blocks.sql new file mode 100644 index 0000000..f66b0fc --- /dev/null +++ b/models/silver/streamline/complete/streamline__complete_blocks.sql @@ -0,0 +1,30 @@ +-- depends_on: {{ ref('bronze__streamline_blocks') }} +{{ config ( + materialized = "incremental", + unique_key = "id", + cluster_by = "ROUND(block_number, -3)", + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" +) }} + +SELECT + id, + block_number, + _inserted_timestamp +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_blocks') }} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + {{ this }} + ) +{% else %} + {{ ref('bronze__streamline_FR_blocks') }} +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY id +ORDER BY + _inserted_timestamp DESC)) = 1 diff --git a/models/silver/streamline/complete/streamline__complete_transactions.sql b/models/silver/streamline/complete/streamline__complete_transactions.sql new file mode 100644 index 0000000..61cd345 --- /dev/null +++ b/models/silver/streamline/complete/streamline__complete_transactions.sql @@ -0,0 +1,30 @@ +-- depends_on: {{ ref('bronze__streamline_transactions') }} +{{ config ( + materialized = "incremental", + unique_key = "id", + cluster_by = "ROUND(block_number, -3)", + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" +) }} + +SELECT + id, + block_number, + _inserted_timestamp +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_transactions') }} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + {{ this }} + ) +{% else %} + {{ ref('bronze__streamline_FR_transactions') }} +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY id +ORDER BY + _inserted_timestamp DESC)) = 1 diff --git a/models/silver/streamline/realtime/streamline__blocks_realtime.sql b/models/silver/streamline/realtime/streamline__blocks_realtime.sql new file mode 100644 index 0000000..1763799 --- /dev/null +++ b/models/silver/streamline/realtime/streamline__blocks_realtime.sql @@ -0,0 +1,55 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table','blocks', 'producer_batch_size',500, 'producer_limit_size', 5000000, 'worker_batch_size',50))", + target = "{{this.schema}}.{{this.identifier}}" + ) +) }} + +WITH last_3_days AS ({% if var('STREAMLINE_RUN_HISTORY') %} + + SELECT + 0 AS block_number + {% else %} + SELECT + MAX(block_number) - 50000 AS block_number --aprox 3 days + FROM + {{ ref("streamline__blocks") }} + {% endif %}), + tbl AS ( + SELECT + block_number, + block_number_hex + FROM + {{ ref("streamline__blocks") }} + WHERE + ( + block_number >= ( + SELECT + block_number + FROM + last_3_days + ) + ) + AND block_number IS NOT NULL + EXCEPT + SELECT + block_number, + REPLACE( + concat_ws('', '0x', to_char(block_number, 'XXXXXXXX')), + ' ', + '' + ) AS block_number_hex + FROM + {{ ref("streamline__complete_blocks") }} + ) +SELECT + block_number, + 'eth_getBlockByNumber' AS method, + CONCAT( + block_number_hex, + '_-_', + 'false' + ) AS params +FROM + tbl diff --git a/models/silver/streamline/realtime/streamline__transactions_realtime.sql b/models/silver/streamline/realtime/streamline__transactions_realtime.sql new file mode 100644 index 0000000..d37b140 --- /dev/null +++ b/models/silver/streamline/realtime/streamline__transactions_realtime.sql @@ -0,0 +1,55 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'transactions', 'exploded_key','[\"result\", \"transactions\"]', 'producer_batch_size',500, 'producer_limit_size', 500000, 'worker_batch_size',50))", + target = "{{this.schema}}.{{this.identifier}}" + ) +) }} + +WITH last_3_days AS ({% if var('STREAMLINE_RUN_HISTORY') %} + + SELECT + 0 AS block_number + {% else %} + SELECT + MAX(block_number) - 50000 AS block_number --aprox 3 days + FROM + {{ ref("streamline__blocks") }} + {% endif %}), + tbl AS ( + SELECT + block_number, + block_number_hex + FROM + {{ ref("streamline__blocks") }} + WHERE + ( + block_number >= ( + SELECT + block_number + FROM + last_3_days + ) + ) + AND block_number IS NOT NULL + EXCEPT + SELECT + block_number, + REPLACE( + concat_ws('', '0x', to_char(block_number, 'XXXXXXXX')), + ' ', + '' + ) AS block_number_hex + FROM + {{ ref("streamline__complete_blocks") }} + ) +SELECT + block_number, + 'eth_getBlockByNumber' AS method, + CONCAT( + block_number_hex, + '_-_', + 'true' + ) AS params +FROM + tbl \ No newline at end of file diff --git a/models/silver/streamline/streamline__blocks.sql b/models/silver/streamline/streamline__blocks.sql new file mode 100644 index 0000000..bad8e4d --- /dev/null +++ b/models/silver/streamline/streamline__blocks.sql @@ -0,0 +1,21 @@ +{{ config ( + materialized = "view", + tags = ['streamline_view'] +) }} + +{% if execute %} + {% set height = run_query('SELECT streamline.udf_get_chainhead()') %} + {% set block_height = height.columns [0].values() [0] %} +{% else %} + {% set block_height = 0 %} +{% endif %} + +SELECT + height AS block_number, + REPLACE( + concat_ws('', '0x', to_char(height, 'XXXXXXXX')), + ' ', + '' + ) AS block_number_hex +FROM + TABLE(streamline.udtf_get_base_table({{ block_height }})) diff --git a/models/sources.yml b/models/sources.yml index 9c8bd60..d8efef0 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -8,3 +8,11 @@ sources: - name: dim_date_hours - name: address_tags - name: dim_dates + - name: bronze_streamline + database: streamline + schema: | + {{ "AURORA_DEV" if var("STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES", False) else "AURORA" }} + tables: + - name: blocks + - name: transactions + - name: tx_receipts \ No newline at end of file