• 关于

    array_keys函数

    的搜索结果

回答

参考:在 php 中使用 json_encode() 内置函数(php > 5.2)可以使用得 php 中数据可以与其它语言很好的传递并且使用它。这个函数的功能是将数值转换成json数据存储格式。 <?php $arr = array ( 'Name'=>'你妹', 'Age'=>22 ); $jsonencode = json_encode($arr); echo $jsonencode; ?> 程序运行结果如下: {"Name":null,"Age":20}json_encode 函数中中文被编码成 null 了,Google 了一下,很简单,为了与前端紧密结合,Json 只支持 utf-8 编码,我认为是前端的 JavaScript 也是 utf-8 的原因。 <?php $array = array ( 'name'=>iconv('GB18030','utf-8','你妹'), 'sex'=>'men' ); echo json_encode($array); ?> 这个程序的运行结果为: {"title":"\u6d63\u72b2\ue785","body":"men"} iconv 转换格式时用GBK,gb2312 报错问题,是因为GB2312只收录6763个汉字,出现未收录的汉字时,iconv报错。 GBK向下完全兼容GB2312-80,支持更多的字,但仍然会出现错误,解决办法是用GB18030,GB18030基本兼容GBK,又扩充了百余字体。数组中所有中文在json_encode之后都不见了或者出现u6d63等。解决方法是用urlencode()函数处理以下,在json_encode之前,把所有数组内所有内容都用urlencode()处理一下,然用json_encode()转换成json字符串,最后再用urldecode()将编码过的中文转回来。 <?php /************************************************************** * * 使用特定function对数组中所有元素做处理 * @param string &$array 要处理的字符串 * @param string $function 要执行的函数 * @return boolean $apply_to_keys_also 是否也应用到key上 * @access public * *************************************************************/ function arrayRecursive(&$array, $function, $apply_to_keys_also = false) { static $recursive_counter = 0; if (++$recursive_counter > 1000) { die('possible deep recursion attack'); } foreach ($array as $key => $value) { if (is_array($value)) { arrayRecursive($array[$key], $function, $apply_to_keys_also); } else { $array[$key] = $function($value); } if ($apply_to_keys_also && is_string($key)) { $new_key = $function($key); if ($new_key != $key) { $array[$new_key] = $array[$key]; unset($array[$key]); } } } $recursive_counter--; } /************************************************************** * 将数组转换为JSON字符串(兼容中文) * @param array $array 要转换的数组 * @return string 转换得到的json字符串 * @access public * *************************************************************/ function JSON($array) { arrayRecursive($array, 'urlencode', true); $json = json_encode($array); return urldecode($json); } $array = array ( 'Name'=>'你妹', 'Age'=>22 ); echo JSON($array); ?> 运行结果如下: {"Name":"你妹","Age":"22"}

小旋风柴进 2019-12-02 02:03:09 0 浏览量 回答数 0

回答

