什么?CPU消耗要压降80% ——《事件序列化CPU开销压降》揭榜

简介: 本文为《事件CPU开销压降》揭榜报告,旨在解决风控系统间信息传递时事件体持续膨胀导致的序列化/反序列化CPU消耗过高的问题。

本文为《事件CPU开销压降》揭榜报告,同时也可以泛化为通用的SOFA RPC对复杂对象序列化性能优化方法:在确保反序列化结果正确性100%的前提下,通过前置自定义序列化,对高频对象进行业务字典压缩替换,并解决SOFA RPC默认的Hessian反序列化大byte[ ]性能问题,最终达到压缩序列化CPU消耗数倍的目的。



先上一张揭榜结果性能对比图:最终优化效果序列化CPU耗时相比默认Hessian降低至的20%左右,实现目标要求的5倍提升。


image.png


1.背景描述

风控各系统间均以事件方式进行信息传统,多年以来事件体持续膨胀,以交易事件为例,多个系统间传输平均大小为20K~50K,极端情况会出现>1M的情况。

目标要求:在保障风控引擎消费时最终事件信息不变和 RT 不增加的前提下,将事件的序列化/反序列 CPU 消耗降低80%。

好,让我们开始动手揭榜吧~


2.问题分析与优化方向调研

2.1 前置约束

由于本次揭榜问题是源于线上实践,解法需要最终能落地、能部署上线,所以有几个默认的约束:

  • 约束1:数据正确性需要100%:即反序列化(或解压缩)之后数据字段值、字段类型(含泛型签名等)、引用关系(含循环引用等)应该和原来RPC反序列化一致,如果正确性无法保证,则后续优化无意义。
  • 约束2:需要基于现有的SOFA协议:路由、限流切面、trace等都需要继续能用。
  • 约束3:对接入方的SOFA版本等不能有高版本要求:如果接入方需要必须升级到特定SOFA版本,则会对接入产生巨大障碍。例如:最新版SOFABoot支持Fury和Protobuf等(SOFARPC序列化配置),但是要求接入方都是最新SOFABoot版本,我们的安全服务对接的上游系统众多,如果对上游有高版本要求将会成为接入阻碍。
  • 约束4:接入、优化方式不能太复杂,需要能开箱即用。


2.2优化方向-要解决的几个问题:

2.2.1确定可以优化的腾挪空间

现有RPC序列化的链路很简单,流程示意:

image.png


能够腾挪的空间并不大,大致方向即:

  • 减少SOFARPC默认Hessian序列化/反序列化的消耗
  • 在此过程中的额外消耗增加要远小于前述减少的
  • 抵消后能形成较大正收益

2.2.2默认序列化是否能被替换、或还有优化空间?

  • 理论上可以替换,但是考虑到RPC兼容性,就有障碍了:除了接口参数,SOFA中的方法签名等等RPC参数也都走的Hessian,路由、限流等依赖这个,还要考虑上下游版本兼容性。


  • 不替换:也有优化空间,可以尝试换个其他框架序列化好后,拿序列化结果送给RPC让Hessian再搞一次。


2.2.3传输对象的内容是否还有压缩空间?

初步肉眼观察,感觉properties、extendData、baseInfoData这3个Map大字段及subEvents子事件字段较大,有一定的数据重复度。统计量化到两个比较重要的数据:


  • Map的Key重复度很大:Key的重复度97%


  • Map的Key长度占比很长:Key的长度比Value(只算String类的Value)还长很多,Key的长度占比接近60%。


个人觉得这也很好理解:CTU事件对接的上游尽管很多,但是每个上游需要传递的参数相对是固定的。Map中还有List和Map的嵌套,以及上游可能会把一些List<DTO>也转成Map再放到Map中。


一些考古发现:


属性的Key中,除了常用的英文驼峰命名的Key之外,还有很多是以数字组成的String,例如"118","155"。

image.png

目测是随着历史发展、接入方越来越多,前面大佬也注意到了Key的大小问题,并对常用的Key进行了字符串映射压缩优化,这已经是在EventDTO这个Map的泛型定义的基础上最好的优化了,在一定程度上遏制了事件属性变大的程度。


本次优化方向类似,但是不改变原对象,只在序列化中作额外动作。


2.3最终优化方案

在确保反序列化结果正确性100%的前提下:

  • 步骤1:在SOFA RPC默认序列化前置自定义序列化。
  • 步骤2:在步骤1过程中,对高频String压缩替换。
  • 步骤3:解决SOFA RPC默认的Hessian反序列化大byte[ ]性能问题。

