Clojure STM 笔记-中篇

简介:
继续 上篇继续完成" Software Transactional Memory"的笔记, 本文关注Clojure处理并发的四种引用类型.
 
 
  Clojure中除了引用类型(Reference Type)之外所有的变量都是immutable.本质上" Reference types are mutable references to immutable data". Clojure有四种引用对象:Var Atom Agent Ref.
 
 

Vars

 
    Root Value全局共享,可以重新定义thread-specific值;常用作constant,不建议修改变量值.可以修改Var变量值的函数有def set! binding 有一种场景修改Var变量值是可以接受的:修改配置变量,比如开启反射调用警告 (set! *warn-on-reflection* true)  
 

Atoms

 
   单一值在所有线程共享,原子访问,线程并发读写安全.可以修改Atom值的function有: reset! compare-and-set! swap! 解析Atom使用@Var-name.
 
复制代码
user=> (def a (atom 101))
#'user/a
user=> @a
101
user=> (reset! a 23)
23
user=> @a
23
user=> (swap! a + 1024)
1047
user=>
复制代码

 

Agents

 
   单一值所有线程共享,Agent值在另外的线程调用异步方法(Action)修改.Action返回值作为Agent的新值.Agent值解析使用@var-name.
   发送到同一个Agent的Action会队列化,同一时刻只有一个Action修改Agent的值.Action排队的函数有send和send-off.由于Action是异步执行,所以这两个send方法调用会立即返回.它们的区别在于线程池的大小:一个是固定大小的,一个是大小可变的.我们调用send的时候还可以把一些附加参数带过去,比如 (send v * 10);await方法挂起当前线程直到指定Agents集上的所有的action都已经执行完成.await-for功能类似,提供超时处理.Actions如果发送到的Agent在事务中,就会等待直到事务执行完毕.这个特征用来在代码中附加一些有副作用的逻辑.
 
 
复制代码
user=> (def v (agent 123))
#'user/v
user=> @v
123
user=> (send v inc)
#<Agent@2e21712e: 124>
user=> (send v * 10)
#<Agent@2e21712e: 1240>
user=> (await v)
nil
user=> (await-for 1024 v)
true
user=> @v
1240
user=>
 
user=> (source await)
(defn await
  "Blocks the current thread (indefinitely!) until all actions
  dispatched thus far, from this thread or agent, to the agent(s) have
  occurred.  Will block on failed agents.  Will never return if
  a failed agent is restarted with :clear-actions true."
  {:added "1.0"
   :static true}
  [& agents]
  (io! "await in transaction"
    (when *agent*
      (throw (new Exception "Can't await in agent action")))
    (let [latch (new java.util.concurrent.CountDownLatch (count agents))
          count-down (fn [agent] (. latch (countDown)) agent)]
      (doseq [agent agents]
        (send agent count-down))
      (. latch (await)))))
nil
复制代码

 看一下send和send-off实现的差异:

复制代码
send & send-off

user=> (source send)
(defn send
  "Dispatch an action to an agent. Returns the agent immediately.
  Subsequently, in a thread from a thread pool, the state of the agent
  will be set to the value of:

  (apply action-fn state-of-agent args)"
  {:added "1.0"
   :static true}
  [^clojure.lang.Agent a f & args]
  (.dispatch a (binding [*agent* a] (binding-conveyor-fn f)) args false))
nil
user=> (source send-off)
(defn send-off
  "Dispatch a potentially blocking action to an agent. Returns the
  agent immediately. Subsequently, in a separate thread, the state of
  the agent will be set to the value of:

  (apply action-fn state-of-agent args)"
  {:added "1.0"
   :static true}
  [^clojure.lang.Agent a f & args]
  (.dispatch a (binding [*agent* a] (binding-conveyor-fn f)) args true))
nil
user=>
复制代码

 

 

Refs

 
  Refs只可以在STM Transaction内部进行修改.修改Ref值的函数有ref-set alter commute.Ref的值通过@var-name解析.读一个变量的值不需要使用Transaction,但是如果需要获得 多个变量一致的快照就要使用transaction了.注意:Refs是唯一一个需要使用STM来协调的变量类型.
 
 

