core models (#73)

This commit is contained in:
Austin 2023-05-23 15:28:57 -04:00 committed by GitHub
parent 6f00db61ff
commit dd79a92bad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 1058 additions and 4 deletions

View File

@ -41,4 +41,4 @@ jobs:
dbt deps
- name: Run DBT Jobs
run: |
dbt run --exclude models/silver/abis models/silver/api_udf models/silver/streamline models/silver/silver__decoded_logs.sql
dbt run --exclude models/silver/abis models/silver/api_udf models/silver/streamline models/silver/silver__decoded_logs.sql models/silver/core

View File

@ -3,9 +3,8 @@ run-name: dbt_run_streamline_history
on:
workflow_dispatch:
schedule:
# Runs "every 6 hours" (see https://crontab.guru)
- cron: '0 1-23/6 * * *'
branches:
- "main"
env:
DBT_PROFILES_DIR: ./

View File

@ -0,0 +1,44 @@
name: dbt_run_temp_backfill
run-name: dbt_run_temp_backfill
on:
workflow_dispatch:
schedule:
# Runs "every 2 hours at minute 20" (see https://crontab.guru)
- cron: '20 */2 * * *'
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod_backfill
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v1
with:
python-version: "3.7.x"
- name: install dependencies
run: |
pip3 install dbt-snowflake==${{ vars.DBT_VERSION }} cli_passthrough requests click
dbt deps
- name: Run DBT Jobs
run: |
dbt run -m models/silver/core

View File

@ -0,0 +1,44 @@
{% macro get_merge_sql(
target,
source,
unique_key,
dest_columns,
incremental_predicates
) -%}
{% set predicate_override = "" %}
{% if incremental_predicates [0] == "dynamic_range" %}
-- run some queries to dynamically determine the min + max of this 'input_column' in the new data
{% set input_column = incremental_predicates [1] %}
{% set get_limits_query %}
SELECT
MIN(
{{ input_column }}
) AS lower_limit,
MAX(
{{ input_column }}
) AS upper_limit
FROM
{{ source }}
{% endset %}
{% set limits = run_query(get_limits_query) [0] %}
{% set lower_limit,
upper_limit = limits [0],
limits [1] %}
-- use those calculated min + max values to limit 'target' scan, to only the days with new data
{% set predicate_override %}
dbt_internal_dest.{{ input_column }} BETWEEN '{{ lower_limit }}'
AND '{{ upper_limit }}' {% endset %}
{% endif %}
{% set predicates = [predicate_override] if predicate_override else incremental_predicates %}
-- standard merge from here
{% set merge_sql = dbt.get_merge_sql(
target,
source,
unique_key,
dest_columns,
predicates
) %}
{{ return(merge_sql) }}
{% endmacro %}

View File

@ -0,0 +1,8 @@
{% macro dbt_snowflake_get_tmp_relation_type(
strategy,
unique_key,
language
) %}
-- always table
{{ return('table') }}
{% endmacro %}

12
macros/lookback.sql Normal file
View File

@ -0,0 +1,12 @@
{% macro lookback() %}
{% if execute and is_incremental() %}
{% set query %}
SELECT
MAX(_inserted_timestamp) :: DATE - 3
FROM
{{ this }};
{% endset %}
{% set last_week = run_query(query).columns [0] [0] %}
{% do return(last_week) %}
{% endif %}
{% endmacro %}

View File

@ -0,0 +1,66 @@
-- depends_on: {{ ref('bronze__streamline_blocks') }}
{{ config(
materialized = 'incremental',
unique_key = "block_number",
cluster_by = "block_timestamp::date",
tags = ['core']
) }}
SELECT
block_number,
PUBLIC.udf_hex_to_int(
DATA :result :difficulty :: STRING
) :: INT AS difficulty,
DATA :result :extraData :: STRING AS extra_data,
PUBLIC.udf_hex_to_int(
DATA :result :gasLimit :: STRING
) :: INT AS gas_limit,
PUBLIC.udf_hex_to_int(
DATA :result :gasUsed :: STRING
) :: INT AS gas_used,
DATA :result :hash :: STRING AS HASH,
DATA :result :logsBloom :: STRING AS logs_bloom,
DATA :result :miner :: STRING AS miner,
PUBLIC.udf_hex_to_int(
DATA :result :nonce :: STRING
) :: INT AS nonce,
PUBLIC.udf_hex_to_int(
DATA :result :number :: STRING
) :: INT AS NUMBER,
DATA :result :parentHash :: STRING AS parent_hash,
DATA :result :receiptsRoot :: STRING AS receipts_root,
DATA :result :sha3Uncles :: STRING AS sha3_uncles,
PUBLIC.udf_hex_to_int(
DATA :result :size :: STRING
) :: INT AS SIZE,
DATA :result :stateRoot :: STRING AS state_root,
PUBLIC.udf_hex_to_int(
DATA :result :timestamp :: STRING
) :: TIMESTAMP AS block_timestamp,
PUBLIC.udf_hex_to_int(
DATA :result :totalDifficulty :: STRING
) :: INT AS total_difficulty,
ARRAY_SIZE(
DATA :result :transactions
) AS tx_count,
DATA :result :transactionsRoot :: STRING AS transactions_root,
DATA :result :uncles AS uncles,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_blocks') }}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
{{ this }}
)
{% else %}
{{ ref('bronze__streamline_FR_blocks') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY block_number
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,177 @@
{{ config(
materialized = 'incremental',
unique_key = ['block_number', 'event_index'],
incremental_predicates = ["dynamic_range", "block_number"],
cluster_by = "block_timestamp::date, _inserted_timestamp::date",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION",
tags = ['core']
) }}
-- add back full_refresh = false,
WITH base AS (
SELECT
block_number,
tx_hash,
from_address AS origin_from_address,
to_address AS origin_to_address,
tx_status,
logs,
_inserted_timestamp
FROM
{{ ref('silver__receipts') }}
WHERE
ARRAY_SIZE(logs) > 0
{% if is_incremental() %}
AND block_number >= (
SELECT
ROUND(MAX(block_number), -4)
FROM
{{ this }})
{% endif %}
),
flat_logs AS (
SELECT
block_number,
tx_hash,
origin_from_address,
origin_to_address,
tx_status,
VALUE :address :: STRING AS contract_address,
VALUE :blockHash :: STRING AS block_hash,
VALUE :data :: STRING AS DATA,
PUBLIC.udf_hex_to_int(
VALUE :logIndex :: STRING
) :: INT AS event_index,
VALUE :removed :: BOOLEAN AS event_removed,
VALUE :topics AS topics,
_inserted_timestamp
FROM
base,
LATERAL FLATTEN(
input => logs
)
),
new_records AS (
SELECT
l.block_number,
txs.block_timestamp,
l.tx_hash,
l.origin_from_address,
l.origin_to_address,
txs.origin_function_signature,
l.tx_status,
l.contract_address,
l.block_hash,
l.data,
l.event_index,
l.event_removed,
l.topics,
l._inserted_timestamp,
CASE
WHEN txs.block_timestamp IS NULL
OR txs.origin_function_signature IS NULL THEN TRUE
ELSE FALSE
END AS is_pending,
CONCAT(
l.tx_hash :: STRING,
'-',
l.event_index :: STRING
) AS _log_id
FROM
flat_logs l
LEFT OUTER JOIN {{ ref('silver__transactions2') }}
txs USING (
block_number,
tx_hash
) -- add back after backfill
-- {% if is_incremental() %}
-- WHERE
-- txs._INSERTED_TIMESTAMP >= '{{ lookback() }}'
-- {% endif %}
)
{% if is_incremental() %},
missing_data AS (
SELECT
t.block_number,
txs.block_timestamp,
t.tx_hash,
t.origin_from_address,
t.origin_to_address,
txs.origin_function_signature,
t.tx_status,
t.contract_address,
t.block_hash,
t.data,
t.event_index,
t.event_removed,
t.topics,
GREATEST(
t._inserted_timestamp,
txs._inserted_timestamp
) AS _inserted_timestamp,
_log_id,
FALSE AS is_pending
FROM
{{ this }}
t
INNER JOIN {{ ref('silver__transactions2') }}
txs USING (
block_number,
tx_hash
)
WHERE
t.is_pending
)
{% endif %},
FINAL AS (
SELECT
block_number,
block_timestamp,
tx_hash,
origin_from_address,
origin_to_address,
origin_function_signature,
tx_status,
contract_address,
block_hash,
DATA,
event_index,
event_removed,
topics,
_inserted_timestamp,
_log_id,
is_pending
FROM
new_records
{% if is_incremental() %}
UNION
SELECT
block_number,
block_timestamp,
tx_hash,
origin_from_address,
origin_to_address,
origin_function_signature,
tx_status,
contract_address,
block_hash,
DATA,
event_index,
event_removed,
topics,
_inserted_timestamp,
_log_id,
is_pending
FROM
missing_data
{% endif %}
)
SELECT
*
FROM
FINAL qualify(ROW_NUMBER() over (PARTITION BY block_number, event_index
ORDER BY
_inserted_timestamp DESC, is_pending ASC)) = 1

View File

@ -0,0 +1,96 @@
-- depends_on: {{ ref('bronze__streamline_receipts') }}
{{ config(
materialized = 'incremental',
unique_key = "tx_hash",
incremental_predicates = ["dynamic_range", "block_number"],
cluster_by = "ROUND(block_number, -3)",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(tx_hash)",
tags = ['core']
) }}
-- add back full_refresh = false,
WITH base AS (
SELECT
block_number,
DATA,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_FR_receipts') }}
WHERE
_partition_by_block_id BETWEEN (
SELECT
ROUND(MAX(block_number), -4)
FROM
{{ this }})
AND (
SELECT
ROUND(MAX(block_number), -4) + 500000
FROM
{{ this }})
AND IS_OBJECT(DATA)
{% else %}
{{ ref('bronze__streamline_FR_receipts') }}
WHERE
_partition_by_block_id <= 2500000
AND IS_OBJECT(DATA)
{% endif %}
),
FINAL AS (
SELECT
block_number,
DATA :blockHash :: STRING AS block_hash,
PUBLIC.udf_hex_to_int(
DATA :blockNumber :: STRING
) :: INT AS blockNumber,
PUBLIC.udf_hex_to_int(
DATA :cumulativeGasUsed :: STRING
) :: INT AS cumulative_gas_used,
PUBLIC.udf_hex_to_int(
DATA :effectiveGasPrice :: STRING
) :: INT / pow(
10,
9
) AS effective_gas_price,
DATA :from :: STRING AS from_address,
PUBLIC.udf_hex_to_int(
DATA :gasUsed :: STRING
) :: INT AS gas_used,
DATA :logs AS logs,
DATA :logsBloom :: STRING AS logs_bloom,
PUBLIC.udf_hex_to_int(
DATA :status :: STRING
) :: INT AS status,
CASE
WHEN status = 1 THEN TRUE
ELSE FALSE
END AS tx_success,
CASE
WHEN status = 1 THEN 'SUCCESS'
ELSE 'FAIL'
END AS tx_status,
DATA :to :: STRING AS to_address1,
CASE
WHEN to_address1 = '' THEN NULL
ELSE to_address1
END AS to_address,
DATA :transactionHash :: STRING AS tx_hash,
PUBLIC.udf_hex_to_int(
DATA :transactionIndex :: STRING
) :: INT AS POSITION,
PUBLIC.udf_hex_to_int(
DATA :type :: STRING
) :: INT AS TYPE,
_inserted_timestamp
FROM
base
)
SELECT
*
FROM
FINAL
WHERE
tx_hash IS NOT NULL qualify(ROW_NUMBER() over (PARTITION BY tx_hash
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,367 @@
-- depends_on: {{ ref('bronze__streamline_traces') }}
{{ config (
materialized = "incremental",
unique_key = ['block_number', 'tx_position', 'trace_index'],
incremental_predicates = ["dynamic_range", "block_number"],
cluster_by = "block_timestamp::date, _inserted_timestamp::date",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION",
tags = ['core']
) }}
-- add back full_refresh = false,
WITH traces_txs AS (
SELECT
block_number,
VALUE :array_index :: INT AS tx_position,
DATA :result AS full_traces,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_FR_traces') }}
WHERE
_partition_by_block_id BETWEEN (
SELECT
ROUND(MAX(block_number), -4)
FROM
{{ this }})
AND (
SELECT
ROUND(MAX(block_number), -4) + 500000
FROM
{{ this }})
{% else %}
{{ ref('bronze__streamline_FR_traces') }}
WHERE
_partition_by_block_id <= 2500000
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY block_number, tx_position
ORDER BY
_inserted_timestamp DESC)) = 1
),
base_table AS (
SELECT
CASE
WHEN POSITION(
'.',
path :: STRING
) > 0 THEN REPLACE(
REPLACE(
path :: STRING,
SUBSTR(path :: STRING, len(path :: STRING) - POSITION('.', REVERSE(path :: STRING)) + 1, POSITION('.', REVERSE(path :: STRING))),
''
),
'.',
'__'
)
ELSE '__'
END AS id,
OBJECT_AGG(
DISTINCT key,
VALUE
) AS DATA,
txs.tx_position AS tx_position,
txs.block_number AS block_number,
txs._inserted_timestamp AS _inserted_timestamp
FROM
traces_txs txs,
TABLE(
FLATTEN(
input => PARSE_JSON(
txs.full_traces
),
recursive => TRUE
)
) f
WHERE
f.index IS NULL
AND f.key != 'calls'
GROUP BY
tx_position,
id,
block_number,
_inserted_timestamp
),
flattened_traces AS (
SELECT
DATA :from :: STRING AS from_address,
PUBLIC.udf_hex_to_int(
DATA :gas :: STRING
) AS gas,
PUBLIC.udf_hex_to_int(
DATA :gasUsed :: STRING
) AS gas_used,
DATA :input :: STRING AS input,
DATA :output :: STRING AS output,
DATA :error :: STRING AS error_reason,
DATA :to :: STRING AS to_address,
DATA :type :: STRING AS TYPE,
CASE
WHEN DATA :type :: STRING = 'CALL' THEN PUBLIC.udf_hex_to_int(
DATA :value :: STRING
) / pow(
10,
18
)
ELSE 0
END AS bnb_value,
CASE
WHEN id = '__' THEN CONCAT(
DATA :type :: STRING,
'_ORIGIN'
)
ELSE CONCAT(
DATA :type :: STRING,
'_',
REPLACE(
REPLACE(REPLACE(REPLACE(id, 'calls', ''), '[', ''), ']', ''),
'__',
'_'
)
)
END AS identifier,
concat_ws(
'-',
block_number,
tx_position,
identifier
) AS _call_id,
SPLIT(
identifier,
'_'
) AS id_split,
ARRAY_SLICE(id_split, 1, ARRAY_SIZE(id_split)) AS levels,
ARRAY_TO_STRING(
levels,
'_'
) AS LEVEL,
CASE
WHEN ARRAY_SIZE(levels) = 1
AND levels [0] :: STRING = 'ORIGIN' THEN NULL
WHEN ARRAY_SIZE(levels) = 1 THEN 'ORIGIN'
ELSE ARRAY_TO_STRING(ARRAY_SLICE(levels, 0, ARRAY_SIZE(levels) -1), '_')END AS parent_level,
COUNT(parent_level) over (
PARTITION BY block_number,
tx_position,
parent_level
) AS sub_traces,*
FROM
base_table
),
group_sub_traces AS (
SELECT
tx_position,
block_number,
parent_level,
sub_traces
FROM
flattened_traces
GROUP BY
tx_position,
block_number,
parent_level,
sub_traces
),
add_sub_traces AS (
SELECT
flattened_traces.tx_position AS tx_position,
flattened_traces.block_number :: INTEGER AS block_number,
flattened_traces.error_reason AS error_reason,
flattened_traces.from_address AS from_address,
flattened_traces.to_address AS to_address,
flattened_traces.bnb_value :: FLOAT AS bnb_value,
flattened_traces.gas :: FLOAT AS gas,
flattened_traces.gas_used :: FLOAT AS gas_used,
flattened_traces.input AS input,
flattened_traces.output AS output,
flattened_traces.type AS TYPE,
flattened_traces.identifier AS identifier,
flattened_traces._call_id AS _call_id,
flattened_traces.data AS DATA,
group_sub_traces.sub_traces AS sub_traces,
ROW_NUMBER() over(
PARTITION BY flattened_traces.block_number,
flattened_traces.tx_position
ORDER BY
flattened_traces.gas :: FLOAT DESC,
flattened_traces.bnb_value :: FLOAT ASC,
flattened_traces.to_address
) AS trace_index,
flattened_traces._inserted_timestamp AS _inserted_timestamp
FROM
flattened_traces
LEFT OUTER JOIN group_sub_traces
ON flattened_traces.tx_position = group_sub_traces.tx_position
AND flattened_traces.level = group_sub_traces.parent_level
AND flattened_traces.block_number = group_sub_traces.block_number
),
final_traces AS (
SELECT
tx_position,
trace_index,
block_number,
error_reason,
from_address,
to_address,
bnb_value,
gas,
gas_used,
input,
output,
TYPE,
identifier,
_call_id,
_inserted_timestamp,
DATA,
sub_traces
FROM
add_sub_traces
WHERE
identifier IS NOT NULL
),
new_records AS (
SELECT
f.block_number,
t.tx_hash,
t.block_timestamp,
t.tx_status,
f.tx_position,
f.trace_index,
f.from_address,
f.to_address,
f.bnb_value,
f.gas,
f.gas_used,
f.input,
f.output,
f.type,
f.identifier,
f.sub_traces,
f.error_reason,
CASE
WHEN f.error_reason IS NULL THEN 'SUCCESS'
ELSE 'FAIL'
END AS trace_status,
f.data,
CASE
WHEN t.tx_hash IS NULL
OR t.block_timestamp IS NULL
OR t.tx_status IS NULL THEN TRUE
ELSE FALSE
END AS is_pending,
f._call_id,
f._inserted_timestamp
FROM
final_traces f
LEFT OUTER JOIN {{ ref('silver__transactions2') }}
t
ON f.tx_position = t.position
AND f.block_number = t.block_number -- add back after backfill
-- {% if is_incremental() %}
-- WHERE
-- txs._INSERTED_TIMESTAMP >= '{{ lookback() }}'
-- {% endif %}
)
{% if is_incremental() %},
missing_data AS (
SELECT
t.block_number,
txs.tx_hash,
txs.block_timestamp,
txs.tx_status,
t.tx_position,
t.trace_index,
t.from_address,
t.to_address,
t.bnb_value,
t.gas,
t.gas_used,
t.input,
t.output,
t.type,
t.identifier,
t.sub_traces,
t.error_reason,
t.trace_status,
t.data,
FALSE AS is_pending,
t._call_id,
GREATEST(
t._inserted_timestamp,
txs._inserted_timestamp
) AS _inserted_timestamp
FROM
{{ this }}
t
INNER JOIN {{ ref('silver__transactions2') }}
txs
ON t.tx_position = txs.position
AND t.block_number = txs.block_number
WHERE
t.is_pending
)
{% endif %},
FINAL AS (
SELECT
block_number,
tx_hash,
block_timestamp,
tx_status,
tx_position,
trace_index,
from_address,
to_address,
bnb_value,
gas,
gas_used,
input,
output,
TYPE,
identifier,
sub_traces,
error_reason,
trace_status,
DATA,
is_pending,
_call_id,
_inserted_timestamp
FROM
new_records
{% if is_incremental() %}
UNION
SELECT
block_number,
tx_hash,
block_timestamp,
tx_status,
tx_position,
trace_index,
from_address,
to_address,
bnb_value,
gas,
gas_used,
input,
output,
TYPE,
identifier,
sub_traces,
error_reason,
trace_status,
DATA,
is_pending,
_call_id,
_inserted_timestamp
FROM
missing_data
{% endif %}
)
SELECT
*
FROM
FINAL qualify(ROW_NUMBER() over(PARTITION BY block_number, tx_position, trace_index
ORDER BY
_inserted_timestamp DESC, is_pending ASC)) = 1

View File

@ -0,0 +1,241 @@
-- depends_on: {{ ref('bronze__streamline_transactions') }}
{{ config(
materialized = 'incremental',
unique_key = "tx_hash",
cluster_by = "block_timestamp::date, _inserted_timestamp::date",
incremental_predicates = ["dynamic_range", "block_number"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION",
tags = ['core']
) }}
WITH base AS (
SELECT
block_number,
DATA,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_FR_transactions') }}
WHERE
_partition_by_block_id BETWEEN (
SELECT
ROUND(MAX(block_number), -4)
FROM
{{ this }})
AND (
SELECT
ROUND(MAX(block_number), -4) + 500000
FROM
{{ this }})
AND IS_OBJECT(DATA)
{% else %}
{{ ref('bronze__streamline_FR_transactions') }}
WHERE
_partition_by_block_id <= 2500000
AND IS_OBJECT(DATA)
{% endif %}
),
new_records AS (
SELECT
A.block_number AS block_number,
A.data :blockHash :: STRING AS block_hash,
PUBLIC.udf_hex_to_int(
A.data :blockNumber :: STRING
) :: INT AS blockNumber,
A.data :from :: STRING AS from_address,
PUBLIC.udf_hex_to_int(
A.data :gas :: STRING
) :: INT AS gas,
PUBLIC.udf_hex_to_int(
A.data :gasPrice :: STRING
) :: INT / pow(
10,
9
) AS gas_price,
A.data :hash :: STRING AS tx_hash,
A.data :input :: STRING AS input_data,
SUBSTR(
input_data,
1,
10
) AS origin_function_signature,
PUBLIC.udf_hex_to_int(
A.data :nonce :: STRING
) :: INT AS nonce,
A.data :r :: STRING AS r,
A.data :s :: STRING AS s,
A.data :to :: STRING AS to_address1,
CASE
WHEN to_address1 = '' THEN NULL
ELSE to_address1
END AS to_address,
PUBLIC.udf_hex_to_int(
A.data :transactionIndex :: STRING
) :: INT AS POSITION,
A.data :type :: STRING AS TYPE,
A.data :v :: STRING AS v,
PUBLIC.udf_hex_to_int(
A.data :value :: STRING
) / pow(
10,
18
) :: FLOAT AS VALUE,
block_timestamp,
CASE
WHEN block_timestamp IS NULL
OR tx_status IS NULL THEN TRUE
ELSE FALSE
END AS is_pending,
r.gas_used,
tx_success,
tx_status,
cumulative_gas_used,
effective_gas_price,
(
gas_price * r.gas_used
) / pow(
10,
9
) AS tx_fee,
r.type AS tx_type,
A._INSERTED_TIMESTAMP
FROM
base A
LEFT OUTER JOIN {{ ref('silver__receipts') }}
r
ON A.block_number = r.block_number
AND A.data :hash :: STRING = r.tx_hash
LEFT OUTER JOIN {{ ref('silver__blocks2') }}
b
ON A.block_number = b.block_number
{% if is_incremental() %}
WHERE
r._INSERTED_TIMESTAMP >= '{{ lookback() }}'
{% endif %}
)
{% if is_incremental() %},
missing_data AS (
SELECT
t.block_number,
t.block_hash,
t.from_address,
t.gas,
t.gas_price,
t.tx_hash,
t.input_data,
t.origin_function_signature,
t.nonce,
t.r,
t.s,
t.to_address,
t.position,
t.type,
t.v,
t.value,
b.block_timestamp,
FALSE AS is_pending,
r.gas_used,
r.tx_success,
r.tx_status,
r.cumulative_gas_used,
r.effective_gas_price,
(
t.gas_price * r.gas_used
) / pow(
10,
9
) AS tx_fee,
r.type AS tx_type,
GREATEST(
t._inserted_timestamp,
b._inserted_timestamp,
r._inserted_timestamp
) AS _inserted_timestamp
FROM
{{ this }}
t
INNER JOIN {{ ref('silver__blocks2') }}
b
ON t.block_number = b.block_number
INNER JOIN {{ ref('silver__receipts') }}
r
ON t.tx_hash = r.tx_hash
AND t.block_number = r.block_number
WHERE
t.is_pending
)
{% endif %},
FINAL AS (
SELECT
block_number,
block_hash,
from_address,
gas,
gas_price,
tx_hash,
input_data,
origin_function_signature,
nonce,
r,
s,
to_address,
POSITION,
TYPE,
v,
VALUE,
block_timestamp,
is_pending,
gas_used,
tx_success,
tx_status,
cumulative_gas_used,
effective_gas_price,
tx_fee,
tx_type,
_inserted_timestamp
FROM
new_records
{% if is_incremental() %}
UNION
SELECT
block_number,
block_hash,
from_address,
gas,
gas_price,
tx_hash,
input_data,
origin_function_signature,
nonce,
r,
s,
to_address,
POSITION,
TYPE,
v,
VALUE,
block_timestamp,
is_pending,
gas_used,
tx_success,
tx_status,
cumulative_gas_used,
effective_gas_price,
tx_fee,
tx_type,
_inserted_timestamp
FROM
missing_data
{% endif %}
)
SELECT
*
FROM
FINAL qualify(ROW_NUMBER() over (PARTITION BY tx_hash
ORDER BY
_inserted_timestamp DESC, is_pending ASC)) = 1