wordcount设计与优化

简介:

原文档见:http://gitlab.alibaba-inc.com/middleware/coding4fun-3rd/blob/master/observer.hany/design.md

  • 淘宝中间件第三期编程比赛,题意概述:读入一个文件,统计其中最常出现的前 10 个单词。

系统设计

  • 按照题意,可设计如下简单拓扑图。

0-简单拓扑图

  • 图中方块表示计算节点箭头表示数据流动
    注意: Counter 和 Selector 之间需要设置一道栅栏 ,所有单词统计完毕后才能开始筛选单词。

优化1:同步 OR 异步

  • Reader 是 IO 集中型操作,其他计算节点都是 CPU 集中型操作。
    如果先读完文件再操作,读文件的这段时间 CPU 就白白空闲着浪费掉了。

  • 简单的优化就是异步读文件。
    增加一个后台 Task 线程,Reader 每读取一小块文件数据(Chunk),
    就交给 Task 线程处理,Reader 继续读下一个 Chunk 的时候,Task 已经跑起来了,
    一个占用 IO,一个占用 CPU,充分利用计算机资源。

1-异步读文件

  • 通过异步读文件,Reader 和 Task 能够并发处理数据,提高性能。

实现细节

  • ChunkedTextReader 实现按 Chunk 分块读文件。
    为了避免 Chunk 边界意外将一个单词拆成两半,
    除最后一个 Chunk 外,每个 Chunk 都将末尾的最后一个单词切开,
    拼接到下一个 Chunk 的前面,让下一个 Chunk 处理。
  • Reader 和 Task 之间通过 BlockingQueue 传输数据,
    这是一个线程安全的 "生产者-消费者" 队列。
  • 经测试,Chunk 分块太小队列操作过于频繁,性能下降。
    分块太大读文件阻塞太久,达不到异步读的目的,
    因此默认限制 Chunk 最小 1MB,最大 8MB。

优化2:并发 OR 并发

  • 读文件的速度比处理文件的速度快的多,一个线程 CPU 跑到 100% 也是远远处理不过来。
    测试机有 16 个核,可创建多个并发的 Task 线程,将每个核都利用起来。
    由于 Task 是高度 CPU 密集型操作,默认取 Task 线程数等于 CPU 核数。

2-并发处理

  • 栅栏控制所有数据处理完成才能开始按词频选择单词。

实现细节

  • ConcurrentBlockingQueueExecutor 管理所有 Task 线程,
    executor 在每个线程上等待线程结束,实现栅栏同步。
  • ConcurrentBlockingQueueTask 实现 Task 线程处理流程。
  • Reader 读完文件后在 executor 上设置 done 标识位,
    Task 发现 queue 为空且 executor 设置了 done 标志位,
    则说明文件已经读完并处理完,task 结束。
  • ConcurrentTrieNode 实现了线程安全的 Trie 树。

语言细节

  • 在 Java 实现中,
    ConcurrentBlockingQueueTask 
    ConcurrentBlockingQueueExecutor 互相依赖,
    但 C++ 不能处理互相依赖,
    所以将 task 对 executor 的依赖剥离到
    ConcurrentBlockingQueueExecutorSupport 中,
    避免互相依赖的问题。
    C++程序员通常使用前置声明、分离实现等办法解决互相依赖问题。
  • 程序先用 Java 设计开发完成,再逐个类翻译成 C++。
    编码尽量遵守 Java 约定,
    每个类放到独立的文件,方法实现直接写在头文件的类声明中,".cpp" 文件基本都是空的。
    排除 ".cpp" 文件,文件数量就少一半了,嘿嘿~~
    简单场景还能用 C++ 模拟一下Java,复杂场景就只能用 Java 了。

优化3:双保险模式避免加锁

  • DANGEROUS双保险模式已经被证明是不可靠的,禁止在生产代码中使用。
  • UPDATE@齐楠 @宏江 指出, jdk 1.5 之后加上 volatile 关键字双保险模式是可用的。早期版本不行。
