什么?CPU消耗要压降80% ——《事件序列化CPU开销压降》揭榜

简介: 本文为《事件CPU开销压降》揭榜报告,旨在解决风控系统间信息传递时事件体持续膨胀导致的序列化/反序列化CPU消耗过高的问题。

本文为《事件CPU开销压降》揭榜报告,同时也可以泛化为通用的SOFA RPC对复杂对象序列化性能优化方法:在确保反序列化结果正确性100%的前提下,通过前置自定义序列化,对高频对象进行业务字典压缩替换,并解决SOFA RPC默认的Hessian反序列化大byte[ ]性能问题,最终达到压缩序列化CPU消耗数倍的目的。



先上一张揭榜结果性能对比图:最终优化效果序列化CPU耗时相比默认Hessian降低至的20%左右,实现目标要求的5倍提升。


image.png


1.背景描述

风控各系统间均以事件方式进行信息传统,多年以来事件体持续膨胀,以交易事件为例,多个系统间传输平均大小为20K~50K,极端情况会出现>1M的情况。

目标要求:在保障风控引擎消费时最终事件信息不变和 RT 不增加的前提下,将事件的序列化/反序列 CPU 消耗降低80%。

好,让我们开始动手揭榜吧~


2.问题分析与优化方向调研

2.1 前置约束

由于本次揭榜问题是源于线上实践,解法需要最终能落地、能部署上线,所以有几个默认的约束:

  • 约束1:数据正确性需要100%:即反序列化(或解压缩)之后数据字段值、字段类型(含泛型签名等)、引用关系(含循环引用等)应该和原来RPC反序列化一致,如果正确性无法保证,则后续优化无意义。
  • 约束2:需要基于现有的SOFA协议:路由、限流切面、trace等都需要继续能用。
  • 约束3:对接入方的SOFA版本等不能有高版本要求:如果接入方需要必须升级到特定SOFA版本,则会对接入产生巨大障碍。例如:最新版SOFABoot支持Fury和Protobuf等(SOFARPC序列化配置),但是要求接入方都是最新SOFABoot版本,我们的安全服务对接的上游系统众多,如果对上游有高版本要求将会成为接入阻碍。
  • 约束4:接入、优化方式不能太复杂,需要能开箱即用。


2.2优化方向-要解决的几个问题:

2.2.1确定可以优化的腾挪空间

现有RPC序列化的链路很简单,流程示意:

image.png


能够腾挪的空间并不大,大致方向即:

  • 减少SOFARPC默认Hessian序列化/反序列化的消耗
  • 在此过程中的额外消耗增加要远小于前述减少的
  • 抵消后能形成较大正收益

2.2.2默认序列化是否能被替换、或还有优化空间?

  • 理论上可以替换,但是考虑到RPC兼容性,就有障碍了:除了接口参数,SOFA中的方法签名等等RPC参数也都走的Hessian,路由、限流等依赖这个,还要考虑上下游版本兼容性。


  • 不替换:也有优化空间,可以尝试换个其他框架序列化好后,拿序列化结果送给RPC让Hessian再搞一次。


2.2.3传输对象的内容是否还有压缩空间?

初步肉眼观察,感觉properties、extendData、baseInfoData这3个Map大字段及subEvents子事件字段较大,有一定的数据重复度。统计量化到两个比较重要的数据:


  • Map的Key重复度很大:Key的重复度97%


  • Map的Key长度占比很长:Key的长度比Value(只算String类的Value)还长很多,Key的长度占比接近60%。


个人觉得这也很好理解:CTU事件对接的上游尽管很多,但是每个上游需要传递的参数相对是固定的。Map中还有List和Map的嵌套,以及上游可能会把一些List<DTO>也转成Map再放到Map中。


一些考古发现:


属性的Key中,除了常用的英文驼峰命名的Key之外,还有很多是以数字组成的String,例如"118","155"。

image.png

目测是随着历史发展、接入方越来越多,前面大佬也注意到了Key的大小问题,并对常用的Key进行了字符串映射压缩优化,这已经是在EventDTO这个Map的泛型定义的基础上最好的优化了,在一定程度上遏制了事件属性变大的程度。


本次优化方向类似,但是不改变原对象,只在序列化中作额外动作。


2.3最终优化方案

在确保反序列化结果正确性100%的前提下:

  • 步骤1:在SOFA RPC默认序列化前置自定义序列化。
  • 步骤2:在步骤1过程中,对高频String压缩替换。
  • 步骤3:解决SOFA RPC默认的Hessian反序列化大byte[ ]性能问题。

image.png


3.优化详细步骤


3.1把复杂对象变为简单对象

主要思路:把复杂对象(DTO+Map)变为简单对象(byte[ ])。转换完成后,再将byte[ ]通过现有的RPC框架发送接收。