由于似乎没有人做过此事,因此我认为最好在某个地方提供参考。我已经通过基准测试或代码掠过来表征这些array_*功能。我试图将更有趣的Big-O放在顶部。此列表不完整。 注意:假设是哈希查找,所有计算出来的Big-O都是O(1),即使它实际上是O(n)。n的系数是如此之低,在大Big-O的特征开始生效之前,存储足够大的数组的内存开销会伤害您。例如,array_key_exists在N = 1和N = 1,000,000时的通话时间差为〜50%。 有趣的地方: isset/ array_key_exists比in_array和快得多array_search +(联盟)比array_merge(看起来更好)快一点。但是它的工作方式有所不同,因此请记住这一点。 shuffle 在与Big-O相同的层上 array_rand array_pop/ array_push比重新索引罚款更快array_shift/array_unshift 查询: array_key_existsO(n)但实际上接近O(1)-这是由于碰撞中的线性轮询,但是由于碰撞的机会非常小,因此系数也非常小。我发现您将哈希查找视为O(1)来给出更现实的big-O。例如,N = 1000和N = 100000之间的差异仅减慢了50%。 isset( $array[$index] )O(n)但实际上接近O(1)-它使用与array_key_exists相同的查找。由于是语言构造,因此如果密钥是硬编码的,将缓存查找,从而在重复使用同一密钥的情况下加快了查找速度。 in_array O(n)-这是因为它将对数组进行线性搜索,直到找到该值为止。 array_search O(n)-它使用与in_array相同的核心功能,但返回值。 队列功能: array_push O(∑ var_i,对于所有i) array_pop O(1) array_shift O(n)-必须重新索引所有键 array_unshift O(n + ∑ var_i,对于所有i)-必须重新索引所有键 数组相交,并集,减法: array_intersect_key 如果交集100%进行O(Max(param_i_size)* ∑param_i_count,对于所有i),如果交集0%相交O(∑param_i_size,对于所有i) array_intersect 如果交集100%对所有i执行O(n ^ 2 * ∑param_i_count,对于所有i),如果交集0%与O(n ^ 2)相交 array_intersect_assoc 如果交集100%进行O(Max(param_i_size)* ∑param_i_count,对于所有i),如果交集0%相交O(∑param_i_size,对于所有i) array_diff O(πparam_i_size,for all i)-那是所有param_sizes的乘积 array_diff_key O(∑ param_i_size,for i!= 1)-这是因为我们不需要遍历第一个数组。 array_merge O(∑ array_i,i!= 1)-不需要遍历第一个数组 (联合)O(n),其中n是第二个数组的大小(即array_first + array_second)-比array_merge少的开销,因为它不必重新编号 array_replace O(∑ array_i,对于所有i) 随机: shuffle 上) array_rand O(n)-需要线性轮询。 明显的Big-O: array_fill 上) array_fill_keys 上) range 上) array_splice O(偏移量+长度) array_slice O(偏移量+长度)或O(n)如果长度= NULL array_keys 上) array_values 上) array_reverse 上) array_pad O(pad_size) array_flip 上) array_sum 上) array_product 上) array_reduce 上) array_filter 上) array_map 上) array_chunk 上) array_combine 上) 我要感谢Eureqa使得找到函数的Big-O很容易。这是一个了不起的免费程序,可以为任意数据找到最佳拟合函数。 编辑: 对于那些怀疑PHP数组查找是的人O(N),我编写了一个基准测试(O(1)对于大多数实际值它们仍然有效)。 php数组查找图 $tests = 1000000; $max = 5000001; for( $i = 1; $i <= $max; $i += 10000 ) { //create lookup array $array = array_fill( 0, $i, NULL ); //build test indexes $test_indexes = array(); for( $j = 0; $j < $tests; $j++ ) { $test_indexes[] = rand( 0, $i-1 ); } //benchmark array lookups $start = microtime( TRUE ); foreach( $test_indexes as $test_index ) { $value = $array[ $test_index ]; unset( $value ); } $stop = microtime( TRUE ); unset( $array, $test_indexes, $test_index ); printf( "%d,%1.15f\n", $i, $stop - $start ); //time per 1mil lookups unset( $stop, $start ); } 问题来源于stack overflow

保持可爱mmm 2020-01-15 16:54:32 0 浏览量 回答数 0

问题

附加图像时 Text View 的 Resource Id 出错

蛮大人123 2019-12-01 20:02:28 866 浏览量 回答数 1

阿里云高校特惠,助力学生创业梦!0元体验,快速入门云计算!

学生动手场景应用,快速了解并掌握云服务器的各种新奇玩法!

回答

$GLOBALS ? ######回复 @程子帅 : 其实我没看懂你到底想要什么效果-_-!######是把配置文件写成$GLOBALS还是在类文件中使用 $GLOBALS('函数')?######写一个读取配置的公共函数######对的,我也是这么想的,但是不知道从哪下手?大神有没有这方面资料?###### 之前写的一个 /** * 获取和设置配置参数 支持批量定义 * 如果$key是关联型数组,则会按K-V的形式写入配置 * 如果$key是数字索引数组,则返回对应的配置数组 * @param string|array $name 配置变量 * @param mixed $value 配置值 * @return mixed */ function C($key,$value=null){ static $_config = array(); $args = func_num_args(); if($args == 1){ if(is_string($key)){ //如果传入的key是字符串 return isset($_config[$key])?$_config[$key]:null; } if(is_array($key)){ if(array_keys($key) !== range(0, count($key) - 1)){ //如果传入的key是关联数组 $_config = array_merge($_config, $key); }else{ $ret = array(); foreach ($key as $k) { $ret[$k] = isset($_config[$k])?$_config[$k]:null; } return $ret; } } }else{ $_config[$key] = $value; } return null; } 1: 常量 2: $_SERVER['xxx'] = xxxx; 这样定义就是超全局. ######  就用.ini之类的配置吧。。用 parse_ini_file 函数来解析。。 ######global######用一个类来操作配置文件。可以封装各种格式的配置文件。还可以控制格式和边界检查。######能不能简述一下这个类的流程,我实在是没有思路

kun坤 2020-05-27 14:53:45 0 浏览量 回答数 0

回答

$GLOBALS ? ######回复 @程子帅 : 其实我没看懂你到底想要什么效果-_-!######是把配置文件写成$GLOBALS还是在类文件中使用 $GLOBALS('函数')?######写一个读取配置的公共函数######对的,我也是这么想的,但是不知道从哪下手?大神有没有这方面资料?###### 之前写的一个 /** * 获取和设置配置参数 支持批量定义 * 如果$key是关联型数组,则会按K-V的形式写入配置 * 如果$key是数字索引数组,则返回对应的配置数组 * @param string|array $name 配置变量 * @param mixed $value 配置值 * @return mixed */ function C($key,$value=null){ static $_config = array(); $args = func_num_args(); if($args == 1){ if(is_string($key)){ //如果传入的key是字符串 return isset($_config[$key])?$_config[$key]:null; } if(is_array($key)){ if(array_keys($key) !== range(0, count($key) - 1)){ //如果传入的key是关联数组 $_config = array_merge($_config, $key); }else{ $ret = array(); foreach ($key as $k) { $ret[$k] = isset($_config[$k])?$_config[$k]:null; } return $ret; } } }else{ $_config[$key] = $value; } return null; } ###### 1: 常量 2: $_SERVER['xxx'] = xxxx; 这样定义就是超全局. ######  就用.ini之类的配置吧。。用 parse_ini_file 函数来解析。。 ######global######用一个类来操作配置文件。可以封装各种格式的配置文件。还可以控制格式和边界检查。######能不能简述一下这个类的流程,我实在是没有思路

montos 2020-05-30 23:40:10 0 浏览量 回答数 0

回答

$GLOBALS ? ######回复 @程子帅 : 其实我没看懂你到底想要什么效果-_-!######是把配置文件写成$GLOBALS还是在类文件中使用 $GLOBALS('函数')?######写一个读取配置的公共函数######对的,我也是这么想的,但是不知道从哪下手?大神有没有这方面资料?###### 之前写的一个 /** * 获取和设置配置参数 支持批量定义 * 如果$key是关联型数组,则会按K-V的形式写入配置 * 如果$key是数字索引数组,则返回对应的配置数组 * @param string|array $name 配置变量 * @param mixed $value 配置值 * @return mixed */ function C($key,$value=null){ static $_config = array(); $args = func_num_args(); if($args == 1){ if(is_string($key)){ //如果传入的key是字符串 return isset($_config[$key])?$_config[$key]:null; } if(is_array($key)){ if(array_keys($key) !== range(0, count($key) - 1)){ //如果传入的key是关联数组 $_config = array_merge($_config, $key); }else{ $ret = array(); foreach ($key as $k) { $ret[$k] = isset($_config[$k])?$_config[$k]:null; } return $ret; } } }else{ $_config[$key] = $value; } return null; } ###### 1: 常量 2: $_SERVER['xxx'] = xxxx; 这样定义就是超全局. ######  就用.ini之类的配置吧。。用 parse_ini_file 函数来解析。。 ######global######用一个类来操作配置文件。可以封装各种格式的配置文件。还可以控制格式和边界检查。######能不能简述一下这个类的流程,我实在是没有思路

kun坤 2020-06-06 18:55:43 0 浏览量 回答数 0

回答

有多种删除数组元素的方法,其中某些方法对某些特定任务比其他任务更有用。 删除一个数组元素 如果只想删除一个数组元素,则可以使用unset()或\array_splice()。 另外,如果您具有值并且不知道要删除元素的键,则可以使用\array_search()该键来获取键。 unset() 请注意,使用unset()数组键时不会更改/重新索引。如果要重新索引键,则可以使用它,\array_values()之后unset()将所有键转换为从0开始的数字枚举键。 码 "a", 1 => "b", 2 => "c"]; unset($array[1]); //↑ Key which you want to delete ?> 输出量 [ [0] => a [2] => c ] \array_splice() 方法 如果您使用\array_splice()这些键,则会自动为索引重新编制索引,但是关联键不会更改,相反,关联键会将\array_values()所有键转换为数字键。 还\array_splice()需要偏移量,不是关键!作为第二个参数。 码 "a", 1 => "b", 2 => "c"]; \array_splice($array, 1, 1); //↑ Offset which you want to delete ?> 输出量 [ [0] => a [1] => c ] array_splice()就像unset()通过引用获取数组一样,这意味着您不想将这些函数的返回值分配回数组。 删除多个数组元素 如果你想删除多个数组元素,不想打电话unset()或\array_splice()多次,你可以使用的功能\array_diff()或\array_diff_key()取决于如果你知道的值或要删除的元素的键。 \array_diff() 方法 如果知道要删除的数组元素的值,则可以使用\array_diff()。和以前一样,unset()它不会更改/重新索引数组的键。 码 "a", 1 => "b", 2 => "c"]; $array = \array_diff($array, ["a", "c"]); //└────────┘→ Array values which you want to delete ?> 输出量 [ [1] => b ] \array_diff_key() 方法 如果知道要删除的元素的键,则要使用\array_diff_key()。在这里,您必须确保将键作为第二个参数中的键而不是值来传递。否则,您必须使用翻转数组\array_flip()。而且这里的键不会更改/重新索引。 码 "a", 1 => "b", 2 => "c"]; $array = \array_diff_key($array, [0 => "xy", "2" => "xy"]); //↑ ↑ Array keys which you want to delete ?> 输出量 [ [1] => b ] 同样,如果您要使用unset()或\array_splice()删除具有相同值的多个元素,则可以使用\array_keys()获取特定值的所有键,然后删除所有元素。 问题来源于stack overflow