image.png


3.优化详细步骤


3.1把复杂对象变为简单对象

主要思路:把复杂对象(DTO+Map)变为简单对象(byte[ ])。转换完成后,再将byte[ ]通过现有的RPC框架发送接收。


这一步骤的主要工作,就是选一个高效的序列化框架,然后接入进来,写个Util类。这里经过简单对比,选择了蚂蚁自己的Fury框架,封装一个简单的Util类,提供序列化和反序列化方法:

Fury Util

public class ZipFuryUtil {

    static final ThreadLocal<Fury> furyThreadLocal = new ThreadLocal<>();

    /**
     * 序列化操作
     *
     * @param obj
     * @return
     */
    public static byte[] serializeObjectToByteArray(Object obj) {
        byte[] serializedBytes = getFury().serialize(obj);
        return serializedBytes;
    }

    /**
     * 反序列化操作
     *
     * @param bytes
     * @return
     */
    public static EventDTO deserializeByteArrayToEvent(byte[] bytes) {
        EventDTO event = (EventDTO) getFury().deserialize(bytes);
        return event;
    }

    static ThreadSafeFury threadSafeFury = null;

    public static ThreadSafeFury getFury() {
        if (threadSafeFury == null) {
            synchronized (ZipFuryPlusUtil.class) {
                if (threadSafeFury == null) {
                    threadSafeFury = new ThreadPoolFury(
                            classLoader -> {
                                Fury fury = Fury.builder()
                                        .withRefTracking(true)
                                        .withCompatibleMode(CompatibleMode.SCHEMA_CONSISTENT)
                                        .build();

                                fury.registerSerializer(JSONArray.class, new ArrayListSerializer(fury));
                                fury.registerSerializer(JSONObject.class, new HashMapSerializer(fury));
                                fury.registerSerializer(Map.class, new HashMapSerializer(fury));
                                fury.registerSerializer(Collection.class, new ArrayListSerializer(fury));

                                fury.register(BaseDTO.class);
                                fury.register(EventDTO.class);
                                fury.register(FastEventDTO.class);
                                return fury;

                            },
                            100,
                            100,
                            300,
                            TimeUnit.SECONDS);
                }
            }
        }
        return threadSafeFury;
    }

}


技术选型备注:


  • 也测试过其他一些框架,例如Kryo框架:CPU比Fury消耗高30%左右,可以深度定制化改造Kryo源码,性能向Fury靠拢,但是这种自定义以后将难以维护(可能不兼容Kryo新版)。


  • 是不是可以自己再实现一个:实现一个新的序列化框架并不是本揭榜的核心,不如直接站在巨人肩膀上。


另外,有个前提假设:SOFA RPC对简单类型传输更加高效(备注:这个假设的方向没错,但是后来测试发现Hessian反序列化时有个坑,导致差点翻车,详见第3步优化)。

这步骤完成后,效果就有了:直接降低70%(不过别高兴的太早,有坑)


image.png


3.2对象内容进行业务压缩

主要方法:生成一份全局数据字典。在序列化过程中“注入”一个业务逻辑:遇到字典中的对象,替换为字典的Short索引。

说明:


  • 字典数据来源:目前只针对测试20条数据简单统计,真正上线需要人工校对一下并确定字典顺序。


  • 正确性100%:如果线上流量的key不在字典中,也可按原样正常序列化,无正确性问题。


  • 使用Short索引,较节省空间:仅用正数位支持32768个字典对象(前面问题分析统计不重复Key为591个,加上部分可枚举的Value值不超过2000个)。


  • 数据压缩替换除了对Key适用,也对Value适用:观察Value中无Short,不会冲突,如有,可取消对Value的压缩,影响不大。


  • 此项技术应用范围:其实不止适用于特定Fury框架,其他的如Kryo、Hessian,甚至FastJson等,此思路和方法都是适用的。


关键技术:不改变原Map对象,不生成新Map对象,只在byte[ ]结果生成之前“偷偷”修改。能够克服下面两种替换方式的缺点。

  • 如果修改原Map对象,缺点:① 序列化结束后还得修改回去,否则会引发严重数据正确性问题。② 对Map的修改,会产生多次map的put操作甚至resize操作,额外增加很多耗时。③ 修改原对象本身就是额外增加对所有Key遍历一遍,而且还需要处理好循环引用问题。
  • 生成新的Map对象来存储一个临时的压缩后结果,缺点基本同上,也需要进行多次put操作增加不必要耗时。

详细步骤如下:

3.2.1生成Key字典:

复用签名Key统计的代码,得到所有字典Key:

获取字典Key的代码


package com.alipay.securityservice.decision.util;

import com.alibaba.fastjson.JSON;
import com.alipay.ctu.service.event.model.EventDTO;
import org.junit.Test;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class ZipUtilTools {

    int  totalCount    = 0;
    int  countKey      = 0;
    int  countValue    = 0;
    long lengthKey     = 0;
    long lengthContent = 0;

    public static boolean isStringAscii(String str) {
        for (int i = 0; i < str.length(); i++) {
            if (str.charAt(i) > 127) {
                return false;
            }
        }
        return true;
    }

    private void addKeySet(Set<String> keySet, Map map, int depth, String path) {
        if (depth > 3) {
            return;
        }
        for (Object key : map.keySet()) {
            if (key instanceof String) {
                countKey++;
                String keyStr = (String) key;
                if (!(keyStr.length() == 16 && keyStr.startsWith("2088"))) {
                    if (isStringAscii(keyStr)) {
                        keySet.add(keyStr);
                    }
                }
                totalCount++;
                lengthKey += keyStr.length();
            }
            Object value = map.get(key);

            // 如果Value也进入压缩字典
            //if (value instanceof String) {
            //    String str = (String) value;
            //    if (str.length() <= 32 && (!str.startsWith("2088"))) {
            //        if (isStringAscii(str)) {
            //            keySet.add(str);
            //        }
            //    }
            //    totalCount++;
            //    lengthKey += str.length();
            //}

            if (value instanceof String) {
                countValue++;
                lengthContent += ((String) value).length();
            }
            if (value instanceof List) {
                List list = (List) value;
                for (Object o : list) {
                    if (o instanceof Map) {
                        addKeySet(keySet, (Map) o, depth + 1, path + "." + key);
                    }
                }
            }

            if (value instanceof Map) {
                addKeySet(keySet, (Map) map.get(key), depth + 1, path + "." + key);
            }
        }
    }

    @Test
    public void findMapKeys() {
        Set<String> keySet = new HashSet<>();

        for (String eventId : EventCatagory.eventCatagory.keySet()) {
            EventDTO event = EventCatagory.eventCatagory.get(eventId);
            addKeySet(keySet, event.getExtendData(), 0, "");
            addKeySet(keySet, event.getBaseInfoData(), 0, "");
            addKeySet(keySet, event.getEventProperties(), 0, "");
            for (EventDTO subEvent : event.getSubEvents()) {
                addKeySet(keySet, subEvent.getExtendData(), 0, "");
                addKeySet(keySet, subEvent.getBaseInfoData(), 0, "");
                addKeySet(keySet, subEvent.getEventProperties(), 0, "");
            }
        }
        System.out.println("不重复Key数量:         " + keySet.size());
        System.out.println("所有Key数量:          " + countKey);
        System.out.println("所有Key String长度:   " + lengthKey);
        System.out.println("所有Value String长度: " + lengthContent);

        System.out.println(JSON.toJSONString(keySet));
    }
}


根据结果,搞一个简单的固定的字典:

EventDict 字典类完整代码


/*
 * Ant Group
 * Copyright (c) 2004-2024 All Rights Reserved.
 */
package com.alipay.securityservice.decision.util;

import java.util.HashMap;
import java.util.Map;

public class EventDict {

    public static final Map<String, Short> DICT_STRING;

    private static final int MAX_KEY_SIZE = 32;

    static {
        DICT_STRING = new HashMap<>(eventDictStr.length * 2);
        for (short i = 0; i < eventDictStr.length; i++) {
            DICT_STRING.put(eventDictStr[i], i);
        }
    }

    /**
     * 压缩替换
     *
     * @param oldKey
     * @return
     */
    public static Object zip(Object oldKey) {
        if (oldKey instanceof String) {
            String str = (String) oldKey;
            if (str.length() >= MAX_KEY_SIZE) {
                return oldKey;
            }
            Short index = DICT_STRING.get(str);
            if (index != null) {
                return index;
            }
        }
        return oldKey;
    }

    /**
     * 解压缩替换
     *
     * @param key
     * @return
     */
    public static Object unZip(Object key) {
        if (key instanceof Short) {
            int index = (Short) key;
            if (index < 0 || index >= eventDictStr.length) {
                return key;
            }
            return eventDictStr[index];
        }
        return key;
    }



  
    /**
     * 顺序需要固定,后续有新的追加。生产环境可以搞成动态缓存名单
     */
    static final String[] eventDictStr = new String[] {"事件属性Key1","事件属性Key2"  ,……};
    

}


