Skip to content
Concurrency

Concurrency

Clojure’s concurrency model is built on immutable data structures + identity/value distinction. Instead of mutating shared state, you transition identities through atomic states. Four primitives + core.async cover almost every concurrent pattern.


The Core Idea

Values don’t change over time. Identities point to different values at different times.

A variable doesn’t get mutated — it’s atomically re-pointed to a new immutable value. This means:

  • No locks needed for readers (they always see a consistent snapshot)
  • No race conditions on data structures (they can’t change while you read them)
  • Composable operations via STM

1. Atoms — Synchronous, Uncoordinated Updates

Use when a single identity changes independently.

(def counter (atom 0))

;; swap! applies a function atomically
(swap! counter inc)           ;; => 1
(swap! counter + 5)           ;; => 6

;; compare-and-set for conditional updates
(compare-and-set! counter 6 0) ;; true, now 0

Use case: Request counter & metrics

(def stats (atom {:requests 0 :errors 0 :latency []}))

;; Middleware records each request
(defn wrap-metrics [handler]
  (fn [req]
    (swap! stats update :requests inc)
    (let [start (System/nanoTime)
          resp  (handler req)
          ms    (/ (- (System/nanoTime) start) 1e6)]
      (swap! stats update :latency conj ms)
      resp)))

;; Periodic reporter — reads a consistent snapshot
(defn report-stats []
  (let [{:keys [requests errors latency]} @stats]
    (println (str "Requests: " requests
                  " | Errors: " errors
                  " | Avg Latency: " (when (seq latency) (/ (reduce + latency) (count latency))) "ms"))))

Use case: Hot-reloadable config / feature flags

(def config (atom {:rate-limit 100 :feature-x? false}))

;; Admin thread toggles a feature
(swap! config assoc :feature-x? true)

;; Worker reads — no lock needed, immutable snapshot
(let [cfg @config]
  (when (:feature-x? cfg)
    (enable-new-pathway)))

2. Refs + STM — Synchronous, Coordinated Transactions

Multiple identities that must change atomically together — Clojure’s Software Transactional Memory.

(def from-account (ref 1000))
(def to-account   (ref 500))

(defn transfer [amt]
  (dosync
    ;; both happen or neither — like a DB transaction
    (alter from-account - amt)
    (alter to-account   + amt)))

(transfer 200)
@from-account ;; => 800
@to-account   ;; => 700

dosync retries automatically on conflict. No deadlocks — STM detects cycles.

Use case: Inventory + order management

(def inventory (ref {:widgets 10 :gadgets 5}))
(def orders    (ref []))

(defn place-order [item qty]
  (dosync
    (when (>= (get @inventory item 0) qty)
      (alter inventory update item - qty)
      (alter orders conj {:item item :qty qty :time (java.util.Date.)}))))

;; Thread 1: (place-order :widgets 3)
;; Thread 2: (place-order :widgets 8)
;; One succeeds, the other retries — never both deducting from the same pool

Use case: Resource allocation

(def pool (ref {:servers (vec (take 10 servers))}))