保持可爱mmm 2020-01-08 15:32:10 0 浏览量 回答数 0

回答

$GLOBALS ? ######回复 <a href=""http://my.oschina.net/u/926634"" class=""referer"" target=""blank"">@程子帅 : 其实我没看懂你到底想要什么效果--!######是把配置文件写成$GLOBALS还是在类文件中使用 $GLOBALS('函数')?######写一个读取配置的公共函数######对的,我也是这么想的,但是不知道从哪下手?大神有没有这方面资料?###### 之前写的一个 /** * 获取和设置配置参数 支持批量定义 * 如果$key是关联型数组,则会按K-V的形式写入配置 * 如果$key是数字索引数组,则返回对应的配置数组 * @param string|array $name 配置变量 * @param mixed $value 配置值 * @return mixed */ function C($key,$value=null){ static $_config = array(); $args = func_num_args(); if($args == 1){ if(is_string($key)){ //如果传入的key是字符串 return isset($_config[$key])?$_config[$key]:null; } if(is_array($key)){ if(array_keys($key) !== range(0, count($key) - 1)){ //如果传入的key是关联数组 $_config = array_merge($_config, $key); }else{ $ret = array(); foreach ($key as $k) { $ret[$k] = isset($_config[$k])?$_config[$k]:null; } return $ret; } } }else{ $_config[$key] = $value; } return null; } ###### 1: 常量 2: $_SERVER['xxx'] = xxxx; 这样定义就是超全局. ######  就用.ini之类的配置吧。。用 parse_ini_file 函数来解析。。 ######global######用一个类来操作配置文件。可以封装各种格式的配置文件。还可以控制格式和边界检查。######能不能简述一下这个类的流程,我实在是没有思路

一枚小鲜肉帅哥 2020-05-28 10:03:29 0 浏览量 回答数 0

问题

MAP函数是什么?

nicenelly 2019-12-01 21:26:42 1293 浏览量 回答数 0

回答

没有好的方法可以将数组存储到单个字段中。 您需要检查您的关系数据并对模式进行适当的更改。请参阅下面的示例以获取对此方法的参考。 如果必须将数组保存到单个字段中,则serialize()and unserialize()函数可以解决问题。但是您不能对实际内容执行查询。 作为序列化功能的替代方法,还有 json_encode()和json_decode()。 考虑以下数组 $a = array( 1 => array( 'a' => 1, 'b' => 2, 'c' => 3 ), 2 => array( 'a' => 1, 'b' => 2, 'c' => 3 ), ); 要将其保存在数据库中,您需要创建一个这样的表 $c = mysql_connect($server, $username, $password); mysql_select_db('test'); $r = mysql_query( 'DROP TABLE IF EXISTS test'); $r = mysql_query( 'CREATE TABLE test ( id INTEGER UNSIGNED NOT NULL, a INTEGER UNSIGNED NOT NULL, b INTEGER UNSIGNED NOT NULL, c INTEGER UNSIGNED NOT NULL, PRIMARY KEY (id) )'); 要使用记录,您可以执行诸如此类的查询(是的,请注意,请注意!) function getTest() { $ret = array(); $c = connect(); $query = 'SELECT * FROM test'; $r = mysql_query($query,$c); while ($o = mysql_fetch_array($r,MYSQL_ASSOC)) { $ret[array_shift($o)] = $o; } mysql_close($c); return $ret; } function putTest($t) { $c = connect(); foreach ($t as $k => $v) { $query = "INSERT INTO test (id,". implode(',',array_keys($v)). ") VALUES ($k,". implode(',',$v). ")"; $r = mysql_query($query,$c); } mysql_close($c); } putTest($a); $b = getTest(); 该connect()函数返回一个mysql连接资源 function connect() { $c = mysql_connect($server, $username, $password); mysql_select_db('test'); return $c; }

保持可爱mmm 2020-02-08 14:17:57 0 浏览量 回答数 0

回答

没有好的方法可以将数组存储到单个字段中。 您需要检查您的关系数据并对模式进行适当的更改。请参阅下面的示例以获取对此方法的参考。 如果必须将数组保存到单个字段中,则serialize()and unserialize()函数可以解决问题。但是您不能对实际内容执行查询。 作为序列化功能的替代方法,还有 json_encode()和json_decode()。 考虑以下数组 $a = array( 1 => array( 'a' => 1, 'b' => 2, 'c' => 3 ), 2 => array( 'a' => 1, 'b' => 2, 'c' => 3 ), ); 要将其保存在数据库中,您需要创建一个这样的表 $c = mysql_connect($server, $username, $password); mysql_select_db('test'); $r = mysql_query( 'DROP TABLE IF EXISTS test'); $r = mysql_query( 'CREATE TABLE test ( id INTEGER UNSIGNED NOT NULL, a INTEGER UNSIGNED NOT NULL, b INTEGER UNSIGNED NOT NULL, c INTEGER UNSIGNED NOT NULL, PRIMARY KEY (id) )'); 要使用记录,您可以执行诸如此类的查询(是的,请注意,请注意!) function getTest() { $ret = array(); $c = connect(); $query = 'SELECT * FROM test'; $r = mysql_query($query,$c); while ($o = mysql_fetch_array($r,MYSQL_ASSOC)) { $ret[array_shift($o)] = $o; } mysql_close($c); return $ret; } function putTest($t) { $c = connect(); foreach ($t as $k => $v) { $query = "INSERT INTO test (id,". implode(',',array_keys($v)). ") VALUES ($k,". implode(',',$v). ")"; $r = mysql_query($query,$c); } mysql_close($c); } putTest($a); $b = getTest(); 该connect()函数返回一个mysql连接资源 function connect() { $c = mysql_connect($server, $username, $password); mysql_select_db('test'); return $c; } 来源:stack overflow

保持可爱mmm 2020-05-13 14:17:03 0 浏览量 回答数 0

问题

javascript 如何在回调函数内,修改外部变量?

爵霸 2019-12-01 20:11:14 1401 浏览量 回答数 1

问题

Python:在Postgres中插入大型dataframe (1.2M行)的问题

kun坤 2019-12-30 09:34:45 0 浏览量 回答数 0

回答