3.2.2序列化过程中进行字典替换

只在序列化过程中,在写入最终二进制流之前,进行字典查找与替换,不对原对象有任何修改。


这个操作过程中需要对map进行get操作,有一点点额外开销,对比收益来讲,还是很值得的(取决于字典的覆盖度,线上业务Key应该能覆盖统计90%以上,所以很好)。


一些小优化:map进行get操作查字典之前,先判断字符串长度(目前设置为32,超过则不进行字典判断:肯定不在,节省额外计算hashCode的消耗 )。



压缩替换示例:


原始数据Map:


{

"serviceMethodName":"invoke",

"other":"很长很长的字符串"

}


数据字典

1:serviceMethodName

2: invoke

3: other

压缩Map序列化二进制结构示意:


[Map对象,泛型<String,String>][i,1][i,2][i,3][S,很长很长的字符串]


(泛型描述不变)


对比默认Map序列化二进制结构示意:


[Map对象,泛型<String,String>][S,serviceMethodName][S,invoke][S,other][S,很长很长的字符串]


放个简单的测试感受一下:


image.png


主要代码:

序列化调用入口


private void generalJavaWrite(Fury fury, MemoryBuffer buffer, Map map) {
            ClassResolver classResolver = fury.getClassResolver();
            RefResolver refResolver = fury.getRefResolver();
            Set<Entry> entrySet = map.entrySet();
            for (Map.Entry entry : entrySet) {
                Object key = EventDict.zip(entry.getKey());
                Object value = EventDict.zip(entry.getValue());

                writeKeyJavaRefOptimized(
                        fury, classResolver, refResolver, buffer, key, keyClassInfoWriteCache);
                writeJavaRefOptimized(
                        fury, classResolver, refResolver, buffer, value, valueClassInfoWriteCache);
            }
        }


EventDict.zip 查找字典


 public static Object zip(Object oldKey) {
        if (oldKey instanceof String) {
            String str = (String) oldKey;
            if (str.length() >= MAX_KEY_SIZE) {
                return oldKey;
            }
            Short index = DICT_STRING.get(str);
            if (index != null) {
                return index;
            }
        }
        return oldKey;
    }

3.2.3反序列化过程,还原

说明:

自定义map反序列化器,遇到Short,即查询压缩字典,将查询结果放入真实Map,无二次转换。

小技巧:查询压缩字典的时候,根据Short可以作为ArrayList的index查询,相对Map.get更高效,几乎无性能额外开销。


这就是2.2的反过程,比较简单,请直接看代码即可:

调用字典入口


private void generalJavaRead(Fury fury, MemoryBuffer buffer, Map map, int size) {
            for (int i = 0; i < size; i++) {
                Object key = fury.readRef(buffer, keyClassInfoReadCache);
                Object value = fury.readRef(buffer, valueClassInfoReadCache);

                key = EventDict.unZip(key);
                value = EventDict.unZip(value);
                map.put(key, value);
            }
        }


EventDict.unZip 字典还原


public static Object unZip(Object key) {
        if (key instanceof Short) {
            int index = (Short) key;
            if (index < 0 || index >= eventDictStr.length) {
                return key;
            }
            return eventDictStr[index];
        }
        return key;
    }


此阶段完成后,效果更加明显了:效果较上一步骤再降低30%


image.png


3.3SOFA RPC:Hessian优化

完成上面两个步骤之后,一切看起来还算顺利,效果也很明显。


但是,总感觉少了点什么?是的,这没有在RPC环境下实际测试啊。


兴致冲冲的部署到联调环境,发现时间消耗有一点缩减,但并没有按预期的这个优化比例缩减,本着实事求是的态度,得研究研究这个耗时主要来源:


  • RPC Hessian包裹发送与接收byte[ ]真的如预期一样是零消耗吗?
  • RPC中还有大量SOFA中间件的切面/工具,如awatch、guardian等等,这个基本是跟RPC调用相关固定的,暂时不在我们本优化范围内。

对于第一个问题,自己搭建一个测试对比环境,通过火焰图发现:Hessian2的反序列化耗时占比有40%多,相当于原有优化耗时消耗要翻倍了,当看到这个结果时,第一感觉:天啦噜,前面的尝试都白费了。


image.png 

这意味着什么?意味着前面两个步骤的优化耗时,实际需要翻一倍。

image.png