如何选择alter ref-set ?

 
 你为什么要使用ref-set而不是alter?Clojure惯用法往往是这样的:
 
  •  我已经知道了Ref变量的值,所以我把它传给ref-set
  •  我知道Ref变量的值是函数调用的结果,所以我直接把函数调用和参数交给alter

 

看实现代码的差异,alter:

复制代码
 ;;;;;;;;;  alter ;;;;;;;;;;;;;;;;;;
(defn alter
  "Must be called in a transaction. Sets the in-transaction-value of
  ref to:

  (apply fun in-transaction-value-of-ref args)

  and returns the in-transaction-value of ref."
  {:added "1.0"
   :static true}
  [^clojure.lang.Ref ref fun & args]
    (. ref (alter fun args)))
    
public Object alter(IFn fn, ISeq args) {
     LockingTransaction t = LockingTransaction.getEx();
     return t.doSet(this, fn.applyTo(RT.cons(t.doGet(this), args)));
}

Object doSet(Ref ref, Object val){
     if(!info.running())
          throw retryex;
     if(commutes.containsKey(ref))
          throw new IllegalStateException("Can't set after commute");
     if(!sets.contains(ref))
          {
          sets.add(ref);
          lock(ref); //If another transaction has modified the same Ref then this transaction may retry (see the lock method).
          }
     vals.put(ref, val);
     return val;
}
复制代码

 

ref-set:
复制代码
;;;;;;;;;  ref-set ;;;;;;;;;;;;;;;;;;
(defn ref-set
  "Must be called in a transaction. Sets the value of ref.
  Returns val."
  {:added "1.0"
   :static true}
  [^clojure.lang.Ref ref val]
    (. ref (set val)))  


public Object set(Object val){
    return LockingTransaction.getEx().doSet(this, val);
}
复制代码

 

 
  不要害怕把函数传递给函数在函数式编程里面这是很平常的事情
 
复制代码
Clojure 1.4.0
user=> (source ref-set)
(defn ref-set
  "Must be called in a transaction. Sets the value of ref.
  Returns val."
  {:added "1.0"
   :static true}
  [^clojure.lang.Ref ref val]
    (. ref (set val)))
nil
user=> (source alter)
(defn alter
  "Must be called in a transaction. Sets the in-transaction-value of
  ref to:

  (apply fun in-transaction-value-of-ref args)

  and returns the in-transaction-value of ref."
  {:added "1.0"
   :static true}
  [^clojure.lang.Ref ref fun & args]
    (. ref (alter fun args)))
nil
user=>
复制代码

 

 

commute的适用场景?

 
     通常情况下都鼓励使用alter而非commute,commute适用的场景是:Ref变动的事务执行顺序不重要的情况,或者说所有的操作是符合数学交换律的.判断是否选择commute大概是这样一个过程:我想修改Ref值 但我不关心是否有其它的事务也在修改它,Ref值出现冲突的时候使用最新的值也是正确的;
    有些情况下commute特别有用比如累计Collection对象,计算一些统计数据 min max average 如果这个collection是被Ref持有,如果两个并发的事务都是添加对象到Collection 没有必要纠结哪个先添加哪一个后添加.使用ref-set/alter,如果另外一个transaction提交了对Ref的修改,当前的事务会重试,使用Commute当前的事务不会重试会继续执行.规避了重试会得到较好的性能,但是要搞清楚,你的逻辑 真的顺序无关吗?
     在一个事务中传递给commute的函数和参数保存在sorted map. Map结果的key是Ref对象,Value是函数和参数的lists, Map按照Ref的创建时间排序.当事务提交的时候,Map中的每一个Ref都会按照排序加写锁, 所有涉及到的commute function都会再次调用,决定最终Ref是什么.按顺序获得write lock避免了死锁.Ref的Value取决与是当前事务开始那一刻之后否有事务又提交新值,如果有新值就会使用新值.否则就会使用最初进入事务时变量的值;这就意味着变量传送到commute function的值,可能和最初传入的值不一样.
    事务中,在Ref上调用了commute之后就不能再使用ref-set/alter重新赋值了.有些场景下需要阻止其它事务读取当前事务要修改的值(比如避免write shew),这是使用ensure来保证的.保证所有没有其它事务可以修改Ref,甚至不能保证当前事务能修改Ref,这是因为其它事务可能也在调用ensure.
