decode traces (#813)

* decode traces

* fixes

* workflow

* exclude

* functions

* decode traces

* fix hooks

* traces workflow
This commit is contained in:
Austin 2024-02-13 14:59:03 -05:00 committed by GitHub
parent 91610ce304
commit bf4849edcf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
479 changed files with 241 additions and 0 deletions

View File

@ -0,0 +1,45 @@
name: dbt_run_streamline_traces_decoder
run-name: dbt_run_streamline_traces_decoder
on:
workflow_dispatch:
schedule:
# At minute 10. (see https://crontab.guru)
- cron: '10 * * * *'
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m "ethereum_models,tag:streamline_decoded_traces_complete" "ethereum_models,tag:streamline_decoded_traces_realtime"

View File

@ -169,6 +169,17 @@
{%- endif %};
{% endmacro %}
{% macro create_udf_bulk_decode_traces() %}
CREATE
OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_decode_traces(
json OBJECT
) returns ARRAY api_integration = aws_ethereum_api AS {% if target.name == "prod" %}
'https://e03pt6v501.execute-api.us-east-1.amazonaws.com/prod/bulk_decode_traces'
{% else %}
'https://mryeusnrob.execute-api.us-east-1.amazonaws.com/dev/bulk_decode_traces'
{%- endif %};
{% endmacro %}
{% macro create_udf_rest_api() %}
CREATE
OR REPLACE EXTERNAL FUNCTION streamline.udf_rest_api(

View File

@ -62,6 +62,7 @@ sources:
- name: receipts
- name: traces
- name: confirm_blocks
- name: decoded_traces
- name: crosschain_silver
database: "{{ 'crosschain' if target.database == 'ETHEREUM' else 'crosschain_dev' }}"
schema: silver

View File

@ -0,0 +1,41 @@
{{ config (
materialized = 'view'
) }}
WITH meta AS (
SELECT
last_modified AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 6), '_', 1) AS INTEGER) AS _partition_by_block_number,
TO_DATE(
concat_ws('-', SPLIT_PART(file_name, '/', 3), SPLIT_PART(file_name, '/', 4), SPLIT_PART(file_name, '/', 5))
) AS _partition_by_created_date
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze_streamline", "decoded_traces") }}')
) A
)
SELECT
block_number,
id :: STRING AS id,
DATA,
_inserted_timestamp,
s._partition_by_block_number AS _partition_by_block_number,
s._partition_by_created_date AS _partition_by_created_date
FROM
{{ source(
"bronze_streamline",
"decoded_traces"
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b._partition_by_block_number = s._partition_by_block_number
AND b._partition_by_created_date = s._partition_by_created_date
WHERE
b._partition_by_block_number = s._partition_by_block_number
AND b._partition_by_created_date = s._partition_by_created_date
AND s._partition_by_created_date >= DATEADD('day', -2, CURRENT_TIMESTAMP())

View File

@ -0,0 +1,41 @@
{{ config (
materialized = 'view'
) }}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 6), '_', 1) AS INTEGER) AS _partition_by_block_number,
TO_DATE(
concat_ws('-', SPLIT_PART(file_name, '/', 3), SPLIT_PART(file_name, '/', 4), SPLIT_PART(file_name, '/', 5))
) AS _partition_by_created_date
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", "decoded_traces") }}'
)
) A
)
SELECT
block_number,
id :: STRING AS id,
DATA,
_inserted_timestamp,
s._partition_by_block_number AS _partition_by_block_number,
s._partition_by_created_date AS _partition_by_created_date
FROM
{{ source(
"bronze_streamline",
"decoded_traces"
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b._partition_by_block_number = s._partition_by_block_number
AND b._partition_by_created_date = s._partition_by_created_date
WHERE
b._partition_by_block_number = s._partition_by_block_number
AND b._partition_by_created_date = s._partition_by_created_date

View File

@ -0,0 +1,33 @@
-- depends_on: {{ ref('bronze__decoded_traces') }}
{{ config (
materialized = "incremental",
unique_key = "_call_id",
cluster_by = "ROUND(block_number, -3)",
incremental_predicates = ["dynamic_range", "block_number"],
merge_update_columns = ["_call_id"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(_call_id)",
tags = ['streamline_decoded_traces_complete']
) }}
SELECT
block_number,
id AS _call_id,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__decoded_traces') }}
WHERE
TO_TIMESTAMP_NTZ(_inserted_timestamp) >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% else %}
{{ ref('bronze__fr_decoded_traces') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY id
ORDER BY
_inserted_timestamp DESC)) = 1

Some files were not shown because too many files have changed in this diff Show More