Asynchronous Programming with soildb

soildb is built on Python’s asyncio for high-performance, concurrent data access. This guide explains how to use async features effectively.

Getting Started

Asynchronous programming allows you to write concurrent code that can handle multiple tasks at once. In soildb, all functions that interact with the Soil Data Access API are async and must be await-ed.

Here is a simple example of an async function that retrieves soil map units for a given area:

import asyncio
import soildb

async def get_soil_data():
    # All soildb functions are async
    mapunits = await soildb.get_mapunit_by_areasymbol("IA109")
    df = mapunits.to_pandas()
    return df

# To run this function, you need an event loop.
# The following sections explain how to run this code in different environments.

Why Async?

  • Performance: Concurrent queries execute faster than sequential ones
  • Scalability: Handle many requests without blocking
  • Integration: Works seamlessly with async web frameworks (FastAPI, aiohttp, etc.)

Concurrent Queries

Asynchronous functions allow us to execute multiple queries as simultaneous tasks:

import asyncio
import soildb

async def concurrent_queries():
    areas = ["IA109", "IA113", "IA117"]
    
    # Create tasks for concurrent execution
    tasks = [
        soildb.get_mapunit_by_areasymbol(area) 
        for area in areas
    ]
    
    # Wait for all to complete
    results = await asyncio.gather(*tasks)
    
    # Process results
    for area, result in zip(areas, results):
        df = result.to_pandas()
        print(f"{area}: {len(df)} map units")

asyncio.run(concurrent_queries())

Using the SDAClient for Multiple Queries

For scenarios involving multiple queries, it is more efficient to create a single SDAClient instance and reuse it. This avoids the overhead of establishing a new connection for each query. The SDAClient can be used as an async context manager.

Sequential Queries

import asyncio
import soildb

async def sequential_queries():
    async with soildb.SDAClient() as client:
        for area in ["IA109", "IA113", "IA117"]:
            query = soildb.query_templates.query_mapunits_by_legend(area)
            result = await client.execute(query)
            print(f"{area}: {len(result.to_pandas())} map units")

asyncio.run(sequential_queries())

Concurrent Queries with a Client

You can also use the client to execute multiple queries concurrently with asyncio.gather:

import asyncio
import soildb

async def concurrent_queries_with_client():
    async with soildb.SDAClient() as client:
        queries = [soildb.query_templates.query_mapunits_by_legend(area) for area in ["IA109", "IA113", "IA117"]]
        results = await asyncio.gather(*[client.execute(q) for q in queries])
        for result in results:
            print(f"Got {len(result.to_pandas())} map units")

asyncio.run(concurrent_queries_with_client())

Bulk Data Fetching

Async is particularly helpful with larger datasets (say, multiple soil survey areas):

import asyncio
from soildb import fetch_by_keys, get_mukey_by_areasymbol

async def bulk_fetch():
    # Get many mukeys
    mukeys = await get_mukey_by_areasymbol(["IA109", "IA113"])
    
    # Fetch data in chunks automatically
    response = await fetch_by_keys(
        mukeys, 
        "mapunit", 
        columns=["mukey", "muname", "mukind"]
    )
    
    return response.to_pandas()

df = asyncio.run(bulk_fetch())

Streaming Large Datasets

Async Generator for Streaming

import asyncio
import soildb

async def stream_areas(areas):
    for area in areas:
        result = await soildb.get_mapunit_by_areasymbol(area)
        yield result

async def process_stream():
    areas = ["IA109", "IA113", "IA117"]
    async for result in stream_areas(areas):
        df = result.to_pandas()
        # Process each result as it arrives
        print(f"Processed {len(df)} map units")

asyncio.run(process_stream())

Error Handling

There are a variety of errors that can occur, we can cleanly handle them using try/except blocks:

import asyncio
import soildb
from soildb import SDAConnectionError, SDAMaintenanceError

async def robust_query():
    try:
        result = await soildb.get_mapunit_by_areasymbol("IA109")
        return result.to_pandas()
    except SDAConnectionError:
        print("Network error - retry later")
        return None
    except SDAMaintenanceError:
        print("SDA service under maintenance")
        return None
    except Exception as e:
        print(f"Unexpected error: {e}")
        return None

result = asyncio.run(robust_query())

Integration with Async Frameworks

Implementing custom soil data getting functions in other frameworks is much better with async functions.

FastAPI Example

Here we demonstrate how to create a custom API endpoint with FastAPI to obtain mapunits that occur in the specified soil survey area:

from fastapi import FastAPI
import soildb

app = FastAPI()

@app.get("/soil/{areasymbol}")
async def get_soil_data(areasymbol: str):
    mapunits = await soildb.get_mapunit_by_areasymbol(areasymbol)
    return mapunits.to_pandas().to_dict('records')

Running Async Code

There are two main ways to run async code with soildb:

1. In a Regular Python Script

When running a .py file from your terminal, you can use asyncio.run() to execute the top-level async function. This creates a new event loop, runs the function, and closes the loop.

import asyncio
import soildb

async def main():
    mapunits = await soildb.get_mapunit_by_areasymbol("IA109")
    print(mapunits.to_pandas())

if __name__ == "__main__":
    asyncio.run(main())

2. In an Environment with a Running Event Loop

Environments like Jupyter Notebooks, VSCode Notebooks, or other async applications already have a running asyncio event loop. Calling asyncio.run() in these environments will raise a RuntimeError.

To solve this, you should use nest_asyncio to allow the event loop to be nested.

import asyncio
import nest_asyncio
import soildb

# Allow nested event loops
nest_asyncio.apply()

async def get_soil_data():
    mapunits = await soildb.get_mapunit_by_areasymbol("IA109")
    return mapunits.to_pandas()

# Get the current event loop and run the function
loop = asyncio.get_event_loop()
df = loop.run_until_complete(get_soil_data())
df.head()

Troubleshooting

Event Loop Issues

Problem: RuntimeError: asyncio.run() cannot be called from a running event loop

Solution: Use nest_asyncio or check for existing loops:

try:
    loop = asyncio.get_running_loop()
    # Use existing loop
except RuntimeError:
    # Create new loop
    asyncio.run(main())

Connection Timeouts

Problem: Queries timeout with large datasets

Solution: Increase timeout and use chunking:

client = soildb.SDAClient(timeout=120.0)  # 2 minutes
response = await soildb.fetch_by_keys(large_key_list, "mapunit", chunk_size=500)

Memory Issues

Problem: Large datasets consume too much memory

Solution: Process in chunks and use polars for efficiency:

# Process in smaller chunks
for i in range(0, len(mukeys), 1000):
    chunk = mukeys[i:i+1000]
    response = await soildb.fetch_by_keys(chunk, "mapunit")
    # Process chunk immediately