怎么办?(各种心理活动暂且不表)通过研究火焰图及Hessian源码,发现:Hessian中对大byte[]读取性能有问题,会对流多次缓冲-读取-中断,并会产生额外的Stream对象作为中转。


解决方法:读Hessian源码、咨询RPC同学、调试Hessian源码了解机制等等,找到可能的突破点,改造它!


3.3.1改造Hessian反序列化


关键技术提升点:自定义Hessian反序列化,只读取一次缓冲直接生成byte[]。


具体方法:改动Hessian2Input的readObject( )方法,修改其对byte块('b分块'和'B终块'类型)的处理,处理好内存buffer和缓冲流指针。

直接上代码吧:


修改后的1821-1914行 readObject方法


case 'b':
            case 'B': {
                _isLastChunk = tag == 'B';
                _chunkLength = (read() << 8) + read();
                ByteArrayOutputStream bos=null;
                while (!_isLastChunk){
                    if (bos == null) {
                        bos = new ByteArrayOutputStream();
                    }
                    byte[] temp = new byte[_chunkLength];
                    int i = 0;
                    //处理完内存里的buffer
                    while (_offset < _length && i < _chunkLength) {
                        temp[i] = _buffer[_offset++];
                        i++;
                    }
                    int needRead = _chunkLength - i;
                    if (needRead > 0) {
                        _is.read(temp, i, needRead);
                    }
                    bos.write(temp);
                    //读下一个块
                    int code = read();
                    switch (code){
                        case 'b':
                            _isLastChunk = false;
                            _chunkLength = (read() << 8) + read();
                            break;
                        case 'B':
                            _isLastChunk = true;
                            _chunkLength = (read() << 8) + read();
                            break;
                        case 0x20: case 0x21: case 0x22: case 0x23:
                        case 0x24: case 0x25: case 0x26: case 0x27:
                        case 0x28: case 0x29: case 0x2a: case 0x2b:
                        case 0x2c: case 0x2d: case 0x2e: case 0x2f:
                            _isLastChunk = true;
                            _chunkLength = code - 0x20;
                            break;
                        default:
                            throw expect("byte[]", code);
                    }
                }
                byte[] res = new byte[_chunkLength];
                int i = 0;
                //处理完内存里的buffer
                while (_offset < _length && i < _chunkLength) {
                    res[i] = _buffer[_offset++];
                    i++;
                }
                int needRead = _chunkLength - i;
                if (needRead > 0) {
                    _is.read(res, i, needRead);
                }
                if (bos!=null){
                    bos.write(res);
                    res=bos.toByteArray();
                }
                for (i = 0; i < res.length; i++) {
                    res[i] = (byte) (res[i] & 0xff);
                }
                _chunkLength = 0;
                return res;
                // 优化前原始代码
                //_isLastChunk = tag == 'B';
                //_chunkLength = (read() << 8) + read();
                //
                //int data;
                //ByteArrayOutputStream bos = new ByteArrayOutputStream();
                //
                //while ((data = parseByte()) >= 0)
                //    bos.write(data);
                //
                //return bos.toByteArray();
            }

修改前原始代码


 case 'b':
    case 'B': {
      _isLastChunk = tag == 'B';
      _chunkLength = (read() << 8) + read();

      int data;
      ByteArrayOutputStream bos = new ByteArrayOutputStream();
      
      while ((data = parseByte()) >= 0)
        bos.write(data);

      return bos.toByteArray();
    }


此步骤效果:改造后,Hessian耗时占比几乎清零。整个流程中耗时压减50%。


3.3.2把自定义的改造注入到RPC中去

服务方在启动时,可以在afterPropertiesSet( ),注册自定义Hessian序列化器,确保高效。

注册自定义反序列化类

CustomHessianSerializerManager.addSerializer(SofaRequest.class, new FastSofaRequestHessianSerializer(serializerFactory, genericSerializerFactory));

serializerFactory, genericSerializerFactory这两个参数可通过反射获取,简单实现代码参考:

注册自定义反序列化类



