Storm-源码分析-LocalState (backtype.storm.utils)

LocalState

A simple, durable, atomic K/V database. *Very inefficient*, should only be used for occasional reads/writes. Every read/write hits disk.

基于map实现, 每次读写都需要从磁盘上将数据读出, 并反序列化成map, 这个过程称为snapshot. 所以说是比较简单和低效的, 只能用于读取配置或参数, 这种偶尔读取的场景.

    public synchronized Map<Object, Object> snapshot() throws IOException {
        int attempts = 0;
        while(true) {
            String latestPath = _vs.mostRecentVersionPath();
            if(latestPath==null) return new HashMap<Object, Object>();
            try {
                return (Map<Object, Object>) Utils.deserialize(FileUtils.readFileToByteArray(new File(latestPath)));
            } catch(IOException e) {
                attempts++;
                if(attempts >= 10) {
                    throw e;
                }
            }
        }

读写操作都是基于map的操作, get和put, 但是put需要做persist操作. 
这里使用synchronized来做对象的线程间同步, 对于一个LocalState对象, 所有synchronized标有的函数只能被串行操作.

    public Object get(Object key) throws IOException {
        return snapshot().get(key);
    }
    public synchronized void put(Object key, Object val, boolean cleanup) throws IOException {
        Map<Object, Object> curr = snapshot();
        curr.put(key, val);
        persist(curr, cleanup);
    }

当然不止这么简单, 为了达到atomic, 还使用了VersionedStore, 参考下一章 
persist不会去update现有的文件, 而是不断的产生递增version的文件, 故每一批更新都会产生一个新的文件

把需要写入的数据序列化 
创建新的versionfile的path 
把数据写入versionfile 
调用succeedVersion, 创建tokenfile以标志versionfile的写入完成 
清除旧版本, 只保留4个版本

    private void persist(Map<Object, Object> val, boolean cleanup) throws IOException {
        byte[] toWrite = Utils.serialize(val);
        String newPath = _vs.createVersion();
        FileUtils.writeByteArrayToFile(new File(newPath), toWrite);
        _vs.succeedVersion(newPath);
        if(cleanup) _vs.cleanup(4);
    }

 

VersionedStore

    public VersionedStore(String path) throws IOException {
      _root = path;
      mkdirs(_root);
    }

这个store, 其实就是_root目录下的一堆文件 
文件分两种, 
VersionFile, _root + version, 真正的数据存储文件 
TokenFile, _root + version + “.version”, 标志位文件, 标志version文件是否完成写操作, 以避免读到正在更新的文件 

getAllVersions就是读出所有_root目录下的所有完成写操作的文件, 读出version, 并做从大到小的排序

    public List<Long> getAllVersions() throws IOException {
        List<Long> ret = new ArrayList<Long>();
        for(String s: listDir(_root)) {
            if(s.endsWith(FINISHED_VERSION_SUFFIX)) {
                ret.add(validateAndGetVersion(s));
            }
        }
        Collections.sort(ret);
        Collections.reverse(ret);
        return ret;
    }

 

找到最新的版本文件

    public Long mostRecentVersion() throws IOException {
        List<Long> all = getAllVersions();
        if(all.size()==0) return null;
        return all.get(0);

 

创建新版本号, 用当前时间作为version

    public String createVersion() throws IOException {
        Long mostRecent = mostRecentVersion();
        long version = Time.currentTimeMillis();
        if(mostRecent!=null && version <= mostRecent) {
            version = mostRecent + 1;
        }
        return createVersion(version);
    }

    public String createVersion(long version) throws IOException {
        String ret = versionPath(version);
        if(getAllVersions().contains(version))
            throw new RuntimeException("Version already exists or data already exists");
        else
            return ret;
    }
 

创建tokenfile, 以标记versionfile写完成

    public void succeedVersion(String path) throws IOException {
        long version = validateAndGetVersion(path);
        // should rewrite this to do a file move
        createNewFile(tokenPath(version));
    }
 

清除旧的版本, 只保留versionsToKeep个, 清除操作就是删除versionfile和tokenfile

    public void cleanup(int versionsToKeep) throws IOException {
        List<Long> versions = getAllVersions();
        if(versionsToKeep >= 0) {
            versions = versions.subList(0, Math.min(versions.size(), versionsToKeep));
        }
        HashSet<Long> keepers = new HashSet<Long>(versions);

        for(String p: listDir(_root)) {
            Long v = parseVersion(p);
            if(v!=null && !keepers.contains(v)) {
                deleteVersion(v);
            }
        }
    }

本文章摘自博客园,原文发布日期:2013-07-11
上一篇:oracle创建用户、表空间、授权


下一篇:Storm源码浅析之topology的提交