号码 [...Array(5).keys()]; => [0, 1, 2, 3, 4] 角色迭代 String.fromCharCode(...[...Array('D'.charCodeAt(0) - 'A'.charCodeAt(0) + 1).keys()].map(i => i + 'A'.charCodeAt(0))); => "ABCD" 迭代 for (const x of Array(5).keys()) { console.log(x, String.fromCharCode('A'.charCodeAt(0) + x)); } => 0,"A" 1,"B" 2,"C" 3,"D" 4,"E" 作为功​​能 function range(size, startAt = 0) { return [...Array(size).keys()].map(i => i + startAt); } function characterRange(startChar, endChar) { return String.fromCharCode(...range(endChar.charCodeAt(0) - startChar.charCodeAt(0), startChar.charCodeAt(0))) } 作为键入函数 function range(size:number, startAt:number = 0):ReadonlyArray { return [...Array(size).keys()].map(i => i + startAt); } function characterRange(startChar:string, endChar:string):ReadonlyArray { return String.fromCharCode(...range(endChar.charCodeAt(0) - startChar.charCodeAt(0), startChar.charCodeAt(0))) } lodash.js _.range()函数 _.range(10); => [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] _.range(1, 11); => [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] _.range(0, 30, 5); => [0, 5, 10, 15, 20, 25] .range(0, -10, -1); => [0, -1, -2, -3, -4, -5, -6, -7, -8, -9] String.fromCharCode(....range('A'.charCodeAt(0), 'D'.charCodeAt(0) + 1)); => "ABCD" 没有库的旧的非es6浏览器: Array.apply(null, Array(5)).map(function (_, i) {return i;}); => [0, 1, 2, 3, 4] console.log([...Array(5).keys()]); 问题来源于stack overflow

保持可爱mmm 2020-01-13 17:25:33 0 浏览量 回答数 0

问题

oss的教程和api做得都不是很好

plbeast 2019-12-01 21:21:40 8229 浏览量 回答数 3

问题

如何插入两个表;一个表将插入1行,另一表将插入多行,两个表的一列具有相同的值

保持可爱mmm 2019-12-01 21:57:51 4 浏览量 回答数 1

问题

网页中一段php报错,不知道原因求解答?报错

爱吃鱼的程序员 2020-06-20 16:44:11 0 浏览量 回答数 1

问题

JSON 数据类型操作

云栖大讲堂 2019-12-01 21:28:15 1749 浏览量 回答数 0

问题

关于php验证的函数问题:报错 

kun坤 2020-06-04 21:13:11 2 浏览量 回答数 1

问题

关于php验证的函数问题,报错

一枚小鲜肉帅哥 2020-05-28 13:19:04 7 浏览量 回答数 1

问题

在Twig中,检查数组的特定键是否存在

保持可爱mmm 2020-02-09 11:24:04 0 浏览量 回答数 1

问题

关于php验证的函数问题 - php报错

montos 2020-06-04 16:57:28 4 浏览量 回答数 1

回答