这一步骤的主要工作,就是选一个高效的序列化框架,然后接入进来,写个Util类。这里经过简单对比,选择了蚂蚁自己的Fury框架,封装一个简单的Util类,提供序列化和反序列化方法:

Fury Util

public class ZipFuryUtil {

    static final ThreadLocal<Fury> furyThreadLocal = new ThreadLocal<>();

    /**
     * 序列化操作
     *
     * @param obj
     * @return
     */
    public static byte[] serializeObjectToByteArray(Object obj) {
        byte[] serializedBytes = getFury().serialize(obj);
        return serializedBytes;
    }

    /**
     * 反序列化操作
     *
     * @param bytes
     * @return
     */
    public static EventDTO deserializeByteArrayToEvent(byte[] bytes) {
        EventDTO event = (EventDTO) getFury().deserialize(bytes);
        return event;
    }

    static ThreadSafeFury threadSafeFury = null;

    public static ThreadSafeFury getFury() {
        if (threadSafeFury == null) {
            synchronized (ZipFuryPlusUtil.class) {
                if (threadSafeFury == null) {
                    threadSafeFury = new ThreadPoolFury(
                            classLoader -> {
                                Fury fury = Fury.builder()
                                        .withRefTracking(true)
                                        .withCompatibleMode(CompatibleMode.SCHEMA_CONSISTENT)
                                        .build();

                                fury.registerSerializer(JSONArray.class, new ArrayListSerializer(fury));
                                fury.registerSerializer(JSONObject.class, new HashMapSerializer(fury));
                                fury.registerSerializer(Map.class, new HashMapSerializer(fury));
                                fury.registerSerializer(Collection.class, new ArrayListSerializer(fury));

                                fury.register(BaseDTO.class);
                                fury.register(EventDTO.class);
                                fury.register(FastEventDTO.class);
                                return fury;

                            },
                            100,
                            100,
                            300,
                            TimeUnit.SECONDS);
                }
            }
        }
        return threadSafeFury;
    }

}


技术选型备注:


  • 也测试过其他一些框架,例如Kryo框架:CPU比Fury消耗高30%左右,可以深度定制化改造Kryo源码,性能向Fury靠拢,但是这种自定义以后将难以维护(可能不兼容Kryo新版)。


  • 是不是可以自己再实现一个:实现一个新的序列化框架并不是本揭榜的核心,不如直接站在巨人肩膀上。


另外,有个前提假设:SOFA RPC对简单类型传输更加高效(备注:这个假设的方向没错,但是后来测试发现Hessian反序列化时有个坑,导致差点翻车,详见第3步优化)。

这步骤完成后,效果就有了:直接降低70%(不过别高兴的太早,有坑)


image.png


3.2对象内容进行业务压缩

主要方法:生成一份全局数据字典。在序列化过程中“注入”一个业务逻辑:遇到字典中的对象,替换为字典的Short索引。

说明:


  • 字典数据来源:目前只针对测试20条数据简单统计,真正上线需要人工校对一下并确定字典顺序。


  • 正确性100%:如果线上流量的key不在字典中,也可按原样正常序列化,无正确性问题。


  • 使用Short索引,较节省空间:仅用正数位支持32768个字典对象(前面问题分析统计不重复Key为591个,加上部分可枚举的Value值不超过2000个)。


  • 数据压缩替换除了对Key适用,也对Value适用:观察Value中无Short,不会冲突,如有,可取消对Value的压缩,影响不大。


  • 此项技术应用范围:其实不止适用于特定Fury框架,其他的如Kryo、Hessian,甚至FastJson等,此思路和方法都是适用的。


关键技术:不改变原Map对象,不生成新Map对象,只在byte[ ]结果生成之前“偷偷”修改。能够克服下面两种替换方式的缺点。

  • 如果修改原Map对象,缺点:① 序列化结束后还得修改回去,否则会引发严重数据正确性问题。② 对Map的修改,会产生多次map的put操作甚至resize操作,额外增加很多耗时。③ 修改原对象本身就是额外增加对所有Key遍历一遍,而且还需要处理好循环引用问题。
  • 生成新的Map对象来存储一个临时的压缩后结果,缺点基本同上,也需要进行多次put操作增加不必要耗时。

详细步骤如下:

3.2.1生成Key字典:

复用签名Key统计的代码,得到所有字典Key:

获取字典Key的代码


package com.alipay.securityservice.decision.util;

