💖10分钟认识RocketMQ!想进阿里连这个都不会?2️⃣💖

简介: 消费消息的步骤:​ 1. 创建消息消费者, 指定消费者所属的组名 2. 指定Nameserver地址 3. 指定消费者订阅的主题和标签 4. 设置回调函数,编写处理消息的方法 5. 启动消息消费者

三、入门使用


3.1、导入依赖


<dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-client</artifactId>
      <version>4.4.0</version>
    </dependency>
复制代码


3.2、发送消息


   发送消息的步骤1:

  1. 创建消息生产者, 指定生产者所属的组名。
  2. 指定Nameserver地址。
  3. 启动生产者。
  4. 创建消息对象,指定主题、标签和消息体。
  5. 发送消息。
  6. 关闭生产者。


public class Producer {
  public static void main(String[] args) throws Exception {
  // 创建一个生产者对象,并且指定一个生产者组
  DefaultMQProducer producer = new DefaultMQProducer("xiaolin-producer");
  // 指定名字服务器地址
    producer.setNamesrvAddr("127.0.0.1:9876");
  // 启动生产者
    producer.start();
    // 创建一个消息,参数分别为:主题、标签、消息体
    Message message = new Message("hello_demo1","tag1","你好,我是消息1".getBytes("utf-8"));
    // 发送消息
    producer.send(message);
    // 关闭资源
    producer.shutdown();
  }
}
复制代码


3.3、消费消息(接收消息)


消费消息的步骤:


1. 创建消息消费者, 指定消费者所属的组名 2. 指定Nameserver地址 3. 指定消费者订阅的主题和标签 4. 设置回调函数,编写处理消息的方法 5. 启动消息消费者


public class ConsumerDemo {
  public static void main(String[] args) throws MQClientException {
    // 创建一个拉取消息对象,并指定所属组名
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-demo");
      // 指定名字服务器地址
    producer.setNamesrvAddr("127.0.0.1:9876");
      // 指定消费者订阅的主题和标签,第二个参数用于过滤条件用于指定接收什么tag,什么都接收
      consumer.subscribe("hello_demo1", "*");
    // 注册一个消息监听器
    consumer.registerMessageListener(new MessageListenerConcurrently() {
      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
          ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        // 可能有多个消息
        for (MessageExt msg : msgs) {
          // 打印消息
          System.out.println(new String(msg.getBody()));
        }
        // 返回发送成功
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
      }
    });
          // 启动消费者
    consumer.start();
  }
}
复制代码


四、消息的类型


4.1、普通消息


   RocketMQ提供三种方式来发送普通消息:可靠同步发送、可靠异步发送和单向发送。


4.1.1、可靠同步发送


   同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式。这种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。


public class SyncProducer {
  public static void main(String[] args) throws Exception {
      // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("xiaolin-producer");
      // 设置NameServer的地址
    producer.setNamesrvAddr("127.0.0.1:9876");
      // 启动Producer实例
        producer.start();
      for (int i = 0; i < 100; i++) {
          // 创建消息,并指定Topic,Tag和消息体
          Message msg = new Message("04-producer-type" /* Topic */,
          "TagA" /* Tag */,
          ("这是一条同步消息 " + i).getBytes("utf-8") /* Message body */);
          //发送同步消息到一个Broker
            SendResult sendResult = producer.send(msg);
            // 通过sendResult返回消息是否成功送达
            System.out.println(JSON.toJSONString(sendResult));
      }
      // 如果不再发送消息,关闭Producer实例。
      producer.shutdown();
    }
}
复制代码


4.1.2、异步消息


   异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。发送方通过回调接口接收服务器响应,并对响应结果进行处理。


   异步发送一般用于链路耗时较长,对 RT 响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。


public class ASyncProducer {
  public static void main(String[] args) throws Exception {
      // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("xiaolin-producer");
      // 设置NameServer的地址
    producer.setNamesrvAddr("127.0.0.1:9876");
      // 启动Producer实例
        producer.start();
    for (int i = 0; i < 100; i++) {
          // 创建消息,并指定Topic,Tag和消息体
          Message msg = new Message("04-producer-type" /* Topic */,
          "TagA" /* Tag */,
          ("我是异步消息" + i).getBytes("utf-8") /* Message body */
          );
          //发送同步消息到一个Broker
            producer.send(msg, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
          System.out.println("消息发送成功");
          System.out.println(JSON.toJSONString(sendResult));
        }
        @Override
        public void onException(Throwable e) {
          System.out.println("消息发送失败"+e.getMessage());
          System.out.println("处理失败消息");
        }
      });
      }
        // 让线程不要终止,否则会报错
    Thread.sleep(30000000);
      // 如果不再发送消息,关闭Producer实例。
      producer.shutdown();
    }
}
复制代码