(defn acquire-server []
  (dosync
    (let [pool' @pool
          {:keys [servers]} pool']
      (when (seq servers)
        (alter pool update :servers rest)
        (first servers)))))

(defn release-server [s]
  (dosync
    (alter pool update :servers conj s)))

3. Agents — Asynchronous, Independent Updates

Fire-and-forget state changes. Each agent runs in its own thread pool — send returns immediately.

(def log-buffer (agent []))

;; send returns immediately; function runs on agent's thread
(send log-buffer conj {:event "login" :user "alice"})
(send log-buffer conj {:event "logout" :user "alice"})

;; read current state (may lag behind sends)
@log-buffer
;; => [{:event "login", :user "alice"}
;;     {:event "logout", :user "alice"}]

;; error handler — agent pauses on failure
(agent-error log-buffer) ;; nil if ok

Use case: Write-behind logging with batch flush

(def audit-log (agent [] :error-handler
                 (fn [ag ex]
                   (println "Log agent failed:" (.getMessage ex)))))

;; Fire 1000 events — non-blocking, single agent thread
(dotimes [i 1000]
  (send audit-log conj {:id i :ts (System/currentTimeMillis)}))

;; Watcher flushes to DB every 100 entries
(add-watch audit-log :flusher
  (fn [_ _ old new]
    (when (>= (count new) 100)
      (send audit-log
        (fn [buffer]
          (db/bulk-insert! buffer)
          [])))))  ;; returns empty vector as new state

Use case: Background index rebuild

(def search-index (agent {}))

(defn reindex! [db]
  (send search-index
    (fn [_]
      (println "Rebuilding index...")
      (db/build-index db))))

4. Futures & Promises — One-Shot Results

Future — compute on another thread, deref when ready

(def f (future (Thread/sleep 2000) (+ 40 2)))

;; block until done
@f ;; => 42 after 2s

;; non-blocking check
(realized? f) ;; true

Use case: Parallel API calls

(let [users    (future (fetch "/api/users"))
      orders   (future (fetch "/api/orders"))
      products (future (fetch "/api/products"))]
  {:users   @users
   :orders  @orders
   :products @products})  ;; all 3 run in parallel, 1x wall time

Use case: Timeout wrapper

(defn with-timeout [ms f]
  (let [result (future (f))]
    (try
      (deref result ms :timeout)
      (finally
        (future-cancel result)))))

(with-timeout 5000 #(slow-calc 42))
;; => result or :timeout

Promise — deliver a value from anywhere, any time

(def p (promise))
(future (Thread/sleep 1000) (deliver p "done!"))
@p ;; => "done!" (blocks until delivered)

Use case: Bridge async callback → synchronous result

(defn wait-for-click [button]
  (let [p (promise)]
    (.addActionListener button (fn [_] (deliver p true)))
    @p))  ;; blocks until user clicks

Use case: Fan-out / fan-in

(let [n 10
      promises (repeatedly n promise)
      results  (repeatedly n promise)]
  ;; Fan-out: spawn workers, each gets a promise to fill
  (dotimes [i n]
    (future
      (let [data (do-work i)]
        (deliver (nth promises i) data))))
  ;; Fan-in: collect all results
  (mapv deref promises))

5. core.async — CSP Channels (Separate Dependancy)

Go-style channels for communication between processes (lightweight threads). Add [org.clojure/core.async "1.7.xxx"] to deps.

(require '[clojure.core.async :refer [chan go <! >!! <!! timeout]])

(def ch (chan))            ;; unbuffered channel

;; producer
(go (dotimes [i 5]
      (<! (timeout 1000))  ;; sleep 1s — non-blocking
      (>! ch i)))          ;; puts on channel

;; consumer
(loop []
  (when-let [val (<!! ch)]
    (println "Got:" val)
    (recur)))

Use case: Pipeline processing

(require '[clojure.core.async :as async])

(defn pipeline [input output]
  (async/pipeline 4 output (map (fn [x] (do-slow-work x))) input))

;; Wire up: raw-data → transform → save
(let [raw    (chan 1000)
      cooked (chan 1000)]
  (pipeline raw cooked)
  (async/go-loop []
    (when-let [item (async/<! cooked)]
      (db/save! item)
      (recur)))
  (doseq [x (range 1000)]
    (async/put! raw x)))  ;; non-blocking

Use case: Pub/Sub event bus

(require '[clojure.core.async :as async])

(defn event-bus []
  (let [bus (async/chan 100)]
    {:pub (fn [topic data]
            (async/go
              (async/>! bus {:topic topic :data data})))
     :sub (fn [topic]
            (let [out (async/chan 10)]
              (async/go-loop []
                (when-let [msg (async/<! bus)]
                  (when (= (:topic msg) topic)
                    (async/>! out (:data msg)))
                  (recur)))
              out))}))

Choosing the Right Tool

PrimitiveSync?Coordinated?Use for
atomSyncNoSingle independent value (config, counter, cache)
refSyncYes (STM)Multiple values that must change together (accounts, inventory)
agentAsyncNoBackground state updates (logs, indexes)
futureAsyncOne-shot parallel computation (API calls, timeouts)
promiseBridge callbacks → synchronous result
core.asyncAsyncChannelsData pipelines, producer/consumer, event buses

Key Takeaways

  • No mutexes, no synchronized, no data races on data structures. Immutability means shared state can be read freely — contention only happens at the identity level, not the data level.
  • STM transactions compose. dosync wraps multiple ref updates in a retry loop — if any piece conflicts, the whole thing restarts.
  • swap! on atoms is lock-free using Java’s CAS primitives — hundreds of millions of ops/sec.
  • Agents decouple side effects from the main execution path. Errors pause the agent rather than crashing the caller.
  • core.async lets you write async code that reads synchronously. The go macro rewrites blocking-looking <! operations into state-machine continuations — no callback hell.

For most day-to-day Clojure, atoms and futures cover 90% of needs. Reach for refs when you need coordinated changes, agents for background workloads, and core.async for high-throughput pipelines.