import com.alibaba.fastjson.JSON;
import com.alipay.ctu.service.event.model.EventDTO;
import org.junit.Test;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class ZipUtilTools {

    int  totalCount    = 0;
    int  countKey      = 0;
    int  countValue    = 0;
    long lengthKey     = 0;
    long lengthContent = 0;

    public static boolean isStringAscii(String str) {
        for (int i = 0; i < str.length(); i++) {
            if (str.charAt(i) > 127) {
                return false;
            }
        }
        return true;
    }

    private void addKeySet(Set<String> keySet, Map map, int depth, String path) {
        if (depth > 3) {
            return;
        }
        for (Object key : map.keySet()) {
            if (key instanceof String) {
                countKey++;
                String keyStr = (String) key;
                if (!(keyStr.length() == 16 && keyStr.startsWith("2088"))) {
                    if (isStringAscii(keyStr)) {
                        keySet.add(keyStr);
                    }
                }
                totalCount++;
                lengthKey += keyStr.length();
            }
            Object value = map.get(key);

            // 如果Value也进入压缩字典
            //if (value instanceof String) {
            //    String str = (String) value;
            //    if (str.length() <= 32 && (!str.startsWith("2088"))) {
            //        if (isStringAscii(str)) {
            //            keySet.add(str);
            //        }
            //    }
            //    totalCount++;
            //    lengthKey += str.length();
            //}

            if (value instanceof String) {
                countValue++;
                lengthContent += ((String) value).length();
            }
            if (value instanceof List) {
                List list = (List) value;
                for (Object o : list) {
                    if (o instanceof Map) {
                        addKeySet(keySet, (Map) o, depth + 1, path + "." + key);
                    }
                }
            }

            if (value instanceof Map) {
                addKeySet(keySet, (Map) map.get(key), depth + 1, path + "." + key);
            }
        }
    }

    @Test
    public void findMapKeys() {
        Set<String> keySet = new HashSet<>();

        for (String eventId : EventCatagory.eventCatagory.keySet()) {
            EventDTO event = EventCatagory.eventCatagory.get(eventId);
            addKeySet(keySet, event.getExtendData(), 0, "");
            addKeySet(keySet, event.getBaseInfoData(), 0, "");
            addKeySet(keySet, event.getEventProperties(), 0, "");
            for (EventDTO subEvent : event.getSubEvents()) {
                addKeySet(keySet, subEvent.getExtendData(), 0, "");
                addKeySet(keySet, subEvent.getBaseInfoData(), 0, "");
                addKeySet(keySet, subEvent.getEventProperties(), 0, "");
            }
        }
        System.out.println("不重复Key数量:         " + keySet.size());
        System.out.println("所有Key数量:          " + countKey);
        System.out.println("所有Key String长度:   " + lengthKey);
        System.out.println("所有Value String长度: " + lengthContent);

        System.out.println(JSON.toJSONString(keySet));
    }
}


根据结果,搞一个简单的固定的字典:

EventDict 字典类完整代码


/*
 * Ant Group
 * Copyright (c) 2004-2024 All Rights Reserved.
 */
package com.alipay.securityservice.decision.util;

import java.util.HashMap;
import java.util.Map;

public class EventDict {

    public static final Map<String, Short> DICT_STRING;

    private static final int MAX_KEY_SIZE = 32;

    static {
        DICT_STRING = new HashMap<>(eventDictStr.length * 2);
        for (short i = 0; i < eventDictStr.length; i++) {
            DICT_STRING.put(eventDictStr[i], i);
        }
    }

    /**
     * 压缩替换
     *
     * @param oldKey
     * @return
     */
    public static Object zip(Object oldKey) {
        if (oldKey instanceof String) {
            String str = (String) oldKey;
            if (str.length() >= MAX_KEY_SIZE) {
                return oldKey;
            }
            Short index = DICT_STRING.get(str);
            if (index != null) {
                return index;
            }
        }
        return oldKey;
    }

    /**
     * 解压缩替换
     *
     * @param key
     * @return
     */
    public static Object unZip(Object key) {
        if (key instanceof Short) {
            int index = (Short) key;
            if (index < 0 || index >= eventDictStr.length) {
                return key;
            }
            return eventDictStr[index];
        }
        return key;
    }



  
    /**
     * 顺序需要固定,后续有新的追加。生产环境可以搞成动态缓存名单
     */
    static final String[] eventDictStr = new String[] {"事件属性Key1","事件属性Key2"  ,……};
    

}


3.2.2序列化过程中进行字典替换

只在序列化过程中,在写入最终二进制流之前,进行字典查找与替换,不对原对象有任何修改。


这个操作过程中需要对map进行get操作,有一点点额外开销,对比收益来讲,还是很值得的(取决于字典的覆盖度,线上业务Key应该能覆盖统计90%以上,所以很好)。


一些小优化:map进行get操作查字典之前,先判断字符串长度(目前设置为32,超过则不进行字典判断:肯定不在,节省额外计算hashCode的消耗 )。



压缩替换示例:


原始数据Map:


{

"serviceMethodName":"invoke",

"other":"很长很长的字符串"

}


数据字典

1:serviceMethodName

2: invoke

3: other

压缩Map序列化二进制结构示意:


[Map对象,泛型<String,String>][i,1][i,2][i,3][S,很长很长的字符串]


(泛型描述不变)


对比默认Map序列化二进制结构示意:


[Map对象,泛型<String,String>][S,serviceMethodName][S,invoke][S,other][S,很长很长的字符串]


放个简单的测试感受一下:


image.png


主要代码:

序列化调用入口


private void generalJavaWrite(Fury fury, MemoryBuffer buffer, Map map) {
            ClassResolver classResolver = fury.getClassResolver();
            RefResolver refResolver = fury.getRefResolver();
            Set<Entry> entrySet = map.entrySet();
            for (Map.Entry entry : entrySet) {
                Object key = EventDict.zip(entry.getKey());
                Object value = EventDict.zip(entry.getValue());

                writeKeyJavaRefOptimized(
                        fury, classResolver, refResolver, buffer, key, keyClassInfoWriteCache);
                writeJavaRefOptimized(
                        fury, classResolver, refResolver, buffer, value, valueClassInfoWriteCache);
            }
        }


EventDict.zip 查找字典


 public static Object zip(Object oldKey) {
        if (oldKey instanceof String) {
            String str = (String) oldKey;
            if (str.length() >= MAX_KEY_SIZE) {
                return oldKey;
            }
            Short index = DICT_STRING.get(str);
            if (index != null) {
                return index;
            }
        }
        return oldKey;
    }

3.2.3反序列化过程,还原

说明:

自定义map反序列化器,遇到Short,即查询压缩字典,将查询结果放入真实Map,无二次转换。

小技巧:查询压缩字典的时候,根据Short可以作为ArrayList的index查询,相对Map.get更高效,几乎无性能额外开销。


这就是2.2的反过程,比较简单,请直接看代码即可:

调用字典入口


private void generalJavaRead(Fury fury, MemoryBuffer buffer, Map map, int size) {
            for (int i = 0; i < size; i++) {
                Object key = fury.readRef(buffer, keyClassInfoReadCache);
                Object value = fury.readRef(buffer, valueClassInfoReadCache);

                key = EventDict.unZip(key);
                value = EventDict.unZip(value);
                map.put(key, value);
            }
        }


EventDict.unZip 字典还原


public static Object unZip(Object key) {
        if (key instanceof Short) {
            int index = (Short) key;
            if (index < 0 || index >= eventDictStr.length) {
                return key;
            }
            return eventDictStr[index];
        }
        return key;
    }


此阶段完成后,效果更加明显了:效果较上一步骤再降低30%


image.png


3.3SOFA RPC:Hessian优化

完成上面两个步骤之后,一切看起来还算顺利,效果也很明显。


但是,总感觉少了点什么?是的,这没有在RPC环境下实际测试啊。


兴致冲冲的部署到联调环境,发现时间消耗有一点缩减,但并没有按预期的这个优化比例缩减,本着实事求是的态度,得研究研究这个耗时主要来源:


  • RPC Hessian包裹发送与接收byte[ ]真的如预期一样是零消耗吗?
  • RPC中还有大量SOFA中间件的切面/工具,如awatch、guardian等等,这个基本是跟RPC调用相关固定的,暂时不在我们本优化范围内。

对于第一个问题,自己搭建一个测试对比环境,通过火焰图发现:Hessian2的反序列化耗时占比有40%多,相当于原有优化耗时消耗要翻倍了,当看到这个结果时,第一感觉:天啦噜,前面的尝试都白费了。


image.png 

这意味着什么?意味着前面两个步骤的优化耗时,实际需要翻一倍。

image.png

怎么办?(各种心理活动暂且不表)通过研究火焰图及Hessian源码,发现:Hessian中对大byte[]读取性能有问题,会对流多次缓冲-读取-中断,并会产生额外的Stream对象作为中转。


解决方法:读Hessian源码、咨询RPC同学、调试Hessian源码了解机制等等,找到可能的突破点,改造它!


3.3.1改造Hessian反序列化


关键技术提升点:自定义Hessian反序列化,只读取一次缓冲直接生成byte[]。


具体方法:改动Hessian2Input的readObject( )方法,修改其对byte块('b分块'和'B终块'类型)的处理,处理好内存buffer和缓冲流指针。

直接上代码吧:


修改后的1821-1914行 readObject方法