4.1.3、单向消息


   单向发送是指发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。


public class OneWayProducer {
    public static void main(String[] args) throws Exception {
        // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("xiaolin-producer");
        // 设置NameServer的地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 启动Producer实例
        producer.start();
        for (int i = 0; i < 100; i++) {
            // 创建消息,并指定Topic,Tag和消息体
            Message msg = new Message("04-producer-type" /* Topic */,
                    "TagA" /* Tag */,
                    ("我是单向消息" + i).getBytes("utf-8") /* Message body */
            );
            //发送单向消息到一个Broker
            producer.sendOneway(msg);
        }
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}
复制代码


4.1.4、三种发送方式的对比


发送方式 发送的时间 发送反馈结果 是否丢失数据
同步发送 不丢失
异步发送 不丢失
单向消息 较快 可能丢失


4.2、顺序消息


   虽然RocketMQ的数据结构是队列,看起来天生支持顺序消息,当只有一个队列的时候,他就天生支持顺序消息,但是Brocket内部有多个队列,发送多条消息的时候,Broker会按照轮询的方式将多个消息放在不同的队列,消费者采用多线程的方式去消费消息,所以无法保证消费消息的方式和发送消息的方式一样的。解决方式是将消息全部发送到一个队列里面。


   比如一个订单的流程是:创建、付款、推送、完成。订单号相同的

   顺序消息是消息队列提供的一种严格按照顺序来发布和消费的消息类型。


1.JPG


/**
 * 订单构建者
 */
public class OrderStep {
    private long orderId;
    private String desc;
    public long getOrderId() {
        return orderId;
    }
    public void setOrderId(long orderId) {
        this.orderId = orderId;
    }
    public String getDesc() {
        return desc;
    }
    public void setDesc(String desc) {
        this.desc = desc;
    }
    @Override
    public String toString() {
        return "OrderStep{" +
                "orderId=" + orderId +
                ", desc='" + desc + '\'' +
                '}';
    }
    public static List<OrderStep> buildOrders() {
        //  1039L   : 创建    付款 推送 完成
        //  1065L   : 创建   付款
        //  7235L   :创建    付款
        List<OrderStep> orderList = new ArrayList<OrderStep>();
        OrderStep orderDemo = new OrderStep();
        orderDemo.setOrderId(1039L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);
        orderDemo = new OrderStep();
        orderDemo.setOrderId(1065L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);
        orderDemo = new OrderStep();
        orderDemo.setOrderId(1039L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);
        orderDemo = new OrderStep();
        orderDemo.setOrderId(7235L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);
        orderDemo = new OrderStep();
        orderDemo.setOrderId(1065L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);
        orderDemo = new OrderStep();
        orderDemo.setOrderId(7235L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);
        orderDemo = new OrderStep();
        orderDemo.setOrderId(1065L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);
        orderDemo = new OrderStep();
        orderDemo.setOrderId(1039L);
        orderDemo.setDesc("推送");
        orderList.add(orderDemo);
        orderDemo = new OrderStep();
        orderDemo.setOrderId(7235L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);
        orderDemo = new OrderStep();
        orderDemo.setOrderId(1039L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);
        return orderList;
    }
}
复制代码


public class Producer {
    public static void main(String[] args) throws Exception {
        //1.创建消息生产者producer,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //2.指定Nameserver地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        //3.启动producer
        producer.start();
        //构建消息集合
        List<OrderStep> orderSteps = OrderStep.buildOrders();
        //发送消息
        for (int i = 0; i < orderSteps.size(); i++) {
            String body = orderSteps.get(i) + "";
            Message message = new Message("OrderTopic", "Order", "i" + i, body.getBytes());
            /**
             * 参数一:消息对象
             * 参数二:消息队列的选择器
             * 参数三:选择队列的业务标识(订单ID)
             */
            SendResult sendResult = producer.send(message, new MessageQueueSelector() {
                /**
                 *
                 * @param mqs:队列集合
                 * @param msg:消息对象
                 * @param arg:业务标识的参数
                 * @return
                 */
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    long orderId = (long) arg;
                    long index = orderId % mqs.size();
                    return mqs.get((int) index);
                }
            }, orderSteps.get(i).getOrderId());
            System.out.println("发送结果:" + sendResult);
        }
        producer.shutdown();
    }
}
复制代码