@Override
    public void afterPropertiesSet() throws Exception {
        SofaRequestHessianSerializer serializer = (SofaRequestHessianSerializer) CustomHessianSerializerManager.getSerializer(
                SofaRequest.class);
        Class<?> clazz = serializer.getClass();
        Field[] fields = clazz.getDeclaredFields();

        SerializerFactory serializerFactory = null;
        SerializerFactory genericSerializerFactory = null;

        for (Field field : fields) {
            if (!field.isAccessible()) {
                field.setAccessible(true);
            }
            if ("serializerFactory".equals(field.getName())) {
                try {
                    serializerFactory = (SerializerFactory) field.get(serializer);
                } catch (IllegalAccessException e) {
                    throw new RuntimeException(e);
                }
            }
            if ("genericSerializerFactory".equals(field.getName())) {
                try {
                    genericSerializerFactory = (SerializerFactory) field.get(serializer);
                } catch (IllegalAccessException e) {
                    throw new RuntimeException(e);
                }
            }
        }

        CustomHessianSerializerManager.addSerializer(SofaRequest.class,
                new FastSofaRequestHessianSerializer(serializerFactory, genericSerializerFactory));
    }


注:这个对Hessian的性能优化后续计划会提交给SOFARPC的代码库,如果采纳合并的话,后续新版本就自动是byte[ ]读取高性能版本了,此步骤就可以省略了。



4.性能优化报告

4.1测试说明

用比较简单的测试方法:固定次数循环调用对比,单线程CPU跑满,所以可以直接以耗时对比(实际跑多次,多次之间肉眼取中位数成绩)。

几个方法说明:

测试名称

优化项目

说明

test01_Hessian

原始Hessian

用作基准对比

test11_Kryo

Kryo序列化

无RPC序列化包裹,仅测试Kryo框架

test12_Kryo_Hessian

Kryo序列化 + 原始Hessian


test21_Fury

Fury序列化

无RPC序列化包裹,仅测试Fury框架

test22_Fury_Hessian

Fury序列化 + 原始Hessian


test31_FuryPlus

Fury序列化 + 字典压缩

无RPC序列化包裹,测试Fury和字典

test32_FuryPlus_Hessian

Fury序列化 + 字典压缩 +原始Hessian


test33_FuryPlus_FastHessian

Fury序列化 + 字典压缩 +优化Hessian

最终版本


测试环境:

CPU: Apple M1 Pro

JDK: 1.8

备注:这种通过单测for循环的方式尽管每次都尽可能预热,虽并不绝对严谨,不过也基本反映出对比趋势。有没有更好的测试方式?当然有,先留个伏笔。


4.2单次调用场景:5倍性能提升

序列化+反序列化:20×1000次序列化+20×1000次反序列化测试:最终耗时压缩至23%

反序列化效果更明细:20×1次序列化+20×1000次反序列化测试:最终耗时压缩至17%


【20组】数据,每组序列化【1000】次,反序列化【1000】次
---------------------------------------------------------------------------
测试方法                       ,     数据字节长度,     耗时ms, 耗时较默认百分比
test01_Hessian                ,         314693,      4114,     100.00% 
test11_Kryo                   ,         256739,      1841,      44.75% 
test12_Kryo_Hessian           ,         256802,      2621,      63.71% 
test21_Fury                   ,         337680,      1304,      31.70% 
test22_Fury_Hessian           ,         337743,      2162,      52.55% 
test31_FuryPlus               ,         234287,       929,      22.58% 
test32_FuryPlus_Hessian       ,         234347,      1638,      39.82% 
test33_FuryPlus_FastHessian   ,         234347,       973,      23.65%

image.png



---------------------------------------------------------------------------
【20组】数据,每组序列化【1】次,反序列化【2000】次
---------------------------------------------------------------------------
测试方法                       ,     数据字节长度,     耗时ms, 耗时较默认百分比
test01_Hessian                ,         314693,      4830,     100.00% 
test11_Kryo                   ,         256739,      1587,      32.86% 
test12_Kryo_Hessian           ,         256802,      3132,      64.84% 
test21_Fury                   ,         337680,      1208,      25.01% 
test22_Fury_Hessian           ,         337743,      3125,      64.70% 
test31_FuryPlus               ,         234287,       766,      15.86% 
test32_FuryPlus_Hessian       ,         234347,      2133,      44.16% 
test33_FuryPlus_FastHessian   ,         234347,       842,      17.43%

image.png



4.3多下游场景:额外N倍性能提升

以上测试结果,只是反映单次的序列化和反序列化对比:即假设场景是“1个调用端,调用一个下游Service”。


如果一个场景是调用端需要调用多个Service传递此DTO:

  • 1个调用端,调用N个下游Service。只序列化1次,节省N-1次。
  • A调用B,B再调用C。如果B读取,只有反序列化消耗。B不需要修改数据的情况下,直接将A传入的数据再传给C,没有序列化消耗。


将会获得更大的性能提升,N个额外下游额外提升N倍:原Hessian方式会触发多次序列化。使用此方式,只需要序列化一次,得到的中间结果:byte[ ]之后发送成本接近为0。


