diff --git a/.github/workflows/dbt_run_abi_refresh.yml b/.github/workflows/dbt_run_abi_refresh.yml new file mode 100644 index 0000000..e89628f --- /dev/null +++ b/.github/workflows/dbt_run_abi_refresh.yml @@ -0,0 +1,47 @@ +name: dbt_run_abi_refresh +run-name: dbt_run_abi_refresh + +on: + workflow_dispatch: + schedule: + # Runs “At minute 0 past every 12th hour.” (see https://crontab.guru) + - cron: '0 */12 * * *' + +env: + DBT_PROFILES_DIR: ./ + + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_prod + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v1 + with: + python-version: "3.7.x" + + - name: install dependencies + run: | + pip3 install dbt-snowflake==${{ vars.DBT_VERSION }} cli_passthrough requests click + dbt deps + - name: Run DBT Jobs + run: | + dbt run -m models/silver/abis + + + diff --git a/.github/workflows/dbt_run_incremental.yml b/.github/workflows/dbt_run_incremental.yml index a6b112a..9777f05 100644 --- a/.github/workflows/dbt_run_incremental.yml +++ b/.github/workflows/dbt_run_incremental.yml @@ -41,4 +41,4 @@ jobs: dbt deps - name: Run DBT Jobs run: | - dbt run --exclude models/silver/API_udf models/silver/streamline/* models/silver/silver__decoded_logs.sql models/silver/silver__decoded_logs_full.sql models/bronze/api_udf/bronze_api__contract_abis.sql models/silver/core/tests + dbt run --exclude models/silver/abis models/silver/API_udf models/silver/streamline/* models/silver/silver__decoded_logs.sql models/silver/silver__decoded_logs_full.sql models/bronze/api_udf/bronze_api__contract_abis.sql models/silver/core/tests diff --git a/macros/create_udfs.sql b/macros/create_udfs.sql index dc1e052..62c1cd5 100644 --- a/macros/create_udfs.sql +++ b/macros/create_udfs.sql @@ -9,6 +9,12 @@ {{ create_udtf_get_base_table( schema = "streamline" ) }} + {{ create_udf_keccak( + schema = 'silver' + ) }} + {{ create_udf_simple_event_names( + schema = 'silver' + ) }} {% endset %} {% do run_query(sql) %} diff --git a/macros/python/keccak_udf.sql b/macros/python/keccak_udf.sql new file mode 100644 index 0000000..c5b55ab --- /dev/null +++ b/macros/python/keccak_udf.sql @@ -0,0 +1,18 @@ +{% macro create_udf_keccak(schema) %} +create or replace function {{ schema }}.udf_keccak(event_name VARCHAR(255)) +RETURNS string +LANGUAGE python +runtime_version = 3.8 +packages = ('pycryptodome==3.15.0') +HANDLER = 'udf_encode' +AS +$$ +from Crypto.Hash import keccak + +def udf_encode(event_name): + keccak_hash = keccak.new(digest_bits=256) + keccak_hash.update(event_name.encode('utf-8')) + return '0x' + keccak_hash.hexdigest() +$$; + +{% endmacro %} \ No newline at end of file diff --git a/macros/python/simple_event_name_udf.sql b/macros/python/simple_event_name_udf.sql new file mode 100644 index 0000000..50ed09b --- /dev/null +++ b/macros/python/simple_event_name_udf.sql @@ -0,0 +1,31 @@ +{% macro create_udf_simple_event_names(schema) %} +create or replace function {{ schema }}.udf_simple_event_name(abi VARIANT) +RETURNS STRING +LANGUAGE PYTHON +RUNTIME_VERSION = '3.8' +HANDLER = 'get_simplified_signature' +AS +$$ +def get_simplified_signature(abi): + def generate_signature(inputs): + signature_parts = [] + for input_data in inputs: + if 'components' in input_data: + component_signature_parts = [] + components = input_data['components'] + component_signature_parts.extend(generate_signature(components)) + component_signature_parts[-1] = component_signature_parts[-1].rstrip(",") + if input_data['type'].endswith('[]'): + signature_parts.append("(" + "".join(component_signature_parts) + ")[],") + else: + signature_parts.append("(" + "".join(component_signature_parts) + "),") + else: + signature_parts.append(input_data['type'].replace('enum ', '').replace(' payable', '') + ",") + return signature_parts + + signature_parts = [abi['name'] + "("] + signature_parts.extend(generate_signature(abi['inputs'])) + signature_parts[-1] = signature_parts[-1].rstrip(",") + ")" + return "".join(signature_parts) +$$; +{% endmacro %} \ No newline at end of file diff --git a/macros/streamline/models.sql b/macros/streamline/models.sql index 0ed8180..a0a55a2 100644 --- a/macros/streamline/models.sql +++ b/macros/streamline/models.sql @@ -15,14 +15,23 @@ SELECT l.block_number, l._log_id, - abi.data AS abi, - l.data + A.abi AS abi, + OBJECT_CONSTRUCT( + 'topics', + l.topics, + 'data', + l.data, + 'address', + l.contract_address + ) AS DATA FROM - {{ ref("streamline__decode_logs") }} + {{ ref("silver__logs") }} l - INNER JOIN {{ ref("silver__abis") }} - abi - ON l.abi_address = abi.contract_address + INNER JOIN {{ ref("silver__complete_event_abis") }} A + ON A.parent_contract_address = l.contract_address + AND A.event_signature = l.topics [0] :: STRING + AND l.block_number BETWEEN A.start_block + AND A.end_block WHERE ( l.block_number BETWEEN {{ start }} diff --git a/models/silver/abis/silver__complete_event_abis.sql b/models/silver/abis/silver__complete_event_abis.sql new file mode 100644 index 0000000..fda74bd --- /dev/null +++ b/models/silver/abis/silver__complete_event_abis.sql @@ -0,0 +1,177 @@ +{{ config ( + materialized = 'table' +) }} + +WITH abi_base AS ( + + SELECT + contract_address, + DATA + FROM + {{ ref('silver__abis') }} +), +flat_abi AS ( + SELECT + contract_address, + DATA, + VALUE :inputs AS inputs, + VALUE :payable :: BOOLEAN AS payable, + VALUE :stateMutability :: STRING AS stateMutability, + VALUE :type :: STRING AS TYPE, + VALUE :anonymous :: BOOLEAN AS anonymous, + VALUE :name :: STRING AS NAME + FROM + abi_base, + LATERAL FLATTEN ( + input => DATA + ) + WHERE + TYPE = 'event' +), +event_types AS ( + SELECT + contract_address, + inputs, + anonymous, + NAME, + ARRAY_AGG( + VALUE :type :: STRING + ) AS event_type + FROM + flat_abi, + LATERAL FLATTEN ( + input => inputs + ) + GROUP BY + contract_address, + inputs, + anonymous, + NAME +), +proxy_base AS ( + SELECT + C.created_contract_address AS contract_address, + p.proxy_address, + p.start_block, + C.block_number AS created_block + FROM + {{ ref('silver__created_contracts') }} C + INNER JOIN {{ ref('silver__proxies') }} + p + ON C.created_contract_address = p.contract_address + AND p.proxy_address <> '0x0000000000000000000000000000000000000000' +), +stacked AS ( + SELECT + ea.contract_address, + ea.inputs, + ea.anonymous, + ea.name, + ea.event_type, + pb.start_block, + pb.contract_address AS base_contract_address, + 1 AS priority + FROM + event_types ea + INNER JOIN proxy_base pb + ON ea.contract_address = pb.proxy_address + UNION ALL + SELECT + eab.contract_address, + eab.inputs, + eab.anonymous, + eab.name, + eab.event_type, + pbb.created_block AS start_block, + pbb.contract_address AS base_contract_address, + 2 AS priority + FROM + event_types eab + INNER JOIN ( + SELECT + DISTINCT contract_address, + created_block + FROM + proxy_base + ) pbb + ON eab.contract_address = pbb.contract_address + UNION ALL + SELECT + eac.contract_address, + eac.inputs, + eac.anonymous, + eac.name, + eac.event_type, + 0 AS start_block, + eac.contract_address AS base_contract_address, + 3 AS priority + FROM + event_types eac + WHERE + contract_address NOT IN ( + SELECT + DISTINCT contract_address + FROM + proxy_base + ) +), +apply_udfs AS ( + SELECT + contract_address AS source_contract_address, + base_contract_address AS parent_contract_address, + NAME AS event_name, + PARSE_JSON( + OBJECT_CONSTRUCT( + 'anonymous', + anonymous, + 'inputs', + inputs, + 'name', + NAME, + 'type', + 'event' + ) :: STRING + ) AS abi, + start_block, + silver.udf_simple_event_name(abi) AS simple_event_name, + silver.udf_keccak(simple_event_name) AS event_signature, + priority, + NAME, + inputs, + event_type + FROM + stacked +), +FINAL AS ( + SELECT + parent_contract_address, + event_name, + abi, + start_block, + simple_event_name, + event_signature, + NAME, + inputs, + event_type + FROM + apply_udfs qualify ROW_NUMBER() over ( + PARTITION BY parent_contract_address, + NAME, + event_type, + start_block + ORDER BY + priority ASC + ) = 1 +) +SELECT + parent_contract_address, + event_name, + abi, + start_block, + simple_event_name, + event_signature, + IFNULL(LEAD(start_block) over (PARTITION BY parent_contract_address, event_signature +ORDER BY + start_block) -1, 1e18) AS end_block +FROM + FINAL diff --git a/models/silver/abis/silver__complete_event_abis.yml b/models/silver/abis/silver__complete_event_abis.yml new file mode 100644 index 0000000..9004009 --- /dev/null +++ b/models/silver/abis/silver__complete_event_abis.yml @@ -0,0 +1,9 @@ +version: 2 +models: + - name: silver__complete_event_abis + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - PARENT_CONTRACT_ADDRESS + - EVENT_SIGNATURE + - START_BLOCK \ No newline at end of file diff --git a/models/silver/abis/silver__proxies.sql b/models/silver/abis/silver__proxies.sql index e8a7a32..6ad81c7 100644 --- a/models/silver/abis/silver__proxies.sql +++ b/models/silver/abis/silver__proxies.sql @@ -7,14 +7,15 @@ WITH base AS ( SELECT from_address, to_address, - MIN(block_number) AS start_block + MIN(block_number) AS start_block, + MAX(_inserted_timestamp) AS _inserted_timestamp FROM {{ ref('silver__traces') }} WHERE TYPE = 'DELEGATECALL' + AND trace_status = 'SUCCESS' AND tx_status = 'SUCCESS' AND from_address != to_address -- exclude self-calls - AND output :: STRING = '0x' GROUP BY from_address, to_address @@ -23,16 +24,11 @@ SELECT from_address AS contract_address, to_address AS proxy_address, start_block, - COALESCE( - (LAG(start_block) over(PARTITION BY from_address - ORDER BY - start_block DESC)) - 1, - 100000000000 - ) AS end_block, CONCAT( from_address, '-', to_address - ) AS _id + ) AS _id, + _inserted_timestamp FROM - base \ No newline at end of file + base diff --git a/models/silver/abis/silver__user_verified_abis.sql b/models/silver/abis/silver__user_verified_abis.sql index 61ee75a..0e482f5 100644 --- a/models/silver/abis/silver__user_verified_abis.sql +++ b/models/silver/abis/silver__user_verified_abis.sql @@ -44,35 +44,112 @@ ORDER BY _inserted_timestamp ASC LIMIT 10 -), max_blocks AS ( +), contracts AS ( SELECT - abi_address, - abi, - MAX(block_number) AS max_block, - max_block - 100000 AS min_block + contract_address FROM - {{ ref('streamline__decode_logs') }} - l - JOIN base C - ON C.contract_address = l.abi_address + {{ ref('silver__proxies') }} + WHERE + contract_address IN ( + SELECT + contract_address + FROM + base + ) +), +proxies AS ( + SELECT + proxy_address, + contract_address + FROM + {{ ref('silver__proxies') }} + WHERE + proxy_address IN ( + SELECT + contract_address + FROM + base + ) +), +final_groupings AS ( + SELECT + b.contract_address AS address, + C.contract_address, + proxy_address, + CASE + WHEN C.contract_address IS NOT NULL + AND proxy_address IS NOT NULL THEN 'contract' + WHEN C.contract_address IS NOT NULL THEN 'contract' + WHEN proxy_address IS NOT NULL THEN 'proxy' + WHEN C.contract_address IS NULL + AND proxy_address IS NULL THEN 'contract' + END AS TYPE, + p.contract_address AS proxy_parent, + CASE + WHEN TYPE = 'contract' THEN address + ELSE proxy_parent + END AS final_address + FROM + base b + LEFT JOIN ( + SELECT + DISTINCT contract_address + FROM + contracts + ) C + ON b.contract_address = C.contract_address + LEFT JOIN ( + SELECT + DISTINCT proxy_address, + contract_address + FROM + proxies + ) p + ON b.contract_address = proxy_address +), +identified_addresses AS ( + SELECT + DISTINCT address AS base_address, + final_address AS contract_address + FROM + final_groupings +), +ranges AS ( + SELECT + contract_address, + base_address, + MIN(block_number) AS min_block, + min_block + 100000 AS max_block + FROM + {{ ref('silver__logs') }} + JOIN identified_addresses USING (contract_address) GROUP BY - 1, - 2 + contract_address, + base_address ), logs AS ( SELECT l.block_number, l.contract_address, - l.data AS logs_data, - C.abi, - l.abi_address + OBJECT_CONSTRUCT( + 'topics', + l.topics, + 'data', + l.data, + 'address', + l.contract_address + ) AS logs_data, + b.abi, + base_address AS abi_address FROM - {{ ref('streamline__decode_logs') }} + {{ ref('silver__logs') }} l - JOIN max_blocks C - ON C.abi_address = l.abi_address + JOIN ranges C + ON C.contract_address = l.contract_address AND l.block_number BETWEEN C.min_block AND C.max_block + JOIN base b + ON b.contract_address = C.base_address ), recent_logs AS ( SELECT diff --git a/models/silver/silver__contract_abis.sql b/models/silver/silver__contract_abis.sql deleted file mode 100644 index 62fdfbb..0000000 --- a/models/silver/silver__contract_abis.sql +++ /dev/null @@ -1,40 +0,0 @@ -{{ config ( - materialized = "incremental", - unique_key = "contract_address", - cluster_by = "_inserted_timestamp::date", - merge_update_columns = ["contract_address"] -) }} - -WITH base AS ( - - SELECT - contract_address, - abi_data AS full_data, - abi_data :data :result AS abi, - _inserted_timestamp - FROM - {{ ref('bronze_api__contract_abis') }} - -{% if is_incremental() %} -WHERE - _inserted_timestamp >= ( - SELECT - MAX( - _inserted_timestamp - ) - FROM - {{ this }} - ) -{% endif %} -) -SELECT - contract_address, - full_data, - abi, - _inserted_timestamp -FROM - base -WHERE - abi :: STRING <> 'Contract source code not verified' qualify(ROW_NUMBER() over(PARTITION BY contract_address -ORDER BY - _inserted_timestamp DESC)) = 1 diff --git a/models/silver/silver__contract_abis.yml b/models/silver/silver__contract_abis.yml deleted file mode 100644 index 9d962dc..0000000 --- a/models/silver/silver__contract_abis.yml +++ /dev/null @@ -1,7 +0,0 @@ -version: 2 -models: - - name: silver__contract_abis - tests: - - dbt_utils.unique_combination_of_columns: - combination_of_columns: - - CONTRACT_ADDRESS \ No newline at end of file diff --git a/models/silver/streamline/core/history/streamline__debug_traceBlockByNumber_history.sql b/models/silver/streamline/core/history/streamline__debug_traceBlockByNumber_history.sql index baf61d1..2a4cda9 100644 --- a/models/silver/streamline/core/history/streamline__debug_traceBlockByNumber_history.sql +++ b/models/silver/streamline/core/history/streamline__debug_traceBlockByNumber_history.sql @@ -6,10 +6,20 @@ ) ) }} -{% for item in range(40) %} +{% for item in range(45) %} ( - WITH blocks AS ( + WITH last_3_days AS ( + SELECT + block_number + FROM + {{ ref("_max_block_by_date") }} + qualify ROW_NUMBER() over ( + ORDER BY + block_number DESC + ) = 3 + ), + blocks AS ( SELECT block_number FROM @@ -19,6 +29,12 @@ AND {{( item + 1 ) * 1000000 }} + AND block_number <= ( + SELECT + block_number + FROM + last_3_days + ) EXCEPT SELECT block_number @@ -29,6 +45,12 @@ AND {{( item + 1 ) * 1000000 }} + AND block_number <= ( + SELECT + block_number + FROM + last_3_days + ) ) SELECT PARSE_JSON( diff --git a/models/silver/streamline/core/history/streamline__qn_getBlockWithReceipts_history.sql b/models/silver/streamline/core/history/streamline__qn_getBlockWithReceipts_history.sql index 5fc12a0..4f4839f 100644 --- a/models/silver/streamline/core/history/streamline__qn_getBlockWithReceipts_history.sql +++ b/models/silver/streamline/core/history/streamline__qn_getBlockWithReceipts_history.sql @@ -6,10 +6,20 @@ ) ) }} -{% for item in range(40) %} +{% for item in range(45) %} ( - WITH blocks AS ( + WITH last_3_days AS ( + SELECT + block_number + FROM + {{ ref("_max_block_by_date") }} + qualify ROW_NUMBER() over ( + ORDER BY + block_number DESC + ) = 3 + ), + blocks AS ( SELECT block_number FROM @@ -19,6 +29,12 @@ AND {{( item + 1 ) * 1000000 }} + AND block_number <= ( + SELECT + block_number + FROM + last_3_days + ) EXCEPT SELECT block_number @@ -29,6 +45,12 @@ AND {{( item + 1 ) * 1000000 }} + AND block_number <= ( + SELECT + block_number + FROM + last_3_days + ) ) SELECT PARSE_JSON( diff --git a/models/silver/streamline/decoder/streamline__decode_logs.sql b/models/silver/streamline/decoder/streamline__decode_logs.sql deleted file mode 100644 index a39d8ea..0000000 --- a/models/silver/streamline/decoder/streamline__decode_logs.sql +++ /dev/null @@ -1,60 +0,0 @@ -{{ config ( - materialized = "incremental", - unique_key = "_log_id", - cluster_by = "round(block_number,-3)", - merge_update_columns = ["_log_id"], - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(abi_address, _log_id)" -) }} - -WITH base AS ( - - SELECT - l.block_number, - l.tx_hash, - l.event_index, - l.contract_address, - l.topics, - l.data, - l._log_id, - p.proxy_address, - l._INSERTED_TIMESTAMP - FROM - {{ ref('silver__logs') }} - l - LEFT JOIN {{ ref('silver__proxies') }} - p - ON p.contract_address = l.contract_address - AND l.block_number BETWEEN p.start_block - AND p.end_block - WHERE - 1 = 1 - -{% if is_incremental() %} -AND l._inserted_timestamp >= ( - SELECT - MAX(_inserted_timestamp) - FROM - {{ this }} -) -{% endif %} -) -SELECT - b.block_number, - b._log_id, - contract_address, - proxy_address, - COALESCE( - proxy_address, - contract_address - ) AS abi_address, - OBJECT_CONSTRUCT( - 'topics', - b.topics, - 'data', - b.data, - 'address', - b.contract_address - ) AS DATA, - _inserted_timestamp -FROM - base b diff --git a/models/silver/streamline/decoder/streamline__decode_logs_realtime.sql b/models/silver/streamline/decoder/streamline__decode_logs_realtime.sql index c269cd6..0b332b5 100644 --- a/models/silver/streamline/decoder/streamline__decode_logs_realtime.sql +++ b/models/silver/streamline/decoder/streamline__decode_logs_realtime.sql @@ -17,13 +17,23 @@ WITH look_back AS ( SELECT l.block_number, l._log_id, - A.data AS abi, - l.data + A.abi AS abi, + OBJECT_CONSTRUCT( + 'topics', + l.topics, + 'data', + l.data, + 'address', + l.contract_address + ) AS DATA FROM - {{ ref("streamline__decode_logs") }} + {{ ref("silver__logs") }} l - INNER JOIN {{ ref("silver__abis") }} A - ON l.abi_address = A.contract_address + INNER JOIN {{ ref("silver__complete_event_abis") }} A + ON A.parent_contract_address = l.contract_address + AND A.event_signature = l.topics[0]:: STRING + AND l.block_number BETWEEN A.start_block + AND A.end_block WHERE ( l.block_number >= (