persist-queue
implements file-based and SQLite3-based persistent queues for Python.
It provides thread-safe, disk-based queue implementations that survive process crashes
and restarts.
By default, persist-queue use pickle object serialization module to support object instances. Most built-in type, like int, dict, list are able to be persisted by persist-queue directly, to support customized objects, please refer to Pickling and unpickling extension types(Python2) and Pickling Class Instances(Python3)
This project is based on the achievements of python-pqueue and queuelib
- Disk-based: Each queued item is stored on disk to survive crashes
- Thread-safe: Supports multi-threaded producers and consumers
- Recoverable: Items can be read after process restart
- Green-compatible: Works with
greenlet
oreventlet
environments - Multiple serialization: Supports pickle (default), msgpack, cbor, and json
- Async support: Provides async versions of all queue types (v1.1.0+)
File-based Queues:
Queue
- Basic file-based FIFO queueAsyncQueue
- Async file-based queue (v1.1.0+)
SQLite-based Queues:
SQLiteQueue
/FIFOSQLiteQueue
- FIFO SQLite queueFILOSQLiteQueue
- FILO SQLite queueUniqueQ
- Unique items only queuePriorityQueue
- Priority-based queueSQLiteAckQueue
- Acknowledgment-based queueAsyncSQLiteQueue
- Async SQLite queue (v1.1.0+)
Other:
PDict
- Persistent dictionaryMySQLQueue
- MySQL-based queue (requires extra dependencies)
pip install persist-queue
# For msgpack, cbor, and MySQL support
pip install "persist-queue[extra]"
# For async support (requires Python 3.7+)
pip install "persist-queue[async]"
# For all features
pip install "persist-queue[extra,async]"
git clone https://github.com/peter-wangxu/persist-queue
cd persist-queue
python setup.py install
- Python 3.5 or newer (Python 2 support dropped in v1.0.0)
- Full support for Linux, macOS, and Windows
- For async features: Python 3.7+ with aiofiles and aiosqlite
- For MySQL queues: DBUtils and PyMySQL
from persistqueue import Queue
# Create a queue
q = Queue("my_queue_path")
# Add items
q.put("item1")
q.put("item2")
# Get items
item = q.get()
print(item) # "item1"
# Mark as done
q.task_done()
import persistqueue
# Create SQLite queue
q = persistqueue.SQLiteQueue('my_queue.db', auto_commit=True)
# Add items
q.put('data1')
q.put('data2')
# Get items
item = q.get()
print(item) # "data1"
import persistqueue
# Create MySQL queue
q = persistqueue.MySQLQueue(
host='localhost',
port=3306,
user='username',
password='password',
database='testdb',
table_name='my_queue'
)
# Add items
q.put('data1')
q.put('data2')
# Get items
item = q.get()
print(item) # "data1"
# Mark as done
q.task_done()
import asyncio
from persistqueue import AsyncQueue
async def main():
async with AsyncQueue("/path/to/queue") as queue:
await queue.put("async item")
item = await queue.get()
await queue.task_done()
asyncio.run(main())
>>> from persistqueue import Queue
>>> q = Queue("mypath")
>>> q.put('a')
>>> q.put('b')
>>> q.put('c')
>>> q.get()
'a'
>>> q.task_done()
>>> import persistqueue
>>> q = persistqueue.SQLiteQueue('mypath', auto_commit=True)
>>> q.put('str1')
>>> q.put('str2')
>>> q.put('str3')
>>> q.get()
'str1'
>>> del q
>>> import persistqueue
>>> q = persistqueue.PriorityQueue('mypath')
>>> q.put('low', priority=10)
>>> q.put('high', priority=1)
>>> q.put('mid', priority=5)
>>> q.get()
'high'
>>> q.get()
'mid'
>>> q.get()
'low'
>>> import persistqueue
>>> q = persistqueue.UniqueQ('mypath')
>>> q.put('str1')
>>> q.put('str1') # Duplicate ignored
>>> q.size
1
>>> q.put('str2')
>>> q.size
2
>>> import persistqueue
>>> ackq = persistqueue.SQLiteAckQueue('path')
>>> ackq.put('str1')
>>> item = ackq.get()
>>> # Process the item
>>> ackq.ack(item) # Mark as completed
>>> # Or if processing failed:
>>> ackq.nack(item) # Mark for retry
>>> ackq.ack_failed(item) # Mark as failed
>>> import persistqueue
>>> q = persistqueue.MySQLQueue(
... host='localhost',
... port=3306,
... user='testuser',
... password='testpass',
... database='testdb',
... table_name='test_queue'
... )
>>> q.put('item1')
>>> q.put('item2')
>>> q.put('item3')
>>> q.get()
'item1'
>>> q.task_done()
>>> q.get()
'item2'
>>> q.task_done()
>>> q.size
1
import asyncio
from persistqueue import AsyncQueue, AsyncSQLiteQueue
async def example():
# File-based async queue
async with AsyncQueue("/path/to/queue") as queue:
await queue.put("data item")
item = await queue.get()
await queue.task_done()
# SQLite-based async queue
async with AsyncSQLiteQueue("/path/to/queue.db") as queue:
item_id = await queue.put({"key": "value"})
item = await queue.get()
await queue.update({"key": "new_value"}, item_id)
await queue.task_done()
asyncio.run(example())
>>> from persistqueue import PDict
>>> q = PDict("testpath", "testname")
>>> q['key1'] = 123
>>> q['key2'] = 321
>>> q['key1']
123
>>> len(q)
2
>>> del q['key1']
>>> q['key1']
KeyError: 'Key: key1 not exists.'
from persistqueue import FIFOSQLiteQueue
from threading import Thread
q = FIFOSQLiteQueue(path="./test", multithreading=True)
def worker():
while True:
item = q.get()
do_work(item)
for i in range(num_worker_threads):
t = Thread(target=worker)
t.daemon = True
t.start()
for item in source():
q.put(item)
q.join() # Block until all tasks are done
from persistqueue import Queue
from threading import Thread
q = Queue()
def worker():
while True:
item = q.get()
do_work(item)
q.task_done()
for i in range(num_worker_threads):
t = Thread(target=worker)
t.daemon = True
t.start()
for item in source():
q.put(item)
q.join() # Block until all tasks are done
from persistqueue import MySQLQueue
from threading import Thread
q = MySQLQueue(
host='localhost',
port=3306,
user='username',
password='password',
database='testdb',
table_name='my_queue'
)
def worker():
while True:
item = q.get()
do_work(item)
q.task_done()
for i in range(num_worker_threads):
t = Thread(target=worker)
t.daemon = True
t.start()
for item in source():
q.put(item)
q.join() # Block until all tasks are done
persist-queue supports multiple serialization protocols:
>>> from persistqueue import Queue
>>> from persistqueue import serializers
# Pickle (default)
>>> q = Queue('mypath', serializer=serializers.pickle)
# MessagePack
>>> q = Queue('mypath', serializer=serializers.msgpack)
# CBOR
>>> q = Queue('mypath', serializer=serializers.cbor2)
# JSON
>>> q = Queue('mypath', serializer=serializers.json)
Windows (Windows 10, SATA3 SSD, 16GB RAM)
Write | Write/Read(1 task_done) | Write/Read(many task_done) | |
SQLite3 Queue | 1.8880 | 2.0290 | 3.5940 |
File Queue | 4.9520 | 5.0560 | 8.4900 |
You can easily benchmark the performance of all queue types (including async) using the built-in tool:
Run with tox:
tox -e bench -- rst
Or run directly:
python benchmark/run_benchmark.py 1000 rst
- The first argument is the number of items to test (default: 1000)
- The second argument is the output format: rst (for reStructuredText table), console, or json
Example output (rst):
+--------------------+--------------------+--------------------+--------------------+
| Queue Type | Write | Write/Read(1 task_done) | Write/Read(many task_done) |
+--------------------+--------------------+--------------------+--------------------+
| File Queue | 0.0481 | 0.0299 | 0.0833 |
| AsyncSQLiteQueue | 0.2664 | 0.5353 | 0.5508 |
| AsyncFileQueue | 0.1333 | 0.1500 | 0.2337 |
+--------------------+--------------------+--------------------+--------------------+
This makes it easy to compare the performance of sync and async queues on your platform.
- WAL Mode: SQLite3 queues use WAL mode by default for better performance
- auto_commit=False: Use for batch operations, call
task_done()
to persist - Protocol Selection: Automatically selects optimal pickle protocol
- Windows: File queue performance improved 3-4x since v0.4.1
- MySQL Connection Pooling: MySQL queues use connection pooling for better performance
Run tests using tox:
# Run tests for specific Python version
tox -e py312
# Run code style checks
tox -e pep8
# Generate coverage report
tox -e cover
Install development dependencies:
pip install -r test-requirements.txt
pip install -r extra-requirements.txt
Run benchmarks:
python benchmark/run_benchmark.py 1000
For detailed information about recent changes and updates, see:
- Release Notes for v1.1 - Major update with async queue enhancements and pytest migration
- Windows File Queue: Atomic operations are experimental. Critical data may become unreadable during
task_done()
failures - MySQL Tests: Require local MySQL service, otherwise skipped automatically
- Async Features: Require Python 3.7+ and asyncio support
If you get sqlite3.OperationalError: database is locked
:
- Increase the
timeout
parameter when creating the queue - Ensure you're using
multithreading=True
for multi-threaded access
If you get MySQL connection errors:
- Verify MySQL server is running and accessible
- Check connection parameters (host, port, user, password)
- Ensure the database exists and user has proper permissions
- For connection pool issues, try increasing
max_connections
parameter
- Make sure to set
multithreading=True
when initializing SQLite queues - SQLite3 queues are thoroughly tested in multi-threading environments
- MySQL queues are thread-safe by default
- For async features: Install with
pip install "persist-queue[async]"
- For MySQL support: Install with
pip install "persist-queue[extra]"
- Slack: Join persist-queue channel
- GitHub: Repository
- PyPI: Package
- Fork the repository
- Create a feature branch
- Make your changes
- Add tests to cover your changes
- Submit a pull request with a clear title and description