case 'b':
            case 'B': {
                _isLastChunk = tag == 'B';
                _chunkLength = (read() << 8) + read();
                ByteArrayOutputStream bos=null;
                while (!_isLastChunk){
                    if (bos == null) {
                        bos = new ByteArrayOutputStream();
                    }
                    byte[] temp = new byte[_chunkLength];
                    int i = 0;
                    //处理完内存里的buffer
                    while (_offset < _length && i < _chunkLength) {
                        temp[i] = _buffer[_offset++];
                        i++;
                    }
                    int needRead = _chunkLength - i;
                    if (needRead > 0) {
                        _is.read(temp, i, needRead);
                    }
                    bos.write(temp);
                    //读下一个块
                    int code = read();
                    switch (code){
                        case 'b':
                            _isLastChunk = false;
                            _chunkLength = (read() << 8) + read();
                            break;
                        case 'B':
                            _isLastChunk = true;
                            _chunkLength = (read() << 8) + read();
                            break;
                        case 0x20: case 0x21: case 0x22: case 0x23:
                        case 0x24: case 0x25: case 0x26: case 0x27:
                        case 0x28: case 0x29: case 0x2a: case 0x2b:
                        case 0x2c: case 0x2d: case 0x2e: case 0x2f:
                            _isLastChunk = true;
                            _chunkLength = code - 0x20;
                            break;
                        default:
                            throw expect("byte[]", code);
                    }
                }
                byte[] res = new byte[_chunkLength];
                int i = 0;
                //处理完内存里的buffer
                while (_offset < _length && i < _chunkLength) {
                    res[i] = _buffer[_offset++];
                    i++;
                }
                int needRead = _chunkLength - i;
                if (needRead > 0) {
                    _is.read(res, i, needRead);
                }
                if (bos!=null){
                    bos.write(res);
                    res=bos.toByteArray();
                }
                for (i = 0; i < res.length; i++) {
                    res[i] = (byte) (res[i] & 0xff);
                }
                _chunkLength = 0;
                return res;
                // 优化前原始代码
                //_isLastChunk = tag == 'B';
                //_chunkLength = (read() << 8) + read();
                //
                //int data;
                //ByteArrayOutputStream bos = new ByteArrayOutputStream();
                //
                //while ((data = parseByte()) >= 0)
                //    bos.write(data);
                //
                //return bos.toByteArray();
            }

修改前原始代码


 case 'b':
    case 'B': {
      _isLastChunk = tag == 'B';
      _chunkLength = (read() << 8) + read();

      int data;
      ByteArrayOutputStream bos = new ByteArrayOutputStream();
      
      while ((data = parseByte()) >= 0)
        bos.write(data);

      return bos.toByteArray();
    }


此步骤效果:改造后,Hessian耗时占比几乎清零。整个流程中耗时压减50%。


3.3.2把自定义的改造注入到RPC中去

服务方在启动时,可以在afterPropertiesSet( ),注册自定义Hessian序列化器,确保高效。

注册自定义反序列化类

CustomHessianSerializerManager.addSerializer(SofaRequest.class, new FastSofaRequestHessianSerializer(serializerFactory, genericSerializerFactory));

serializerFactory, genericSerializerFactory这两个参数可通过反射获取,简单实现代码参考:

注册自定义反序列化类



@Override
    public void afterPropertiesSet() throws Exception {
        SofaRequestHessianSerializer serializer = (SofaRequestHessianSerializer) CustomHessianSerializerManager.getSerializer(
                SofaRequest.class);
        Class<?> clazz = serializer.getClass();
        Field[] fields = clazz.getDeclaredFields();

        SerializerFactory serializerFactory = null;
        SerializerFactory genericSerializerFactory = null;

        for (Field field : fields) {
            if (!field.isAccessible()) {
                field.setAccessible(true);
            }
            if ("serializerFactory".equals(field.getName())) {
                try {
                    serializerFactory = (SerializerFactory) field.get(serializer);
                } catch (IllegalAccessException e) {
                    throw new RuntimeException(e);
                }
            }
            if ("genericSerializerFactory".equals(field.getName())) {
                try {
                    genericSerializerFactory = (SerializerFactory) field.get(serializer);
                } catch (IllegalAccessException e) {
                    throw new RuntimeException(e);
                }
            }
        }

        CustomHessianSerializerManager.addSerializer(SofaRequest.class,
                new FastSofaRequestHessianSerializer(serializerFactory, genericSerializerFactory));
    }


注:这个对Hessian的性能优化后续计划会提交给SOFARPC的代码库,如果采纳合并的话,后续新版本就自动是byte[ ]读取高性能版本了,此步骤就可以省略了。



4.性能优化报告

4.1测试说明

用比较简单的测试方法:固定次数循环调用对比,单线程CPU跑满,所以可以直接以耗时对比(实际跑多次,多次之间肉眼取中位数成绩)。

几个方法说明:

测试名称

优化项目

说明

test01_Hessian

原始Hessian

用作基准对比

test11_Kryo

Kryo序列化

无RPC序列化包裹,仅测试Kryo框架

test12_Kryo_Hessian

Kryo序列化 + 原始Hessian


test21_Fury

Fury序列化

无RPC序列化包裹,仅测试Fury框架

test22_Fury_Hessian

Fury序列化 + 原始Hessian


test31_FuryPlus

Fury序列化 + 字典压缩

无RPC序列化包裹,测试Fury和字典

test32_FuryPlus_Hessian

Fury序列化 + 字典压缩 +原始Hessian


test33_FuryPlus_FastHessian