同时,此优化也适用于非RPC调用,例如需要将DTO序列化存储的情况:存入Lindorm等。


4.4数据传输大小压缩25%-80%

数据压缩只是本次优化的副产品,20组测试数据,传输大小压缩至原75%左右。

由于测试数据中MapValue中字符串较大,对于日常的普通小DTO(Key能大量被字典覆盖的情况下),结果可能压缩至20%甚至更低。



5.落地部署使用

5.1接入步骤

如果想让客户端无感接入,可以继续改HessianInput,全局代理RPC的所有序列化操作,根据输入类型做判断路由是否走自己的序列化(不建议这种方式,影响面较大,这里不做展开)。

以下为普通接入方式:


5.1.1更新Jar包

将上述几个类(序列化入口、内置字典、自定义Fury字典序列化器、Hessian反序列化byte[]优化类)封装进公共类库,服务端、调用端更新引入。


5.1.2服务方发布新的接口签名

发布服务方法参数签名为:Object或者byte[ ],例如:int question(Object eventDTO);

如果发布签名为Object的参数,客户端调用时传入EventDTO这种DTO实例、或者 byte[]实例都是可以的,服务端按Object接收后,可以根据instanceof等方式判断具体类型后决定直接使用DTO或对byte[]进行额外反序列化。

需要注意一点:客户端不能传入纯Object实例,会引发RPC报错。


5.1.3服务端注册更新Hessian反序列化类

请参考“详细优化步骤3.2”部分。


5.1.4客户端调用使用

比较简单的两种方式:

  • 自行调用ZipFuryPlus.serializeObjectToByteArray( )方法,将DTO转为byte[ ],然后调用question(Object eventDTO);
  • 将序列化过程封装进公共jar中,对客户端提供一个question(EventDTO eventDTO)方法;


5.2细节问题:如何更新词典

目前demo为写死固定。如需对字典的维护,简单来讲就是用jar包直接固定,或者线上应用可以通过DRM等方式推送维护,注意发布顺序:


1、对字典只增加、不删除、不修改;2、先对Server端增加,再对Client增加;

本文就不展开讨论了。相信聪明的读者能够想出N种更新维护的姿势。


5.3其他可能优化展望

上述优化方案主要还是为了验证并确认笔者的揭榜思路,尽管已经取得了较明显的性能提升,这就是极限了吗?答案肯定随着更多的投入与打磨,还是有提升空间的,以下列出测试中发现与思考的几点以供参考:


  • 第三方框架升级:例如Fury0.9对比0.8就有一些字符串写入性能提升,相信随着后续迭代还有优化空间。(甚至不局限于Fury,其他不序列化框架随着时间的演化可能也会有性能提升,但从本质上都适用于本方案。)


  • 三方库中的一些通用判断等:在固定场景下,可以改部分源码:直接删除部分分支逻辑、减少if和函数调用嵌套、默认buffer大小修改等。缺点:收益不太大,后续维护成本更高,另外有一些JIT优化也会通过方法内联等方式达到此效果。


  • Fury序列化中对String是否为ASCII的判断:isLatin()方法,根据火焰图分析有一定性能开销,整体占比1%-5%。可以对固定path的value,通过内置业务字典,直接确认是否isLatin。


  • LazyMap:反序列化Map时,不实时构建Map,减少掉这部分性能,而在读取的第一次时才构建Map。适用于只需要读取EventDTO的基本属性的场景,这样反序列化性能还能提升很多。缺点-语义区别:Fury已支持,但是测试发现语义有一定区别,例如put操作,HashMap会返回oldValue,但是LazyMap不返回,所以早期Fury貌似是LazyMap继承HashMap,新版本不继承了。另外如果最终是要读取Map内容,那性能本质没区别。


  • DTO中的普通字段:本次测试,只在序列化过程中压缩替换了DTO中的耗时较大部分——Map的序列化。对于DTO的其他普通字段,如果某些字段也是相对固定的枚举Value,而在内存中是String,也可以按照此次优化方式进行字典化优化。


  • 业务侧优化:业务侧如果能主动消减一些不必要数据传输,能从源头上“压缩”。另外目前的数据中有循环依赖,例如map中的某个value又引用map自身。如果能确保Map中没有$ref,则可以在序列化框架中关闭ref检查,预期再提升10%左右。


  • 另外能不能直接在RPC默认的Hessian上面做优化?当然可以,方案如下:调用方改造一个FastHessian2Output,里面嵌入优化步骤2的字典压缩等方式进行自定义,甚至也可以遇到EventDTO这个类的时候再引用Fury搞成byte[ ],发挥空间也很大,服务方配套改造FastHessian2Input。调用方和服务方都用步骤3的方式替换RPC调用。就是侵入性影响面稍微大一些。



