Sharing python objects across multiple workers

It is not possible to share a python object between different processes straightforwardly.
The facilities included in the multiprocessing module (like managers or shared memory) are not suitable for sharing resources between workers, since they require a master process creating the resources and do not have the durability property. Also server processes can be run on different machines.

The most preferred means for sharing resources between workers:

  • Databases – in the case of a persistent nature of resources that require reliable storage and scalability. Examples: PostgreSQL, MariaDB, MongoDB, and many others.
  • Caches (key/value) – in the case of a temporary nature of the data, faster than databases, but not having such scalability and often not ACID compliant. Examples: Redis, Memcached and etc.

Below I will present two very simple examples of how one could use both approaches to share data in FastAPI application between workers. As an example, I took the aiocache library with Redis as backend and Tortoise ORM library with PostgreSQL as backend. Since FastAPI is the asynchronous framework I chose asyncio-based libraries.

The structure of the test project is as follows:

.
├── app_cache.py
├── app_db.py
├── docker-compose.yml
├── __init__.py

Docker-compose file:

For experiments, you can use the following docker-compose file exposing 5432 (Postgres) and 6379 (Redis) ports to localhost.

version: '3'

services:
  database:
    image: postgres:12-alpine
    ports:
      - "5432:5432"
    environment:
      POSTGRES_PASSWORD: test_pass
      POSTGRES_USER: test_user
      POSTGRES_DB: test_db
  redis:
    image: redis:6-alpine
    ports:
      - "6379:6379"

Starting:

docker-compose up -d

Cache (aiocache)

Aiocache provides 3 main entities:

  • backends: Allow you specify which backend you want to use for your cache. Currently supporting: SimpleMemoryCache, RedisCache using aioredis and MemCache using aiomcache.
  • serializers: Serialize and deserialize the data between your code and the backends. This allows you to save any Python object into your cache. Currently supporting: StringSerializer, PickleSerializer, JsonSerializer, and MsgPackSerializer. But you can also build custom ones.
  • plugins: Implement a hooks system that allows to execute extra behavior before and after of each command.

Starting:

uvicorn app_cache:app --host localhost --port 8000 --workers 5
# app_cache.py
import os
from aiocache import Cache
from fastapi import FastAPI, status


app = FastAPI()
cache = Cache(Cache.REDIS, endpoint="localhost", port=6379, namespace="main")


class Meta:
    def __init__(self):
        pass

    async def get_count(self) -> int:
        return await cache.get("count", default=0)

    async def set_count(self, value: int) -> None:
        await cache.set("count", value)

    async def increment_count(self) -> None:
        await cache.increment("count", 1)


meta = Meta()


# increases the count variable in the meta object by 1
@app.post("/increment")
async def increment():
    await meta.increment_count()
    return status.HTTP_200_OK


# returns a json containing the current count from the meta object
@app.get("/report")
async def report():
    count = await meta.get_count()
    return {'count': count, "current_process_id": os.getpid()}


# resets the count in the meta object to 0
@app.post("/reset")
async def reset():
    await meta.set_count(0)
    return status.HTTP_200_OK

Database (Tortoise ORM + PostgreSQL)

Starting:
For the sake of simplicity, we first run one worker to create a schema in the database:

uvicorn app_db:app --host localhost --port 8000 --workers 1
[Ctrl-C] 
uvicorn app_db:app --host localhost --port 8000 --workers 5
# app_db.py
from fastapi import FastAPI, status
from tortoise import Model, fields
from tortoise.contrib.fastapi import register_tortoise


class MetaModel(Model):
    count = fields.IntField(default=0)


app = FastAPI()


# increases the count variable in the meta object by 1
@app.post("/increment")
async def increment():
    meta, is_created = await MetaModel.get_or_create(id=1)
    meta.count += 1  # it's better do it in transaction
    await meta.save()
    return status.HTTP_200_OK


# returns a json containing the current count from the meta object
@app.get("/report")
async def report():
    meta, is_created = await MetaModel.get_or_create(id=1)
    return {'count': meta.count}


# resets the count in the meta object to 0
@app.post("/reset")
async def reset():
    meta, is_created = await MetaModel.get_or_create(id=1)
    meta.count = 0
    await meta.save()
    return status.HTTP_200_OK

register_tortoise(
    app,
    db_url="postgres://test_user:test_pass@localhost:5432/test_db",  # Don't expose login/pass in src, use environment variables
    modules={"models": ["app_db"]},
    generate_schemas=True,
    add_exception_handlers=True,
)

Leave a Comment