Sleeping Man

Some thoughts of when I am not sleeping.

Clojure Concurrency

| Comments

I like to think that clojure concurrency is a happy “giveaway” from the nature of the language (and the functional paradigm). Specifically, about the ways that clojure handles state mutation. Whenever you have to do it, it feels dirty and messy (as it should), so the natural way of composing your solution is over immutable data.

In this context, the possibility to structure your application as a concurrent one is easier, since the need to coordinate state mutation along several processing units is minimised.

In this post I will discuss some of the types that clojure provides, out of the box, to promote concurrent solutions.

Properties

Below, are some of the properties that each type of concurrency is classified against. Furthermore, you will see that depending on the combination of these properties, each construct will shine on a set of scenarios.

Coordinated / Independent

When a type is coordinated, the atomic execution of reads and/or writes in a transaction will all succeed or all will fail.

When a type is independent, no such guarantee is given, meaning that the outcome of a read/write for a first one may not influence the execution of a second one.

Synchronous / Asynchronous

Synchronous requests are blocking, in the sense that whenever called, the caller will have to wait for the end of the computation of the request in order to continue processing it’s own thread.

On the other hand, an asynchronous request is queued and it will be processed later. The call for the request returns immediately, thus the caller may continue processing it’s own thread. At some point in time, the result will be available for the caller to use.

Retriable / Not Retriable

Retriable means that the work done on a given type may have to be repeated, due to a failing transaction or such. It is really important to have in mind that retriable work should be idempotent, not provoking side effects. This comes from the fact that there are no guarantees on how many times the work will be retried.

Thread-local / Cross-threads

One can achieve the thread-local property by making sure that all the state changes within a thread are visible to that only thread. Cross-threads changes share the state amongst several threads.

Types for Concurrency

All of the types below can be labeled as reference types (not to be confused with the Refs type). All these types can be accessed through the deref function, which will return the value stored within the references. Since this is such a common operation, clojure provides the @ macro, which achieves the same effect.

Refs

Also known as Transactional References, this type should be used to perform coordinated changes across multiple states at once. It can be classified as:

  • Coordinated
  • Synchronous
  • Retriable
  • Cross-threads

It relies on the STM (Software Transactional Memory) which guarantees the ACI properties (Atomicity, Consistency and Isolation), but not the Durability, since the changes are stored in memory. Refs can be defined with an initial value as follows:

1
2
(def first-ref (ref 0))
(def second-ref (ref 100))

The Refs values can be changed using dosync, which will start a transaction. Within this transaction, alter can be used to apply a function to the current state of a Ref, producing the new state to be stored.

1
2
3
(dosync
  (alter first-ref + 10)
  (alter second-ref + 20))

Upon success, the resulting states of the Refs will be:

1
2
@first-ref  ; => 10
@second-ref ; => 120

Upon failure of any of the alter statements, the resulting states of the Refs will be the state previous to the dosync application.

1
2
3
4
5
6
7
8
9
10
11
12
@first-ref  ; => 10
@second-ref ; => 120

(defn bang [value]
  (throw (Exception. "Something went wrong")))

(dosync
  (alter first-ref + 10)
  (alter second-ref bang))

@first-ref  ; => 10
@second-ref ; => 120  

It is also possible to set the value of a Ref directly, using the ref-set function.

1
2
3
4
5
6
@first-ref ; => 10

(dosync
  (ref-set first-ref 9999))

@first-ref ; => 9999  

Agents

An Agent is meant for asynchronous operations over a single state. They can be classified as:

  • Independent
  • Asynchronous
  • Retriable
  • Cross-threads

An Agent can be created with a given value that will represent it’s initial state. This state will be held in a Ref underneath.

1
2
3
(def my-agent (agent 0))

@my-agent ; => 0

Agents are completely reactive, so there are no message loop that polls for changes. In order to change the state of an Agent, one of the functions send or send-off should be used. These functions receive another function and optional parameters to be applied to the current state of the Agent, generating the next state.

1
2
3
4
5
6
7
8
9
(def my-agent (agent 0))

(send my-agent + 10)

@my-agent ; => 10

(send-off my-agent + 10)

@my-agent ; => 20

When calling the send or send-off functions they dispatch the new computation to be executed on a different thread and return immediately. By dereferencing an Agent, the caller gets an immediate result of the agent’s current state at the time of the call.

1
2
3
4
5
(def my-agent (agent 0))

(send my-agent (fn [_] (Thread/sleep 10000))) ; => returns immediately

@my-agent ; => 0 (also returns immediatelly)

The main difference between the send and send-off functions is the way they handle the scheduling of the Agent computations.

The first one (send) uses a fixed sized thread pool. Whenever all the threads in the pool are being used, next operations will block until there is a thread available.

