How to Build Applications With Asyncpg and PostgreSQL
Efficiency, consistency, and agility are core concerns for application developers today. The choice of a database and its interaction with the system play vital roles in achieving these requirements.
Due to its robustness, PostgreSQL has been one of the go-to databases among developers. When integrated with the Python library asyncpg, PostgreSQL can significantly enhance application performance.
Asyncpg fully supports asynchronous programming, allowing non-blocking code execution. This capability is beneficial for many concurrent database operations, such as web applications, real-time analytic systems, or microservices architectures. With asyncpg, it's possible to service several requests at once, thus lowering latencies and boosting overall efficiency.
In this tutorial, I’ll walk you through creating applications with asyncpg and PostgreSQL. I'll start with the configuration basics and gradually build a simple app for financial data management. By the end, you will have learned how to make applications efficient but also scalable with asyncpg and PostgreSQL. But first, let’s describe the tools we’ll use in this tutorial.
What’s Asyncpg?
Asyncpg is a high-performance asynchronous PostgreSQL client library for Python. It allows developers to execute database operations without blocking other code's execution. This capability is crucial in building responsive and practical applications, especially in very demanding cases with high concurrency and low latency.
Asyncpg features
- High performance: asyncpg is designed to be speedy and efficient, and it will most likely rank with the fastest-existing PostgreSQL drivers.
- Asynchronous capabilities: An asyncio library in Python serves as the primary enabler for asyncpg. This allows for non-blocking operations on databases to be executed, therefore enhancing the application's overall responsiveness.
- Supports the latest PostgreSQL: asyncpg fully supports the latest PostgreSQL features, such as JSONB, arrays, composite types, and many others.
- Prepared statements: asyncpg supports ready statements, thus increasing the execution speed of queries with already compiled and optimized execution plans.
- Extensibility: asyncpg is easily customizable and extendable to be adapted for different use cases.
Asyncpg architecture
Asyncpg is designed to be performant yet simple. Its architecture includes the following core components:
Event loop integration
Asyncpg is deeply integrated with the Python asyncio event loop, which makes it able to do non-blocking operations with a database—those that are crucial for building scalable applications. The event loop schedules the execution of async tasks so that it handles multiple operations running concurrently without waiting for one operation to complete before another starts.
Connection pooling
Connection pooling is a crucial feature of asyncpg. It means that instead of opening and closing the same database connection every time, you'll reuse one from an existing pool. This lowers the overhead of creating a connection, improving an application's performance. The connection pool will administer a set of database connections, providing an efficient way of handling numerous simultaneous requests to a database.
High-level query interface
Asyncpg features an upper-level query interface, which abstracts the developer's interaction with the database and, hence, facilitates database work. This interface design allows developers to run SQL queries and communicate with the database in a very intuitive and effective manner. The high level of complexity in interfacing with database operations is thereby abstracted, facilitating ease in developing and maintaining database applications.
Binary protocol support
One of the most exciting features of asyncpg is binary protocol support for PostgreSQL. This protocol achieves faster communication between the client and database server by reducing the data that needs to be transferred. Because of binary protocol support, database operation performance is significantly increased when dealing with large datasets.
Data caching
Asyncpg incorporates mechanisms for caching data, which would reduce the load on the database server while increasing response time. By storing often-used data, it prevents constant re-querying of the database for that information and hence improves overall application efficiency.
Robust error handling
Error handling is a fundamental aspect of the asyncpg architecture. It supports developers with full error reporting and management, allowing them to gracefully handle errors that may arise from the database. This ensures that applications will be stable and dependable even in the face of unexpected issues.
Asynchronous transaction management
Asyncpg adds support for asynchronously running transactional operations so that a developer can implement complex transactional flows without blocking the event loop. This is critical in applications that require correct and reliable data operations, such as those serving financial systems or real-time analytic platforms.
PostgreSQL Made Powerful: TimescaleDB
TimescaleDB is an open-source time-series database designed to efficiently manage large-scale time-series data. Developed as an extension to PostgreSQL, it seamlessly combines advanced time-series capabilities into applications. This ensures high performance, maximum scalability, and ease of use while building on PostgreSQL's robustness and reliability.
If you want to focus on building your application, not your database, the fully managed Timescale Cloud will give you time to focus on your priorities. It provides automated data management policies, backup and restore mechanisms, and top-notch 24/7 support, among other benefits. If you want to take it out for a spin, sign up and try it for free for 30 days.
TimescaleDB features
TimescaleDB has numerous functionalities aimed at primarily enhancing time-series data management and analytics.
- Automatic partitioning: Hypertables simplify time-series data management. For users, they look like one big table, but TimescaleDB divides them into smaller pieces called chunks. This division is managed automatically by TimescaleDB to optimize queries and operations on the data structure for better performance. Check out our docs to learn how to create your hypertable.
- Time-based query optimization: TimescaleDB automatically optimizes various time-based queries, such as last-point queries, time aggregates, and downsampling.
- Data retention policies: Through automated data retention policies, older data will be compressed and removed according to user-defined rules, balancing query performance with storage needs.
- Compression: TimescaleDB has advanced compression mechanisms, which reduce storage costs and improve I/O performance on historical data.
- Continuous aggregates: These are materialized views that will refresh continually with newly ingested data, offering speedy responses to complex analytical questions.
Now that you know more about the tools we’ll use, it’s time to get started. The code for this tutorial is available on GitHub.
Integrating Asyncpg With TimescaleDB
Installing asyncpg library
To install the asyncpg library in Python, use the following code:
Connecting and importing data from TimescaleDB through asyncpg in Python
You can use this code to connect the TimescaleDB to asyncpg and fetch data in Python:
The libraries I used to connect the database are:
asyncpg
: a package that allows you to interact with PostgreSQL databases asynchronously.pandas
: a library for data manipulation and exploration.asyncio
: a library for writing concurrent code with the async and await keywords.
I used the TimescaleDB finance database for the data that contains the information about the BTC/USD for high, open, and low. Then, you need to provide the database URL.
# Database credentials
db_url = "paste your credentials"
If you need to use your database, you can simply create the database through the TimescaleDB extension and import or insert data your data. Replace the credentials with the username, password, host, port, and database name.
The next step is to define the asynchronous function to fetch the data. Here’s how you can do it:
async def fetch_data():
# Connect to the database
conn = await asyncpg.connect(db_url)
fetch_data()
: an asynchronous function to handle the database connection and data retrieval.conn
: it establishes a connection to the database using the credentials provided indb_url
.
Now let’s define the SQL query to extract the data from the table:
# Define the query to fetch data for the past 1 week
query = """
SELECT bucket, open, high, low, close
FROM one_min_candle
WHERE symbol = 'BTC/USD'
AND bucket >= NOW() - INTERVAL '1 week'
ORDER BY bucket DESC;
"""
The next step is to fetch and process the data.
# Execute the query and fetch the results
rows = await conn.fetch(query)
# Convert the results to a pandas DataFrame
data = pd.DataFrame(rows, columns=['bucket', 'open', 'high', 'low', 'close'])
#save the DataFrame to a CSV file
data.to_csv('data.csv', index=False)
The statement rows = await conn.fetch(query)
runs the given SQL query asynchronously on the linked PostgreSQL database and retrieves the rows. The await keyword guarantees that the function waits for the query to finish before proceeding, with the results saved in the rows parameter.
Now let’s close the connection and display the extracted data.
# Close the connection
await conn.close()
# Return the DataFrame to display it in the notebook
return data
# Using await directly in Jupyter to run the async function and display the DataFrame
display(await fetch_data())
The line await conn.close()
asynchronously terminates the database connection. The return data method then provides the DataFrame for additional usage. Lastly, display(await fetch_data())
invokes the async method fetch_data()
in a Jupyter Notebook and displays the resulting DataFrame. This sequence guarantees proper connection management and data presentation.
Upon executing the code, we got the following result:
Now that the data is extracted and displayed, it’s time to execute some operations. Let’s visualize the data to explore the trend.
The plot is dynamic, updating every second with the most recent information. This real-time data visualization allows us to monitor financial trends and market movements continuously. The connection to TimescaleDB ensures that we are accessing high-resolution time-series data efficiently, enabling us to provide up-to-the-minute insights and analytics.
Building a Finance Application: CRUD Operations
In the previous section, we successfully connected to and fetched data from the TimescaleDB database. Now, let’s create our table and perform CRUD (create, read, update, and delete) operations to build a simple app for financial data.
First, you need to create your service in TimescaleDB and copy the connection link. Here is the code you can use to create the hypertable once you have the link:
async def create_hypertable():
conn = await asyncpg.connect(CONNECTION)
create_table_query = """
CREATE TABLE IF NOT EXISTS ticks (
time TIMESTAMPTZ NOT NULL,
symbol TEXT NOT NULL,
price DOUBLE PRECISION,
volume DOUBLE PRECISION,
PRIMARY KEY (time, symbol)
);
"""
await conn.execute(create_table_query)
create_hypertable_query = "SELECT create_hypertable('ticks', 'time', if_not_exists => TRUE);"
await conn.execute(create_hypertable_query)
create_continuous_aggregate_query = """
CREATE MATERIALIZED VIEW one_min_candle
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 minute', time) AS bucket,
symbol,
first(price, time) AS open,
max(price) AS high,
min(price) AS low,
last(price, time) AS close
FROM
ticks
GROUP BY
bucket, symbol
WITH NO DATA;
"""
await conn.execute(create_continuous_aggregate_query)
print("Hypertable and continuous aggregate created successfully.")
await conn.close()
Create (Insert)
The first operation is to create and insert data in the table. Below is the code that you can use to insert data in the hypertable:
async def insert_ticks(data):
conn = await asyncpg.connect(CONNECTION)
insert_query = """
INSERT INTO ticks (time, symbol, price, volume)
VALUES ($1, $2, $3, $4)
"""
await conn.executemany(insert_query, data)
print("Tick data inserted successfully.")
await conn.close()
Read (Select)
The next operation is to read the data that we have inserted earlier. Below is the code that you can use to read the data:
async def read_ticks():
conn = await asyncpg.connect(CONNECTION)
select_query = "SELECT * FROM ticks ORDER BY time DESC"
rows = await conn.fetch(select_query)
for row in rows:
print(dict(row))
await conn.close()
return rows
async def read_aggregates():
conn = await asyncpg.connect(CONNECTION)
select_query = "SELECT * FROM one_min_candle ORDER BY bucket DESC"
rows = await conn.fetch(select_query)
for row in rows:
print(dict(row))
await conn.close()
return rows
Update
The code for updating the data in the table is:
async def update_tick(time, symbol, new_values):
conn = await asyncpg.connect(CONNECTION)
update_query = """
UPDATE ticks
SET price = $3, volume = $4
WHERE time = $1 AND symbol = $2
"""
await conn.execute(update_query, time, symbol, new_values[0], new_values[1])
print("Tick data updated successfully.")
Delete
To delete the data from the table, you can use the following function:
async def delete_tick(time, symbol):
conn = await asyncpg.connect(CONNECTION)
delete_query = "DELETE FROM ticks WHERE time = $1 AND symbol = $2"
await conn.execute(delete_query, time, symbol)
print("Tick data deleted successfully.")
await conn.close()
The main function handles the table's creation, data insertion, reading, updating, and eventually deletion, illustrating CRUD activities. The code for the main function is:
async def main():
# Create hypertable and continuous aggregate
await create_hypertable()
# Insert sample data
sample_ticks = [
(datetime.strptime('2024-07-03 06:25:00+00:00', '%Y-%m-%d %H:%M:%S%z'), 'BTC', 61057.0, 0.5),
(datetime.strptime('2024-07-03 06:24:30+00:00', '%Y-%m-%d %H:%M:%S%z'), 'BTC', 61058.8, 0.2),
(datetime.strptime('2024-07-03 06:23:45+00:00', '%Y-%m-%d %H:%M:%S%z'), 'BTC', 61061.4, 0.1),
(datetime.strptime('2024-07-03 06:22:30+00:00', '%Y-%m-%d %H:%M:%S%z'), 'BTC', 61072.7, 0.3),
(datetime.strptime('2024-07-03 06:21:15+00:00', '%Y-%m-%d %H:%M:%S%z'), 'BTC', 61026.4, 0.4)
]
await insert_ticks(sample_ticks)
# Read raw tick data
print("Reading raw tick data:")
await read_ticks()
# Read aggregated candlestick data
print("Reading aggregated candlestick data:")
await read_aggregates()
# Update a tick data
update_time = datetime.strptime('2024-07-03 06:25:00+00:00', '%Y-%m-%d %H:%M:%S%z')
new_values = (62000.0, 0.6)
await update_tick(update_time, 'BTC', new_values)
print("After update:")
await read_ticks()
await read_aggregates()
# Delete a tick data
delete_time = datetime.strptime('2024-07-03 06:25:00+00:00', '%Y-%m-%d %H:%M:%S%z')
await delete_tick(delete_time, 'BTC')
print("After delete:")
await read_ticks()
await read_aggregates()
await main()
Upon running the code, you will get an output similar to the one given below, demonstrating the simple application performing the CRUD operations using aysncpg with PostgreSQL.
Upon viewing the output, you'll see how to integrate asyncpg with PostgreSQL to create efficient apps. Using asyncpg's asynchronous capabilities, you can perform non-blocking CRUD operations and employ TimescaleDB’s sophisticated features like indexing and hypertables for optimal efficiency.
Tips for Handling Larger Datasets
- Batch processing: handle huge datasets in chunks to prevent memory overflows and shorten transaction times.
- Indexes: use indexes on frequently used columns to accelerate read operations.
- Hypertables: use TimescaleDB's hypertables to optimize time-series data storage and performance of queries.
- Asynchronous operations: use async operations to manage I/O-bound activities without interrupting the application's primary workflow.
Developing scalable and efficient applications requires implementing best practices and optimizing performance in dealing with time-series databases, like TimescaleDB, and when working with asynchronous programming, using asyncpg and asyncio.
Numerous openings and closings of the database connections use most of the resources. Pooling makes an application capable of managing the task of addressing a limited set of connections with which it could respond to a possibly huge number of concurrent database queries. It ensures better use of database connections, which could help gracefully handle spiking loads more effectively.
Optimizing Time-Series Queries for Performance Gains
Optimizing PostgreSQL queries is the main option for time-series data, characterized by large data volumes and frequent access.
- Indexing: Apply proper indexing methods. TimescaleDB manages the indexing of time columns automatically, so the user must think about other indexes according to the patterns of the queries in use.
- Partitioning: Partitioning by time intervals, done through chunks in TimescaleDB, helps enhance performance while querying. One has to ensure that their chunk size is appropriately defined concerning the kind of data it will hold.
- Query design: Write effective queries. Avoid selecting unnecessary columns, provide proper join conditions, and use appropriate aggregate functions from TimescaleDB.
- Caching: cache query results when accessing frequently queried results to minimize the load on the database.
Next Steps
In this tutorial, I walked you through building a financial data application using asyncpg and PostgreSQL. By now, you should feel better equipped to make your applications more efficient and scalable using these tools.
The async functionality of asyncpg perfectly complements TimescaleDB’s speed and power when managing time-series data. These technologies are essential for businesses relying on real-time analytics and IoT, both of which demand efficient time-series data management.
If this tutorial piqued your interest in Timescale, you can create a free account, and try it out today.
For more Python resources, check out this blog post on how to develop applications using psycopg3.