测试下:
public static void main(String[] args) { List<User> users = new ArrayList<>() {{ add(new User(1, "test1")); add(new User(2, "test1")); add(new User(2, "test2")); add(new User(3, "test3")); add(new User(3, "test4")); }}; List<User> collect1 = new EnhancedStream<>(users.stream()).distinct( User::getId, (u1, u2) -> u1.getId() == u2.getId(), (u1, u2) -> u1 ).collect(Collectors.toList()); List<User> collect2 = new EnhancedStream<>(users.stream()).distinct( user -> user.getName().hashCode(), (u1, u2) -> u1.getName().equalsIgnoreCase(u2.getName()), (u1, u2) -> u1 ).collect(Collectors.toList()); }
通过动态代理
上面这种实现有很多冗余代码,可以考虑使用动态代理实现,首先编写代理接口类,通过EnhancedStream
继承Stream
接口,增加distinct
接口,并让所有返回Stream
的接口返回EnhancedStream
,这样才能让返回有新的distinct
接口可以使用。
public interface EnhancedStream<T> extends Stream<T> { EnhancedStream<T> distinct(ToIntFunction<T> hashCode, BiPredicate<T, T> equals, BinaryOperator<T> merger); @Override EnhancedStream<T> filter(Predicate<? super T> predicate); @Override <R> EnhancedStream<R> map( Function<? super T, ? extends R> mapper); @Override <R> EnhancedStream<R> flatMap( Function<? super T, ? extends Stream<? extends R>> mapper); @Override EnhancedStream<T> distinct(); @Override EnhancedStream<T> sorted(); @Override EnhancedStream<T> sorted(Comparator<? super T> comparator); @Override EnhancedStream<T> peek(Consumer<? super T> action); @Override EnhancedStream<T> limit(long maxSize); @Override EnhancedStream<T> skip(long n); @Override EnhancedStream<T> takeWhile(Predicate<? super T> predicate); @Override EnhancedStream<T> dropWhile(Predicate<? super T> predicate); @Override EnhancedStream<T> sequential(); @Override EnhancedStream<T> parallel(); @Override EnhancedStream<T> unordered(); @Override EnhancedStream<T> onClose(Runnable closeHandler); }
然后,编写代理类EnhancedStreamHandler
实现方法代理:
public class EnhancedStreamHandler<T> implements InvocationHandler { private Stream<T> delegate; public EnhancedStreamHandler(Stream<T> delegate) { this.delegate = delegate; } private static final Method ENHANCED_DISTINCT; static { try { ENHANCED_DISTINCT = EnhancedStream.class.getMethod( "distinct", ToIntFunction.class, BiPredicate.class, BinaryOperator.class ); } catch (NoSuchMethodException e) { throw new Error(e); } } /** * 将EnhancedStream的方法与Stream的方法一一对应 */ private static final Map<Method, Method> METHOD_MAP = Stream.of(EnhancedStream.class.getMethods()) .filter(m -> !m.equals(ENHANCED_DISTINCT)) .filter(m -> !Modifier.isStatic(m.getModifiers())) .collect(Collectors.toUnmodifiableMap( Function.identity(), m -> { try { return Stream.class.getMethod( m.getName(), m.getParameterTypes()); } catch (NoSuchMethodException e) { throw new Error(e); } })); @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if (method.equals(ENHANCED_DISTINCT)) { //调用方法为扩展方法distinct return distinct( (EnhancedStream<T>) proxy, (ToIntFunction<T>) args[0], (BiPredicate<T, T>) args[1], (BinaryOperator<T>) args[2]); } else if (method.getReturnType() == EnhancedStream.class) { //对于返回类型为EnhancedStream的,证明是代理的方法调用,走代理 Method match = METHOD_MAP.get(method); //更相信代理对象为新的Stream this.delegate = (Stream) match.invoke(this.delegate, args); return proxy; } else { //否则,直接用代理类调用 return method.invoke(this.delegate, args); } } private static final class Key<E> { private final E e; private final ToIntFunction<E> hashCode; private final BiPredicate<E, E> equals; public Key(E e, ToIntFunction<E> hashCode, BiPredicate<E, E> equals) { this.e = e; this.hashCode = hashCode; this.equals = equals; } @Override public int hashCode() { return hashCode.applyAsInt(e); } @Override public boolean equals(Object obj) { if (!(obj instanceof Key)) { return false; } @SuppressWarnings("unchecked") Key<E> that = (Key<E>) obj; return equals.test(this.e, that.e); } } private EnhancedStream<T> distinct(EnhancedStream<T> proxy, ToIntFunction<T> hashCode, BiPredicate<T, T> equals, BinaryOperator<T> merger) { delegate = delegate.collect(Collectors.toMap( t -> new Key<>(t, hashCode, equals), Function.identity(), merger, //使用LinkedHashMap,保持入参原始顺序 LinkedHashMap::new)) .values() .stream(); return proxy; } }
最后编写工厂类,生成EnhancedStream
代理类:
public class EnhancedStreamFactory { public static <E> EnhancedStream<E> newEnhancedStream(Stream<E> stream) { return (EnhancedStream<E>) Proxy.newProxyInstance( //必须用EnhancedStream的classLoader,不能用Stream的,因为Stream是jdk的类,ClassLoader是rootClassLoader EnhancedStream.class.getClassLoader(), //代理接口 new Class<?>[] {EnhancedStream.class}, //代理类 new EnhancedStreamHandler<>(stream) ); } }
这样,代码看上去更优雅了,就算 JDK 以后扩展更多方法,这里也可不用修改