The second one (send-off) uses an unbounded cached thread pool. Threads are created by demand and whenever they finish their work, they are returned to the cached pool to be re-used.

Due to these differences, it is recommended to use send for computational heavy operations and send-off for blocking operations, like IO.

Whenever an error occurs on an Agent, this will enter an error state. No more actions will be processed and no new actions will be accepted until the error is cleared out.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
(def my-agent (agent 123))

(defn bang [_] (throw (Exception. "something went wrong")))

(send my-agent bang) ; => Exception something went wrong

@my-agent ; => 123

(agent-errors @my-agent) ; => (#<Exception java.lang.Exception: something went wrong>)

(send my-agent + 10) ; => Exception something went wrong

(clear-agent-errors my-agent) ; => 123

(agent-errors my-agent) ; => nil

(send my-agent + 10)

@my-agent ; => 133

On the other hand, it is possible to change the error handling mode of the Agents, so that errors can be ignored. This can be achieved on the Agent’s creation or through the function set-error-mode!.

Agents can also be used within a STM, although it is important to notice that they are independent, meaning it’s state will not affect the rest of the transaction.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
(def my-agent (agent 0))
(def my-ref (ref 0))

(defn bang [_] (throw (Exception. "something went wrong")))

(dosync
  (send my-agent bang)
  (alter my-ref + 10))

@my-ref ; => 10

(agent-errors @my-agent) ; => (#<Exception java.lang.Exception: something went wrong>)

@my-agent ; => 0

Atoms

Atoms are similar to Refs, although they are not to be used to perform coordinated changes. An Atom should be used for managing a single state synchronously. They are classified as:

  • Independent
  • Synchronous
  • Retriable
  • Cross-threads

An Atom holds an initial state on creation, which can be modified throughout it’s lifecycle with the functions swap! and the more rough reset!. These functions will block the execution of the caller, which will have to wait for the operations to finish.

1
2
3
4
5
6
7
8
9
10
11
(def my-atom (atom 0))

@my-atom ; => 0

(swap! my-atom + 10)

@my-atom ; => 10

(reset! my-atom 999)

@my-atom ; => 999

Whenever two threads are simultaneously changing an Atom’s state, the first one that succeeds will cause the other one to retry. The one that retries will take in consideration the Atom’s new state.

Atoms are frequently used to store state on a single threaded applications as well. In that matter, one of it’s common usages is for memoization.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
; Example taken from clojure.org

(defn memoize [f]
  (let [mem (atom {})]
    (fn [& args]
      (if-let [e (find @mem args)]
        (val e)
        (let [ret (apply f args)]
          (swap! mem assoc args ret)
          ret)))))

(defn fib [n]
  (if (<= n 1)
    n
    (+ (fib (dec n)) (fib (- n 2)))))

(def fib (memoize fib))))

Vars

This is the most commonly used type in clojure. It is usually named and contained within a namespace, which provides easy access. This type can be used to mutate/assign state to each thread in a local manner. They are:

  • Independent
  • Synchronous
  • Not Retriable
  • Cross threads or Thread Local (Binding local)

One can define Vars, using the def, or any macros that expand to it, like the defn macro.

1
2
3
(def my-def 10)

(defn my-defn [] 10) ; => expands to (def my-defn (clojure.core/fn ([] 10)))

It is important to notice that the Var itself is contained within the name specified for it and can be accessed using the var function or the #‘ macro.

1
2
3
4
5
6
7
(ns my-namespace)

(def my-def 10)

(var my-def) ; => #'my-namespace/my-def

#'my-def ; => #'my-namespace/my-def

The content of a Var can either be accessed directly through it’s name (which is more convenient when they are named) or using the references reader macros.

1
2
3
4
5
6
7
(def my-def 10)

my-def ; => 10

@(var my-def) ; => 10

@#'my-def ; => 10

All of the defined Vars can be accessed by multiple threads, however they can be rebound locally by each thread. This is why Vars are considered to be Cross-threads and Thread-local. The rebinding of Vars can be achieved with the binding and with-local-vars macros.

When using binding, it is necessary to declare your Var with the dynamic metadata. A nice convention to indicate that your variable is dynamically bindable is to enclosed it with earmuffs, as displayed below.

1
2
3
4
5
6
7
8
(def ^:dynamic *x* 10)
(def ^:dynamic *y* 20)

(+ *x* *y*) ; => 30

(binding [*x* 99
          *y* 101]
          (+ *x* *y*)) ; => 200

With the with-local-vars macro, it is not necessary to mark your Vars as dynamic. Furthermore, all the bound Vars on the macro’s scope will be unnamed. In order to get their value, one must use the deref function or the @ macro.

1
2
3
4
5
6
7
(def x 10)

x ;=> 10

(with-local-vars [x 999] x)  ; => #<Var: --unnamed-->

(with-local-vars [x 999] @x) ; => 999

References

Comments