Fury序列化 + 字典压缩 +优化Hessian

最终版本


测试环境:

CPU: Apple M1 Pro

JDK: 1.8

备注:这种通过单测for循环的方式尽管每次都尽可能预热,虽并不绝对严谨,不过也基本反映出对比趋势。有没有更好的测试方式?当然有,先留个伏笔。


4.2单次调用场景:5倍性能提升

序列化+反序列化:20×1000次序列化+20×1000次反序列化测试:最终耗时压缩至23%

反序列化效果更明细:20×1次序列化+20×1000次反序列化测试:最终耗时压缩至17%


【20组】数据,每组序列化【1000】次,反序列化【1000】次
---------------------------------------------------------------------------
测试方法                       ,     数据字节长度,     耗时ms, 耗时较默认百分比
test01_Hessian                ,         314693,      4114,     100.00% 
test11_Kryo                   ,         256739,      1841,      44.75% 
test12_Kryo_Hessian           ,         256802,      2621,      63.71% 
test21_Fury                   ,         337680,      1304,      31.70% 
test22_Fury_Hessian           ,         337743,      2162,      52.55% 
test31_FuryPlus               ,         234287,       929,      22.58% 
test32_FuryPlus_Hessian       ,         234347,      1638,      39.82% 
test33_FuryPlus_FastHessian   ,         234347,       973,      23.65%

image.png



---------------------------------------------------------------------------
【20组】数据,每组序列化【1】次,反序列化【2000】次
---------------------------------------------------------------------------
测试方法                       ,     数据字节长度,     耗时ms, 耗时较默认百分比
test01_Hessian                ,         314693,      4830,     100.00% 
test11_Kryo                   ,         256739,      1587,      32.86% 
test12_Kryo_Hessian           ,         256802,      3132,      64.84% 
test21_Fury                   ,         337680,      1208,      25.01% 
test22_Fury_Hessian           ,         337743,      3125,      64.70% 
test31_FuryPlus               ,         234287,       766,      15.86% 
test32_FuryPlus_Hessian       ,         234347,      2133,      44.16% 
test33_FuryPlus_FastHessian   ,         234347,       842,      17.43%

image.png



4.3多下游场景:额外N倍性能提升

以上测试结果,只是反映单次的序列化和反序列化对比:即假设场景是“1个调用端,调用一个下游Service”。


如果一个场景是调用端需要调用多个Service传递此DTO:

  • 1个调用端,调用N个下游Service。只序列化1次,节省N-1次。
  • A调用B,B再调用C。如果B读取,只有反序列化消耗。B不需要修改数据的情况下,直接将A传入的数据再传给C,没有序列化消耗。


将会获得更大的性能提升,N个额外下游额外提升N倍:原Hessian方式会触发多次序列化。使用此方式,只需要序列化一次,得到的中间结果:byte[ ]之后发送成本接近为0。


同时,此优化也适用于非RPC调用,例如需要将DTO序列化存储的情况:存入Lindorm等。


4.4数据传输大小压缩25%-80%

数据压缩只是本次优化的副产品,20组测试数据,传输大小压缩至原75%左右。

由于测试数据中MapValue中字符串较大,对于日常的普通小DTO(Key能大量被字典覆盖的情况下),结果可能压缩至20%甚至更低。



5.落地部署使用

5.1接入步骤

如果想让客户端无感接入,可以继续改HessianInput,全局代理RPC的所有序列化操作,根据输入类型做判断路由是否走自己的序列化(不建议这种方式,影响面较大,这里不做展开)。

以下为普通接入方式:


5.1.1更新Jar包

将上述几个类(序列化入口、内置字典、自定义Fury字典序列化器、Hessian反序列化byte[]优化类)封装进公共类库,服务端、调用端更新引入。


5.1.2服务方发布新的接口签名

发布服务方法参数签名为:Object或者byte[ ],例如:int question(Object eventDTO);

如果发布签名为Object的参数,客户端调用时传入EventDTO这种DTO实例、或者 byte[]实例都是可以的,服务端按Object接收后,可以根据instanceof等方式判断具体类型后决定直接使用DTO或对byte[]进行额外反序列化。

需要注意一点:客户端不能传入纯Object实例,会引发RPC报错。


5.1.3服务端注册更新Hessian反序列化类

请参考“详细优化步骤3.2”部分。


5.1.4客户端调用使用

比较简单的两种方式:

  • 自行调用ZipFuryPlus.serializeObjectToByteArray( )方法,将DTO转为byte[ ],然后调用question(Object eventDTO);
  • 将序列化过程封装进公共jar中,对客户端提供一个question(EventDTO eventDTO)方法;


5.2细节问题:如何更新词典

目前demo为写死固定。如需对字典的维护,简单来讲就是用jar包直接固定,或者线上应用可以通过DRM等方式推送维护,注意发布顺序:


1、对字典只增加、不删除、不修改;2、先对Server端增加,再对Client增加;

本文就不展开讨论了。相信聪明的读者能够想出N种更新维护的姿势。


5.3其他可能优化展望

上述优化方案主要还是为了验证并确认笔者的揭榜思路,尽管已经取得了较明显的性能提升,这就是极限了吗?答案肯定随着更多的投入与打磨,还是有提升空间的,以下列出测试中发现与思考的几点以供参考:


  • 第三方框架升级:例如Fury0.9对比0.8就有一些字符串写入性能提升,相信随着后续迭代还有优化空间。(甚至不局限于Fury,其他不序列化框架随着时间的演化可能也会有性能提升,但从本质上都适用于本方案。)


  • 三方库中的一些通用判断等:在固定场景下,可以改部分源码:直接删除部分分支逻辑、减少if和函数调用嵌套、默认buffer大小修改等。缺点:收益不太大,后续维护成本更高,另外有一些JIT优化也会通过方法内联等方式达到此效果。


  • Fury序列化中对String是否为ASCII的判断:isLatin()方法,根据火焰图分析有一定性能开销,整体占比1%-5%。可以对固定path的value,通过内置业务字典,直接确认是否isLatin。


  • LazyMap:反序列化Map时,不实时构建Map,减少掉这部分性能,而在读取的第一次时才构建Map。适用于只需要读取EventDTO的基本属性的场景,这样反序列化性能还能提升很多。缺点-语义区别:Fury已支持,但是测试发现语义有一定区别,例如put操作,HashMap会返回oldValue,但是LazyMap不返回,所以早期Fury貌似是LazyMap继承HashMap,新版本不继承了。另外如果最终是要读取Map内容,那性能本质没区别。


  • DTO中的普通字段:本次测试,只在序列化过程中压缩替换了DTO中的耗时较大部分——Map的序列化。对于DTO的其他普通字段,如果某些字段也是相对固定的枚举Value,而在内存中是String,也可以按照此次优化方式进行字典化优化。


  • 业务侧优化:业务侧如果能主动消减一些不必要数据传输,能从源头上“压缩”。另外目前的数据中有循环依赖,例如map中的某个value又引用map自身。如果能确保Map中没有$ref,则可以在序列化框架中关闭ref检查,预期再提升10%左右。


  • 另外能不能直接在RPC默认的Hessian上面做优化?当然可以,方案如下:调用方改造一个FastHessian2Output,里面嵌入优化步骤2的字典压缩等方式进行自定义,甚至也可以遇到EventDTO这个类的时候再引用Fury搞成byte[ ],发挥空间也很大,服务方配套改造FastHessian2Input。调用方和服务方都用步骤3的方式替换RPC调用。就是侵入性影响面稍微大一些。



分享一些揭榜技巧


  • 搞明白问题想要什么:找好大致的方向,和揭榜对接人多聊聊,例如本次的榜单问题不仅仅是压缩数据长度、而更看重的是希望压缩CPU时间,这就基本直接排除了zip等压缩算法方向。


  • 尽早的建立一个测试和对比环境:工欲善其事必先利其器,以便对每项改动进行数据正确性和性能对比,可以更快更好的验证自己的想法。例如本次揭榜过程中,测试代码的大部分时间是通过自己搞的一系列本地单元测试,相较于在两个代码库分别写序列化和反序列化代码并且提交部署,测试效率提升十倍以上。上面的揭榜过程和结果,也是经过数百次测试和开发得到的,工具的效率很重要。


  • 找类库专家多交流:除了参考各种内外网文档资料外,过程中也和Fury的作者慕白、SOFARPC的均源同学进行了多项细节探讨,把自己的想法、测试进展等同步出来,中间获得了很多有益的反馈,避免了一些弯路。同时,也希望我们实践的一些反馈对类库的后续发展做出一些小贡献。


  • 快速测试评估第三方框架的可修改空间和改动效果:序列化框架的改动测试,我用了一个方法——把开源代码直接引入项目,而不是直接引入jar包。对于中间想对这些组件搞一些简单的修改看效果,效率提升还是很好的。


  • 过程无外乎就是“多调研、多实践、多思考”:要相信问题还是有一定复杂度和难度的,顺利时不要轻言成功,多想想有没有什么疏漏(甚至在写ATA文章的今天,为了跑一个测试数据,发现可能有一些不严谨的地方又改了些测试代码);挫折时也不要轻言放弃,这个项目必然会遇到各种困难和挫折,从最开始就要做好坚持的心理建设,坚持肯定是胜利的必要条件之一。


6.后记

我们大安全技术部搞的这个揭榜活动,过程中能感受到挑战与压力,但是目标感也非常足。这个揭榜过程是对技术和心态的综合锻炼:参与进去,勇于迈出第一步,经过上百次的尝试,数次徘徊在怀疑、失望与坚持之间,终于捅破了那几层窗户纸。


