Storm-源码分析-LocalState (backtype.storm.utils)

简介:

LocalState

A simple, durable, atomic K/V database. *Very inefficient*, should only be used for occasional reads/writes. Every read/write hits disk.

基于map实现, 每次读写都需要从磁盘上将数据读出, 并反序列化成map, 这个过程称为snapshot. 所以说是比较简单和低效的, 只能用于读取配置或参数, 这种偶尔读取的场景.

    public synchronized Map<Object, Object> snapshot() throws IOException {
        int attempts = 0;
        while(true) {
            String latestPath = _vs.mostRecentVersionPath();
            if(latestPath==null) return new HashMap<Object, Object>();
            try {
                return (Map<Object, Object>) Utils.deserialize(FileUtils.readFileToByteArray(new File(latestPath)));
            } catch(IOException e) {
                attempts++;
                if(attempts >= 10) {
                    throw e;
                }
            }
        }

读写操作都是基于map的操作, get和put, 但是put需要做persist操作. 
这里使用synchronized来做对象的线程间同步, 对于一个LocalState对象, 所有synchronized标有的函数只能被串行操作.

    public Object get(Object key) throws IOException {
        return snapshot().get(key);
    }
    public synchronized void put(Object key, Object val, boolean cleanup) throws IOException {
        Map<Object, Object> curr = snapshot();
        curr.put(key, val);
        persist(curr, cleanup);
    }

当然不止这么简单, 为了达到atomic, 还使用了VersionedStore, 参考下一章 
persist不会去update现有的文件, 而是不断的产生递增version的文件, 故每一批更新都会产生一个新的文件

把需要写入的数据序列化 
创建新的versionfile的path 
把数据写入versionfile 
调用succeedVersion, 创建tokenfile以标志versionfile的写入完成 
清除旧版本, 只保留4个版本

    private void persist(Map<Object, Object> val, boolean cleanup) throws IOException {
        byte[] toWrite = Utils.serialize(val);
        String newPath = _vs.createVersion();
        FileUtils.writeByteArrayToFile(new File(newPath), toWrite);
        _vs.succeedVersion(newPath);
        if(cleanup) _vs.cleanup(4);
    }

 

VersionedStore

    public VersionedStore(String path) throws IOException {
      _root = path;
      mkdirs(_root);
    }

这个store, 其实就是_root目录下的一堆文件 
文件分两种, 
VersionFile, _root + version, 真正的数据存储文件 
TokenFile, _root + version + “.version”, 标志位文件, 标志version文件是否完成写操作, 以避免读到正在更新的文件 

getAllVersions就是读出所有_root目录下的所有完成写操作的文件, 读出version, 并做从大到小的排序

    public List<Long> getAllVersions() throws IOException {
        List<Long> ret = new ArrayList<Long>();
        for(String s: listDir(_root)) {
            if(s.endsWith(FINISHED_VERSION_SUFFIX)) {
                ret.add(validateAndGetVersion(s));
            }
        }
        Collections.sort(ret);
        Collections.reverse(ret);
        return ret;
    }

 

找到最新的版本文件

    public Long mostRecentVersion() throws IOException {
        List<Long> all = getAllVersions();
        if(all.size()==0) return null;
        return all.get(0);

 

创建新版本号, 用当前时间作为version

    public String createVersion() throws IOException {
        Long mostRecent = mostRecentVersion();
        long version = Time.currentTimeMillis();
        if(mostRecent!=null && version <= mostRecent) {
            version = mostRecent + 1;
        }
        return createVersion(version);
    }

    public String createVersion(long version) throws IOException {
        String ret = versionPath(version);
        if(getAllVersions().contains(version))
            throw new RuntimeException("Version already exists or data already exists");
        else
            return ret;
    }
 

创建tokenfile, 以标记versionfile写完成

    public void succeedVersion(String path) throws IOException {
        long version = validateAndGetVersion(path);
        // should rewrite this to do a file move
        createNewFile(tokenPath(version));
    }
 

清除旧的版本, 只保留versionsToKeep个, 清除操作就是删除versionfile和tokenfile

    public void cleanup(int versionsToKeep) throws IOException {
        List<Long> versions = getAllVersions();
        if(versionsToKeep >= 0) {
            versions = versions.subList(0, Math.min(versions.size(), versionsToKeep));
        }
        HashSet<Long> keepers = new HashSet<Long>(versions);

        for(String p: listDir(_root)) {
            Long v = parseVersion(p);
            if(v!=null && !keepers.contains(v)) {
                deleteVersion(v);
            }
        }
    }

本文章摘自博客园,原文发布日期:2013-07-11
目录
相关文章
|
前端开发 JavaScript API
Axure实战22:使用Axure和CSS实现渐变色背景
Axure实战22:使用Axure和CSS实现渐变色背景
954 0
Axure实战22:使用Axure和CSS实现渐变色背景
|
JSON 前端开发 JavaScript
【面试题】 面试官:你如何实现大文件上传
【面试题】 面试官:你如何实现大文件上传
337 0
|
3月前
|
监控 JavaScript Java
2025版基于springboot的校园打印社管理系统
本系统旨在解决传统校园打印社管理效率低、排队时间长、耗材管理难等问题,集成订单管理、设备监控、耗材统计、线上预约、自助打印与在线支付等功能,提升运营效率与用户体验,助力校园信息化建设。
|
4月前
|
运维 Kubernetes Cloud Native
云原生运维也能很稳:Kubernetes 运维避坑指南
云原生运维也能很稳:Kubernetes 运维避坑指南
151 1
|
9月前
|
druid Java 数据库连接
【YashanDB 知识库】druid 连接池做断网测试,无法自动重新连接
【YashanDB 知识库】druid 连接池做断网测试,无法自动重新连接
|
机器学习/深度学习 安全 网络安全
【计算巢】数字取证:追踪和分析网络犯罪的方法
【6月更文挑战第4天】本文探讨了数字取证在网络安全中的关键作用,通过Python编程展示如何分析网络日志以发现线索。数字取证利用科学方法收集、分析电子数据,以应对黑客入侵、数据泄露等网络犯罪。文中提供的Python代码示例演示了从服务器日志中提取IP地址并统计访问次数,以识别异常行为。此外,实际的数字取证还包括数据恢复、恶意软件分析等复杂技术,并需遵循法律程序和伦理标准。随着技术发展,数字取证将更有效地保障网络空间的和平与秩序。
351 5
|
XML 存储 Java
Apache POI 实现用Java操作Excel完成读写操作
Apache POI 实现用Java操作Excel完成读写操作
uniapp 登录
uniapp 登录
215 0
|
机器学习/深度学习 数据挖掘 数据处理
强化学习在数据分析中的应用:使用Python制定决策策略
【4月更文挑战第12天】本文介绍了使用Python进行强化学习以制定数据分析决策策略的方法。强化学习是通过智能体与环境交互获取奖励来制定决策的技术。Python在强化学习中有丰富库支持(如TensorFlow、PyTorch、Keras)、强大的数据处理能力和丰富的生态系统。基本流程包括环境构建(使用OpenAI Gym)、模型选择(如神经网络)、策略选择(Q-Learning等)、训练模型及评估模型。
414 3
|
SQL Java 关系型数据库
hive中 sql执行过程
hive中 sql执行过程
389 0