public class Consumer {
    public static void main(String[] args) throws MQClientException {
        //1.创建消费者Consumer,制定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2.指定Nameserver地址
        consumer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");
        //3.订阅主题Topic和Tag
        consumer.subscribe("OrderTopic", "*");
        //4.注册消息监听器
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("线程名称:【" + Thread.currentThread().getName() + "】:" + new String(msg.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        //5.启动消费者
        consumer.start();
        System.out.println("消费者启动");
    }
}
复制代码


4.3、事务消息


   RocketMQ提供了事务消息,通过事务消息就能达到分布式事务的最终一致。事务消息交互的过程如下:


2.JPG

 

事务消息的基本概念:


  • 半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了RocketMQ服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。
  • 消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,RocketMQ服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该询问过程即消息回查。


   事务消息的发送步骤:


  1. 发送方将半事务消息发送到RocketMQ服务端。
  2. RocketMQ服务端将消息持久化后,向发送方返回确认消息已经发送成功,此时消息为半事务消息。
  3. 发送方开始执行本地事务逻辑。
  4. 发送方根据本地事务执行结果向服务端提交二次确认(Commit或者是Rollback),服务端收到Commit状态则将事务消息标记为可投递,订阅方最终将收到该消息。服务端如果收到的是Rollback状态则删除半事务消息,订阅方将不会接收该消息。

   事务消息回查步骤:

  1. 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。
  2. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  3. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行操作。


4.4、延时消息


   比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。我们就可以使用延时消息来完成这个功能。


   延时消息的使用限制,现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18。


private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
复制代码

 

 消息消费失败会进入延时消息队列,消息发送时间与设置的延时等级和重试次数有关。


4.4.1、生产者


public class ScheduledMessageProducer {
    public static void main(String[] args) throws Exception {
        // 实例化一个生产者来产生延时消息
        DefaultMQProducer producer = new DefaultMQProducer("wolfcode-producer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 启动生产者
        producer.start();
        Message message = new Message("06-delay", ("delay message").getBytes());
        // 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
        message.setDelayTimeLevel(3);
        // 发送消息
        producer.send(message);
        // 关闭生产者
        producer.shutdown();
    }
}
复制代码


4.4.2、消费者


public class ScheduledMessageConsumer {
   public static void main(String[] args) throws Exception {
       DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("wolfcode_consumer");
       consumer.setNamesrvAddr("127.0.0.1:9876");
       // 订阅Topics
       consumer.subscribe("06-delay", "*");
      // 注册消息监听者
      consumer.registerMessageListener(new MessageListenerConcurrently() {
          @Override
          public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
              for (MessageExt message : messages) {
                  System.out.println("Receive message[msgId=" + message.getMsgId() + "] " );
              }
              return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
          }
      });
      // 启动消费者
      consumer.start();
  }
}
复制代码


4.5、消息的过滤


   在消费消息的时候,我们可以指定消费哪些消息,这个时候就需要用到消息的过滤,他分为1两种过滤:


  1. 通过标签过滤。
  2. 通过SQL语句的方式过滤。


4.5.1、通过标签过滤


4.5.1.1、生产者


public class Producer {
    public static void main(String[] args) throws Exception {
        //1.创建消息生产者producer,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //2.指定Nameserver地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        //3.启动producer
        producer.start();
        for (int i = 0; i < 3; i++) {
            //4.创建消息对象,指定主题Topic、Tag和消息体
            /**
             * 参数一:消息主题Topic
             * 参数二:消息Tag
             * 参数三:消息内容
             */
            Message msg = new Message("FilterTagTopic", "Tag2", ("消息的过滤" + i).getBytes());
            //5.发送消息
            SendResult result = producer.send(msg);
            //发送状态
            SendStatus status = result.getSendStatus();
            System.out.println("发送结果:" + result);
            //线程睡1秒
            TimeUnit.SECONDS.sleep(1);
        }
       //6.关闭生产者producer
        producer.shutdown();
    }
}
复制代码


4.5.1.2、消费者


public class Consumer {
    public static void main(String[] args) throws Exception {
        //1.创建消费者Consumer,制定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2.指定Nameserver地址
        consumer.setNamesrvAddr("127.0.0.1:9876");
        //3.订阅主题Topic和Tag
        consumer.subscribe("FilterTagTopic", "Tag1 || Tag2 ");
        //4.设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            //接受消息内容
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5.启动消费者consumer
        consumer.start();
        System.out.println("消费者启动");
    }
}
复制代码


4.5.2、通过SQL语句过滤


   RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。


  • 数值比较,比如:>,>=,<,<=,BETWEEN,=
  • 字符比较,比如:=,<>,IN
  • IS NULL 或者 IS NOT NULL
  • 逻辑符号 AND,OR,NOT

   常量支持类型为:

  • 数值,比如:123,3.1415
  • 字符,比如:'abc',必须用单引号包裹起来
  • NULL,特殊的常量
  • 布尔值,TRUEFALSE


4.5.2.1、消息生产者


   发送消息时,你能通过putUserProperty来设置消息的属性。


DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
Message msg = new Message("TopicTest",
   tag,
   ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 设置一些属性
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);
producer.shutdown();
复制代码


4.5.2.2、消息消费者


   用MessageSelector.bySql来使用sql筛选消息


DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// 只有订阅的消息有这个属性a, a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
   @Override
   public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
       return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
   }
});
consumer.start();



