| .. | ||
| shroomdk | ||
| LICENSE.txt | ||
| Makefile | ||
| MANIFEST.in | ||
| README.md | ||
| requirements-dev.txt | ||
| requirements.txt | ||
| setup.cfg | ||
| setup.py | ||
| VERSION | ||
Python SDK
Programmatic access to the most comprehensive blockchain data in Web3, for free. 🥳
GM frens, you've found yourself at the Python SDK for ShroomDK.
💾 Install the SDK
pip install shroomdk
🦾 Getting Started
from shroomdk import ShroomDK
# Initialize `ShroomDK` with your API Key
sdk = ShroomDK(
"<YOUR_API_KEY>",
"https://api.flipsidecrypto.com"
)
# Parameters can be passed into SQL statements
# via native string interpolation
my_address = "0x...."
sql = f"""
SELECT
nft_address,
mint_price_eth,
mint_price_usd
FROM ethereum.core.ez_nft_mints
WHERE nft_to_address = LOWER('${my_address}')
"""
# Run the query against Flipside's query engine
# and await the results
query_result_set = sdk.query(sql)
# Iterate over the results
for record in query_result_set.records:
nft_address = record['nft_address']
mint_price_eth = record['mint_price_eth']
mint_price_usd = record['mint_price_usd']
print(f"${nft_address} minted for {mint_price_eth}E ({mint_price_usd})USD")
The Details
Executing a Query
When executing a query the following parameters can be passed in / overriden to the query method on the ShroomDK object:
| Argument | Description | Default Value |
|---|---|---|
| sql | The sql string to execute | None (required) |
| ttl_minutes | The number of minutes to cache the query results | 60 |
| cached | An override on the query result cache. A value of false will re-execute the query. | True |
| timeout_minutes | The number of minutes until your query run times out | 20 |
| retry_interval_seconds | The number of second to wait between polls to the server | 1 |
| page_size | The number of rows/records to return | 100,000 |
| page_number | The page number to return (starts at 1) | 1 |
Let's create a query to retrieve all NFTs minted by an address:
my_address = "0x...."
sql = f"""
SELECT
nft_address,
mint_price_eth,
mint_price_usd
FROM ethereum.core.ez_nft_mints
WHERE nft_to_address = LOWER('${my_address}')
LIMIT 100
"""
Now let's execute the query and retrieve the first 5 rows of the result set. Note we will set page_size to 5 and page_number to 1 to retrieve those first 5 rows.
query_result_set = sdk.query(
sql,
ttl_minutes=60,
cached=True,
timeout_minutes=20,
retry_interval_seconds=1,
page_size=5,
page_number=1
)
The results of this query will be cached for 60 minutes, given the ttl_minutes parameter is set to 60. Let's examine the query result object that's returned.
The QueryResultSet Object
After executing a query the results are stored in a QueryResultSet object:
class QueryResultSet(BaseModel):
query_id: Union[str, None] = Field(None, description="The server id of the query")
status: str = Field(False, description="The status of the query (`PENDING`, `FINISHED`, `ERROR`)")
columns: Union[List[str], None] = Field(None, description="The names of the columns in the result set")
column_types: Union[List[str], None] = Field(None, description="The type of the columns in the result set")
rows: Union[List[Any], None] = Field(None, description="The results of the query")
run_stats: Union[QueryRunStats, None] = Field(
None,
description="Summary stats on the query run (i.e. the number of rows returned, the elapsed time, etc)",
)
records: Union[List[Any], None] = Field(None, description="The results of the query transformed as an array of objects")
error: Any
Let's iterate over the results from our query above.
Our query selected nft_address, mint_price_eth, and mint_price_usd. We can access the returned data via the records parameter. The column names in our query are assigned as keys in each record object.
for record in query_result_set.records:
nft_address = record['nft_address']
mint_price_eth = record['mint_price_eth']
mint_price_usd = record['mint_price_usd']
print(f"${nft_address} minted for {mint_price_eth}E ({mint_price_usd})USD")
Other useful information can be accessed on the query result set object such as run stats, i.e. how long the query took to execute:
started_at = query_result_set.run_stats.started_at
ended_at = query_result_set.run_stats.ended_at
elapsed_seconds = query_result_set.run_stats.elapsed_seconds
record_count = query_result_set.run_stats.record_count
print(f"This query took ${elapsed_seconds} seconds to run and returned {record_count} records from the database."")
Rate Limits
Every API key is subject to a rate limit over a moving 5 minute window, as well as an aggregate daily limit.
If the limit is reach in a 5 minute period, the sdk will exponentially backoff and retry the query up to the timeoutMinutes parameter set on the Query object.
This feature is quite useful if leveraging the SDK client side and your web application sees a large spike in traffic. Rather than using up your daily limit all at once, requests will be smoothed out over the day.
Rate limits can be adjust per key/use-case.