Producer consumer with qualifications

Here’s my take on it. I made a point of only using Clojure data structures to see how that would work out. Note that it would have been perfectly usual and idiomatic to take a blocking queue from the Java toolbox and use it here; the code would be easy to adapt, I think. Update: I actually did adapt it to java.util.concurrent.LinkedBlockingQueue, see below.

clojure.lang.PersistentQueue

Call (pro-con) to start a test run; then have a look at the contents of output to see if anything happened and queue-lengths to see if they stayed within the given bound.

Update: To explain why I felt the need to use ensure below (I was asked about this on IRC), this is to prevent write skew (see the Wikipedia article on Snapshot isolation for a definition). If I substituted @queue for (ensure queue), it would become possible for two or more producers to check the length of the queue, find that it is less than 4, then place additional items on the queue and possibly bring the total length of the queue above 4, breaking the constraint. Similarly, two consumers doing @queue could accept the same item for processing, then pop two items off the queue. ensure prevents either of these scenarios from happening.

(def go-on? (atom true))
(def queue (ref clojure.lang.PersistentQueue/EMPTY))
(def output (ref ()))
(def queue-lengths (ref ()))
(def *max-queue-length* 4)

(defn overseer
  ([] (overseer 20000))
  ([timeout]
     (Thread/sleep timeout)
     (swap! go-on? not)))

(defn queue-length-watch [_ _ _ new-queue-state]
  (dosync (alter queue-lengths conj (count new-queue-state))))

(add-watch queue :queue-length-watch queue-length-watch)

(defn producer [tag]
  (future
   (while @go-on?
     (if (dosync (let [l (count (ensure queue))]
                   (when (< l *max-queue-length*)
                     (alter queue conj tag)
                     true)))
       (Thread/sleep (rand-int 2000))))))

(defn consumer []
  (future
   (while @go-on?
     (Thread/sleep 100)       ; don't look at the queue too often
     (when-let [item (dosync (let [item (first (ensure queue))]
                               (alter queue pop)
                               item))]
       (Thread/sleep (rand-int 500))         ; do stuff
       (dosync (alter output conj item)))))) ; and let us know

(defn pro-con []
  (reset! go-on? true)
  (dorun (map #(%1 %2)
              (repeat 5 producer)
              (iterate inc 0)))
  (dorun (repeatedly 2 consumer))
  (overseer))

java.util.concurrent.LinkedBlockingQueue

A version of the above written using LinkedBlockingQueue. Note how the general outline of the code is basically the same, with some details actually being slightly cleaner. I removed queue-lengths from this version, as LBQ takes care of that constraint for us.

(def go-on? (atom true))
(def *max-queue-length* 4)
(def queue (java.util.concurrent.LinkedBlockingQueue. *max-queue-length*))
(def output (ref ()))

(defn overseer
  ([] (overseer 20000))
  ([timeout]
     (Thread/sleep timeout)
     (swap! go-on? not)))

(defn producer [tag]
  (future
   (while @go-on?
     (.put queue tag)
     (Thread/sleep (rand-int 2000)))))

(defn consumer []
  (future
   (while @go-on?
     ;; I'm using .poll on the next line so as not to block
     ;; indefinitely if we're done; note that this has the
     ;; side effect that nulls = nils on the queue will not
     ;; be handled; there's a number of other ways to go about
     ;; this if this is a problem, see docs on LinkedBlockingQueue
     (when-let [item (.poll queue)]
       (Thread/sleep (rand-int 500)) ; do stuff
       (dosync (alter output conj item)))))) ; and let us know

(defn pro-con []
  (reset! go-on? true)
  (dorun (map #(%1 %2)
              (repeat 5 producer)
              (iterate inc 0)))
  (dorun (repeatedly 2 consumer))
  (overseer))

Leave a Comment