Clojure的并发(四)Agent深入分析和Actor

简介:
Clojure 的并发(一) Ref和STM
Clojure 的并发(二)Write Skew分析
Clojure 的并发(三)Atom、缓存和性能
Clojure 的并发(四)Agent深入分析和Actor
Clojure 的并发(五)binding和let
Clojure的并发(六)Agent可以改进的地方
Clojure的并发(七)pmap、pvalues和pcalls
Clojure的并发(八)future、promise和线程

四、 Agent和Actor

   除了用于协调同步的Ref,独立同步的Ref,还有一类非常常见的需求:你可能希望状态的更新是异步,你通常不关心更新的结果,这时候你可以考虑下使用Agent。

1、创建agent:

user =>  (def counter (agent  0 ))
# 'user/counter

user
=>  counter
# <Agent@9444d1: 0>


通过agent函数你就可以创建一个agent,指向一个不可变的初始状态。

2、取agent的值,这跟Ref和Atom没啥两样,都是通过deref或者@宏:
user =>   @counter
0
user
=>  (deref counter)
0

3、更新agent,通过send或者send-off函数给agent发送任务去更新agent:
user =>  ( send  counter inc)
# <Agent@9444d1: 0>

  send返回agent对象,内部的值仍然是0,而非inc递增之后的1,这是因为send是异步发送,更新是在另一个线程执行,两个线程(REPL主线程和更新任务的线程)的执行顺序没有同步,显示什么取决于两者谁更快。更新肯定是发生了,查看counter的值:
user =>   @counter
1

   果然更新到了1了。send的方法签名:
( send  a f  &  args)

   其中f是更新的函数,它的定义如下:
(f state - of - agent  &  args)
   也就是它会在第一个参数接收当前agent的状态,而args是send附带的参数。

   还有个方法,send-off,它的作用于send类似:
user =>  ( send - off counter inc)
# <Agent@9444d1: 1>
user =>   @counter
2

   send和send-off的区别在于,send是将任务交给一个 固定大小的线程池执行
final public static ExecutorService pooledExecutor  =
        Executors
.newFixedThreadPool ( 2   +  Runtime . getRuntime() . availableProcessors());
   默认线程池大小是 CPU核数加上2。因此 send执行的任务最好不要有阻塞的操作。而send-off则使用没有大小限制(取决于内存)的线程池:

final public static ExecutorService soloExecutor  =  Executors .newCachedThreadPool ();
  
   因此, send-off比较适合任务有阻塞的操作,如IO读写之类。请注意, 所有的agent是共用这些线程池,这从这些线程池的定义看出来,都是静态变量。

4、异步转同步
,刚才提到send和send-off都是异步将任务提交给线程池去处理,如果你希望同步等待结果返回,那么可以使用await函数:
 ( do  ( send  counter inc) (await counter) (println  @counter ))

send一个任务之后,调用await等待agent所有派发的更新任务结束,然后打印agent的值。await是阻塞当前线程,直到至今为止所有任务派发执行完毕才返回。await没有超时,会一直等待直到条件满足,await-for则可以接受等待的超时时间,如果超过指定时间没有返回,则返回nil,否则返回结果。
 ( do  ( send  counter inc) (await - for   100  counter) (println  @counter ))

await-for接受的单位是毫秒。

5、错误处理


   agent也可以跟Ref和Atom一样设置validator,用于约束验证。由于agent的更新是异步的,你不知道更新的时候agent是否发生异常,只有等到你去取值或者更新的时候才能发现:
user =>  (def counter (agent  0  :validator number ? ))
#
' user/counter

user
=>  (send counter (fn[_]  " foo " ))
#
< clojure.lang.Agent@4de8ce62:  0 >

   强制要求counter的值是数值类型,第二个表达式我们给counter发送了一个更新任务,想将状态更新为字符串"foo",由于是异步更新,返回的结果可能没有显示异常,当你取值的时候,问题出现了:
user =>  @counter
java.lang.Exception: Agent has errors (NO_SOURCE_FILE:
0 )

  告诉你agent处于不正常的状态,如果你想获取详细信息,可以通过agent-errors函数:
user =>  (.printStackTrace (agent - errors counter))
java.lang.IllegalArgumentException: No matching field found: printStackTrace 
for   class  clojure.lang.PersistentList (NO_SOURCE_FILE: 0 )

   你可以恢复agent到前一个正常的状态,通过clear-agent-errors函数:
 
user =>  (clear - agent - errors counter)
nil
user
=>  @counter
0

6、加入事务