ConcurrentTrieNode* getChild(char c) {
    int const index = c - 'a';
    if (children[index] == NULL) {
        synchronized: {
            Locker locker(childrenLock);
            if (children[index] == NULL) {
                children[index] = (ConcurrentTrieNode*) calloc(1, sizeof(ConcurrentTrieNode));
            }
        }
    }
    return children[index];
}

语言细节

  • ConcurrentTrieNode 是一个简单 struct, 不包含虚函数和复杂对象字段,
    其构造函数只是简单地将所有字段(包括Lock)初始化为 0。
    使用 calloc(1, sizeof(ConcurrentTrieNode)) 直接分配一块 0 初始化的内存,
    calloc 返回内存地址时,已经得到一个合法初始化的 ConcurrentTrieNode 对象,
    而不必调用构造函数。

优化4: 原子操作避免加锁

  • 并发统计 count 时,将 count 字段声明为 volatile(),
/**
 * Word 出现的次数.
 */
volatile int count;
  • 使用原子操作实现线程安全并避免加锁(),提高性能。
// atomic_inc
__asm__ __volatile__(
        "lock ; " "incl %0"
        :"=m" (node->count)
        :"m" (node->count));

优化5:统计单词结束后再过滤排除单词

  • 程序要求排除一些单词,在统计前排除,每个分词都要判断一次。
    统计结束再排除,相同单词已经合并,减少判断,性能更高。

总结

  • 优化过程中曾想过各种方案,
    比如并发 merge sort 排序再处理,每线程一个 Map 最后再合并等,
    结果发现使用 ConcurrentHashMap 不但编程复杂度明显简单,性能还更加理想。
    再一次证明, 最简单的方案往往就是最好的方案 
    不仅从开发维护的角度来看,有时从性能角度来看也是这样。
    Java 的 ConcurrentHashMap 性能相当赞,并发环境首选啊。

  • 程序开始是用 Java ConcurrentHashMap 实现的。
    为了提升性能翻译成 C++,过程可谓大费周折,相当痛苦,我会告诉你我大半夜还在调 segmental fault 吗?
    C++ 没有 ConcurrentHashMap,实现 ConcurrentTrie 相对简单,所以选择了 Trie。
    很多同学采用 Java 实现性能也非常好,相当赞!

相关文章
|
4月前
|
分布式计算
MapReduce中的Shuffle过程是什么?为什么它在性能上很关键?
MapReduce中的Shuffle过程是什么?为什么它在性能上很关键?
45 0
|
9月前
|
数据采集 分布式计算 搜索推荐
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(一)
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(一)
|
5月前
|
存储 分布式计算 大数据
【大数据技术Hadoop+Spark】Spark RDD设计、运行原理、运行流程、容错机制讲解(图文解释)
【大数据技术Hadoop+Spark】Spark RDD设计、运行原理、运行流程、容错机制讲解(图文解释)
81 0
|
5月前
|
分布式计算 数据处理 Spark
Spark【RDD编程(四)综合案例】
Spark【RDD编程(四)综合案例】
|
9月前
|
数据采集 缓存 分布式计算
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(二)
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(二)
|
10月前
|
分布式计算
MapReduce 的原理、流程【重要】
MapReduce 的原理、流程【重要】
88 0
|
分布式计算 Hadoop Java
Mapreduce实验之wordcount
利用hadoop函数,标准输出输出堆中的k个单词与频次。
Mapreduce实验之wordcount
|
分布式计算 资源调度 并行计算
|
分布式计算 资源调度 Hadoop
MapReduce 优化方法|学习笔记
快速学习 MapReduce 优化方法
175 0
|
分布式计算 Hadoop Java
WordCount 案例分析| 学习笔记
快速学习 WordCount 案例分析
107 0
WordCount 案例分析| 学习笔记