增强 Stream 接口的 distinct 方法的一些思考(下)

简介: 增强 Stream 接口的 distinct 方法的一些思考(下)

测试下:

public static void main(String[] args) {
    List<User> users = new ArrayList<>() {{
        add(new User(1, "test1"));
        add(new User(2, "test1"));
        add(new User(2, "test2"));
        add(new User(3, "test3"));
        add(new User(3, "test4"));
    }};
    List<User> collect1 = new EnhancedStream<>(users.stream()).distinct(
            User::getId,
            (u1, u2) -> u1.getId() == u2.getId(),
            (u1, u2) -> u1
    ).collect(Collectors.toList());
    List<User> collect2 = new EnhancedStream<>(users.stream()).distinct(
            user -> user.getName().hashCode(),
            (u1, u2) -> u1.getName().equalsIgnoreCase(u2.getName()),
            (u1, u2) -> u1
    ).collect(Collectors.toList());
}


通过动态代理


上面这种实现有很多冗余代码,可以考虑使用动态代理实现,首先编写代理接口类,通过EnhancedStream继承Stream接口,增加distinct接口,并让所有返回Stream的接口返回EnhancedStream,这样才能让返回有新的distinct接口可以使用。

public interface EnhancedStream<T> extends Stream<T> {
    EnhancedStream<T> distinct(ToIntFunction<T> hashCode,
                               BiPredicate<T, T> equals,
                               BinaryOperator<T> merger);
    @Override
    EnhancedStream<T> filter(Predicate<? super T> predicate);
    @Override
    <R> EnhancedStream<R> map(
            Function<? super T, ? extends R> mapper);
    @Override
    <R> EnhancedStream<R> flatMap(
            Function<? super T, ? extends Stream<? extends R>> mapper);
    @Override
    EnhancedStream<T> distinct();
    @Override
    EnhancedStream<T> sorted();
    @Override
    EnhancedStream<T> sorted(Comparator<? super T> comparator);
    @Override
    EnhancedStream<T> peek(Consumer<? super T> action);
    @Override
    EnhancedStream<T> limit(long maxSize);
    @Override
    EnhancedStream<T> skip(long n);
    @Override
    EnhancedStream<T> takeWhile(Predicate<? super T> predicate);
    @Override
    EnhancedStream<T> dropWhile(Predicate<? super T> predicate);
    @Override
    EnhancedStream<T> sequential();
    @Override
    EnhancedStream<T> parallel();
    @Override
    EnhancedStream<T> unordered();
    @Override
    EnhancedStream<T> onClose(Runnable closeHandler);
}


然后,编写代理类EnhancedStreamHandler实现方法代理:


public class EnhancedStreamHandler<T> implements InvocationHandler {
    private Stream<T> delegate;
    public EnhancedStreamHandler(Stream<T> delegate) {
        this.delegate = delegate;
    }
    private static final Method ENHANCED_DISTINCT;
    static {
        try {
            ENHANCED_DISTINCT = EnhancedStream.class.getMethod(
                    "distinct", ToIntFunction.class, BiPredicate.class,
                    BinaryOperator.class
            );
        } catch (NoSuchMethodException e) {
            throw new Error(e);
        }
    }
    /**
     * 将EnhancedStream的方法与Stream的方法一一对应
     */
    private static final Map<Method, Method> METHOD_MAP =
            Stream.of(EnhancedStream.class.getMethods())
                    .filter(m -> !m.equals(ENHANCED_DISTINCT))
                    .filter(m -> !Modifier.isStatic(m.getModifiers()))
                    .collect(Collectors.toUnmodifiableMap(
                            Function.identity(),
                            m -> {
                                try {
                                    return Stream.class.getMethod(
                                            m.getName(), m.getParameterTypes());
                                } catch (NoSuchMethodException e) {
                                    throw new Error(e);
                                }
                            }));
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        if (method.equals(ENHANCED_DISTINCT)) {
            //调用方法为扩展方法distinct
            return distinct(
                    (EnhancedStream<T>) proxy,
                    (ToIntFunction<T>) args[0],
                    (BiPredicate<T, T>) args[1],
                    (BinaryOperator<T>) args[2]);
        } else if (method.getReturnType() == EnhancedStream.class) {
            //对于返回类型为EnhancedStream的,证明是代理的方法调用,走代理
            Method match = METHOD_MAP.get(method);
            //更相信代理对象为新的Stream
            this.delegate = (Stream) match.invoke(this.delegate, args);
            return proxy;
        } else {
            //否则,直接用代理类调用
            return method.invoke(this.delegate, args);
        }
    }
    private static final class Key<E> {
        private final E e;
        private final ToIntFunction<E> hashCode;
        private final BiPredicate<E, E> equals;
        public Key(E e, ToIntFunction<E> hashCode,
                   BiPredicate<E, E> equals) {
            this.e = e;
            this.hashCode = hashCode;
            this.equals = equals;
        }
        @Override
        public int hashCode() {
            return hashCode.applyAsInt(e);
        }
        @Override
        public boolean equals(Object obj) {
            if (!(obj instanceof Key)) {
                return false;
            }
            @SuppressWarnings("unchecked")
            Key<E> that = (Key<E>) obj;
            return equals.test(this.e, that.e);
        }
    }
    private EnhancedStream<T> distinct(EnhancedStream<T> proxy,
                                       ToIntFunction<T> hashCode,
                                       BiPredicate<T, T> equals,
                                       BinaryOperator<T> merger) {
        delegate = delegate.collect(Collectors.toMap(
                t -> new Key<>(t, hashCode, equals),
                Function.identity(),
                merger,
                //使用LinkedHashMap,保持入参原始顺序
                LinkedHashMap::new))
                .values()
                .stream();
        return proxy;
    }
}


最后编写工厂类,生成EnhancedStream代理类:


public class EnhancedStreamFactory {
    public static <E> EnhancedStream<E> newEnhancedStream(Stream<E> stream) {
        return (EnhancedStream<E>) Proxy.newProxyInstance(
                //必须用EnhancedStream的classLoader,不能用Stream的,因为Stream是jdk的类,ClassLoader是rootClassLoader
                EnhancedStream.class.getClassLoader(),
                //代理接口
                new Class<?>[] {EnhancedStream.class},
                //代理类
                new EnhancedStreamHandler<>(stream)
        );
    }
}


这样,代码看上去更优雅了,就算 JDK 以后扩展更多方法,这里也可不用修改

相关文章
|
6月前
|
SQL 分布式计算 大数据
`collect_set`函数用于将一组数据收集到一个集合中
`collect_set`函数用于将一组数据收集到一个集合中
85 1
|
9月前
|
存储
Stream流中各阶段方法说明及组合示例
Stream流中各阶段方法说明及组合示例
74 1
|
4月前
|
Java
JDK8 Stream之如何给List属性去重?
JDK8 Stream之如何给List属性去重?
54 0
|
11月前
|
存储
Stream流示例、常见生成方式及Stream中间操作方法
Stream流示例、常见生成方式及Stream中间操作方法
95 0
Java8 新特性 list集合利用stream根据对象得某一属性对集合进行分组,附加8之前分组
Java8 新特性 list集合利用stream根据对象得某一属性对集合进行分组,附加8之前分组
565 4
|
Java
List Stream 的常规用法
List Stream 的常规用法
96 0
|
IDE 编译器 API
stream的实用方法和注意事项
相信大家一定都在项目开发中享受过stream带来的便利性和优雅的代码风格。接下来补充几个项目中不常见到但是同样实用的api,同时跟大家一起探讨stream这把双刃剑的另一面。
180 0
stream的实用方法和注意事项
增强 Stream 接口的 distinct 方法的一些思考(上)
增强 Stream 接口的 distinct 方法的一些思考(上)
|
机器学习/深度学习 算法
通用实现去重的两种方式
通用实现去重的两种方式
|
存储 并行计算 算法
【小家java】java8新特性之---Stream API 详解 (Map-reduce、Collectors收集器、并行流、groupby多字段分组)(中)
【小家java】java8新特性之---Stream API 详解 (Map-reduce、Collectors收集器、并行流、groupby多字段分组)(中)
【小家java】java8新特性之---Stream API 详解 (Map-reduce、Collectors收集器、并行流、groupby多字段分组)(中)