agent跟atom不一样,agent可以加入事务,在事务里调用send发送一个任务, 当事务成功的时候该任务将只会被发送一次,最多最少都一次。利用这个特性,我们可以实现在事务操作的时候写文件,达到ACID中的D——持久性的目的:
(def backup - agent (agent  " output/messages-backup.clj "  ))
(def messages (ref []))
(use 
' [clojure.contrib.duck-streams :only (spit)])
(defn add - message - with - backup [msg]
       (dosync
           (let [snapshot (commute messages conj msg)]
                (send
- off backup - agent (fn [filename]
                                        (spit filename snapshot)
                                        filename))
           snapshot)))

定义了一个backup-agent用于保存消息, add - message - with - backup函数首先将状态保存到messages,这是个普通的Ref,然后调用send-off给backup-agent一个任务:
 (fn [filename]
          (spit filename snapshot)
         filename)
这个任务是一个匿名函数,它利用spit打开文件,写入当前的快照,并且关闭文件,文件名来自backup-agent的状态值。注意到,我们是用send-off,send-off利用cache线程池,哪怕阻塞也没关系。

利用事务加上一个backup-agent可以实现类似数据库的ACID,但是还是不同的,主要区别在于 backup-agent的更新是异步,并不保证一定写入文件,因此持久性也没办法得到保证。

7、关闭线程池:


前面提到agent的更新都是交给线程池去处理,在系统关闭的时候你需要关闭这两个线程吃,通过shutdown-agents方法,你再添加任务将被拒绝:
user =>  (shutdown - agents)
nil
user
=>  (send counter inc)
java.util.concurrent.RejectedExecutionException (NO_SOURCE_FILE:
0 )
user
=>  (def counter (agent  0 ))
#
' user/counter
user =>  (send counter inc)    
java.util.concurrent.RejectedExecutionException (NO_SOURCE_FILE:
0 )

哪怕我重新创建了counter,提交任务仍然被拒绝,进一步证明这些 线程池是全局共享的。

8、原理浅析

前文其实已经将agent的实现原理大体都说了,agent本身只是个普通的java对象,它的内部维持一个状态和一个队列:
    volatile  Object state;
    AtomicReference
< IPersistentStack >  q  =   new  AtomicReference(PersistentQueue.EMPTY);


任务提交的时候,是封装成Action对象,添加到此队列

    
public  Object dispatch(IFn fn, ISeq args,  boolean  solo) {
        
if  (errors  !=   null ) {
            
throw   new  RuntimeException( " Agent has errors " , (Exception) RT.first(errors));
        }
        
// 封装成action对象
        Action action  =   new  Action( this , fn, args, solo);
        dispatchAction(action);

        
return   this ;
    }


    
static   void  dispatchAction(Action action) {
        LockingTransaction trans 
=  LockingTransaction.getRunning();
        
//  有事务,加入事务
         if  (trans  !=   null )
            trans.enqueue(action);
        
else   if  (nested.get()  !=   null ) {
            nested.set(nested.get().cons(action));
        }
        
else  {
            
//  入队
            action.agent.enqueue(action);
        }
    }

send和send-off都是调用Agent的dispatch方法,只是两者的参数不一样,dispatch的第二个参数 solo决定了是使用哪个线程池处理action:
(defn send
  [#
^ clojure.lang.Agent a f  &  args]
    (. a (dispatch f args 
false )))

(defn send
- off
  [#
^ clojure.lang.Agent a f  &  args]
    (. a (dispatch f args 
true )))

send-off将solo设置为true,当为true的时候使用cache线程池:

   
final   public   static  ExecutorService soloExecutor  =  Executors.newCachedThreadPool();

    
final   static  ThreadLocal < IPersistentVector >  nested  =   new  ThreadLocal < IPersistentVector > ();

        
void  execute() {
            
if  (solo)
                soloExecutor.execute(
this );
            
else
                pooledExecutor.execute(
this );
        }

执行的时候调用更新函数并设置新的状态:

try  {
                    Object oldval 
=  action.agent.state;
                    Object newval 
=  action.fn.applyTo(RT.cons(action.agent.state, action.args));
                    action.agent.setState(newval);
                    action.agent.notifyWatches(oldval, newval);
                }
                
catch  (Throwable e) {
                    
//  todo report/callback
                    action.agent.errors  =  RT.cons(e, action.agent.errors);
                    hadError 
=   true ;
                }

9、跟actor的比较:

Agent跟Actor有一个显著的不同,agent的action来自于别人发送的任务附带的更新函数,而actor的action则是自身逻辑的一部分。因此,如果想用agent实现actor模型还是相当困难的,下面是我的一个尝试:

(ns actor)

(defn receive [
&  args]
   (apply hash
- map args))
(defn self [] 
* agent * )

(defn spawn [recv
- map]
    (agent recv
- map))

(defn 
!  [actor msg]
    (send actor #(apply (get 
% 1   % 2 )  (vector  % 2 )) msg))
;;启动一个actor
(def actor (spawn 
             (receive :hello #(println 
" receive  " % ))))
;;发送消息 hello
(
!  actor :hello)


   利用spawn启动一个actor,其实本质上是一个agent,而发送通过感叹号!,给agent发送一个更新任务,它从recv-map中查找消息对应的处理函数并将消息作为参数来执行。难点在于消息匹配,匹配这种简单类型的消息没有问题,但是如果匹配用到变量,暂时没有想到好的思路实现,例如实现两个actor的ping/pong。

