reorg/folder-tests (#206)

* initial structure and tags

* workflows and tags

* decoded_logs history range tags

* python versions and cache

* renamed job to chainhead

* removed legacy workflows

* decoder workflows

* dbt version rq

* add erc1155 value filter (#207)

* add nft transfers from operator and null bug fix (#209)

* initial structure and tags

* merge

* merge

---------

Co-authored-by: Sam <110511194+sam-xyz@users.noreply.github.com>
This commit is contained in:
drethereum 2023-08-07 12:03:13 -06:00 committed by GitHub
parent 087dbf6fe5
commit bf46b004b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1791 changed files with 11491 additions and 10612 deletions

View File

@ -31,17 +31,18 @@ jobs:
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v1
- uses: actions/setup-python@v4
with:
python-version: "3.7.x"
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip3 install dbt-snowflake==${{ vars.DBT_VERSION }} cli_passthrough requests click
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run -m models/silver/abis
dbt run -m tag:abis

View File

@ -1,5 +1,5 @@
name: dbt_run_scheduled_abi
run-name: dbt_run_scheduled_abi
name: dbt_run_contract_abis
run-name: dbt_run_contract_abis
on:
workflow_dispatch:
@ -31,14 +31,15 @@ jobs:
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v1
- uses: actions/setup-python@v4
with:
python-version: "3.7.x"
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip3 install dbt-snowflake==${{ vars.DBT_VERSION }} cli_passthrough requests click
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run -m models/bronze/api_udf/bronze_api__contract_abis.sql
dbt run -m tag:contract_abis

View File

@ -52,13 +52,14 @@ jobs:
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v1
- uses: actions/setup-python@v4
with:
python-version: "3.7.x"
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip3 install dbt-snowflake==${{ vars.DBT_VERSION }} cli_passthrough requests click
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |

View File

@ -1,44 +0,0 @@
name: dbt_run_api_feeder_table
run-name: dbt_run_api_feeder_table
on:
workflow_dispatch:
schedule:
# Runs "at 8:00 UTC" (see https://crontab.guru)
- cron: '0 8 * * *'
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v1
with:
python-version: "3.7.x"
- name: install dependencies
run: |
pip3 install dbt-snowflake==${{ vars.DBT_VERSION }} cli_passthrough requests click
dbt deps
- name: Run DBT Jobs
run: |
dbt run -m models/silver/API_udf/silver__relevant_abi_contracts.sql models/silver/API_udf/silver__relevant_contracts.sql

View File

@ -31,13 +31,14 @@ jobs:
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v1
- uses: actions/setup-python@v4
with:
python-version: "3.7.x"
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip3 install dbt-snowflake==${{ vars.DBT_VERSION }} cli_passthrough requests click
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |

View File

@ -31,17 +31,18 @@ jobs:
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v1
- uses: actions/setup-python@v4
with:
python-version: "3.7.x"
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip3 install dbt-snowflake==${{ vars.DBT_VERSION }} cli_passthrough requests click
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run --vars '{"OBSERV_FULL_TEST":True}' -m models/silver/_observability
dbt run --vars '{"OBSERV_FULL_TEST":True}' -m tag:observability

View File

@ -1,45 +0,0 @@
name: dbt_run_scheduled
run-name: dbt_run_scheduled
on:
workflow_dispatch:
schedule:
# Runs every "At minute 30.” (see https://crontab.guru)
- cron: '30 * * * *'
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v1
with:
python-version: "3.7.x"
- name: install dependencies
run: |
pip3 install dbt-snowflake==${{ vars.DBT_VERSION }} cli_passthrough requests click
dbt deps
- name: Run DBT Jobs
run: |
dbt run --exclude models/silver/abis models/silver/API_udf models/silver/streamline/* models/silver/silver__decoded_logs.sql models/bronze/api_udf/bronze_api__contract_abis.sql models/silver/core/tests models/silver/silver__decoded_logs_legacy.sql models/silver/_observability
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m models/silver/streamline/decoder/streamline__decode_logs_realtime.sql models/silver/streamline/decoder/streamline__complete_decode_logs.sql

View File

@ -0,0 +1,45 @@
name: dbt_run_scheduled_non_realtime
run-name: dbt_run_scheduled_non_realtime
on:
workflow_dispatch:
schedule:
# Runs every "At minute 30.” (see https://crontab.guru)
- cron: '30 * * * *'
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m tag:non_realtime 1+tag:streamline_decoded_logs_realtime

View File

@ -0,0 +1,45 @@
name: dbt_run_streamline_chainhead
run-name: dbt_run_streamline_chainhead
on:
workflow_dispatch:
schedule:
# Runs “At every 15th and 45th minute.” (see https://crontab.guru)
- cron: '15,45 * * * *'
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+tag:streamline_core_realtime

View File

@ -31,14 +31,15 @@ jobs:
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v1
- uses: actions/setup-python@v4
with:
python-version: "3.7.x"
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip3 install dbt-snowflake==${{ vars.DBT_VERSION }} cli_passthrough requests click
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run -m models/silver/silver__decoded_logs.sql
dbt run -m tag:decoded_logs

View File

@ -31,14 +31,15 @@ jobs:
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v1
- uses: actions/setup-python@v4
with:
python-version: "3.7.x"
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip3 install dbt-snowflake==${{ vars.DBT_VERSION }} cli_passthrough requests click
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run --threads 4 --vars '{"STREAMLINE_INVOKE_STREAMS":True,"WAIT":600,"row_limit":7500000}' -m models/silver/streamline/decoder/history/range_1
dbt run --threads 4 --vars '{"STREAMLINE_INVOKE_STREAMS":True,"WAIT":600,"row_limit":7500000}' -m 1+tag:streamline_decoded_logs_history_range_1

View File

@ -31,14 +31,15 @@ jobs:
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v1
- uses: actions/setup-python@v4
with:
python-version: "3.7.x"
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip3 install dbt-snowflake==${{ vars.DBT_VERSION }} cli_passthrough requests click
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run --threads 8 --vars '{"STREAMLINE_INVOKE_STREAMS":True,"WAIT":120,"row_limit":2400000}' -m models/silver/streamline/decoder/history/range_2
dbt run --threads 8 --vars '{"STREAMLINE_INVOKE_STREAMS":True,"WAIT":120,"row_limit":2400000}' -m 1+tag:streamline_decoded_logs_history_range_2

View File

@ -31,14 +31,15 @@ jobs:
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v1
- uses: actions/setup-python@v4
with:
python-version: "3.7.x"
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip3 install dbt-snowflake==${{ vars.DBT_VERSION }} cli_passthrough requests click
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run --threads 8 --vars '{"STREAMLINE_INVOKE_STREAMS":True,"WAIT":120,"row_limit":2400000}' -m models/silver/streamline/decoder/history/range_3
dbt run --threads 8 --vars '{"STREAMLINE_INVOKE_STREAMS":True,"WAIT":120,"row_limit":2400000}' -m 1+tag:streamline_decoded_logs_history_range_3

View File

@ -31,14 +31,15 @@ jobs:
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v1
- uses: actions/setup-python@v4
with:
python-version: "3.7.x"
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip3 install dbt-snowflake==${{ vars.DBT_VERSION }} cli_passthrough requests click
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run --threads 8 --vars '{"STREAMLINE_INVOKE_STREAMS":True,"WAIT":120,"row_limit":2400000}' -m models/silver/streamline/decoder/history/range_4
dbt run --threads 8 --vars '{"STREAMLINE_INVOKE_STREAMS":True,"WAIT":120,"row_limit":2400000}' -m 1+tag:streamline_decoded_logs_history_range_4

View File

@ -31,14 +31,15 @@ jobs:
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v1
- uses: actions/setup-python@v4
with:
python-version: "3.7.x"
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip3 install dbt-snowflake==${{ vars.DBT_VERSION }} cli_passthrough requests click
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run --threads 8 --vars '{"STREAMLINE_INVOKE_STREAMS":True,"WAIT":120,"row_limit":2400000}' -m models/silver/streamline/decoder/history/range_5
dbt run --threads 8 --vars '{"STREAMLINE_INVOKE_STREAMS":True,"WAIT":120,"row_limit":2400000}' -m 1+tag:streamline_decoded_logs_history_range_5

View File

@ -1,43 +0,0 @@
name: dbt_run_streamline_history
run-name: dbt_run_streamline_history
on:
workflow_dispatch:
branches:
- "main"
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v1
with:
python-version: "3.7.x"
- name: install dependencies
run: |
pip3 install dbt-snowflake~=${{ vars.DBT_VERSION }} cli_passthrough requests click
dbt deps
- name: Run DBT Jobs
run: |
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/silver/streamline/core/history

View File

@ -0,0 +1,69 @@
name: dbt_run_streamline_history_adhoc
run-name: dbt_run_streamline_history_adhoc
on:
workflow_dispatch:
branches:
- "main"
inputs:
environment:
type: choice
description: DBT Run Environment
required: true
options:
- dev
- prod
- prod_backfill
default: dev
warehouse:
type: choice
description: Snowflake warehouse
required: true
options:
- DBT
- DBT_CLOUD
- DBT_EMERGENCY
default: DBT
dbt_command:
type: choice
description: 'DBT Run Command'
required: true
options:
- dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+tag:streamline_core_history
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ inputs.warehouse }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_${{ inputs.environment }}
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
${{ inputs.dbt_command }}

View File

@ -1,44 +0,0 @@
name: dbt_run_streamline_realtime
run-name: dbt_run_streamline_realtime
on:
workflow_dispatch:
schedule:
# Runs “At every 15th and 45th minute.” (see https://crontab.guru)
- cron: '15,45 * * * *'
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v1
with:
python-version: "3.7.x"
- name: install dependencies
run: |
pip3 install dbt-snowflake~=${{ vars.DBT_VERSION }} cli_passthrough requests click
dbt deps
- name: Run DBT Jobs
run: |
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/silver/streamline/core/realtime

View File

@ -31,13 +31,14 @@ jobs:
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v1
- uses: actions/setup-python@v4
with:
python-version: "3.7.x"
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip3 install dbt-snowflake==${{ vars.DBT_VERSION }} cli_passthrough requests click
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |

View File

@ -31,15 +31,16 @@ jobs:
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v1
- uses: actions/setup-python@v4
with:
python-version: "3.7.x"
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip3 install dbt-snowflake==${{ vars.DBT_VERSION }} cli_passthrough requests click
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run -m models/silver/_observability
dbt run -m tag:observability
dbt test -m tag:recent_test

View File

@ -31,14 +31,15 @@ jobs:
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v1
- uses: actions/setup-python@v4
with:
python-version: "3.7.x"
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip3 install dbt-snowflake==${{ vars.DBT_VERSION }} cli_passthrough requests click
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt test --select tag:full_test
dbt test -m tag:full_test

View File

@ -1,7 +1,8 @@
{{ config(
materialized = 'incremental',
unique_key = "contract_address",
full_refresh = false
full_refresh = false,
tags = ['contract_abis']
) }}
WITH api_keys AS (

View File

@ -1,7 +1,8 @@
{{ config(
materialized = 'incremental',
unique_key = "contract_address",
full_refresh = false
full_refresh = false,
tags = ['non_realtime']
) }}
WITH base AS (

View File

@ -2,7 +2,7 @@
materialized = 'view',
persist_docs ={ "relation": true,
"columns": true },
meta ={ 'database_tags':{ 'table':{ 'PURPOSE': 'NFT' }} }
meta ={ 'database_tags':{ 'table':{ 'PURPOSE': 'NFT' } } }
) }}
SELECT

View File

@ -2,7 +2,7 @@
materialized = 'view',
persist_docs ={ "relation": true,
"columns": true },
meta ={ 'database_tags':{ 'table':{ 'PURPOSE': 'NFT' }} }
meta ={ 'database_tags':{ 'table':{ 'PURPOSE': 'NFT' } } }
) }}
WITH base AS (

View File

@ -2,7 +2,7 @@
materialized = 'view',
persist_docs ={ "relation": true,
"columns": true },
meta ={ 'database_tags':{ 'table':{ 'PURPOSE': 'NFT' }} }
meta ={ 'database_tags':{ 'table':{ 'PURPOSE': 'NFT' } } }
) }}
SELECT

View File

@ -0,0 +1,97 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'block_number',
cluster_by = ['block_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION",
tags = ['non_realtime']
) }}
WITH matic_base AS (
SELECT
tx_hash,
block_number,
block_timestamp,
identifier,
from_address,
to_address,
matic_value,
_call_id,
_inserted_timestamp
FROM
{{ ref('silver__traces') }}
WHERE
matic_value > 0
AND tx_status = 'SUCCESS'
AND trace_status = 'SUCCESS'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '72 hours'
FROM
{{ this }}
)
{% endif %}
),
matic_price AS (
SELECT
HOUR,
price AS matic_price
FROM
{{ ref('silver__prices') }}
WHERE
token_address = LOWER('0x0d500B1d8E8eF31E21C99d1Db9A6444d3ADf1270')
),
tx_table AS (
SELECT
tx_hash,
from_address AS origin_from_address,
to_address AS origin_to_address,
origin_function_signature
FROM
{{ ref('silver__transactions') }}
WHERE
tx_hash IN (
SELECT
DISTINCT tx_hash
FROM
matic_base
)
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '72 hours'
FROM
{{ this }}
)
{% endif %}
)
SELECT
A.tx_hash AS tx_hash,
A.block_number AS block_number,
A.block_timestamp AS block_timestamp,
A.identifier AS identifier,
origin_from_address,
origin_to_address,
origin_function_signature,
A.from_address AS matic_from_address,
A.to_address AS matic_to_address,
A.matic_value AS amount,
ROUND(
A.matic_value * matic_price,
2
) AS amount_usd,
_call_id,
_inserted_timestamp
FROM
matic_base A
LEFT JOIN matic_price
ON DATE_TRUNC(
'hour',
block_timestamp
) = HOUR
LEFT JOIN tx_table
ON A.tx_hash = tx_table.tx_hash

View File

@ -1,96 +0,0 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'block_number',
cluster_by = ['block_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION"
) }}
WITH matic_base AS (
SELECT
tx_hash,
block_number,
block_timestamp,
identifier,
from_address,
to_address,
matic_value,
_call_id,
_inserted_timestamp
FROM
{{ ref('silver__traces') }}
WHERE
matic_value > 0
AND tx_status = 'SUCCESS'
AND trace_status = 'SUCCESS'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '72 hours'
FROM
{{ this }}
)
{% endif %}
),
matic_price AS (
SELECT
HOUR,
price AS matic_price
FROM
{{ ref('silver__prices') }}
WHERE
token_address = LOWER('0x0d500B1d8E8eF31E21C99d1Db9A6444d3ADf1270')
),
tx_table AS (
SELECT
tx_hash,
from_address AS origin_from_address,
to_address AS origin_to_address,
origin_function_signature
FROM
{{ ref('silver__transactions') }}
WHERE
tx_hash IN (
SELECT
DISTINCT tx_hash
FROM
matic_base
)
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '72 hours'
FROM
{{ this }}
)
{% endif %}
)
SELECT
A.tx_hash AS tx_hash,
A.block_number AS block_number,
A.block_timestamp AS block_timestamp,
A.identifier AS identifier,
origin_from_address,
origin_to_address,
origin_function_signature,
A.from_address AS matic_from_address,
A.to_address AS matic_to_address,
A.matic_value AS amount,
ROUND(
A.matic_value * matic_price,
2
) AS amount_usd,
_call_id,
_inserted_timestamp
FROM
matic_base A
LEFT JOIN matic_price
ON DATE_TRUNC(
'hour',
block_timestamp
) = HOUR
LEFT JOIN tx_table
ON A.tx_hash = tx_table.tx_hash

View File

@ -1,28 +0,0 @@
{{ config(
materialized = 'table',
unique_key = "contract_address"
) }}
WITH base AS (
SELECT
contract_address
FROM
{{ ref('silver__relevant_contracts') }}
),
proxies AS (
SELECT
proxy_address
FROM
{{ ref('silver__proxies') }}
JOIN base USING (contract_address)
)
SELECT
contract_address
FROM
base
UNION
SELECT
proxy_address AS contract_address
FROM
proxies

View File

@ -1,19 +0,0 @@
{{ config(
materialized = 'table',
unique_key = "contract_address"
) }}
SELECT
contract_address,
'polygon' AS blockchain,
COUNT(*) AS transfers,
MAX(block_number) AS latest_block
FROM
{{ ref('silver__logs') }}
WHERE
tx_status = 'SUCCESS'
GROUP BY
1,
2
HAVING
COUNT(*) > 25

View File

@ -0,0 +1,324 @@
{{ config(
materialized = 'incremental',
unique_key = '_log_id',
cluster_by = ['block_timestamp::DATE', '_inserted_timestamp::DATE'],
merge_update_columns = ["_log_id"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(contract_address, tx_hash)",
tags = ['non_realtime']
) }}
WITH base AS (
SELECT
_log_id,
block_number,
tx_hash,
block_timestamp,
event_index,
contract_address,
topics,
DATA,
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
TO_TIMESTAMP_NTZ(_inserted_timestamp) AS _inserted_timestamp
FROM
{{ ref('silver__logs') }}
WHERE
tx_status = 'SUCCESS'
AND (
(
topics [0] :: STRING = '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'
AND DATA = '0x'
AND topics [3] IS NOT NULL
) --erc721s
OR (
topics [0] :: STRING = '0xc3d58168c5ae7397731d063d5bbf3d657854427343f4c083240f7aacaa2d0f62'
) --erc1155s
OR (
topics [0] :: STRING = '0x4a39dc06d4c0dbc64b70af90fd698a233a518aa5d07e595d983b8c0526c8f7fb'
) --erc1155s TransferBatch event
)
{% if is_incremental() %}
AND TO_TIMESTAMP_NTZ(_inserted_timestamp) >= (
SELECT
MAX(
_inserted_timestamp
)
FROM
{{ this }}
)
{% endif %}
),
erc721s AS (
SELECT
_log_id,
block_number,
tx_hash,
block_timestamp,
contract_address,
CONCAT('0x', SUBSTR(topics [1] :: STRING, 27, 40)) AS from_address,
CONCAT('0x', SUBSTR(topics [2] :: STRING, 27, 40)) AS to_address,
utils.udf_hex_to_int(
topics [3] :: STRING
) :: STRING AS token_id,
NULL AS erc1155_value,
TO_TIMESTAMP_NTZ(_inserted_timestamp) AS _inserted_timestamp,
event_index
FROM
base
WHERE
topics [0] :: STRING = '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'
AND DATA = '0x'
AND topics [3] IS NOT NULL
),
transfer_singles AS (
SELECT
_log_id,
block_number,
tx_hash,
block_timestamp,
contract_address,
CONCAT('0x', SUBSTR(topics [1] :: STRING, 27, 40)) AS operator_address,
CONCAT('0x', SUBSTR(topics [2] :: STRING, 27, 40)) AS from_address,
CONCAT('0x', SUBSTR(topics [3] :: STRING, 27, 40)) AS to_address,
utils.udf_hex_to_int(
segmented_data [0] :: STRING
) :: STRING AS token_id,
TRY_TO_NUMBER(
utils.udf_hex_to_int(
segmented_data [1] :: STRING
)
) AS erc1155_value,
TO_TIMESTAMP_NTZ(_inserted_timestamp) AS _inserted_timestamp,
event_index
FROM
base
WHERE
topics [0] :: STRING = '0xc3d58168c5ae7397731d063d5bbf3d657854427343f4c083240f7aacaa2d0f62'
),
transfer_batch_raw AS (
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
segmented_data,
CONCAT('0x', SUBSTR(topics [1] :: STRING, 27, 40)) AS operator_address,
CONCAT('0x', SUBSTR(topics [2] :: STRING, 27, 40)) AS from_address,
CONCAT('0x', SUBSTR(topics [3] :: STRING, 27, 40)) AS to_address,
contract_address,
utils.udf_hex_to_int(
segmented_data [2] :: STRING
) :: STRING AS tokenid_length,
tokenid_length AS quantity_length,
_log_id,
TO_TIMESTAMP_NTZ(_inserted_timestamp) AS _inserted_timestamp
FROM
base
WHERE
topics [0] :: STRING = '0x4a39dc06d4c0dbc64b70af90fd698a233a518aa5d07e595d983b8c0526c8f7fb'
),
flattened AS (
SELECT
block_number,
block_timestamp,
_log_id,
_inserted_timestamp,
tx_hash,
event_index,
operator_address,
from_address,
to_address,
contract_address,
INDEX,
VALUE,
tokenid_length,
quantity_length,
'2' + tokenid_length AS tokenid_indextag,
'4' + tokenid_length AS quantity_indextag_start,
'4' + tokenid_length + tokenid_length AS quantity_indextag_end,
CASE
WHEN INDEX BETWEEN 3
AND (
tokenid_indextag
) THEN 'tokenid'
WHEN INDEX BETWEEN (
quantity_indextag_start
)
AND (
quantity_indextag_end
) THEN 'quantity'
ELSE NULL
END AS label
FROM
transfer_batch_raw,
LATERAL FLATTEN (
input => segmented_data
)
),
tokenid_list AS (
SELECT
block_number,
block_timestamp,
_log_id,
_inserted_timestamp,
tx_hash,
event_index,
operator_address,
from_address,
to_address,
contract_address,
utils.udf_hex_to_int(
VALUE :: STRING
) :: STRING AS tokenId,
ROW_NUMBER() over (
PARTITION BY tx_hash,
event_index
ORDER BY
INDEX ASC
) AS tokenid_order
FROM
flattened
WHERE
label = 'tokenid'
),
quantity_list AS (
SELECT
tx_hash,
event_index,
TRY_TO_NUMBER (
utils.udf_hex_to_int(
VALUE :: STRING
)
) AS quantity,
ROW_NUMBER() over (
PARTITION BY tx_hash,
event_index
ORDER BY
INDEX ASC
) AS quantity_order
FROM
flattened
WHERE
label = 'quantity'
),
transfer_batch_final AS (
SELECT
block_number,
block_timestamp,
_log_id,
_inserted_timestamp,
t.tx_hash,
t.event_index,
operator_address,
from_address,
to_address,
contract_address,
t.tokenId AS token_id,
q.quantity AS erc1155_value,
tokenid_order AS intra_event_index
FROM
tokenid_list t
INNER JOIN quantity_list q
ON t.tx_hash = q.tx_hash
AND t.event_index = q.event_index
AND t.tokenid_order = q.quantity_order
),
all_transfers AS (
SELECT
block_number,
tx_hash,
block_timestamp,
contract_address,
from_address,
to_address,
token_id,
erc1155_value,
_inserted_timestamp,
event_index,
'erc721_Transfer' AS token_transfer_type,
CONCAT(
_log_id,
'-',
contract_address,
'-',
token_id
) AS _log_id
FROM
erc721s
UNION ALL
SELECT
block_number,
tx_hash,
block_timestamp,
contract_address,
from_address,
to_address,
token_id,
erc1155_value,
_inserted_timestamp,
event_index,
'erc1155_TransferSingle' AS token_transfer_type,
CONCAT(
_log_id,
'-',
contract_address,
'-',
token_id
) AS _log_id
FROM
transfer_singles
WHERE
erc1155_value > 0
UNION ALL
SELECT
block_number,
tx_hash,
block_timestamp,
contract_address,
from_address,
to_address,
token_id,
erc1155_value,
_inserted_timestamp,
event_index,
'erc1155_TransferBatch' AS token_transfer_type,
CONCAT(
_log_id,
'-',
contract_address,
'-',
token_id,
'-',
intra_event_index
) AS _log_id
FROM
transfer_batch_final
WHERE
erc1155_value > 0
)
SELECT
block_number,
block_timestamp,
tx_hash,
event_index,
contract_address,
from_address,
to_address,
token_id AS tokenid,
erc1155_value,
CASE
WHEN from_address = '0x0000000000000000000000000000000000000000' THEN 'mint'
ELSE 'other'
END AS event_type,
token_transfer_type,
_log_id,
_inserted_timestamp
FROM
all_transfers
WHERE
to_address IS NOT NULL qualify ROW_NUMBER() over (
PARTITION BY _log_id
ORDER BY
_inserted_timestamp DESC
) = 1

View File

@ -1,7 +1,8 @@
{{ config(
materialized = 'incremental',
unique_key = 'nft_log_id',
cluster_by = ['block_timestamp::DATE']
cluster_by = ['block_timestamp::DATE'],
tags = ['non_realtime']
) }}
WITH seaport_fees_wallet AS (

View File

@ -1,7 +1,8 @@
{{ config(
materialized = 'incremental',
unique_key = 'nft_log_id',
cluster_by = ['block_timestamp::DATE']
cluster_by = ['block_timestamp::DATE'],
tags = ['non_realtime']
) }}
WITH seaport_fees_wallet AS (

View File

@ -1,7 +1,8 @@
{{ config(
materialized = 'incremental',
unique_key = 'nft_log_id',
cluster_by = ['block_timestamp::DATE']
cluster_by = ['block_timestamp::DATE'],
tags = ['non_realtime']
) }}
WITH seaport_fees_wallet AS (

View File

@ -1,7 +1,8 @@
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
full_refresh = false
full_refresh = false,
tags = ['observability']
) }}
WITH summary_stats AS (

View File

@ -1,5 +1,6 @@
{{ config(
materialized = 'view'
materialized = 'view',
tags = ['observability']
) }}
SELECT

View File

@ -1,7 +1,8 @@
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
full_refresh = false
full_refresh = false,
tags = ['observability']
) }}
WITH summary_stats AS (

View File

@ -1,7 +1,8 @@
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
full_refresh = false
full_refresh = false,
tags = ['observability']
) }}
WITH summary_stats AS (

View File

@ -1,7 +1,8 @@
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
full_refresh = false
full_refresh = false,
tags = ['observability']
) }}
WITH summary_stats AS (

View File

@ -1,7 +1,8 @@
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
full_refresh = false
full_refresh = false,
tags = ['observability']
) }}
WITH summary_stats AS (

View File

@ -1,7 +1,8 @@
{{ config (
materialized = "incremental",
unique_key = "contract_address",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(contract_address)"
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(contract_address)",
tags = ['abis']
) }}
WITH override_abis AS (

View File

@ -1,6 +1,7 @@
{{ config (
materialized = "incremental",
unique_key = "abi_id"
unique_key = "abi_id",
tags = ['abis']
) }}
WITH bytecodes AS (

View File

@ -1,5 +1,6 @@
{{ config (
materialized = 'table'
materialized = 'table',
tags = ['abis']
) }}
WITH abi_base AS (

View File

@ -1,5 +1,6 @@
{{ config(
materialized = 'view'
materialized = 'view',
tags = ['abis']
) }}
SELECT

View File

@ -1,5 +1,6 @@
{{ config (
materialized = "table"
materialized = "table",
tags = ['abis']
) }}
WITH base AS (

View File

@ -0,0 +1,29 @@
{{ config(
materialized = 'table',
unique_key = "contract_address",
tags = ['abis']
) }}
WITH base AS (
SELECT
contract_address
FROM
{{ ref('silver__relevant_contracts') }}
),
proxies AS (
SELECT
proxy_address
FROM
{{ ref('silver__proxies') }}
JOIN base USING (contract_address)
)
SELECT
contract_address
FROM
base
UNION
SELECT
proxy_address AS contract_address
FROM
proxies

View File

@ -1,7 +1,8 @@
{{ config (
materialized = "incremental",
unique_key = "id",
merge_update_columns = ["id"]
merge_update_columns = ["id"],
tags = ['abis']
) }}
WITH base AS (

View File

@ -1,6 +1,7 @@
{{ config(
materialized = 'incremental',
unique_key = "contract_address"
unique_key = "contract_address",
tags = ['abis']
) }}
WITH base AS (

View File

@ -2,7 +2,8 @@
{{ config(
materialized = 'incremental',
unique_key = "block_number",
cluster_by = "block_timestamp::date"
cluster_by = "block_timestamp::date",
tags = ['non_realtime']
) }}
SELECT

View File

@ -3,7 +3,8 @@
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = "block_number",
cluster_by = "round(block_number,-3)"
cluster_by = "round(block_number,-3)",
tags = ['non_realtime']
) }}
WITH base AS (

View File

@ -0,0 +1,99 @@
{{ config(
materialized = 'incremental',
unique_key = 'contract_address',
tags = ['non_realtime']
) }}
WITH base_metadata AS (
SELECT
contract_address,
block_number,
function_sig AS function_signature,
read_result AS read_output,
_inserted_timestamp
FROM
{{ ref('bronze_api__token_reads') }}
WHERE
read_result IS NOT NULL
AND read_result <> '0x'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
)
FROM
{{ this }}
)
{% endif %}
),
token_names AS (
SELECT
contract_address,
block_number,
function_signature,
read_output,
utils.udf_hex_to_string(
SUBSTR(read_output,(64*2+3),LEN(read_output))) AS token_name
FROM
base_metadata
WHERE
function_signature = '0x06fdde03'
AND token_name IS NOT NULL
),
token_symbols AS (
SELECT
contract_address,
block_number,
function_signature,
read_output,
utils.udf_hex_to_string(
SUBSTR(read_output,(64*2+3),LEN(read_output))) AS token_symbol
FROM
base_metadata
WHERE
function_signature = '0x95d89b41'
AND token_symbol IS NOT NULL
),
token_decimals AS (
SELECT
contract_address,
utils.udf_hex_to_int(
read_output :: STRING
) AS token_decimals,
LENGTH(token_decimals) AS dec_length
FROM
base_metadata
WHERE
function_signature = '0x313ce567'
AND read_output IS NOT NULL
AND read_output <> '0x'
),
contracts AS (
SELECT
contract_address,
MAX(_inserted_timestamp) AS _inserted_timestamp
FROM
base_metadata
GROUP BY
1
)
SELECT
c1.contract_address :: STRING AS contract_address,
token_name,
TRY_TO_NUMBER(token_decimals) AS token_decimals,
token_symbol,
_inserted_timestamp
FROM
contracts c1
LEFT JOIN token_names
ON c1.contract_address = token_names.contract_address
LEFT JOIN token_symbols
ON c1.contract_address = token_symbols.contract_address
LEFT JOIN token_decimals
ON c1.contract_address = token_decimals.contract_address
AND dec_length < 3 qualify(ROW_NUMBER() over(PARTITION BY c1.contract_address
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,36 @@
{{ config (
materialized = "incremental",
unique_key = "created_contract_address",
tags = ['non_realtime']
) }}
SELECT
block_number,
block_timestamp,
tx_hash,
to_address AS created_contract_address,
from_address AS creator_address,
input AS created_contract_input,
_inserted_timestamp
FROM
{{ ref('silver__traces') }}
WHERE
TYPE ILIKE 'create%'
AND to_address IS NOT NULL
AND input IS NOT NULL
AND input != '0x'
AND tx_status = 'SUCCESS'
AND trace_status = 'SUCCESS'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '24 hours'
FROM
{{ this }}
)
{% endif %}
qualify(ROW_NUMBER() over(PARTITION BY created_contract_address
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,224 @@
-- depends_on: {{ ref('bronze__decoded_logs') }}
{{ config (
materialized = "incremental",
unique_key = ['block_number', 'event_index'],
cluster_by = "block_timestamp::date",
incremental_predicates = ["dynamic_range", "block_number"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION",
full_refresh = false,
tags = ['decoded_logs']
) }}
WITH base_data AS (
SELECT
block_number :: INTEGER AS block_number,
SPLIT(
id,
'-'
) [0] :: STRING AS tx_hash,
SPLIT(
id,
'-'
) [1] :: INTEGER AS event_index,
DATA :name :: STRING AS event_name,
LOWER(
DATA :address :: STRING
) :: STRING AS contract_address,
DATA AS decoded_data,
id :: STRING AS _log_id,
TO_TIMESTAMP_NTZ(_inserted_timestamp) AS _inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__decoded_logs') }}
WHERE
TO_TIMESTAMP_NTZ(_inserted_timestamp) >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% else %}
{{ ref('bronze__fr_decoded_logs') }}
WHERE
_partition_by_block_number <= 2500000
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY block_number, event_index
ORDER BY
_inserted_timestamp DESC)) = 1
),
transformed_logs AS (
SELECT
block_number,
tx_hash,
event_index,
contract_address,
event_name,
decoded_data,
_inserted_timestamp,
_log_id,
ethereum.silver.udf_transform_logs(decoded_data) AS transformed
FROM
base_data
),
FINAL AS (
SELECT
b.tx_hash,
b.block_number,
b.event_index,
b.event_name,
b.contract_address,
b.decoded_data,
transformed,
b._log_id,
b._inserted_timestamp,
OBJECT_AGG(
DISTINCT CASE
WHEN v.value :name = '' THEN CONCAT(
'anonymous_',
v.index
)
ELSE v.value :name
END,
v.value :value
) AS decoded_flat
FROM
transformed_logs b,
LATERAL FLATTEN(
input => transformed :data
) v
GROUP BY
b.tx_hash,
b.block_number,
b.event_index,
b.event_name,
b.contract_address,
b.decoded_data,
transformed,
b._log_id,
b._inserted_timestamp
),
new_records AS (
SELECT
b.tx_hash,
b.block_number,
b.event_index,
b.event_name,
b.contract_address,
b.decoded_data,
b.transformed,
b._log_id,
b._inserted_timestamp,
b.decoded_flat,
block_timestamp,
origin_function_signature,
origin_from_address,
origin_to_address,
topics,
DATA,
event_removed :: STRING AS event_removed,
tx_status,
CASE
WHEN block_timestamp IS NULL THEN TRUE
ELSE FALSE
END AS is_pending
FROM
FINAL b
LEFT JOIN {{ ref('silver__logs') }} USING (
block_number,
_log_id
)
)
{% if is_incremental() %},
missing_data AS (
SELECT
t.tx_hash,
t.block_number,
t.event_index,
t.event_name,
t.contract_address,
t.decoded_data,
t.transformed,
t._log_id,
GREATEST(
TO_TIMESTAMP_NTZ(
t._inserted_timestamp
),
TO_TIMESTAMP_NTZ(
l._inserted_timestamp
)
) AS _inserted_timestamp,
t.decoded_flat,
l.block_timestamp,
l.origin_function_signature,
l.origin_from_address,
l.origin_to_address,
l.topics,
l.data,
l.event_removed :: STRING AS event_removed,
l.tx_status,
FALSE AS is_pending
FROM
{{ this }}
t
INNER JOIN {{ ref('silver__logs') }}
l USING (
block_number,
_log_id
)
WHERE
t.is_pending
AND l.block_timestamp IS NOT NULL
)
{% endif %}
SELECT
tx_hash,
block_number,
event_index,
event_name,
contract_address,
decoded_data,
transformed,
_log_id,
_inserted_timestamp,
decoded_flat,
block_timestamp,
origin_function_signature,
origin_from_address,
origin_to_address,
topics,
DATA,
event_removed,
tx_status,
is_pending
FROM
new_records
{% if is_incremental() %}
UNION
SELECT
tx_hash,
block_number,
event_index,
event_name,
contract_address,
decoded_data,
transformed,
_log_id,
_inserted_timestamp,
decoded_flat,
block_timestamp,
origin_function_signature,
origin_from_address,
origin_to_address,
topics,
DATA,
event_removed,
tx_status,
is_pending
FROM
missing_data
{% endif %}

View File

@ -4,7 +4,8 @@
unique_key = "block_number",
cluster_by = "block_timestamp::date, _inserted_timestamp::date",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION",
full_refresh = False
full_refresh = false,
tags = ['non_realtime']
) }}
WITH base AS (

View File

@ -5,7 +5,8 @@
unique_key = "block_number",
cluster_by = "ROUND(block_number, -3)",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(tx_hash)",
full_refresh = False
full_refresh = false,
tags = ['non_realtime']
) }}
WITH base AS (

View File

@ -0,0 +1,20 @@
{{ config(
materialized = 'table',
unique_key = "contract_address",
tags = ['non_realtime']
) }}
SELECT
contract_address,
'polygon' AS blockchain,
COUNT(*) AS transfers,
MAX(block_number) AS latest_block
FROM
{{ ref('silver__logs') }}
WHERE
tx_status = 'SUCCESS'
GROUP BY
1,
2
HAVING
COUNT(*) > 25

View File

@ -5,7 +5,8 @@
unique_key = "block_number",
cluster_by = "block_timestamp::date, _inserted_timestamp::date",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION",
full_refresh = False
full_refresh = false,
tags = ['non_realtime']
) }}
WITH traces_txs AS (

View File

@ -5,7 +5,8 @@
unique_key = "block_number",
cluster_by = "block_timestamp::date, _inserted_timestamp::date",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION",
full_refresh = False
full_refresh = false,
tags = ['non_realtime']
) }}
WITH base AS (

View File

@ -0,0 +1,62 @@
{{ config(
materialized = 'incremental',
unique_key = '_log_id',
cluster_by = ['block_timestamp::DATE', '_inserted_timestamp::DATE'],
tags = ['non_realtime']
) }}
WITH logs AS (
SELECT
_log_id,
block_number,
tx_hash,
block_timestamp,
origin_function_signature,
origin_from_address,
origin_to_address,
contract_address :: STRING AS contract_address,
CONCAT('0x', SUBSTR(topics [1], 27, 40)) :: STRING AS from_address,
CONCAT('0x', SUBSTR(topics [2], 27, 40)) :: STRING AS to_address,
utils.udf_hex_to_int(SUBSTR(DATA, 3, 64)) :: FLOAT AS raw_amount,
event_index,
_inserted_timestamp
FROM
{{ ref('silver__logs') }}
WHERE
topics [0] :: STRING = '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'
AND tx_status = 'SUCCESS'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(
_inserted_timestamp
)
FROM
{{ this }}
)
{% endif %}
)
SELECT
_log_id,
block_number,
tx_hash,
origin_function_signature,
origin_from_address,
origin_to_address,
block_timestamp,
contract_address,
from_address,
to_address,
raw_amount,
_inserted_timestamp,
event_index
FROM
logs
WHERE
raw_amount IS NOT NULL
AND to_address IS NOT NULL
AND from_address IS NOT NULL qualify(ROW_NUMBER() over(PARTITION BY _log_id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -1,6 +1,7 @@
{{ config(
materialized = 'incremental',
unique_key = "block_number"
unique_key = "block_number",
tags = ['non_realtime']
) }}
SELECT

View File

@ -1,7 +1,8 @@
{{ config(
materialized = 'incremental',
unique_key = "pool_address",
full_refresh = false
full_refresh = false,
tags = ['non_realtime']
) }}
WITH pool_creation AS (

View File

@ -1,7 +1,8 @@
{{ config(
materialized = 'incremental',
unique_key = '_log_id',
cluster_by = ['block_timestamp::DATE']
cluster_by = ['block_timestamp::DATE'],
tags = ['non_realtime']
) }}
WITH pool_name AS (

View File

@ -1,7 +1,8 @@
{{ config(
materialized = 'incremental',
unique_key = "pool_id",
full_refresh = false
full_refresh = false,
tags = ['non_realtime']
) }}
WITH contract_deployments AS (

View File

@ -1,7 +1,8 @@
{{ config(
materialized = 'incremental',
unique_key = "_log_id",
cluster_by = ['block_timestamp::DATE']
cluster_by = ['block_timestamp::DATE'],
tags = ['non_realtime']
) }}
WITH pool_meta AS (

View File

@ -1,7 +1,8 @@
{{ config(
materialized = 'incremental',
unique_key = '_log_id',
cluster_by = ['block_timestamp::DATE']
cluster_by = ['block_timestamp::DATE'],
tags = ['non_realtime']
) }}
WITH pools AS (

Some files were not shown because too many files have changed in this diff Show More