复制代码
 ;;;;; commute ;;;;;;;
(defn commute
  "Must be called in a transaction. Sets the in-transaction-value of
  ref to:

  (apply fun in-transaction-value-of-ref args)

  and returns the in-transaction-value of ref.

  At the commit point of the transaction, sets the value of ref to be:

  (apply fun most-recently-committed-value-of-ref args)

  Thus fun should be commutative, or, failing that, you must accept
  last-one-in-wins behavior.  commute allows for more concurrency than
  ref-set."
  {:added "1.0"
   :static true}

  [^clojure.lang.Ref ref fun & args]
    (. ref (commute fun args)))

public Object commute(IFn fn, ISeq args) {
     return LockingTransaction.getEx().doCommute(this, fn, args);
}

Object doCommute(Ref ref, IFn fn, ISeq args) {
     if(!info.running())
          throw retryex;
     if(!vals.containsKey(ref))
          {
          Object val = null;
          try
               {
               ref.lock.readLock().lock();
               val = ref.tvals == null ? null : ref.tvals.val;
               }
          finally
               {
               ref.lock.readLock().unlock();
               }
          vals.put(ref, val);
          }
     ArrayList<CFn> fns = commutes.get(ref);
     if(fns == null)
          commutes.put(ref, fns = new ArrayList<CFn>());
     fns.add(new CFn(fn, args));
     Object ret = fn.applyTo(RT.cons(vals.get(ref), args));
     vals.put(ref, ret);
     return ret;
}
复制代码

 

 
 
 
总结性的表格:
 

  Uncoordinated Coordinated
Synchronous Atom Ref
Asynchronous Agent none
 

Clojure Validators and Watchers

 
    四种引用类型都支持Validator,Watcher.
 
    每一次引用对象的值被修改的时候就会调用Validator.如果function发现变动不是有效的,就会返回false或者异常来阻止变动发生.每一个引用类型都只能有一个Validator. set-validator! 将一个validator function指定给一个引用类型.
    
 
复制代码
user=> (def my-ref (ref 100 :validator (fn [x] (> x 50))))
#'user/my-ref
user=> (dosync (ref-set my-ref 123))
123
user=> (dosync (ref-set my-ref 23))
IllegalStateException Invalid reference state  clojure.lang.ARef.validate (ARef.
java:33)
user=>
复制代码

 

   有两种机制实现引用类型值的变动通知:Watch function 和Watch Agents,区别在于订阅的消息不同,或者说关注的事件不同.
 