文章转自庄周梦蝶  ,原文发布时间2010-07-19

目录
相关文章
|
5月前
|
存储 消息中间件 缓存
【Flume】Flume Agent的内部原理分析
【4月更文挑战第4天】【Flume】Flume Agent的内部原理分析
|
14天前
|
SQL 分布式计算 Hadoop
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
36 2
|
2月前
|
数据采集 存储 Java
Flume Agent 的内部原理分析:深入探讨 Flume 的架构与实现机制
【8月更文挑战第24天】Apache Flume是一款专为大规模日志数据的收集、聚合及传输而设计的分布式、可靠且高可用系统。本文深入解析Flume Agent的核心机制并提供实际配置与使用示例。Flume Agent由三大组件构成:Source(数据源)、Channel(数据缓存)与Sink(数据目的地)。工作流程包括数据采集、暂存及传输。通过示例配置文件和Java代码片段展示了如何设置这些组件以实现日志数据的有效管理。Flume的强大功能与灵活性使其成为大数据处理及实时数据分析领域的优选工具。
85 1
|
5月前
|
Kubernetes 安全 Go
对于阿里开源混沌工程工具chaosblade-box-agent心跳报错问题的分析与解决
摘要: 本文记录了一个由chaosblade-box平台后台发现的偶发的chaosblade-box-agent不发送心跳的问题,从报错日志入手,结合chaosblade-box-agent源码进行分析,最终解决问题并修复打包的过程。
395 7
|
5月前
|
人工智能 测试技术 API
【AIGC】LangChain Agent(代理)技术分析与实践
【5月更文挑战第12天】 LangChain代理是利用大语言模型和推理引擎执行一系列操作以完成任务的工具,适用于从简单响应到复杂交互的各种场景。它能整合多种服务,如Google搜索、Wikipedia和LLM。代理通过选择合适的工具按顺序执行任务,不同于链的固定路径。代理的优势在于可以根据上下文动态选择工具和执行策略。适用场景包括网络搜索、嵌入式搜索和API集成。代理由工具组成,每个工具负责单一任务,如Web搜索或数据库查询。工具包则包含预定义的工具集合。创建代理需要定义工具、初始化执行器和设置提示词。LangChain提供了一个从简单到复杂的AI解决方案框架。
638 3
|
5月前
|
人工智能 自然语言处理 搜索推荐
【AGI】智能体简介及场景分析
【4月更文挑战第14天】AI时代,智能体的意义,使用场景及对未来的意义
171 1
|
存储 机器学习/深度学习 人工智能
大模型自主智能体爆火,OpenAI也在暗中观察、发力,这是内部人的分析博客(1)
大模型自主智能体爆火,OpenAI也在暗中观察、发力,这是内部人的分析博客
190 1
|
机器学习/深度学习 人工智能 自然语言处理
大模型自主智能体爆火,OpenAI也在暗中观察、发力,这是内部人的分析博客(2)
大模型自主智能体爆火,OpenAI也在暗中观察、发力,这是内部人的分析博客
232 0
|
人工智能 监控 搜索推荐
使用LangChain的自定义Tool+Agent, 构建全新的AIOps故障分析流程?
如果能够利用LangChain的Agent对问题的推理、任务的编排能力, 再进一步结合自定义的检查脚本工具, 是否就能够更好的实现故障分析的流程化智能编排和执行。
5116 0
|
Java 应用服务中间件 API
Agent内存马的自动分析与查杀(三)
Agent内存马的自动分析与查杀
346 0
Agent内存马的自动分析与查杀(三)