How to chain a Celery task that returns a list into a group?

You can get this kind of behavior using an intermediate task. Here’s a demonstration of creating a “map” like method that works like you’ve suggested.

from celery import task, subtask, group

@task
def get_list(amount):
    return [i for i in range(amount)]

@task
def process_item(item):
    # do stuff
    pass

@task
def dmap(it, callback):
    # Map a callback over an iterator and return as a group
    callback = subtask(callback)
    return group(callback.clone([arg,]) for arg in it)()

# runs process_item for each item in the return of get_list 
process_list = (get_list.s(10) | dmap.s(process_item.s()))

Credit to Ask Solem for giving me this suggestion when I asked him for help on a similar issue.

Leave a Comment