复制代码
user=> (def a (ref 123))
#'user/a
user=> (defn w-1 [key id old new ] (println "w-1" "key" key "id" id "old" old "n
ew" new))
#'user/w-1
user=> (defn w-2 [key id old new ] (println "w-2" "key" key "id" id "old" old "n
ew" new))
#'user/w-2
user=> (add-watch a "watch-1" w-1)
#<Ref@41f1f35b: 123>
user=> (add-watch a "watch-2" w-2)
#<Ref@41f1f35b: 123>
user=> (dosync (alter a inc))
w-1 key watch-1 id #<Ref@41f1f35b: 124> old 123 new 124
w-2 key watch-2 id #<Ref@41f1f35b: 124> old 123 new 124
124
user=>
复制代码

 

  从上面的例子可以看到watcher可以是多个.下面是add-watch的源码,metadata有详细的说明:
 
复制代码
user=> (source add-watch)
(defn add-watch
  "Alpha - subject to change.
  Adds a watch function to an agent/atom/var/ref reference. The watch
  fn must be a fn of 4 args: a key, the reference, its old-state, its
  new-state. Whenever the reference's state might have been changed,
  any registered watches will have their functions called. The watch fn
  will be called synchronously, on the agent's thread if an agent,
  before any pending sends if agent or ref. Note that an atom's or
  ref's state may have changed again prior to the fn call, so use
  old/new-state rather than derefing the reference. Note also that watch
  fns may be called from multiple threads simultaneously. Var watchers
  are triggered only by root binding changes, not thread-local
  set!s. Keys must be unique per reference, and can be used to remove
  the watch with remove-watch, but are otherwise considered opaque by
  the watch mechanism."
  {:added "1.0"
   :static true}
  [^clojure.lang.IRef reference key fn] (.addWatch reference key fn))
nil
user=>
复制代码

 

 
  Watcher Agent订阅的事件是有action发送到Agent.注意:Action并没有携带old value.
 
 

Clojure STM - High Level

 
   目前Clojure实现是Clojure和java代码混搭,STM的几乎全部用java实现.
   Clojure STM的实现是基于MVCC(多版本并发控制)和 snapshot isolation(快照隔离);Clojure的STM实现和关系数据库的STM差异在于Clojure管理的是内存中的变量值而非数据库表,行,列. 
    
MVCC 使用时间戳或者增加事务ID来获得序列化,MVCC对象都会维护多个版本,因而可以让事务不需要等待.看下这份总结: http://wenku.baidu.com/view/8475271f866fb84ae45c8d0c.html 下面是维基百科的资料:
 
 
  

"MVCC uses timestamps or increasing transaction IDs to achieve serializability. MVCC ensures a transaction never has to wait for a [database] object by maintaining several versions of an object. Each version would have a write timestamp and it would let a transaction read the most recent version of an object which precedes the transaction timestamp."

"If a transaction (Ti) wants to write to an object, and if there is another transaction (Tk) (that also wants to write it), the timestamp of Ti must precede the timestamp of Tk for the object write operation to succeed. Which is to say a write cannot complete if there are outstanding transactions with an earlier timestamp."

"Every object would also have a read timestamp, and if a transaction Ti wanted to write to object P, and the timestamp of that transaction is earlier than the object's read timestamp, the transaction Ti is aborted and restarted. Otherwise, Ti creates a new version of P and sets the read/write timestamps of P to the timestamp of the transaction." (The Clojure STM implementation does not use read timestamps.)

"The obvious drawback to this system is the cost of storing multiple versions of objects [in the database]. On the other hand reads are never blocked, which can be important for workloads mostly involving reading values [from the database]. MVCC is particularly adept at implementing true snapshot isolation, something which other methods of concurrency control frequently do either incompletely or with high performance costs."

"A transaction executing under snapshot isolation appears to operate on a personal snapshot [of the database], taken at the start of the transaction. When the transaction concludes, it will successfully commit only if the values updated by the transaction have not been changed externally since the snapshot was taken." 

 
 
  就到这里,下一篇将继续对" Software Transactional Memory"的研读,关注Clojure语言处理并发的基础原语的实现.
  
  新年快乐,明天回家喽!
 
 
最后小图一张,一位豆瓣好友的头像,相当有春天的气息:
 
目录
相关文章
|
IDE 编译器 Linux
用GCC开发STM32入门一(使用官方库)
用GCC开发STM32入门一(使用官方库)
|
存储 人工智能 算法
C++ Primer Plus 第6版 读书笔记(7)第 7 章 函数——C++的编程模块
乐趣在于发现。仔细研究,读者将在函数中找到乐趣。C++自带了一个包含函数的大型库(标准 ANSI 库加上多个 C++类),但真正的编程乐趣在于编写自己的函数;另一方面,要提高编程效率,本章和第 8 章介绍如何定义函数、给函数传递信息以及从函数那里获得信息。
145 0
|
机器人 C++
基于stm32的嵌入式开发学习之--前言
基于stm32的嵌入式开发学习之--前言
|
分布式数据库 Windows