import asyncio
import soildb
async def get_soil_data():
# All soildb functions are async
mapunits = await soildb.get_mapunit_by_areasymbol("IA109")
df = mapunits.to_pandas()
return df
# To run this function, you need an event loop.
# The following sections explain how to run this code in different environments.Asynchronous Programming with soildb
soildb is built on Python’s asyncio for high-performance, concurrent data access. This guide explains how to use async features effectively.
Getting Started
Asynchronous programming allows you to write concurrent code that can handle multiple tasks at once. In soildb, all functions that interact with the Soil Data Access API are async and must be await-ed.
Here is a simple example of an async function that retrieves soil map units for a given area:
Why Async?
- Performance: Concurrent queries execute faster than sequential ones
- Scalability: Handle many requests without blocking
- Integration: Works seamlessly with async web frameworks (FastAPI, aiohttp, etc.)
Concurrent Queries
Asynchronous functions allow us to execute multiple queries as simultaneous tasks:
import asyncio
import soildb
async def concurrent_queries():
areas = ["IA109", "IA113", "IA117"]
# Create tasks for concurrent execution
tasks = [
soildb.get_mapunit_by_areasymbol(area)
for area in areas
]
# Wait for all to complete
results = await asyncio.gather(*tasks)
# Process results
for area, result in zip(areas, results):
df = result.to_pandas()
print(f"{area}: {len(df)} map units")
asyncio.run(concurrent_queries())Using the SDAClient for Multiple Queries
For scenarios involving multiple queries, it is more efficient to create a single SDAClient instance and reuse it. This avoids the overhead of establishing a new connection for each query. The SDAClient can be used as an async context manager.
Sequential Queries
import asyncio
import soildb
async def sequential_queries():
async with soildb.SDAClient() as client:
for area in ["IA109", "IA113", "IA117"]:
query = soildb.query_templates.query_mapunits_by_legend(area)
result = await client.execute(query)
print(f"{area}: {len(result.to_pandas())} map units")
asyncio.run(sequential_queries())Concurrent Queries with a Client
You can also use the client to execute multiple queries concurrently with asyncio.gather:
import asyncio
import soildb
async def concurrent_queries_with_client():
async with soildb.SDAClient() as client:
queries = [soildb.query_templates.query_mapunits_by_legend(area) for area in ["IA109", "IA113", "IA117"]]
results = await asyncio.gather(*[client.execute(q) for q in queries])
for result in results:
print(f"Got {len(result.to_pandas())} map units")
asyncio.run(concurrent_queries_with_client())Bulk Data Fetching
Async is particularly helpful with larger datasets (say, multiple soil survey areas):
import asyncio
from soildb import fetch_by_keys, get_mukey_by_areasymbol
async def bulk_fetch():
# Get many mukeys
mukeys = await get_mukey_by_areasymbol(["IA109", "IA113"])
# Fetch data in chunks automatically
response = await fetch_by_keys(
mukeys,
"mapunit",
columns=["mukey", "muname", "mukind"]
)
return response.to_pandas()
df = asyncio.run(bulk_fetch())Streaming Large Datasets
Async Generator for Streaming
import asyncio
import soildb
async def stream_areas(areas):
for area in areas:
result = await soildb.get_mapunit_by_areasymbol(area)
yield result
async def process_stream():
areas = ["IA109", "IA113", "IA117"]
async for result in stream_areas(areas):
df = result.to_pandas()
# Process each result as it arrives
print(f"Processed {len(df)} map units")
asyncio.run(process_stream())Error Handling
There are a variety of errors that can occur, we can cleanly handle them using try/except blocks:
import asyncio
import soildb
from soildb import SDAConnectionError, SDAMaintenanceError
async def robust_query():
try:
result = await soildb.get_mapunit_by_areasymbol("IA109")
return result.to_pandas()
except SDAConnectionError:
print("Network error - retry later")
return None
except SDAMaintenanceError:
print("SDA service under maintenance")
return None
except Exception as e:
print(f"Unexpected error: {e}")
return None
result = asyncio.run(robust_query())Integration with Async Frameworks
Implementing custom soil data getting functions in other frameworks is much better with async functions.
FastAPI Example
Here we demonstrate how to create a custom API endpoint with FastAPI to obtain mapunits that occur in the specified soil survey area:
from fastapi import FastAPI
import soildb
app = FastAPI()
@app.get("/soil/{areasymbol}")
async def get_soil_data(areasymbol: str):
mapunits = await soildb.get_mapunit_by_areasymbol(areasymbol)
return mapunits.to_pandas().to_dict('records')Running Async Code
There are two main ways to run async code with soildb:
1. In a Regular Python Script
When running a .py file from your terminal, you can use asyncio.run() to execute the top-level async function. This creates a new event loop, runs the function, and closes the loop.
import asyncio
import soildb
async def main():
mapunits = await soildb.get_mapunit_by_areasymbol("IA109")
print(mapunits.to_pandas())
if __name__ == "__main__":
asyncio.run(main())2. In an Environment with a Running Event Loop
Environments like Jupyter Notebooks, VSCode Notebooks, or other async applications already have a running asyncio event loop. Calling asyncio.run() in these environments will raise a RuntimeError.
To solve this, you should use nest_asyncio to allow the event loop to be nested.
import asyncio
import nest_asyncio
import soildb
# Allow nested event loops
nest_asyncio.apply()
async def get_soil_data():
mapunits = await soildb.get_mapunit_by_areasymbol("IA109")
return mapunits.to_pandas()
# Get the current event loop and run the function
loop = asyncio.get_event_loop()
df = loop.run_until_complete(get_soil_data())
df.head()Troubleshooting
Event Loop Issues
Problem: RuntimeError: asyncio.run() cannot be called from a running event loop
Solution: Use nest_asyncio or check for existing loops:
try:
loop = asyncio.get_running_loop()
# Use existing loop
except RuntimeError:
# Create new loop
asyncio.run(main())Connection Timeouts
Problem: Queries timeout with large datasets
Solution: Increase timeout and use chunking:
client = soildb.SDAClient(timeout=120.0) # 2 minutes
response = await soildb.fetch_by_keys(large_key_list, "mapunit", chunk_size=500)Memory Issues
Problem: Large datasets consume too much memory
Solution: Process in chunks and use polars for efficiency:
# Process in smaller chunks
for i in range(0, len(mukeys), 1000):
chunk = mukeys[i:i+1000]
response = await soildb.fetch_by_keys(chunk, "mapunit")
# Process chunk immediately