一.引言
本地使用 spark paralize 数组 rdd 时需要构造一个随机数组,分别使用 java.util 和 scala.util 实现,下面记录下不同的 shuffle 方法以及踩到的坑。
编辑
二.java
1.API 错误版 ❌
java.util.collenctions 提供了 shuffle 的方法,支持将传入的 list 进行 shuffle:
编辑
直接初始化一个 0-10 的数组调用试一下:
val array = (0 to 10).toArray // java - 不生效 println("===================Java Shuffle V1=====================") val array2 = util.Arrays.asList(array) Collections.shuffle(array2) array2.toArray().foreach(println(_))
foreach 打印结果发现只返回一个 array 的 obejct 而不是 shuffle 后的数组,看来调用的有问题:
编辑
2.API 正确版 ✅
先看下源码为啥调用不对,源码要求传入的是 List[T],由于使用 scala 数组所以采用 util.Arrays.aslist 进行 java list 的转换:
编辑
经过该方法传入的 array 类型为 util.List[Array[Int]],所以 T 的类型是 Array[Int],相当于只传给 shuffle 方法一个元素,所以 shuffle 后只打印出 1 个 obejct 且不会乱序,因为相当于整个 Array[Int] 是一个元素,下面使用 : _* 方法进行修改:
println("===================Java Shuffle V2=====================") val list = java.util.Arrays.asList(array: _*) java.util.Collections.shuffle(list) list.toArray().foreach(println(_))
这里与上面的唯一区别是传入的参数 array -> array: _*:
编辑
这回没有问题了,再重新看下 asList 方法和 :_* :
编辑
asList 采用变长参数作为输入,java 的变长参数采用 ... ,_:* 方法会告诉函数将传入的 collection 的每个元素当做参数处理,从而将 Array[Int] 的每个 int 传给参数,最终构造好 List[Int] 并成功 shuffle,更详细的变长参数 *、:_* 方法可以参考:Scala 变长参数之*与:_*。
3.源码浅析
编辑
源码中具体的实现放在了 shuffle(list, rnd) 中,主方法 shuffle 主要负责初始化 Random 类并向下调用,下面看看源码:
A.shuffle 主函数
编辑
B.swap 辅助函数
编辑
主函数主要判断了原始数组的 size,具体执行逻辑都采用了 swap 函数,看了这个函数就豁然开朗了,shuffle 数组其实就是随机生成索引然后进行指针的替换,这个 swap 方法在很多基础的排序算法中都有用到。这个 SHUFFLE_THREAD 的大小为 5,如果数组长度超过这个限制就会进入 else 逻辑,else 逻辑采用了迭代器,可以有效减少内存的消耗,相关的内存实验可以参考:Scala - Iterator 与 Array 内存的思考。
C.代码实践
直接将上述两个方法拷贝到自己的编辑器,直接使用也可以实现 shuffle 的目的:
private static final int SHUFFLE_THRESHOLD = 5; private static void swap(Object[] arr, int i, int j) { Object tmp = arr[i]; arr[i] = arr[j]; arr[j] = tmp; } public static void shuffle(List<?> list, Random rnd) { int size = list.size(); if (size < SHUFFLE_THRESHOLD || list instanceof RandomAccess) { for (int i=size; i>1; i--) swap(list, i-1, rnd.nextInt(i)); } else { Object arr[] = list.toArray(); // Shuffle array for (int i=size; i>1; i--) swap(arr, i-1, rnd.nextInt(i)); // Dump array back into list // instead of using a raw type here, it's possible to capture // the wildcard but it will require a call to a supplementary // private method ListIterator it = list.listIterator(); for (int i=0; i<arr.length; i++) { it.next(); it.set(arr[i]); } } } public static void main(String args[]) { List<Integer> list = new ArrayList(); for (int i=0; i<=100; i++) { list.add(i); } Random random = new scala.util.Random(); shuffle(list, random); System.out.println(list); }
[9, 10, 6, 1, 5, 3, 7, 2, 0, 8, 4]
三.scala
1.API 正确版
scala 的 util 类提供了类似 java 的 shuffle 调用方法:
println("===================Scala Shuffle=====================") val shuffleArray = scala.util.Random.shuffle(array.toList) shuffleArray.foreach(println(_))
scala 的 api 没啥坑,唯一需要注意的就是需要把 array 转换为 list 传参:
编辑
2.仿java源码自定义版
上述 java 的源码实现也相对简洁,主要就是 random 实现随机索引 + swap 实现指针切换,下面用 scala 实现一个简单版本,为了适应不同的类型,这里采用了泛型类 T,泛型类相关知识可以参考:Scala Generic 泛型类详解 - T。
def swap[T](arr: Array[T], i: Int, j: Int): Unit = { val tmp = arr(i) arr(i) = arr(j) arr(j) = tmp } def shuffleSelf[T](array: Array[T]): Array[T] = { val random = new Random() array.indices.reverse.filter(_ > 1) foreach (index => { swap(array, index - 1, random.nextInt(index)) }) array } def main(args: Array[String]): Unit = { val array = (0 to 10).toArray // scala - 自定义 println("===================Scala Shuffle Self=====================") shuffleSelf(array).foreach(println(_)) }
shuffle 效果没有问题:
编辑
四.总结
上面总共使用了3中方法执行数组的 shuffle,下面总结一下几种方法的使用与效率,表格内容单位为 ms:
方法 \ 数组大小 - 耗时 | 4 | 10000 | 1000000 | 100000000 |
java.util.Collections.shuffle(array: _*) | 1 | 7 | 66 | 2552 |
scala.util.Random.shuffle(array.toList) | 3 | 61 | 163 | ...... |
shuffleSelf(array) | 11 | 27 | 307 | ...... |
后两个方法在数组足够大时运行缓慢,该用哪个方法 shuffle 不用我说了吧。😏