今天的这个文章是站在呈现结果的角度来描述过程,并没有完全覆盖过程中的各种曲折探索,对于其中的不尽之处,感兴趣的同学也欢迎随时与我交流探讨。






来源  |  阿里云开发者公众号

作者  | 泓诚



相关文章
|
9天前
|
调度 云计算 芯片
云超算技术跃进,阿里云牵头制定我国首个云超算国家标准
近日,由阿里云联合中国电子技术标准化研究院主导制定的首个云超算国家标准已完成报批,不久后将正式批准发布。标准规定了云超算服务涉及的云计算基础资源、资源管理、运行和调度等方面的技术要求,为云超算服务产品的设计、实现、应用和选型提供指导,为云超算在HPC应用和用户的大范围采用奠定了基础。
179604 21
|
1天前
|
弹性计算 人工智能 安全
对话 | ECS如何构筑企业上云的第一道安全防线
随着中小企业加速上云,数据泄露、网络攻击等安全威胁日益严重。阿里云推出深度访谈栏目,汇聚产品技术专家,探讨云上安全问题及应对策略。首期节目聚焦ECS安全性,提出三道防线:数据安全、网络安全和身份认证与权限管理,确保用户在云端的数据主权和业务稳定。此外,阿里云还推出了“ECS 99套餐”,以高性价比提供全面的安全保障,帮助中小企业安全上云。
对话 | ECS如何构筑企业上云的第一道安全防线
|
18天前
|
人工智能 自然语言处理 前端开发
从0开始打造一款APP:前端+搭建本机服务,定制暖冬卫衣先到先得
通义灵码携手科技博主@玺哥超carry 打造全网第一个完整的、面向普通人的自然语言编程教程。完全使用 AI,再配合简单易懂的方法,只要你会打字,就能真正做出一个完整的应用。
9518 25
|
4天前
|
机器学习/深度学习 分布式计算 供应链
阿里云先知安全沙龙(上海站) ——大模型基础设施安全攻防
大模型基础设施的安全攻防体系涵盖恶意输入防御和基础设施安全,包括框架、三方库、插件、平台、模型和系统安全。关键漏洞如CVE-2023-6019(Ray框架命令注入)、CVE-2024-5480(PyTorch分布式RPC)及llama.cpp中的多个漏洞,强调了代码安全性的重要性。模型文件安全方面,需防范pickle反序列化等风险,建议使用Safetensors格式。相关实践包括构建供应链漏洞库、智能化漏洞分析和深度检测,确保全方位防护。
|
6天前
|
JSON 分布式计算 数据处理
加速数据处理与AI开发的利器:阿里云MaxFrame实验评测
随着数据量的爆炸式增长,传统数据分析方法逐渐显现出局限性。Python作为数据科学领域的主流语言,因其简洁易用和丰富的库支持备受青睐。阿里云推出的MaxFrame是一个专为Python开发者设计的分布式计算框架,旨在充分利用MaxCompute的强大能力,提供高效、灵活且易于使用的工具,应对大规模数据处理需求。MaxFrame不仅继承了Pandas等流行数据处理库的友好接口,还通过集成先进的分布式计算技术,显著提升了数据处理的速度和效率。
|
22天前
|
Cloud Native Apache 流计算
资料合集|Flink Forward Asia 2024 上海站
Apache Flink 年度技术盛会聚焦“回顾过去,展望未来”,涵盖流式湖仓、流批一体、Data+AI 等八大核心议题,近百家厂商参与,深入探讨前沿技术发展。小松鼠为大家整理了 FFA 2024 演讲 PPT ,可在线阅读和下载。
5158 15
资料合集|Flink Forward Asia 2024 上海站
|
1月前
|
人工智能 自动驾驶 大数据
预告 | 阿里云邀您参加2024中国生成式AI大会上海站,马上报名
大会以“智能跃进 创造无限”为主题,设置主会场峰会、分会场研讨会及展览区,聚焦大模型、AI Infra等热点议题。阿里云智算集群产品解决方案负责人丛培岩将出席并发表《高性能智算集群设计思考与实践》主题演讲。观众报名现已开放。
|
14天前
|
Docker 容器
|
2天前
|
机器学习/深度学习 人工智能 安全
通义视觉推理大模型QVQ-72B-preview重磅上线
Qwen团队推出了新成员QVQ-72B-preview,这是一个专注于提升视觉推理能力的实验性研究模型。提升了视觉表示的效率和准确性。它在多模态评测集如MMMU、MathVista和MathVision上表现出色,尤其在数学推理任务中取得了显著进步。尽管如此,该模型仍存在一些局限性,仍在学习和完善中。
|
17天前
|
消息中间件 人工智能 运维
12月更文特别场——寻找用云高手,分享云&AI实践
我们寻找你,用云高手,欢迎分享你的真知灼见!
1323 76