Concurrency
Clojure’s concurrency model is built on immutable data structures + identity/value distinction. Instead of mutating shared state, you transition identities through atomic states. Four primitives + core.async cover almost every concurrent pattern.
The Core Idea
Values don’t change over time. Identities point to different values at different times.
A variable doesn’t get mutated — it’s atomically re-pointed to a new immutable value. This means:
- No locks needed for readers (they always see a consistent snapshot)
- No race conditions on data structures (they can’t change while you read them)
- Composable operations via STM
1. Atoms — Synchronous, Uncoordinated Updates
Use when a single identity changes independently.
(def counter (atom 0))
;; swap! applies a function atomically
(swap! counter inc) ;; => 1
(swap! counter + 5) ;; => 6
;; compare-and-set for conditional updates
(compare-and-set! counter 6 0) ;; true, now 0Use case: Request counter & metrics
(def stats (atom {:requests 0 :errors 0 :latency []}))
;; Middleware records each request
(defn wrap-metrics [handler]
(fn [req]
(swap! stats update :requests inc)
(let [start (System/nanoTime)
resp (handler req)
ms (/ (- (System/nanoTime) start) 1e6)]
(swap! stats update :latency conj ms)
resp)))
;; Periodic reporter — reads a consistent snapshot
(defn report-stats []
(let [{:keys [requests errors latency]} @stats]
(println (str "Requests: " requests
" | Errors: " errors
" | Avg Latency: " (when (seq latency) (/ (reduce + latency) (count latency))) "ms"))))Use case: Hot-reloadable config / feature flags
(def config (atom {:rate-limit 100 :feature-x? false}))
;; Admin thread toggles a feature
(swap! config assoc :feature-x? true)
;; Worker reads — no lock needed, immutable snapshot
(let [cfg @config]
(when (:feature-x? cfg)
(enable-new-pathway)))2. Refs + STM — Synchronous, Coordinated Transactions
Multiple identities that must change atomically together — Clojure’s Software Transactional Memory.
(def from-account (ref 1000))
(def to-account (ref 500))
(defn transfer [amt]
(dosync
;; both happen or neither — like a DB transaction
(alter from-account - amt)
(alter to-account + amt)))
(transfer 200)
@from-account ;; => 800
@to-account ;; => 700dosync retries automatically on conflict. No deadlocks — STM detects cycles.
Use case: Inventory + order management
(def inventory (ref {:widgets 10 :gadgets 5}))
(def orders (ref []))
(defn place-order [item qty]
(dosync
(when (>= (get @inventory item 0) qty)
(alter inventory update item - qty)
(alter orders conj {:item item :qty qty :time (java.util.Date.)}))))
;; Thread 1: (place-order :widgets 3)
;; Thread 2: (place-order :widgets 8)
;; One succeeds, the other retries — never both deducting from the same poolUse case: Resource allocation
(def pool (ref {:servers (vec (take 10 servers))}))
(defn acquire-server []
(dosync
(let [pool' @pool
{:keys [servers]} pool']
(when (seq servers)
(alter pool update :servers rest)
(first servers)))))
(defn release-server [s]
(dosync
(alter pool update :servers conj s)))3. Agents — Asynchronous, Independent Updates
Fire-and-forget state changes. Each agent runs in its own thread pool — send returns immediately.
(def log-buffer (agent []))
;; send returns immediately; function runs on agent's thread
(send log-buffer conj {:event "login" :user "alice"})
(send log-buffer conj {:event "logout" :user "alice"})
;; read current state (may lag behind sends)
@log-buffer
;; => [{:event "login", :user "alice"}
;; {:event "logout", :user "alice"}]
;; error handler — agent pauses on failure
(agent-error log-buffer) ;; nil if okUse case: Write-behind logging with batch flush
(def audit-log (agent [] :error-handler
(fn [ag ex]
(println "Log agent failed:" (.getMessage ex)))))
;; Fire 1000 events — non-blocking, single agent thread
(dotimes [i 1000]
(send audit-log conj {:id i :ts (System/currentTimeMillis)}))
;; Watcher flushes to DB every 100 entries
(add-watch audit-log :flusher
(fn [_ _ old new]
(when (>= (count new) 100)
(send audit-log
(fn [buffer]
(db/bulk-insert! buffer)
[]))))) ;; returns empty vector as new stateUse case: Background index rebuild
(def search-index (agent {}))
(defn reindex! [db]
(send search-index
(fn [_]
(println "Rebuilding index...")
(db/build-index db))))4. Futures & Promises — One-Shot Results
Future — compute on another thread, deref when ready
(def f (future (Thread/sleep 2000) (+ 40 2)))
;; block until done
@f ;; => 42 after 2s
;; non-blocking check
(realized? f) ;; trueUse case: Parallel API calls
(let [users (future (fetch "/api/users"))
orders (future (fetch "/api/orders"))
products (future (fetch "/api/products"))]
{:users @users
:orders @orders
:products @products}) ;; all 3 run in parallel, 1x wall timeUse case: Timeout wrapper
(defn with-timeout [ms f]
(let [result (future (f))]
(try
(deref result ms :timeout)
(finally
(future-cancel result)))))
(with-timeout 5000 #(slow-calc 42))
;; => result or :timeoutPromise — deliver a value from anywhere, any time
(def p (promise))
(future (Thread/sleep 1000) (deliver p "done!"))
@p ;; => "done!" (blocks until delivered)Use case: Bridge async callback → synchronous result
(defn wait-for-click [button]
(let [p (promise)]
(.addActionListener button (fn [_] (deliver p true)))
@p)) ;; blocks until user clicksUse case: Fan-out / fan-in
(let [n 10
promises (repeatedly n promise)
results (repeatedly n promise)]
;; Fan-out: spawn workers, each gets a promise to fill
(dotimes [i n]
(future
(let [data (do-work i)]
(deliver (nth promises i) data))))
;; Fan-in: collect all results
(mapv deref promises))5. core.async — CSP Channels (Separate Dependancy)
Go-style channels for communication between processes (lightweight threads). Add [org.clojure/core.async "1.7.xxx"] to deps.
(require '[clojure.core.async :refer [chan go <! >!! <!! timeout]])
(def ch (chan)) ;; unbuffered channel
;; producer
(go (dotimes [i 5]
(<! (timeout 1000)) ;; sleep 1s — non-blocking
(>! ch i))) ;; puts on channel
;; consumer
(loop []
(when-let [val (<!! ch)]
(println "Got:" val)
(recur)))Use case: Pipeline processing
(require '[clojure.core.async :as async])
(defn pipeline [input output]
(async/pipeline 4 output (map (fn [x] (do-slow-work x))) input))
;; Wire up: raw-data → transform → save
(let [raw (chan 1000)
cooked (chan 1000)]
(pipeline raw cooked)
(async/go-loop []
(when-let [item (async/<! cooked)]
(db/save! item)
(recur)))
(doseq [x (range 1000)]
(async/put! raw x))) ;; non-blockingUse case: Pub/Sub event bus
(require '[clojure.core.async :as async])
(defn event-bus []
(let [bus (async/chan 100)]
{:pub (fn [topic data]
(async/go
(async/>! bus {:topic topic :data data})))
:sub (fn [topic]
(let [out (async/chan 10)]
(async/go-loop []
(when-let [msg (async/<! bus)]
(when (= (:topic msg) topic)
(async/>! out (:data msg)))
(recur)))
out))}))Choosing the Right Tool
| Primitive | Sync? | Coordinated? | Use for |
|---|---|---|---|
atom | Sync | No | Single independent value (config, counter, cache) |
ref | Sync | Yes (STM) | Multiple values that must change together (accounts, inventory) |
agent | Async | No | Background state updates (logs, indexes) |
future | Async | — | One-shot parallel computation (API calls, timeouts) |
promise | — | — | Bridge callbacks → synchronous result |
core.async | Async | Channels | Data pipelines, producer/consumer, event buses |
Key Takeaways
- No mutexes, no
synchronized, no data races on data structures. Immutability means shared state can be read freely — contention only happens at the identity level, not the data level. - STM transactions compose.
dosyncwraps multiplerefupdates in a retry loop — if any piece conflicts, the whole thing restarts. swap!on atoms is lock-free using Java’sCASprimitives — hundreds of millions of ops/sec.- Agents decouple side effects from the main execution path. Errors pause the agent rather than crashing the caller.
- core.async lets you write async code that reads synchronously. The
gomacro rewrites blocking-looking<!operations into state-machine continuations — no callback hell.
For most day-to-day Clojure, atoms and futures cover 90% of needs. Reach for refs when you need coordinated changes, agents for background workloads, and core.async for high-throughput pipelines.