Playing with Agents

Clojure is a programming language with a rich set of features. The rationale behind Clojure and its set of features can be summarized with these 4 points:

  • functional programming (which favors immutable structure)
  • polymorphism, with global mutable reference, dynamic typing and multi-method dispatch
  • transactional memory to see consistent snapshot of the world
  • advanced concurrency mechanisms at the language-level (CAS, agents, watcher)

I already discussed a bit the TM part in my previous post. Here I move to watcher and agents, a mechanism to run  asychronous tasks in separate threads .  Agents are like workers in the thread pool pattern, they execute tasks in a sequential way one after the other. Submitting a task to an agent must be done in a transacted section (dosync). Waiting for the agent to complete can be achieved with special instructions (wait agent-name). Watchers are special agent, which will run a task each time a reference is changed. The task to run is specified when the watcher is constructed.

Watchers

Here is a version of my test, where each update in the shared map triggers the watcher. When the 100 occurrences have been counter, the watcher prints “OK”.

(ns org.ewe.sharedMapWatcher (:gen-class))
(def my-watcher (agent 0))

(defn my-watcher-action [current-value reference]
(let [new-value  (inc current-value)]
(if (= new-value 100) (println "100 words have been counted"))
new-value
)
)

(def store-map-ref (ref (sorted-map)))

(defn store-it [hash]
(dosync
(let [store-map @store-map-ref]
( if (contains? store-map hash)
( let [count-ref (store-map hash)
new-count (inc @count-ref)]
(ref-set count-ref new-count)
)
( let [new-ref (ref 0)]
(add-watcher new-ref :send-off my-watcher my-watcher-action)
(ref-set new-ref 1)
(ref-set store-map-ref
(assoc store-map hash new-ref)
)
)
)
)
)
)

(defn build-list [n]
(if (= n 0)
(list (Thread. #(store-it n)))
(conj (build-list (dec n)) (Thread. #(store-it (mod n 4))) )
)
)

(defn -main []
(time (do
(let [NUM 100
threads (build-list NUM)]

(dotimes [iter NUM] (.start (nth threads iter)))
(dotimes [iter NUM] (.join (nth threads iter)))

(await my-watcher)

(println "count = " (count @store-map-ref))
(doseq [[hash count-ref] @store-map-ref]
(println hash " -> " @count-ref )
)
)
)
)
)

Message passing

Agents are asynchronous by nature. We may be tempted to think of them like actors in the actor model, but they aren’t. Quoting another post, “an actor is a process with a mailbox which dequeues and processes messages individually. The actor process contains all the logic for processing messages. […] Clojure’s agent inverts ownership of message handling logic: agents are submitted functions which are stored in a mailbox and then executed in order.”

Agents can be used to create fully asynchronous processing model. Agents can be passed to other agent function to create chain of message passing. It is for instance possible to send a function to an agent and specify itself as the callback agent (the *agent* variable refers to the currently executing agent).

(ns org.ewe.asyncAgent2 (:gen-class))
(import '(java.util Date))

(def agent-1 (agent 0))
(def agent-2 (agent nil))

(defn print-time [s]
(println (. (new Date) (toString)) s ))

(defn callback [current-value result]
result
)

(defn compute-and-callback [current-value callback-agent]
(let [result 1234]
(Thread/sleep 10000)
(send callback-agent callback result)
)
nil
)

(defn async-compute [current-value]
(send agent-2 compute-and-callback *agent*)
current-value
)

(defn perform []
(dosync (send agent-1 async-compute) )

(await agent-2)
(print-time "agent-2 is over")

         (await agent-1)
(print-time "agent-1 is over")

(println "agent-1:" @agent-1)
(println "agent-2:" @agent-2)
)

(defn -main []
(time (perform)
)
)

Producer-consumer

Producer consumer is a popular approach to schedule tasks concurrently. The shared list of items acts then as a coordinator between the producers and consumers. Producer-consumer comes in two flavors: fully or partially transactional.

In fully transactional mode, the consumptions of a message is entirely transacted. If the consumption failed (e.g. error during processing), the item remains in the queue. Traditional lock-based implementation don’t enforce this. The shared list is correctly synchronized, but once an item has been popped, it is removed definitively. On the other hand, messaging systems (JMS, etc.) do propose a fully transactional model.

Fully transactional producer-consumer implementation becomes easy with a TM. The problem however (in my understanding) is that no two items can be fetched and processed concurrently, as it would result in a concurrent modification of the shared list of items. The performance of such an system would be the same as having a completely sequential system. Of course, agents could be used to decouple item push and pop from their actual treatment (by mean of asynchronous threatment), but then the system isn’t fully transactional anymore. If the agent fails, the item is lost.

Conclusion

“All concurrency issues boil down to coordinating access to mutable state. The less mutable state, the easier it is to ensure thread safety.”

— Java Concurrency in Practice (from Bill Clementson’s blog)

Transactional memory and aysnchronous mechanism are means to coordinate access to shared data. But they still don’t solve the problem of shared data on their own. I will need to do more experiment to figure out exactly what kind of problem are really simplified by the agent, the actor and the TM models.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s