Spark 源码分析之ShuffleMapTask内存数据Spill和合并(文档详解):https://github.com/opensourceteams/spark-scala-maven/blob/master/md/ShuffleMapTaskSpillDiskFile.md Spark 源码分析之ShuffleMapTask内存数据Spill和合并更多资源分享SPARK 源码分析技术分享(视频汇总套装视频): https://www.bilibili.com/video/av37442139/github: https://github.com/opensourceteams/spark-scala-mavencsdn(汇总视频在线看): https://blog.csdn.net/thinktothings/article/details/84726769前置条件Hadoop版本: Hadoop 2.6.0-cdh5.15.0Spark版本: SPARK 1.6.0-cdh5.15.0JDK.1.8.0_191scala2.10.7技能标签Spark ShuffleMapTask 内存中的数据Spill到临时文件临时文件中的数据是如何定入的,如何按partition升序排序,再按Key升序排序写入(key,value)数据每个临时文件,都存入对应的每个分区有多少个(key,value)对,有多少次流提交数组,数组中保留每次流的大小如何把临时文件合成一个文件如何把内存中的数据和临时文件,进行分区,按key,排序后,再写入合并文件中内存中数据Spill到磁盘ShuffleMapTask进行当前分区的数据读取(此时读的是HDFS的当前分区,注意还有一个reduce分区,也就是ShuffleMapTask输出文件是已经按Reduce分区处理好的)SparkEnv指定默认的SortShuffleManager,getWriter()中匹配BaseShuffleHandle对象,返回SortShuffleWriter对象SortShuffleWriter,用的是ExternalSorter(外部排序对象进行排序处理),会把rdd.iterator(partition, context)的数据通过iterator插入到ExternalSorter中PartitionedAppendOnlyMap对象中做为内存中的map对象数据,每插入一条(key,value)的数据后,会对当前的内存中的集合进行判断,如果满足溢出文件的条件,就会把内存中的数据写入到SpillFile文件中满中溢出文件的条件是,每插入32条数据,并且,当前集合中的数据估值大于等于5m时,进行一次判断,会通过算法验证对内存的影响,确定是否可以溢出内存中的数据到文件,如果满足就把当前内存中的所有数据写到磁盘spillFile文件中SpillFile调用org.apache.spark.util.collection.ExternalSorter.SpillableIterator.spill()方法处理WritablePartitionedIterator迭代对象对内存中的数据进行迭代,DiskBlockObjectWriter对象写入磁盘,写入的数据格式为(key,value),不带partition的ExternalSorter.spillMemoryIteratorToDisk()这个方法将内存数据迭代对象WritablePartitionedIterator写入到一个临时文件,SpillFile临时文件用DiskBlockObjectWriter对象来写入数据临时文件的格式temp_local_+UUID遍历内存中的数据写入到临时文件,会记录每个临时文件中每个分区的(key,value)各有多少个,elementsPerPartition(partitionId) += 1 如果说数据很大的话,会每默认每10000条数据进行Flush()一次数据到文件中,会记录每一次Flush的数据大小batchSizes入到ArrayBuffer中保存并且在数据写入前,会进行排序,先按key的hash分区,先按partition的升序排序,再按key的升序排序,这样来写入文件中,以保证读取临时文件时可以分隔开每个临时文件的每个分区的数据,对于一个临时文件中一个分区的数据量比较大的话,会按流一批10000个(key,value)进行读取,读取的大小讯出在batchSizes数据中,就样读取的时候就非常方便了内存数据Spill和合并把数据insertAll()到ExternalSorter中,完成后,此时如果数据大的话,会进行溢出到临时文件的操作,数据写到临时文件后把当前内存中的数据和临时文件中的数据进行合并数据文件,合并后的文件只包含(key,value),并且是按partition升序排序,然后按key升序排序,输出文件名称:ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID) + UUID 即:"shuffle_" + shuffleId + "" + mapId + "" + reduceId + ".data" + UUID,reduceId为默认值0还会有一份索引文件: "shuffle_" + shuffleId + "" + mapId + "" + reduceId + ".index" + "." +UUID,索引文件依次存储每个partition的位置偏移量数据文件的写入分两种情况,一种是直接内存写入,没有溢出临时文件到磁盘中,这种是直接在内存中操作的(数据量相对小些),另外单独分析一种是有磁盘溢出文件的,这种情况是本文重点分析的情况ExternalSorter.partitionedIterator()方法可以处理所有磁盘中的临时文件和内存中的文件,返回一个可迭代的对象,里边放的元素为reduce用到的(partition,Iterator(key,value)),迭代器中的数据是按key升序排序的具体是通过ExternalSorter.mergeWithAggregation(),遍历每一个临时文件中当前partition的数据和内存中当前partition的数据,注意,临时文件数据读取时是按partition为0开始依次遍历的源码分析(内存中数据Spill到磁盘)ShuffleMapTask调用ShuffleMapTask.runTask()方法处理当前HDFS分区数据 调用SparkEnv.get.shuffleManager得到SortShuffleManager SortShuffleManager.getWriter()得到SortShuffleWriter 调用SortShuffleWriter.write()方法 SparkEnv.create() val shortShuffleMgrNames = Map( "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager", "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager", "tungsten-sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager") val shuffleMgrName = conf.get("spark.shuffle.manager", "sort") val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName) val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass) override def runTask(context: TaskContext): MapStatus = { // Deserialize the RDD using the broadcast variable. val deserializeStartTime = System.currentTimeMillis() val ser = SparkEnv.get.closureSerializer.newInstance() val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime metrics = Some(context.taskMetrics) var writer: ShuffleWriter[Any, Any] = null try { val manager = SparkEnv.get.shuffleManager writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) writer.stop(success = true).get } catch { case e: Exception => try { if (writer != null) { writer.stop(success = false) } } catch { case e: Exception => log.debug("Could not stop writer", e) } throw e } } SortShuffleWriter调用SortShuffleWriter.write()方法根据RDDDependency中mapSideCombine是否在map端合并,这个是由算子决定,reduceByKey中mapSideCombine为true,groupByKey中mapSideCombine为false,会new ExternalSorter()外部排序对象进行排序然后把records中的数据插入ExternalSorter对象sorter中,数据来源是HDFS当前的分区/* Write a bunch of records to this task's output / override def write(records: Iterator[Product2[K, V]]): Unit = { sorter = if (dep.mapSideCombine) { require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!") new ExternalSorter[K, V, C]( context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer) } else { // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't // care whether the keys get sorted in each partition; that will be done on the reduce side // if the operation being run is sortByKey. new ExternalSorter[K, V, V]( context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer) } sorter.insertAll(records) // Don't bother including the time to open the merged output file in the shuffle write time, // because it just opens a single file, so is typically too fast to measure accurately // (see SPARK-3570). val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId) val tmp = Utils.tempFileWith(output) try { val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID) val partitionLengths = sorter.writePartitionedFile(blockId, tmp) shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp) mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths) } finally { if (tmp.exists() && !tmp.delete()) { logError(s"Error while deleting temp file ${tmp.getAbsolutePath}") } } }ExternalSorter.insertAll()方法该方法会把迭代器records中的数据插入到外部排序对象中ExternalSorter中的数据是不进行排序的,是以数组的形式存储的,健存的为(partition,key),值为Shuffle之前的RDD链计算结果 在内存中会对相同的key,进行合并操作,就是map端本地合并,合并的函数就是reduceByKey(+)这个算子中定义的函数maybeSpillCollection方法会判断是否满足磁盘溢出到临时文件,满足条件,会把当前内存中的数据写到磁盘中,写到磁盘中的数据是按partition升序排序,再按key升序排序,就是(key,value)的临时文件,不带partition,但是会记录每个分区的数量elementsPerPartition(partitionId- 记录每一次Flush的数据大小batchSizes入到ArrayBuffer中保存内存中的数据存在PartitionedAppendOnlyMap,记住这个对象,后面排序用到了这个里边的排序算法@volatile private var map = new PartitionedAppendOnlyMap[K, C] def insertAll(records: Iterator[Product2[K, V]]): Unit = { // TODO: stop combining if we find that the reduction factor isn't high val shouldCombine = aggregator.isDefined if (shouldCombine) { // Combine values in-memory first using our AppendOnlyMap val mergeValue = aggregator.get.mergeValue val createCombiner = aggregator.get.createCombiner var kv: Product2[K, V] = null val update = (hadValue: Boolean, oldValue: C) => { if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) } while (records.hasNext) { addElementsRead() kv = records.next() map.changeValue((getPartition(kv._1), kv._1), update) maybeSpillCollection(usingMap = true) } } else { // Stick values into our buffer while (records.hasNext) { addElementsRead() val kv = records.next() buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C]) maybeSpillCollection(usingMap = false) } } } ExternalSorter.maybeSpillCollectionestimatedSize当前内存中数据预估占内存大小maybeSpill满足Spill条件就把内存中的数据写入到临时文件中调用ExternalSorter.maybeSpill()/** Spill the current in-memory collection to disk if needed.* @param usingMap whether we're using a map or buffer as our current in-memory collection*/ private def maybeSpillCollection(usingMap: Boolean): Unit = { var estimatedSize = 0L if (usingMap) { estimatedSize = map.estimateSize() if (maybeSpill(map, estimatedSize)) { map = new PartitionedAppendOnlyMap[K, C] } } else { estimatedSize = buffer.estimateSize() if (maybeSpill(buffer, estimatedSize)) { buffer = new PartitionedPairBuffer[K, C] } } if (estimatedSize > _peakMemoryUsedBytes) { _peakMemoryUsedBytes = estimatedSize } }ExternalSorter.maybeSpill()对内存中的数据遍历时,每遍历32个元素,进行判断,当前内存是否大于5m,如果大于5m,再进行内存的计算,如果满足就把内存中的数据写到临时文件中如果满足条件,调用ExternalSorter.spill()方法,将内存中的数据写入临时文件 /** Spills the current in-memory collection to disk if needed. Attempts to acquire more memory before spilling.* @param collection collection to spill to disk @param currentMemory estimated size of the collection in bytes @return true if collection was spilled to disk; false otherwise*/ protected def maybeSpill(collection: C, currentMemory: Long): Boolean = { var shouldSpill = false if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) { // Claim up to double our current memory from the shuffle memory pool val amountToRequest = 2 * currentMemory - myMemoryThreshold val granted = acquireOnHeapMemory(amountToRequest) myMemoryThreshold += granted // If we were granted too little memory to grow further (either tryToAcquire returned 0, // or we already had more memory than myMemoryThreshold), spill the current collection shouldSpill = currentMemory >= myMemoryThreshold } shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold // Actually spill if (shouldSpill) { _spillCount += 1 logSpillage(currentMemory) spill(collection) _elementsRead = 0 _memoryBytesSpilled += currentMemory releaseMemory() } shouldSpill } ExternalSorter.spill()调用方法collection.destructiveSortedWritablePartitionedIterator进行排序,即调用PartitionedAppendOnlyMap.destructiveSortedWritablePartitionedIterator进行排序()方法排序,最终会调用WritablePartitionedPairCollection.destructiveSortedWritablePartitionedIterator()排序,调用方法WritablePartitionedPairCollection.partitionedDestructiveSortedIterator(),没有实现,调用子类PartitionedAppendOnlyMap.partitionedDestructiveSortedIterator()方法调用方法ExternalSorter.spillMemoryIteratorToDisk() 将磁盘中的数据写入到spillFile临时文件中 /** Spill our in-memory collection to a sorted file that we can merge later. We add this file into spilledFiles to find it later.* @param collection whichever collection we're using (map or buffer)*/ override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = { val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator) val spillFile = spillMemoryIteratorToDisk(inMemoryIterator) spills.append(spillFile) }PartitionedAppendOnlyMap.partitionedDestructiveSortedIterator()调用排序算法WritablePartitionedPairCollection.partitionKeyComparator即先按分区数的升序排序,再按key的升序排序/** Implementation of WritablePartitionedPairCollection that wraps a map in which the keys are tuples of (partition ID, K)*/ private[spark] class PartitionedAppendOnlyMap[K, V] extends SizeTrackingAppendOnlyMap[(Int, K), V] with WritablePartitionedPairCollection[K, V] { def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]]) : Iterator[((Int, K), V)] = { val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator) destructiveSortedIterator(comparator) } def insert(partition: Int, key: K, value: V): Unit = { update((partition, key), value) }} /** A comparator for (Int, K) pairs that orders them both by their partition ID and a key ordering.*/ def partitionKeyComparatorK: Comparator[(Int, K)] = { new Comparator[(Int, K)] { override def compare(a: (Int, K), b: (Int, K)): Int = { val partitionDiff = a._1 - b._1 if (partitionDiff != 0) { partitionDiff } else { keyComparator.compare(a._2, b._2) } } } }}ExternalSorter.spillMemoryIteratorToDisk()创建blockId : temp_shuffle_ + UUID溢出到磁盘临时文件: temp_shuffle_ + UUID遍历内存数据inMemoryIterator写入到磁盘临时文件spillFile遍历内存中的数据写入到临时文件,会记录每个临时文件中每个分区的(key,value)各有多少个,elementsPerPartition(partitionId) 如果说数据很大的话,会每默认每10000条数据进行Flush()一次数据到文件中,会记录每一次Flush的数据大小batchSizes入到ArrayBuffer中保存/** Spill contents of in-memory iterator to a temporary file on disk.*/ private[this] def spillMemoryIteratorToDisk(inMemoryIterator: WritablePartitionedIterator) : SpilledFile = { // Because these files may be read during shuffle, their compression must be controlled by // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use // createTempShuffleBlock here; see SPARK-3426 for more context. val (blockId, file) = diskBlockManager.createTempShuffleBlock() // These variables are reset after each flush var objectsWritten: Long = 0 var spillMetrics: ShuffleWriteMetrics = null var writer: DiskBlockObjectWriter = null def openWriter(): Unit = { assert (writer == null && spillMetrics == null) spillMetrics = new ShuffleWriteMetrics writer = blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, spillMetrics) } openWriter() // List of batch sizes (bytes) in the order they are written to disk val batchSizes = new ArrayBuffer[Long] // How many elements we have in each partition val elementsPerPartition = new Array[Long](numPartitions) // Flush the disk writer's contents to disk, and update relevant variables. // The writer is closed at the end of this process, and cannot be reused. def flush(): Unit = { val w = writer writer = null w.commitAndClose() _diskBytesSpilled += spillMetrics.shuffleBytesWritten batchSizes.append(spillMetrics.shuffleBytesWritten) spillMetrics = null objectsWritten = 0 } var success = false try { while (inMemoryIterator.hasNext) { val partitionId = inMemoryIterator.nextPartition() require(partitionId >= 0 && partitionId < numPartitions, s"partition Id: ${partitionId} should be in the range [0, ${numPartitions})") inMemoryIterator.writeNext(writer) elementsPerPartition(partitionId) += 1 objectsWritten += 1 if (objectsWritten == serializerBatchSize) { flush() openWriter() } } if (objectsWritten > 0) { flush() } else if (writer != null) { val w = writer writer = null w.revertPartialWritesAndClose() } success = true } finally { if (!success) { // This code path only happens if an exception was thrown above before we set success; // close our stuff and let the exception be thrown further if (writer != null) { writer.revertPartialWritesAndClose() } if (file.exists()) { if (!file.delete()) { logWarning(s"Error deleting ${file}") } } } } SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition) } 源码分析(内存数据Spill合并)SortShuffleWriter.insertAll即内存中的数据,如果有溢出,写入到临时文件后,可能会有多个临时文件(看数据的大小) 这时要开始从所有的临时文件中,shuffle出按给reduce输入数据(partition,Iterator),相当于要对多个临时文件进行合成一个文件,合成的结果按partition升序排序,再按Key升序排序 SortShuffleWriter.write 得到合成文件shuffleBlockResolver.getDataFile : 格式如 "shuffle_" + shuffleId + "" + mapId + "" + reduceId + ".data" + "." + UUID,reduceId为默认的0 调用关键方法ExternalSorter的sorter.writePartitionedFile,这才是真正合成文件的方法 返回值partitionLengths,即为数据文件中对应索引文件按分区从0到最大分区,每个分区的数据大小的数组 /* Write a bunch of records to this task's output / override def write(records: Iterator[Product2[K, V]]): Unit = { sorter = if (dep.mapSideCombine) { require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!") new ExternalSorter[K, V, C]( context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer) } else { // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't // care whether the keys get sorted in each partition; that will be done on the reduce side // if the operation being run is sortByKey. new ExternalSorter[K, V, V]( context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer) } sorter.insertAll(records) // Don't bother including the time to open the merged output file in the shuffle write time, // because it just opens a single file, so is typically too fast to measure accurately // (see SPARK-3570). val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId) val tmp = Utils.tempFileWith(output) try { val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID) val partitionLengths = sorter.writePartitionedFile(blockId, tmp) shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp) mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths) } finally { if (tmp.exists() && !tmp.delete()) { logError(s"Error while deleting temp file ${tmp.getAbsolutePath}") } } } ExternalSorter.writePartitionedFile按方法名直译,把数据写入已分区的文件中如果没有spill文件,直接按ExternalSorter在内存中排序,用的是TimSort排序算法排序,单独合出来讲,这里不详细讲如果有spill文件,是我们重点分析的,这个时候,调用this.partitionedIterator按回按[(partition,Iterator)],按分区升序排序,按(key,value)中key升序排序的数据,并键中方法this.partitionedIterator()写入合并文件中,并返回写入合并文件中每个分区的长度,放到lengths数组中,数组索引就是partition/** Write all the data added into this ExternalSorter into a file in the disk store. This is called by the SortShuffleWriter.* @param blockId block ID to write to. The index file will be blockId.name + ".index". @return array of lengths, in bytes, of each partition of the file (used by map output tracker)*/ def writePartitionedFile( blockId: BlockId, outputFile: File): Array[Long] = { // Track location of each range in the output file val lengths = new Array[Long](numPartitions) if (spills.isEmpty) { // Case where we only have in-memory data val collection = if (aggregator.isDefined) map else buffer val it = collection.destructiveSortedWritablePartitionedIterator(comparator) while (it.hasNext) { val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize, context.taskMetrics.shuffleWriteMetrics.get) val partitionId = it.nextPartition() while (it.hasNext && it.nextPartition() == partitionId) { it.writeNext(writer) } writer.commitAndClose() val segment = writer.fileSegment() lengths(partitionId) = segment.length } } else { // We must perform merge-sort; get an iterator by partition and write everything directly. for ((id, elements) <- this.partitionedIterator) { if (elements.hasNext) { val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize, context.taskMetrics.shuffleWriteMetrics.get) for (elem <- elements) { writer.write(elem._1, elem._2) } writer.commitAndClose() val segment = writer.fileSegment() lengths(id) = segment.length } } } context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled) context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled) context.internalMetricsToAccumulators( InternalAccumulator.PEAK_EXECUTION_MEMORY).add(peakMemoryUsedBytes) lengths } this.partitionedIterator()直接调用ExternalSorter.merge()方法临时文件参数spills内存文件排序算法在这里调用collection.partitionedDestructiveSortedIterator(comparator),实际调的是PartitionedAppendOnlyMap.partitionedDestructiveSortedIterator,定义了排序算法partitionKeyComparator,即按partition升序排序,再按key升序排序/** Return an iterator over all the data written to this object, grouped by partition and aggregated by the requested aggregator. For each partition we then have an iterator over its contents, and these are expected to be accessed in order (you can't "skip ahead" to one partition without reading the previous one). Guaranteed to return a key-value pair for each partition, in order of partition ID.* For now, we just merge all the spilled files in once pass, but this can be modified to support hierarchical merging. Exposed for testing.*/ def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = { val usingMap = aggregator.isDefined val collection: WritablePartitionedPairCollection[K, C] = if (usingMap) map else buffer if (spills.isEmpty) { // Special case: if we have only in-memory data, we don't need to merge streams, and perhaps // we don't even need to sort by anything other than partition ID if (!ordering.isDefined) { // The user hasn't requested sorted keys, so only sort by partition ID, not key groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None))) } else { // We do need to sort by both partition ID and key groupByPartition(destructiveIterator( collection.partitionedDestructiveSortedIterator(Some(keyComparator)))) } } else { // Merge spilled and in-memory data merge(spills, destructiveIterator( collection.partitionedDestructiveSortedIterator(comparator))) } } ExternalSorter.merge()方法0 until numPartitions 从0到numPartitions(不包含)分区循环调用IteratorForPartition(p, inMemBuffered),每次取内存中的p分区的数据readers是每个分区是读所有的临时文件(因为每份临时文件,都有可能包含p分区的数据),readers.map(_.readNextPartition())该方法内部用的是每次调一个分区的数据,从0开始,刚好对应的是p分区的数据readNextPartition方法即调用SpillReader.readNextPartition()方法对p分区的数据进行mergeWithAggregation合并后,再写入到合并文件中 /** Merge a sequence of sorted files, giving an iterator over partitions and then over elements inside each partition. This can be used to either write out a new file or return data to the user.* Returns an iterator over all the data written to this object, grouped by partition. For each partition we then have an iterator over its contents, and these are expected to be accessed in order (you can't "skip ahead" to one partition without reading the previous one). Guaranteed to return a key-value pair for each partition, in order of partition ID.*/ private def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)]) : Iterator[(Int, Iterator[Product2[K, C]])] = { val readers = spills.map(new SpillReader(_)) val inMemBuffered = inMemory.buffered (0 until numPartitions).iterator.map { p => val inMemIterator = new IteratorForPartition(p, inMemBuffered) val iterators = readers.map(_.readNextPartition()) ++ Seq(inMemIterator) if (aggregator.isDefined) { // Perform partial aggregation across partitions (p, mergeWithAggregation( iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined)) } else if (ordering.isDefined) { // No aggregator given, but we have an ordering (e.g. used by reduce tasks in sortByKey); // sort the elements without trying to merge them (p, mergeSort(iterators, ordering.get)) } else { (p, iterators.iterator.flatten) } } } SpillReader.readNextPartition()readNextItem()是真正读数临时文件的方法,deserializeStream每次读取一个流大小,这个大小时在spill输出文件时写到batchSizes中的,某个是每个分区写一次流,如果分区中的数据很大,就按10000条数据进行一次流,这样每满10000次就再读一次流,这样就可以把当前分区里边的多少提交流全部读完一进来就执行nextBatchStream()方法,该方法是按数组batchSizes存储着每次写入流时的数据大小val batchOffsets = spill.serializerBatchSizes.scanLeft(0L)(_ + _)这个其实取到的值,就刚好是每次流的一位置偏移量,后面的偏移量,刚好是前面所有偏移量之和当前分区的流读完时,就为空,就相当于当前分区的数据全部读完了当partitionId=numPartitions,finished= true说明所有分区的所有文件全部读完了def readNextPartition(): Iterator[Product2[K, C]] = new Iterator[Product2[K, C]] { val myPartition = nextPartitionToRead nextPartitionToRead += 1 override def hasNext: Boolean = { if (nextItem == null) { nextItem = readNextItem() if (nextItem == null) { return false } } assert(lastPartitionId >= myPartition) // Check that we're still in the right partition; note that readNextItem will have returned // null at EOF above so we would've returned false there lastPartitionId == myPartition } override def next(): Product2[K, C] = { if (!hasNext) { throw new NoSuchElementException } val item = nextItem nextItem = null item } } /** * Return the next (K, C) pair from the deserialization stream and update partitionId, * indexInPartition, indexInBatch and such to match its location. * * If the current batch is drained, construct a stream for the next batch and read from it. * If no more pairs are left, return null. */ private def readNextItem(): (K, C) = { if (finished || deserializeStream == null) { return null } val k = deserializeStream.readKey().asInstanceOf[K] val c = deserializeStream.readValue().asInstanceOf[C] lastPartitionId = partitionId // Start reading the next batch if we're done with this one indexInBatch += 1 if (indexInBatch == serializerBatchSize) { indexInBatch = 0 deserializeStream = nextBatchStream() } // Update the partition location of the element we're reading indexInPartition += 1 skipToNextPartition() // If we've finished reading the last partition, remember that we're done if (partitionId == numPartitions) { finished = true if (deserializeStream != null) { deserializeStream.close() } } (k, c) } /* Construct a stream that only reads from the next batch / def nextBatchStream(): DeserializationStream = { // Note that batchOffsets.length = numBatches + 1 since we did a scan above; check whether // we're still in a valid batch. if (batchId < batchOffsets.length - 1) { if (deserializeStream != null) { deserializeStream.close() fileStream.close() deserializeStream = null fileStream = null } val start = batchOffsets(batchId) fileStream = new FileInputStream(spill.file) fileStream.getChannel.position(start) batchId += 1 val end = batchOffsets(batchId) assert(end >= start, "start = " + start + ", end = " + end + ", batchOffsets = " + batchOffsets.mkString("[", ", ", "]")) val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start)) val sparkConf = SparkEnv.get.conf val stream = blockManager.wrapForCompression(spill.blockId, CryptoStreamUtils.wrapForEncryption(bufferedStream, sparkConf)) serInstance.deserializeStream(stream) } else { // No more batches left cleanup() null } } end

