Vars
Atoms
user=> (def a (atom 101)) #'user/a user=> @a 101 user=> (reset! a 23) 23 user=> @a 23 user=> (swap! a + 1024) 1047 user=>
Agents
user=> (def v (agent 123)) #'user/v user=> @v 123 user=> (send v inc) #<Agent@2e21712e: 124> user=> (send v * 10) #<Agent@2e21712e: 1240> user=> (await v) nil user=> (await-for 1024 v) true user=> @v 1240 user=> user=> (source await) (defn await "Blocks the current thread (indefinitely!) until all actions dispatched thus far, from this thread or agent, to the agent(s) have occurred. Will block on failed agents. Will never return if a failed agent is restarted with :clear-actions true." {:added "1.0" :static true} [& agents] (io! "await in transaction" (when *agent* (throw (new Exception "Can't await in agent action"))) (let [latch (new java.util.concurrent.CountDownLatch (count agents)) count-down (fn [agent] (. latch (countDown)) agent)] (doseq [agent agents] (send agent count-down)) (. latch (await))))) nil
看一下send和send-off实现的差异:
send & send-off user=> (source send) (defn send "Dispatch an action to an agent. Returns the agent immediately. Subsequently, in a thread from a thread pool, the state of the agent will be set to the value of: (apply action-fn state-of-agent args)" {:added "1.0" :static true} [^clojure.lang.Agent a f & args] (.dispatch a (binding [*agent* a] (binding-conveyor-fn f)) args false)) nil user=> (source send-off) (defn send-off "Dispatch a potentially blocking action to an agent. Returns the agent immediately. Subsequently, in a separate thread, the state of the agent will be set to the value of: (apply action-fn state-of-agent args)" {:added "1.0" :static true} [^clojure.lang.Agent a f & args] (.dispatch a (binding [*agent* a] (binding-conveyor-fn f)) args true)) nil user=>
Refs
如何选择alter ref-set ?
- 我已经知道了Ref变量的值,所以我把它传给ref-set
- 我知道Ref变量的值是函数调用的结果,所以我直接把函数调用和参数交给alter
看实现代码的差异,alter:
;;;;;;;;; alter ;;;;;;;;;;;;;;;;;; (defn alter "Must be called in a transaction. Sets the in-transaction-value of ref to: (apply fun in-transaction-value-of-ref args) and returns the in-transaction-value of ref." {:added "1.0" :static true} [^clojure.lang.Ref ref fun & args] (. ref (alter fun args))) public Object alter(IFn fn, ISeq args) { LockingTransaction t = LockingTransaction.getEx(); return t.doSet(this, fn.applyTo(RT.cons(t.doGet(this), args))); } Object doSet(Ref ref, Object val){ if(!info.running()) throw retryex; if(commutes.containsKey(ref)) throw new IllegalStateException("Can't set after commute"); if(!sets.contains(ref)) { sets.add(ref); lock(ref); //If another transaction has modified the same Ref then this transaction may retry (see the lock method). } vals.put(ref, val); return val; }
;;;;;;;;; ref-set ;;;;;;;;;;;;;;;;;; (defn ref-set "Must be called in a transaction. Sets the value of ref. Returns val." {:added "1.0" :static true} [^clojure.lang.Ref ref val] (. ref (set val))) public Object set(Object val){ return LockingTransaction.getEx().doSet(this, val); }
Clojure 1.4.0 user=> (source ref-set) (defn ref-set "Must be called in a transaction. Sets the value of ref. Returns val." {:added "1.0" :static true} [^clojure.lang.Ref ref val] (. ref (set val))) nil user=> (source alter) (defn alter "Must be called in a transaction. Sets the in-transaction-value of ref to: (apply fun in-transaction-value-of-ref args) and returns the in-transaction-value of ref." {:added "1.0" :static true} [^clojure.lang.Ref ref fun & args] (. ref (alter fun args))) nil user=>
commute的适用场景?
;;;;; commute ;;;;;;; (defn commute "Must be called in a transaction. Sets the in-transaction-value of ref to: (apply fun in-transaction-value-of-ref args) and returns the in-transaction-value of ref. At the commit point of the transaction, sets the value of ref to be: (apply fun most-recently-committed-value-of-ref args) Thus fun should be commutative, or, failing that, you must accept last-one-in-wins behavior. commute allows for more concurrency than ref-set." {:added "1.0" :static true} [^clojure.lang.Ref ref fun & args] (. ref (commute fun args))) public Object commute(IFn fn, ISeq args) { return LockingTransaction.getEx().doCommute(this, fn, args); } Object doCommute(Ref ref, IFn fn, ISeq args) { if(!info.running()) throw retryex; if(!vals.containsKey(ref)) { Object val = null; try { ref.lock.readLock().lock(); val = ref.tvals == null ? null : ref.tvals.val; } finally { ref.lock.readLock().unlock(); } vals.put(ref, val); } ArrayList<CFn> fns = commutes.get(ref); if(fns == null) commutes.put(ref, fns = new ArrayList<CFn>()); fns.add(new CFn(fn, args)); Object ret = fn.applyTo(RT.cons(vals.get(ref), args)); vals.put(ref, ret); return ret; }
Uncoordinated | Coordinated | |
---|---|---|
Synchronous | Atom | Ref |
Asynchronous | Agent | none |
Clojure Validators and Watchers
user=> (def my-ref (ref 100 :validator (fn [x] (> x 50)))) #'user/my-ref user=> (dosync (ref-set my-ref 123)) 123 user=> (dosync (ref-set my-ref 23)) IllegalStateException Invalid reference state clojure.lang.ARef.validate (ARef. java:33) user=>
user=> (def a (ref 123)) #'user/a user=> (defn w-1 [key id old new ] (println "w-1" "key" key "id" id "old" old "n ew" new)) #'user/w-1 user=> (defn w-2 [key id old new ] (println "w-2" "key" key "id" id "old" old "n ew" new)) #'user/w-2 user=> (add-watch a "watch-1" w-1) #<Ref@41f1f35b: 123> user=> (add-watch a "watch-2" w-2) #<Ref@41f1f35b: 123> user=> (dosync (alter a inc)) w-1 key watch-1 id #<Ref@41f1f35b: 124> old 123 new 124 w-2 key watch-2 id #<Ref@41f1f35b: 124> old 123 new 124 124 user=>
user=> (source add-watch) (defn add-watch "Alpha - subject to change. Adds a watch function to an agent/atom/var/ref reference. The watch fn must be a fn of 4 args: a key, the reference, its old-state, its new-state. Whenever the reference's state might have been changed, any registered watches will have their functions called. The watch fn will be called synchronously, on the agent's thread if an agent, before any pending sends if agent or ref. Note that an atom's or ref's state may have changed again prior to the fn call, so use old/new-state rather than derefing the reference. Note also that watch fns may be called from multiple threads simultaneously. Var watchers are triggered only by root binding changes, not thread-local set!s. Keys must be unique per reference, and can be used to remove the watch with remove-watch, but are otherwise considered opaque by the watch mechanism." {:added "1.0" :static true} [^clojure.lang.IRef reference key fn] (.addWatch reference key fn)) nil user=>
Clojure STM - High Level
"MVCC uses timestamps or increasing transaction IDs to achieve serializability. MVCC ensures a transaction never has to wait for a [database] object by maintaining several versions of an object. Each version would have a write timestamp and it would let a transaction read the most recent version of an object which precedes the transaction timestamp."
"If a transaction (Ti) wants to write to an object, and if there is another transaction (Tk) (that also wants to write it), the timestamp of Ti must precede the timestamp of Tk for the object write operation to succeed. Which is to say a write cannot complete if there are outstanding transactions with an earlier timestamp."
"Every object would also have a read timestamp, and if a transaction Ti wanted to write to object P, and the timestamp of that transaction is earlier than the object's read timestamp, the transaction Ti is aborted and restarted. Otherwise, Ti creates a new version of P and sets the read/write timestamps of P to the timestamp of the transaction." (The Clojure STM implementation does not use read timestamps.)
"The obvious drawback to this system is the cost of storing multiple versions of objects [in the database]. On the other hand reads are never blocked, which can be important for workloads mostly involving reading values [from the database]. MVCC is particularly adept at implementing true snapshot isolation, something which other methods of concurrency control frequently do either incompletely or with high performance costs."
"A transaction executing under snapshot isolation appears to operate on a personal snapshot [of the database], taken at the start of the transaction. When the transaction concludes, it will successfully commit only if the values updated by the transaction have not been changed externally since the snapshot was taken."