相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
8月前
|
物联网
MQTT常见问题之用单片机接入阿里MQTT实例失败如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
3月前
|
消息中间件 Java 数据库
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
这里 借助 Seata 集成 RocketMQ 事务消息的 新功能,介绍一下一个新遇到的面试题:如果如何实现 **强弱一致性 结合**的分布式事务?
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
|
3月前
|
消息中间件 存储 canal
阿里面试:canal+MQ,会有乱序的问题吗?
本文详细探讨了在阿里面试中常见的问题——“canal+MQ,会有乱序的问题吗?”以及如何保证RocketMQ消息有序。文章首先介绍了消息有序的基本概念,包括全局有序和局部有序,并分析了RocketMQ中实现消息有序的方法。接着,针对canal+MQ的场景,讨论了如何通过配置`canal.mq.partitionsNum`和`canal.mq.partitionHash`来保证数据同步的有序性。最后,提供了多个与MQ相关的面试题及解决方案,帮助读者更好地准备面试,提升技术水平。
阿里面试:canal+MQ,会有乱序的问题吗?
|
5月前
|
消息中间件 人工智能 监控
|
消息中间件 中间件 Kafka
限时开源!阿里内部消息中间件合集:MQ+Kafka+体系图+笔记
近好多小伙伴说在准备金三银四的面试突击了,但是遇到消息中间件不知道该怎么学了,问我有没有成体系的消息中间件的学习方式。 额,有点不知所措,于是乎小编就想着做一次消息中间件的专题,归类整理了一些纯手绘知识体系图、面试以及相关的学习笔记。
239 1
|
消息中间件 程序员 Apache
阿里RocketMQ创始人首次分享出这份RocketMQ技术内幕神级架构手册
RocketMQ的发展史? RocketMQ的开源正是源于对这种开源文化的认同,开放是为了更好的协同创新,并将这一技术推向新的高度。在经历了阿里巴巴集团内部多年“双11”交易核心链路工业级场景在验证,2016年11月,团队将RocketMQ捐献给全球享有盛誉的Apache软件基金会正式质为孵化项目。 至此,RocketMQ开启了迈向全球顶级开源软件的新征程。
|
消息中间件 存储 Java
阿里二面:RocketMQ 消费失败了,怎么处理?
阿里二面:RocketMQ 消费失败了,怎么处理?
539 2
阿里二面:RocketMQ 消费失败了,怎么处理?
|
消息中间件 缓存 算法
阿里二面:RocketMQ 消息积压了,增加消费者有用吗?
阿里二面:RocketMQ 消息积压了,增加消费者有用吗?
267 0
阿里二面:RocketMQ 消息积压了,增加消费者有用吗?
|
消息中间件 运维 Kubernetes
阿里的 RocketMQ 如何让双十一峰值之下 0 故障?
2020 年双十一交易峰值达到 58.3 W 笔/秒,消息中间件 RocketMQ 继续数年 0 故障丝般顺滑地完美支持了整个集团大促的各类业务平稳。
184 0
阿里的 RocketMQ 如何让双十一峰值之下 0 故障?
|
消息中间件 存储 负载均衡
阿里IM技术分享(九):深度揭密RocketMQ在钉钉IM系统中的应用实践
在钉钉的IM中,我们通过 RocketMQ实现了系统解耦、异步削峰填谷,还通过定时消息实现分布式定时任务等高级特性。同时与 RocketMQ 深入共创,不断优化解决了很多RocketMQ本身的问题,并且孵化出 POP 消费模式等新特性,使 RocketMQ 能够完美支持对性能稳定性和时延要求非常高的 IM 系统。本文将为你分享这些内容。
438 0
阿里IM技术分享(九):深度揭密RocketMQ在钉钉IM系统中的应用实践