分享一些揭榜技巧


  • 搞明白问题想要什么:找好大致的方向,和揭榜对接人多聊聊,例如本次的榜单问题不仅仅是压缩数据长度、而更看重的是希望压缩CPU时间,这就基本直接排除了zip等压缩算法方向。


  • 尽早的建立一个测试和对比环境:工欲善其事必先利其器,以便对每项改动进行数据正确性和性能对比,可以更快更好的验证自己的想法。例如本次揭榜过程中,测试代码的大部分时间是通过自己搞的一系列本地单元测试,相较于在两个代码库分别写序列化和反序列化代码并且提交部署,测试效率提升十倍以上。上面的揭榜过程和结果,也是经过数百次测试和开发得到的,工具的效率很重要。


  • 找类库专家多交流:除了参考各种内外网文档资料外,过程中也和Fury的作者慕白、SOFARPC的均源同学进行了多项细节探讨,把自己的想法、测试进展等同步出来,中间获得了很多有益的反馈,避免了一些弯路。同时,也希望我们实践的一些反馈对类库的后续发展做出一些小贡献。


  • 快速测试评估第三方框架的可修改空间和改动效果:序列化框架的改动测试,我用了一个方法——把开源代码直接引入项目,而不是直接引入jar包。对于中间想对这些组件搞一些简单的修改看效果,效率提升还是很好的。


  • 过程无外乎就是“多调研、多实践、多思考”:要相信问题还是有一定复杂度和难度的,顺利时不要轻言成功,多想想有没有什么疏漏(甚至在写ATA文章的今天,为了跑一个测试数据,发现可能有一些不严谨的地方又改了些测试代码);挫折时也不要轻言放弃,这个项目必然会遇到各种困难和挫折,从最开始就要做好坚持的心理建设,坚持肯定是胜利的必要条件之一。


6.后记

我们大安全技术部搞的这个揭榜活动,过程中能感受到挑战与压力,但是目标感也非常足。这个揭榜过程是对技术和心态的综合锻炼:参与进去,勇于迈出第一步,经过上百次的尝试,数次徘徊在怀疑、失望与坚持之间,终于捅破了那几层窗户纸。


今天的这个文章是站在呈现结果的角度来描述过程,并没有完全覆盖过程中的各种曲折探索,对于其中的不尽之处,感兴趣的同学也欢迎随时与我交流探讨。






来源  |  阿里云开发者公众号

作者  | 泓诚



相关文章
|
Docker 容器
CPU内存不足分析Gitlab的内存消耗
CPU内存不足分析Gitlab的内存消耗
CPU内存不足分析Gitlab的内存消耗
|
8月前
|
存储 Kubernetes Perl
【CKA模拟题】查询消耗CPU最多的Pod
【CKA模拟题】查询消耗CPU最多的Pod
63 0
【CKA模拟题】查询消耗CPU最多的Pod
|
缓存 Java 大数据
深入解析JVM调优:解决OutOfMemoryError、内存泄露、线程死锁、锁争用和高CPU消耗问题
深入解析JVM调优:解决OutOfMemoryError、内存泄露、线程死锁、锁争用和高CPU消耗问题
270 0
|
移动开发 Java
Java进程如何定位消耗CPU的代码
Java进程如何定位消耗CPU的代码
361 0
|
Java 调度
JVM调优之Java进程消耗CPU过高
JVM调优之Java进程消耗CPU过高
279 0
|
SQL 缓存 固态存储
怎么找出消耗 CPU 的罪魁祸首?!
谁在消耗cpu? 用户+系统+IO等待+软硬中断+空闲
怎么找出消耗 CPU 的罪魁祸首?!
|
SQL 缓存 架构师
怎么找出消耗 CPU 的罪魁祸首?!
谁在消耗cpu?用户+系统+IO等待+软硬中断+空闲。用户空间CPU消耗,各种逻辑运算,正在进行大量tps,函数/排序/类型转化/逻辑IO访问…
|
监控 数据中心 Java
Confluence 6 其他 MBeans 和高 CPU 消耗线程
其他 MBeans 希望监控 Hibernate 和 Hazelcast(仅针对 Confluence 数据中心)你需要在你的 setenv.sh / setenv.bat 文件中添加下面的内容。
1162 0