Creating and updating nested dictionaries and lists inside multiprocessing.Manager object

Your usage of the shared dictionary is identical to and can be re-written as this:

temp = shared_object['nested']
temp.update({'second_key': 'second_value'})

When you request the value of the nested key from the shared dictionary, it is stored in the current process’s memory space. This means that any changes you do to this dictionary will not be reflected in the shared dictionary stored inside the manager simply because the manager has no way of knowing what you did to the dictionary after retrieving a subset of it. Therefore, what you need to do is inform the manager that you have made changes to the nested dictionary. There are two ways you can do this:

Explicitly call the manager object with the updated value

This is probably the most simplest (and fastest, in terms of performance) fix here, after making any changes to the nested dictionary, explicitly set the key’s value as the altered dictionary:

temp = shared_object['nested']
temp.update({'second_key': 'second_value'})
shared_object['nested'] = temp

Doing so will let the manager know that there has been a change in the managed object.

Create another managed object for the nested dictionary (manually vs automatically)

One other way you can achieve this is by creating the managed objects for the nested dictionary and storing that instead. However, keep in mind this can quickly become inconvenient incase the nested levels for your objects go very deep. This is because for each nested dictionary, you’ll need to create another managed object. Moreover, this will not work incase the child processes you create are daemonic (default when using multiprocessing.Pool) and they need to add their own nested objects inside the shared dictionary. This is because to create a managed object you need to start a manager process, and daemonic processes cannot spawn their own processes.

However, both these issues can quickly be fixed if you let the shared dictionary become responsible enough to automatically create managed objects if someone attempts to store a nested dictionary within it. This removes the burden of doing so manually, and an example of how to make this possible is given below (it works for both lists and dicitonaries):

from multiprocessing.managers import SyncManager, MakeProxyType, ListProxy, State
from multiprocessing import Process, Lock
from collections import UserDict
from collections import UserList


class ManagerList(UserList):

    def __check_state(self):
        global manager
        global lock

        # Managers are not thread-safe, protect starting one from within another with a lock
        with lock:
            if manager._state.value != State.STARTED:
                manager.start(initializer=init)

    def __setitem__(self, key, value):
        global manager
        self.__check_state()

        if isinstance(value, list):
            value = manager.list(value)
        elif isinstance(value, dict):
            value = manager.dict(value)
        return super().__setitem__(key, value)


class ManagerDict(UserDict):

    def __check_state(self):
        global manager
        global lock

        # Managers are not thread-safe, protect starting one from within another with a lock
        with lock:
            if manager._state.value != State.STARTED:
                manager.start(initializer=init)

    def __setitem__(self, key, value):
        global manager
        self.__check_state()

        if isinstance(value, list):
            value = manager.list(value)
        elif isinstance(value, dict):
            value = manager.dict(value)
        return super().__setitem__(key, value)


ManagerDictProxy = MakeProxyType('DictProxy', (
    '__contains__', '__delitem__', '__getitem__', '__iter__', '__len__',
    '__setitem__', 'clear', 'copy', 'get', 'items',
    'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
    ))
ManagerDictProxy._method_to_typeid_ = {
    '__iter__': 'Iterator',
    }

SyncManager.register('list', ManagerList, ListProxy)
SyncManager.register('dict', ManagerDict, ManagerDictProxy)


def init():
    global manager
    global lock

    lock = Lock()
    manager = SyncManager()

def worker(shared_object):
    print(f'Before defining the nested dictionary: {shared_object}')
    shared_object['nested'] = {
        'first_key': 'first_value',
    }
    print(f'After defining the nested dictionary: {shared_object["nested"]}')
    shared_object['nested'].update(
        {
            'second_key': 'second_value'
        }
    )
    print(f'After updating the nested dictionary: {shared_object["nested"]}')


if __name__ == "__main__":
    manager = SyncManager()
    manager.start(initializer=init)

    d = manager.dict()  # It works with manager.list() too
    p = Process(target=worker, args=(d, ))
    p.start()
    p.join()

This uses the __setitem__ dunder method to check whether the value being assigned to the dictionary is another list or a dictionary, and if it is, it creates a managed object for them and stores that instead. Any changes now made to these nested dictionaries/lists would then be reflected in the original dictionary as well. This works for any amount of nested levels.

However, keep in mind that this only creates the managed objects if the datatype is list or dict (or their subclasses). Additionally, it creates a manager process for each nested level. Therefore, using the first method will always be faster than using this one, even though this one is more convenient.

Output

Before defining the nested dictionary: {}
After defining the nested dictionary: {'first_key': 'first_value'}
After updating the nested dictionary: {'first_key': 'first_value', 'second_key': 'second_value'}

Leave a Comment