From dc15263263d1d26e53e29b0d1ad65ab42a5bc2c3 Mon Sep 17 00:00:00 2001 From: Austin Date: Tue, 19 Dec 2023 16:43:25 -0500 Subject: [PATCH 1/3] traces models --- macros/create_udfs.sql | 1 - .../core/bronze__streamline_FR_traces.sql | 57 +++++++++++++++++++ .../bronze/core/bronze__streamline_traces.sql | 57 +++++++++++++++++++ .../complete/streamline__complete_traces.sql | 31 ++++++++++ .../history/streamline__traces_history.sql | 38 +++++++++++++ models/sources.yml | 1 + 6 files changed, 184 insertions(+), 1 deletion(-) create mode 100644 models/bronze/core/bronze__streamline_FR_traces.sql create mode 100644 models/bronze/core/bronze__streamline_traces.sql create mode 100644 models/silver/streamline/complete/streamline__complete_traces.sql create mode 100644 models/silver/streamline/history/streamline__traces_history.sql diff --git a/macros/create_udfs.sql b/macros/create_udfs.sql index ae881f8..f816180 100644 --- a/macros/create_udfs.sql +++ b/macros/create_udfs.sql @@ -7,6 +7,5 @@ {% endset %} {% do run_query(sql) %} {{- fsc_utils.create_udfs() -}} - {% endif %} {% endmacro %} diff --git a/models/bronze/core/bronze__streamline_FR_traces.sql b/models/bronze/core/bronze__streamline_FR_traces.sql new file mode 100644 index 0000000..89a7c27 --- /dev/null +++ b/models/bronze/core/bronze__streamline_FR_traces.sql @@ -0,0 +1,57 @@ +{{ config ( + materialized = 'view' +) }} + +WITH meta AS ( + + SELECT + registered_on AS _inserted_timestamp, + file_name, + CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER) AS _partition_by_block_id + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze_streamline", "traces") }}' + ) + ) A +) +SELECT + block_number, + tx_hash, + DATA, + _inserted_timestamp, + MD5( + CAST( + COALESCE(CAST(tx_hash AS text), '' :: STRING) AS text + ) + ) AS id, + s._partition_by_block_id, + s.value AS VALUE +FROM + {{ source( + "bronze_streamline", + "traces" + ) }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b._partition_by_block_id = s._partition_by_block_id +WHERE + b._partition_by_block_id = s._partition_by_block_id + AND ( + DATA :error :code IS NULL + OR DATA :error :code NOT IN ( + '-32000', + '-32001', + '-32002', + '-32003', + '-32004', + '-32005', + '-32006', + '-32007', + '-32008', + '-32009', + '-32010', + '-32608' + ) + ) diff --git a/models/bronze/core/bronze__streamline_traces.sql b/models/bronze/core/bronze__streamline_traces.sql new file mode 100644 index 0000000..d64f9e0 --- /dev/null +++ b/models/bronze/core/bronze__streamline_traces.sql @@ -0,0 +1,57 @@ +{{ config ( + materialized = 'view' +) }} + +WITH meta AS ( + + SELECT + last_modified AS _inserted_timestamp, + file_name, + CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER) AS _partition_by_block_id + FROM + TABLE( + information_schema.external_table_file_registration_history( + start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()), + table_name => '{{ source( "bronze_streamline", "traces") }}') + ) A + ) + SELECT + block_number, + tx_hash, + DATA, + _inserted_timestamp, + MD5( + CAST( + COALESCE(CAST(tx_hash AS text), '' :: STRING) AS text + ) + ) AS id, + s._partition_by_block_id, + s.value AS VALUE + FROM + {{ source( + "bronze_streamline", + "traces" + ) }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b._partition_by_block_id = s._partition_by_block_id + WHERE + b._partition_by_block_id = s._partition_by_block_id + AND ( + DATA :error :code IS NULL + OR DATA :error :code NOT IN ( + '-32000', + '-32001', + '-32002', + '-32003', + '-32004', + '-32005', + '-32006', + '-32007', + '-32008', + '-32009', + '-32010', + '-32608' + ) + ) diff --git a/models/silver/streamline/complete/streamline__complete_traces.sql b/models/silver/streamline/complete/streamline__complete_traces.sql new file mode 100644 index 0000000..55bc355 --- /dev/null +++ b/models/silver/streamline/complete/streamline__complete_traces.sql @@ -0,0 +1,31 @@ +-- depends_on: {{ ref('bronze__streamline_transactions') }} +{{ config ( + materialized = "incremental", + unique_key = "id", + cluster_by = "ROUND(block_number, -3)", + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" +) }} + +SELECT + id, + block_number, + tx_hash, + _inserted_timestamp +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_traces') }} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + {{ this }} + ) +{% else %} + {{ ref('bronze__streamline_FR_traces') }} +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY id +ORDER BY + _inserted_timestamp DESC)) = 1 diff --git a/models/silver/streamline/history/streamline__traces_history.sql b/models/silver/streamline/history/streamline__traces_history.sql new file mode 100644 index 0000000..9915832 --- /dev/null +++ b/models/silver/streamline/history/streamline__traces_history.sql @@ -0,0 +1,38 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'traces', 'exploded_key','[\"result\"]', 'producer_batch_size',10000, 'producer_limit_size',2000000, 'worker_batch_size',100))", + target = "{{this.schema}}.{{this.identifier}}" + ) +) }} + +WITH tbl AS ( + + SELECT + block_number, + tx_hash + FROM + {{ ref("silver__transactions") }} + WHERE + block_number IS NOT NULL + AND tx_hash IS NOT NULL + EXCEPT + SELECT + block_number, + tx_hash + FROM + {{ ref("streamline__complete_traces") }} + WHERE + block_number IS NOT NULL + AND tx_hash IS NOT NULL +) +SELECT + block_number, + 'debug_traceTransaction' AS method, + tx_hash AS params +FROM + tbl +ORDER BY + block_number ASC +LIMIT + 50 diff --git a/models/sources.yml b/models/sources.yml index 83ab44d..fa42ca7 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -18,6 +18,7 @@ sources: - name: blocks - name: transactions - name: tx_receipts + - name: traces - name: crosschain_silver database: crosschain From 5d19ecfb2e23039fc78fd9614997eb5981c9cd1f Mon Sep 17 00:00:00 2001 From: Austin Date: Tue, 19 Dec 2023 16:45:25 -0500 Subject: [PATCH 2/3] traces --- .../silver/streamline/complete/streamline__complete_traces.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/models/silver/streamline/complete/streamline__complete_traces.sql b/models/silver/streamline/complete/streamline__complete_traces.sql index 55bc355..d386fcd 100644 --- a/models/silver/streamline/complete/streamline__complete_traces.sql +++ b/models/silver/streamline/complete/streamline__complete_traces.sql @@ -1,4 +1,4 @@ --- depends_on: {{ ref('bronze__streamline_transactions') }} +-- depends_on: {{ ref('bronze__streamline_traces') }} {{ config ( materialized = "incremental", unique_key = "id", From db9e3b6886ed95fe4d6e593237785d8723b8edb5 Mon Sep 17 00:00:00 2001 From: Austin Date: Tue, 19 Dec 2023 20:52:10 -0500 Subject: [PATCH 3/3] history --- .../dbt_run_streamline_traces_history.yml | 34 +++++++++++++++++++ .../core/bronze__streamline_FR_traces.sql | 2 +- .../bronze/core/bronze__streamline_traces.sql | 2 +- .../complete/streamline__complete_traces.sql | 5 +-- .../history/streamline__traces_history.sql | 10 +++--- 5 files changed, 45 insertions(+), 8 deletions(-) create mode 100644 .github/workflows/dbt_run_streamline_traces_history.yml diff --git a/.github/workflows/dbt_run_streamline_traces_history.yml b/.github/workflows/dbt_run_streamline_traces_history.yml new file mode 100644 index 0000000..65f8547 --- /dev/null +++ b/.github/workflows/dbt_run_streamline_traces_history.yml @@ -0,0 +1,34 @@ +name: dbt_run_streamline_traces_history +run-name: dbt_run_streamline_traces_history + +on: + workflow_dispatch: + schedule: + # Run every 2 hours + - cron: "0 */2 * * *" + +env: + USE_VARS: "${{ vars.USE_VARS }}" + DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}" + DBT_VERSION: "${{ vars.DBT_VERSION }}" + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + called_workflow_template: + uses: FlipsideCrypto/analytics-workflow-templates/.github/workflows/dbt_run_template.yml@main + with: + dbt_command: > + dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/silver/streamline/history/streamline__traces_history.sql + environment: workflow_prod + warehouse: ${{ vars.WAREHOUSE }} + secrets: inherit diff --git a/models/bronze/core/bronze__streamline_FR_traces.sql b/models/bronze/core/bronze__streamline_FR_traces.sql index 89a7c27..2cc38e5 100644 --- a/models/bronze/core/bronze__streamline_FR_traces.sql +++ b/models/bronze/core/bronze__streamline_FR_traces.sql @@ -17,7 +17,7 @@ WITH meta AS ( ) SELECT block_number, - tx_hash, + s.value :metadata :request :params [0] :: STRING AS tx_hash, DATA, _inserted_timestamp, MD5( diff --git a/models/bronze/core/bronze__streamline_traces.sql b/models/bronze/core/bronze__streamline_traces.sql index d64f9e0..b8060de 100644 --- a/models/bronze/core/bronze__streamline_traces.sql +++ b/models/bronze/core/bronze__streamline_traces.sql @@ -17,7 +17,7 @@ WITH meta AS ( ) SELECT block_number, - tx_hash, + s.value :metadata :request :params [0] :: STRING AS tx_hash, DATA, _inserted_timestamp, MD5( diff --git a/models/silver/streamline/complete/streamline__complete_traces.sql b/models/silver/streamline/complete/streamline__complete_traces.sql index d386fcd..8348091 100644 --- a/models/silver/streamline/complete/streamline__complete_traces.sql +++ b/models/silver/streamline/complete/streamline__complete_traces.sql @@ -16,9 +16,10 @@ FROM {% if is_incremental() %} {{ ref('bronze__streamline_traces') }} WHERE - _inserted_timestamp >= ( + _inserted_timestamp >= + ( SELECT - MAX(_inserted_timestamp) _inserted_timestamp + ifnull(MAX(_inserted_timestamp),'1900-01-01' :: timestamp_ntz ) _inserted_timestamp FROM {{ this }} ) diff --git a/models/silver/streamline/history/streamline__traces_history.sql b/models/silver/streamline/history/streamline__traces_history.sql index 9915832..33cdcc8 100644 --- a/models/silver/streamline/history/streamline__traces_history.sql +++ b/models/silver/streamline/history/streamline__traces_history.sql @@ -1,7 +1,7 @@ {{ config ( materialized = "view", post_hook = if_data_call_function( - func = "{{this.schema}}.udf_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'traces', 'exploded_key','[\"result\"]', 'producer_batch_size',10000, 'producer_limit_size',2000000, 'worker_batch_size',100))", + func = "{{this.schema}}.udf_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'traces', 'producer_batch_size',10000, 'producer_limit_size',1000000, 'worker_batch_size',100))", target = "{{this.schema}}.{{this.identifier}}" ) ) }} @@ -29,10 +29,12 @@ WITH tbl AS ( SELECT block_number, 'debug_traceTransaction' AS method, - tx_hash AS params + CONCAT( + tx_hash, + '_-_', + '{"tracer": "callTracer","timeout": "30s"}' + ) AS params FROM tbl ORDER BY block_number ASC -LIMIT - 50