thinktothings 2019-12-02 01:47:56 0 浏览量 回答数 0

回答

对于每一个请求,函数计算服务会根据请求头部的 Authorization 字段来校验是否合法(设置了 HTTP 触发器的允许匿名访问的函数除外)。客户端须使用与函数计算服务端一致的签名算法才能通过验证,对于未包含签名字段或者签名错误的请求,函数计算服务将会返回 HTTP 403 错误。 本文对函数计算的签名校验算法进行介绍。 签名算法 signature = base64(hmac-sha256(HTTP_METHOD + "\n" + CONTENT-MD5 + "\n" + CONTENT-TYPE + "\n" + DATE + "\n" + CanonicalizedFCHeaders + CanonicalizedResource)) // Authorization字段介绍 Authorization = "FC " + accessKeyID + ":" + signature HTTP_METHOD 表示大写的 HTTP Method (如:PUT, GET, POST, DELETE) CONTENT-MD5 表示请求内容数据的 MD5 值。如果请求的 Header 中没有传 Content-MD5,则此处填入空串 CONTENT-TYPE 表示请求内容的类型 DATE 表示此次操作的时间,不能为空,目前只支持 GMT 格式 注意:客户端需要保证生成的时间与函数计算服务端的时间相差不超过15分钟,否则函数服务将拒绝此请求 CanonicalizedFCHeaders 表示所有以 x-fc- 为前缀的 HTTP 头组成的字符串,生成方式见下文 CanonicalizedResource 表示请求的 URL 的 Path ,一般来说是先对收到的 Path decode,再去掉请求的 Path 里的 Params 内容。 Path 的结构为:$api-version/api-path api-version:API 版本,当前版本为 2016-08-15。 api-path:访问各个接口的路径,例如创建 service 为 /services,其他 path 请参考 API 定义。 需要认证的 HTTP 触发器的CanonicalizedResource 与其他请求的CanonicalizedResource 不同,下面对两种情况分别进行介绍。 普通请求的 CanonicalizedResource (普通请求为除了需要访问带认证的 HTTP 触发器的请求外的所有请求): 首先对收到的 Path 进行 url 解码,普通请求的 CanonicalizedResource 会只取到?前面的内容,即舍弃传入的各个Params 需要认证的 HTTP 触发器的CanonicalizedResource :如果有 Params ,则以回车符 \n 分隔各个参数,Params 中的各个参数 key - value 对按照字母序进行排序(如果 Params 里的 key 对应多个 value ,即对 key - value 整体进行排序)。如果没有 Params,最后也以 \n 结束。例如 // 需要认证的 HTTP 触发器的 url 的真实 path /2016-08-15/proxy/service-name/func-name/path-with-%20-space/action?x=1&a=2&x=3&with%20space=foo%20bar // url decode 后的结果 /2016-08-15/proxy/service-name/func-name/path-with- -space/action?x=1&a=2&x=3&with space=foo bar // 需要认证的 HTTP 触发器的 CanonicalizedResource /2016-08-15/proxy/service-name/func-name/path-with- -space/action\na=2\nwith space=foo bar\nx=1\nx=3 // 普通请求的 url 的真实 path /2016-08-15/service-name/func-name/path-with-%20-space/action?x=1&a=2&x=3&with%20space=foo%20bar // url decode 后的结果 /2016-08-15/service-name/func-name/path-with- -space/action?x=1&a=2&x=3&with space=foo bar // 普通请求的 CanonicalizedResource /2016-08-15/service-name/func-name/path-with- -space/action hmac-sha256需要以用户的 AccessKeySecret 为 Key 伪代码如下: // 构造字符串的过程 function composeStringToSign(method, path, headers, queries) { var contentMD5 = headers['content-md5'] || ''; var contentType = headers['content-type'] || ''; var date = headers['date']; var signHeaders = buildCanonicalHeaders(headers, 'x-fc-'); var u = url.parse(path); var pathUnescaped = decodeURIComponent(u.pathname); var str = ${method}\n${contentMD5}\n${contentType}\n${date}\n${signHeaders}${pathUnescaped}; if (queries) { var params = []; Object.keys(queries).forEach(function (key) { var values = queries[key]; var type = typeof values; if (type === 'string') { params.push(${key}=${values}); return; } if (type === 'object' && values instanceof Array) { queries[key].forEach(function (value) { params.push(${key}=${value}); }); } }); params.sort(); str += '\n' + params.join('\n'); } return str; } // 使用 hmac-sha256 和 base64 计算签名的过程,其中 source 参数为构造出的字符串 function signString(source, secret) { const buff = crypto.createHmac('sha256', secret) .update(source, 'utf8') .digest(); return new Buffer(buff, 'binary').toString('base64'); } CanonicalizedFCHeaders 生成步骤如下: 找出请求头中所有以 x-fc- 开头的字段(不区分大小写) 对于符合前缀的字段,先将字段名转换成小写 对于每一个字段,生成一个子串 ${key}:${value}\n , ${key} 是 HTTP 头的名称(转换成小写) ${value} 是 HTTP 头的值 例如:X-Fc-Invocation-Type:Sync 变成 x-fc-invocation-type:Sync\n 然后将这些字段按字段名从小到大排序 将上述生成的子串连接成一个整串 伪代码如下: // javascript // prefix = 'x-fc-' function buildCanonicalHeaders(headers, prefix) { var list = []; var keys = Object.keys(headers); var fcHeaders = {}; for (var i = 0; i < keys.length; i++) { var key = keys[i]; var lowerKey = key.toLowerCase().trim(); if (lowerKey.startsWith(prefix)) { list.push(lowerKey); fcHeaders[lowerKey] = headers[key]; } } list.sort(); var canonical = ''; for (var _i = 0; _i < list.length; _i++) { var _key = list[_i]; canonical += ${_key}:${fcHeaders[_key]}\n; } return canonical; } Authorization 字段 Authorization 可由上文计算得出的 signature 构造出来,构造方法如下: Authorization = "FC " + accessKeyID + ":" + signature 请求示例 请求: GET /2016-08-15/services?limit=100&nextToken=&prefix=&startKey= HTTP/1.1 Host: 1237050315505682.fc.cn-shanghai.aliyuncs.com User-Agent: go-sdk-0.1 Accept: application/json Authorization: FC LTAIUyt0Yeq1rgqo:GBmoz6OwC7bobTlD1jboBZ9PkaZ1e4cKsQ+5/dlLTns= Date: Mon, 08 May 2017 03:08:31 GMT X-User-Agent: go-resty v0.11 - https://github.com/go-resty/resty Accept-Encoding: gzip 响应: HTTP/1.1 200 OK Content-Type: application/json; charset=utf-8 X-Fc-Request-Id: ab7c7602-0922-f04f-b4ee-923cd7df7fb0 Date: Mon, 08 May 2017 03:08:31 GMT Transfer-Encoding: chunked 代码示例 可以参考我们已经发布的SDK中签名部分的代码: fc-nodejs-sdk

1934890530796658 2020-03-27 17:55:35 0 浏览量 回答数 0

问题

PHP中FOR与FOREACH的性能

保持可爱mmm 2020-02-07 22:50:15 0 浏览量 回答数 1

问题

关于PHPsdk中的listobject函数问题

堕落了信仰 2019-12-01 21:38:25 8299 浏览量 回答数 4

问题

【阿里云产品公测】以开发者角度看ACE服务『ACE应用构建指南』

mr_wid 2019-12-01 21:10:06 20092 浏览量 回答数 6
阿里云大学 云服务器ECS com域名 网站域名whois查询 开发者平台 小程序定制 小程序开发 国内短信套餐包 开发者技术与产品 云数据库 图像识别 开发者问答 阿里云建站 阿里云备案 云市场 万网 阿里云帮助文档 免费套餐 开发者工具 企业信息查询 小程序开发制作 视频内容分析 企业网站制作 视频集锦 代理记账服务 2020阿里巴巴研发效能峰会 企业建站模板 云效成长地图 高端建站