Java, divide incoming work uniformly via hashing in multithreaded evnironments

It sounds like you need FIFO ordering per dispatch id, so the ideal would be to have dispatch queues as the abstraction. That would explain your concern about hashing as not providing uniform distribution, as some dispatch queues may be more active than others and unfairly balanced among workers. By separating the queue from the worker, you retain FIFO semantics and evenly spread out the work.

An inactive library that provides this abstraction is HawtDispatch. It is Java 6 compatible.

A very simple Java 8 approach is to use CompletableFuture as a queuing mechanism, ConcurrentHashMap for registration, and an Executor (e.g. ForkJoinPool) for computing. See EventDispatcher for an implementation of this idea, where registration is explicit. If your dispatchers are more dynamic then you may need to periodically prune the map. The basic idea is as follows.

ConcurrentMap<String, CompletableFuture<Void>> dispatchQueues = ...

public CompletableFuture<Void> dispatch(String queueName, Runnable task) {
  return dispatchQueues.compute(queueName, (k, queue) -> {
    return (queue == null)
        ? CompletableFuture.runAsync(task)
        : queue.thenRunAsync(task);
  });
}

Update (JDK7)

A backport of the above idea would be translated with Guava into something like,

ListeningExecutorService executor = ...
Striped<Lock> locks = Striped.lock(256);
ConcurrentMap<String, ListenableFuture<?>> dispatchQueues = ...

public ListenableFuture<?> dispatch(String queueName, final Runnable task) {
  Lock lock = locks.get(queueName);
  lock.lock();
  try {
    ListenableFuture<?> future = dispatchQueues.get(queueName);
    if (future == null) {
      future = executor.submit(task);
    } else {
      final SettableFuture<Void> next = SettableFuture.create();
      future.addListener(new Runnable() {
        try {
          task.run();
        } finally {
          next.set(null);
        }
      }, executor);
      future = next;
    }
    dispatchQueues.put(queueName, future);
  } finally {
    lock.unlock();
  }
}

Leave a Comment