23.pyspark.sql.functions.date_format(date, format)
将日期/时间戳/字符串转换为由第二个参数给定日期格式指定格式的字符串值。
一个模式可能是例如dd.MM.yyyy,可能会返回一个字符串,如“18 .03.1993”。可以使用Java类java.text.SimpleDateFormat的所有模式字母。
注意:尽可能使用像年份这样的专业功能。这些受益于专门的实施。
>>> df = sqlContext.createDataFrame([('2015-04-08',)], ['a']) >>> df.select(date_format('a', 'MM/dd/yyy').alias('date')).collect() [Row(date=u'04/08/2015')]
24.pyspark.sql.functions.date_sub(start, days)
返回start前days天的日期
>>> df = sqlContext.createDataFrame([('2015-04-08',)], ['d']) >>> df.select(date_sub(df.d, 1).alias('d')).collect() [Row(d=datetime.date(2015, 4, 7))]
25.pyspark.sql.functions.datediff(end, start)
返回从start到end的天数。
>>> df = sqlContext.createDataFrame([('2015-04-08','2015-05-10')], ['d1', 'd2']) >>> df.select(datediff(df.d2, df.d1).alias('diff')).collect() [Row(diff=32)]
26.pyspark.sql.functions.dayofmonth(col)
将给定日期的月份的天解压为整数。
>>> df = sqlContext.createDataFrame([('2015-04-08',)], ['a']) >>> df.select(dayofmonth('a').alias('day')).collect() [Row(day=8)]
27.pyspark.sql.functions.dayofyear(col)
将给定日期的年份中的某一天提取为整数。
>>> df = sqlContext.createDataFrame([('2015-04-08',)], ['a']) >>> df.select(dayofyear('a').alias('day')).collect() [Row(day=98)]
28.pyspark.sql.functions.desc(col)
基于给定列名称的降序返回一个排序表达式。
29.pyspark.sql.functions.exp(col)
计算给定值的指数。
30.pyspark.sql.functions.expm1(col)
计算给定值的指数减1。
31.pyspark.sql.functions.factorial(col)
计算给定值的阶乘。
>>> df = sqlContext.createDataFrame([(5,)], ['n']) >>> df.select(factorial(df.n).alias('f')).collect() [Row(f=120)]
34.pyspark.sql.functions.format_string(format, *cols)
以printf样式格式化参数,并将结果作为字符串列返回。
参数:● format – 要格式化的格式
● cols - 要格式化的列
>>> from pyspark.sql.functions import * >>> df = sqlContext.createDataFrame([(5, "hello")], ['a', 'b']) >>> df.select(format_string('%d %s', df.a, df.b).alias('v')).collect() [Row(v=u'5 hello')]
35.pyspark.sql.functions.hex(col)
计算给定列的十六进制值,可以是StringType,BinaryType,IntegerType或LongType
>>> sqlContext.createDataFrame([('ABC', 3)], ['a', 'b']).select(hex('a'), hex('b')).collect() [Row(hex(a)=u'414243', hex(b)=u'3')]
36.pyspark.sql.functions.hour(col)
将给定日期的小时数提取为整数。
>>> df = sqlContext.createDataFrame([('2015-04-08 13:08:15',)], ['a']) >>> df.select(hour('a').alias('hour')).collect() [Row(hour=13)]
38.pyspark.sql.functions.initcap(col)
在句子中将每个单词的第一个字母翻译成大写。
>>> sqlContext.createDataFrame([('ab cd',)], ['a']).select(initcap("a").alias('v')).collect() [Row(v=u'Ab Cd')]
39.pyspark.sql.functions.isnan(col)
如果列是NaN,则返回true的表达式。
>>> df = sqlContext.createDataFrame([(1.0, float('nan')), (float('nan'), 2.0)], ("a", "b")) >>> df.select(isnan("a").alias("r1"), isnan(df.a).alias("r2")).collect() [Row(r1=False, r2=False), Row(r1=True, r2=True)]
40.pyspark.sql.functions.kurtosis(col)
聚合函数:返回组中的值的峰度。
41.pyspark.sql.functions.last(col)
聚合函数:返回组中的最后一个值。
42.pyspark.sql.functions.last_day(date)
返回给定日期所属月份的最后一天。
43.pyspark.sql.functions.lit(col)
创建一个文字值的列
from pyspark.sql import Rowfrom pyspark.sql import functions as sf rdd = sc.parallelize([Row(name='Alice', level='a', age=5, height=80),Row(name='Bob', level='a', age=5, height=80),Row(name='Cycy', level='b', age=10, height=80),Row(name='Didi', level='b', age=12, height=75),Row(name='EiEi', level='b', age=10, height=70)]) df = rdd.toDF() print df.show() """ +---+------+-----+-----+ |age|height|level| name| +---+------+-----+-----+ | 5| 80| a|Alice| | 5| 80| a| Bob| | 10| 80| b| Cycy| | 12| 75| b| Didi| | 10| 70| b| EiEi| +---+------+-----+-----+ """ df2 = df.select("name", (df.age+1).alias("new_age"), sf.lit(2)) print df2.show() """ +-----+-------+---+ | name|new_age| 2| +-----+-------+---+ |Alice| 6| 2| | Bob| 6| 2| | Cycy| 11| 2| | Didi| 13| 2| | EiEi| 11| 2| +-----+-------+---+ """ # 也可以重命名 df2 = df.select("name", (df.age+1).alias("new_age"), sf.lit(2).alias("constant")) print df2.show() """ +-----+-------+--------+ | name|new_age|constant| +-----+-------+--------+ |Alice| 6| 2| | Bob| 6| 2| | Cycy| 11| 2| | Didi| 13| 2| | EiEi| 11| 2| +-----+-------+--------+ """
44.pyspark.sql.functions.log(arg1, arg2=None)
返回第二个参数的第一个基于参数的对数。
如果只有一个参数,那么这个参数就是自然对数。
>>> df.select(log(10.0, df.age).alias('ten')).map(lambda l: str(l.ten)[:7]).collect() ['0.30102', '0.69897'] >>> df.select(log(df.age).alias('e')).map(lambda l: str(l.e)[:7]).collect() ['0.69314', '1.60943']
45.pyspark.sql.functions.log1p(col)
计算给定值的自然对数加1。
46.pyspark.sql.functions.log2(col)
返回参数的基数为2的对数。
>>> sqlContext.createDataFrame([(4,)], ['a']).select(log2('a').alias('log2')).collect() [Row(log2=2.0)]
47.pyspark.sql.functions.lower(col)
将字符串列转换为小写。
48.pyspark.sql.functions.ltrim(col)
从左端修剪指定字符串值的空格。
49.pyspark.sql.functions.minute(col)
提取给定日期的分钟数为整数
>>> df = sqlContext.createDataFrame([('2015-04-08 13:08:15',)], ['a']) >>> df.select(minute('a').alias('minute')).collect() [Row(minute=8)]
51.pyspark.sql.functions.month(col)
将给定日期的月份提取为整数
>>> df = sqlContext.createDataFrame([('2015-04-08',)], ['a']) >>> df.select(month('a').alias('month')).collect() [Row(month=4)]
52.pyspark.sql.functions.months_between(date1, date2)
返回date1和date2之间的月数。
>>> df = sqlContext.createDataFrame([('1997-02-28 10:30:00', '1996-10-30')], ['t', 'd']) >>> df.select(months_between(df.t, df.d).alias('months')).collect() [Row(months=3.9495967...)]
53.pyspark.sql.functions.rand(seed=None)
生成一个随机列,其中包含均匀分布在 [0.0, 1.0) 中的独立且同分布 (i.i.d.) 样本。
54.pyspark.sql.functions.randn(seed=None)
从标准正态分布生成具有独立且同分布 (i.i.d.) 样本的列。
55.pyspark.sql.functions.reverse(col)
反转字符串列并将其作为新的字符串列返回
56.pyspark.sql.functions.rtrim(col)
从右端修剪指定字符串值的空格
57.pyspark.sql.functions.skewness(col)
聚合函数:返回组中值的偏度
58.pyspark.sql.functions.sort_array(col, asc=True)
集合函数:按升序对给定列的输入数组进行排序。
参数:col – 列或表达式名称
>>> df = sqlContext.createDataFrame([([2, 1, 3],),([1],),([],)], ['data']) >>> df.select(sort_array(df.data).alias('r')).collect()[Row(r=[1, 2, 3]), Row(r=[1]), Row(r=[])] >>> df.select(sort_array(df.data, asc=False).alias('r')).collect() [Row(r=[3, 2, 1]), Row(r=[1]), Row(r=[])]
59.pyspark.sql.functions.split(str, pattern)
将模式分割(模式是正则表达式)。
注:pattern是一个字符串表示正则表达式。
>>> df = sqlContext.createDataFrame([('ab12cd',)], ['s',]) >>> df.select(split(df.s, '[0-9]+').alias('s')).collect() [Row(s=[u'ab', u'cd'])]
60.pyspark.sql.functions.sqrt(col)
计算指定浮点值的平方根
61.pyspark.sql.functions.stddev(col)
聚合函数:返回组中表达式的无偏样本标准差
62.pyspark.sql.functions.sumDistinct(col)
聚合函数:返回表达式中不同值的总和
63.pyspark.sql.functions.to_date(col)
将StringType或TimestampType的列转换为DateType
>>> df = sqlContext.createDataFrame([('1997-02-28 10:30:00',)], ['t']) >>> df.select(to_date(df.t).alias('date')).collect() [Row(date=datetime.date(1997, 2, 28))]
64.pyspark.sql.functions.trim(col)
修剪指定字符串列的两端空格。
65.pyspark.sql.functions.trunc(date, format)
返回截断到格式指定单位的日期
参数: format – ‘year’, ‘YYYY’, ‘yy’ or ‘month’, ‘mon’, ‘mm’
>>> df = sqlContext.createDataFrame([('1997-02-28',)], ['d']) >>> df.select(trunc(df.d, 'year').alias('year')).collect() [Row(year=datetime.date(1997, 1, 1))] >>> df.select(trunc(df.d, 'mon').alias('month')).collect() [Row(month=datetime.date(1997, 2, 1))]
66.pyspark.sql.functions.var_samp(col)
聚合函数:返回组中值的无偏差
67.pyspark.sql.functions.variance(col)
聚合函数:返回组中值的总体方差
68.pyspark.sql.functions.array(*cols)
创建一个新的数组列。
参数: cols – 列名(字符串)列表或具有相同数据类型的列表达式列表。
>>> df.select(array('age', 'age').alias("arr")).collect() [Row(arr=[2, 2]), Row(arr=[5, 5])] >>> df.select(array([df.age, df.age]).alias("arr")).collect() [Row(arr=[2, 2]), Row(arr=[5, 5])]
69.pyspark.sql.functions.bin(col)
返回给定列的二进制值的字符串表示形式
>>> l=[('Alice',2),('Bob',5)] >>> df = sqlContext.createDataFrame(l,['name','age']) >>> df.select(bin(df.age).alias('c')).collect() [Row(c=u'10'), Row(c=u'101')]
70.pyspark.sql.functions.conv(col, fromBase, toBase)
将字符串列中的数字从一个基数转换为另一个基数。
>>> df = sqlContext.createDataFrame([("010101",)], ['n']) >>> df.select(conv(df.n, 2, 16).alias('hex')).collect() [Row(hex=u'15')]
71.pyspark.sql.functions.expr(str)
将表达式字符串分析到它表示的列中
>>> l=[('Alice',2),('Bob',5)] >>> df = sqlContext.createDataFrame(l,['name','age']) >>> df.select(expr("length(name)")).collect() [Row(length(name)=5), Row(length(name)=3)]
72.pyspark.sql.functions.from_utc_timestamp(timestamp, tz)
假设时间戳是UTC,并转换为给定的时区
>>> df = sqlContext.createDataFrame([('1997-02-28 10:30:00',)], ['t']) >>> df.select(from_utc_timestamp(df.t, "PST").alias('t')).collect() [Row(t=datetime.datetime(1997, 2, 28, 2, 30))]
73.pyspark.sql.functions.greatest(*cols)
返回列名称列表的最大值,跳过空值。该功能至少需要2个参数。如果所有参数都为空,它将返回null
>>> df = sqlContext.createDataFrame([(1, 4, 3)], ['a', 'b', 'c']) >>> df.select(greatest(df.a, df.b, df.c).alias("greatest")).collect() [Row(greatest=4)]
74.pyspark.sql.functions.instr(str, substr)
找到给定字符串中第一次出现substr列的位置。如果其中任一参数为null,则返回null。
注:位置不是从零开始的,但是基于1的索引,如果在str中找不到substr,则返回0。
>>> df = sqlContext.createDataFrame([('abcd',)], ['s',]) >>> df.select(instr(df.s, 'b').alias('s')).collect() [Row(s=2)]
75.pyspark.sql.functions.isnull(col)
如果列为null,则返回true的表达式
>>> df = sqlContext.createDataFrame([(1, None), (None, 2)], ("a", "b")) >>> df.select(isnull("a").alias("r1"), isnull(df.a).alias("r2")).collect() [Row(r1=False, r2=False), Row(r1=True, r2=True)]
76.pyspark.sql.functions.least(*cols)
返回列名称列表的最小值,跳过空值。该功能至少需要2个参数。如果所有参数都为空,它将返回null
>>> df = sqlContext.createDataFrame([(1, 4, 3)], ['a', 'b', 'c']) >>> df.select(least(df.a, df.b, df.c).alias("least")).collect() [Row(least=1)]
77.pyspark.sql.functions.length(col)
计算字符串或二进制表达式的长度
>>> sqlContext.createDataFrame([('ABC',)], ['a']).select(length('a').alias('length')).collect() [Row(length=3)]
78.pyspark.sql.functions.locate(substr, str, pos=0)
找到第一个出现的位置在位置pos后面的字符串列中。
注:位置不是从零开始,而是从1开始。如果在str中找不到substr,则返回0。
参数: substr – 一个字符串
str – 一个StringType的列
pos – 起始位置(基于零)
>>> df = sqlContext.createDataFrame([('abcd',)], ['s',]) >>> df.select(locate('b', df.s, 1).alias('s')).collect()[Row(s=2)]
79.pyspark.sql.functions.max(col)
聚合函数:返回组中表达式的最大值。
80.pyspark.sql.functions.mean(col)
聚合函数:返回组中的值的平均值
81.pyspark.sql.functions.min(col)
聚合函数:返回组中表达式的最小值。
82.pyspark.sql.functions.next_day(date, dayOfWeek)
返回晚于日期列值的第一个日期
星期几参数不区分大小写,并接受:“Mon”, “Tue”, “Wed”, “Thu”, “Fri”, “Sat”, “Sun”.
>>> df = sqlContext.createDataFrame([('2015-07-27',)], ['d']) >>> df.select(next_day(df.d, 'Sun').alias('date')).collect() [Row(date=datetime.date(2015, 8, 2))]
83.pyspark.sql.functions.repeat(col, n)
重复一个字符串列n次,并将其作为新的字符串列返回
>>> df = sqlContext.createDataFrame([('ab',)], ['s',]) >>> df.select(repeat(df.s, 3).alias('s')).collect() [Row(s=u'ababab')]
84.pyspark.sql.functions.round(col, scale=0)
如果scale> = 0,将e的值舍入为小数点的位数,或者在scale <0的时候将其舍入到整数部分。
>>> sqlContext.createDataFrame([(2.546,)], ['a']).select(round('a', 1).alias('r')).collect() [Row(r=2.5)]
85.pyspark.sql.functions.row_number()
窗口函数:返回窗口分区内从1开始的连续编号。
from pyspark.sql.window import Window df_r = df.withColumn('row_number', sf.row_number().over(Window.partitionBy("level").orderBy("age")).alias("rowNum")) # 其他写法 df_r = df.withColumn('row_number', sf.row_number().over(Window.partitionBy(df.level).orderBy(df.age)).alias("rowNum")) print df_r.show() """ +---+------+-----+-----+----------+ |age|height|level| name|row_number| +---+------+-----+-----+----------+ | 10| 80| b| Cycy| 1| | 10| 70| b| EiEi| 2| | 12| 75| b| Didi| 3| | 5| 80| a| Bob| 1| | 5| 80| a|Alice| 2| """
表示逆序,或者根据多个字段分组
df_r = df.withColumn('row_number', sf.row_number().over(Window.partitionBy(df.level, df.age).orderBy(sf.desc("name"))).alias("rowNum")) # 另一种写法 df_r = df.withColumn('row_number', sf.row_number().over(Window.partitionBy("level", "age").orderBy(sf.desc("name"))).alias("rowNum")) print df_r.show() """ +---+------+-----+-----+----------+ |age|height|level| name|row_number| +---+------+-----+-----+----------+ | 5| 80| a| Bob| 1| | 5| 80| a|Alice| 2| | 10| 70| b| EiEi| 1| | 10| 80| b| Cycy| 2| | 12| 75| b| Didi| 1| +---+------+-----+-----+----------+ """
86.pyspark.sql.functions.second(col)
将给定日期的秒数提取为整数
>>> df = sqlContext.createDataFrame([('2015-04-08 13:08:15',)], ['a']) >>> df.select(second('a').alias('second')).collect() [Row(second=15)]
87.pyspark.sql.functions.size(col)
集合函数:返回存储在列中的数组或映射的长度
参数:col – 列或表达式名称
>>> df = sqlContext.createDataFrame([([1, 2, 3],),([1],),([],)], ['data']) >>> df.select(size(df.data)).collect() [Row(size(data)=3), Row(size(data)=1), Row(size(data)=0)]
88.pyspark.sql.functions.substring(str, pos, len)
子字符串从pos开始,长度为len,当str是字符串类型时,或者返回从字节pos开始的字节数组的片段,当str是二进制类型时,长度
为len
>>> df = sqlContext.createDataFrame([('abcd',)], ['s',]) >>> df.select(substring(df.s, 1, 2).alias('s')).collect() [Row(s=u'ab')]
89.pyspark.sql.functions.sum(col)
聚合函数:返回表达式中所有值的总和。
90.pyspark.sql.functions.to_utc_timestamp(timestamp, tz)
假定给定的时间戳在给定的时区并转换为UTC
>>> df = sqlContext.createDataFrame([('1997-02-28 10:30:00',)], ['t']) >>> df.select(to_utc_timestamp(df.t, "PST").alias('t')).collect() [Row(t=datetime.datetime(1997, 2, 28, 18, 30))]
91.pyspark.sql.functions.year(col)
将给定日期的年份提取为整数
>>> df = sqlContext.createDataFrame([('2015-04-08',)], ['a']) >>> df.select(year('a').alias('year')).collect() [Row(year=2015)]
92.pyspark.sql.functions.when(condition, value)
评估条件列表并返回多个可能的结果表达式之一。如果不调用Column.otherwise(),则不匹配条件返回None
参数:condition – 一个布尔的列表达式.
value – 一个文字值或一个Column表达式
>>> df.select(when(df['age'] == 2, 3).otherwise(4).alias("age")).collect() [Row(age=3), Row(age=4)] >>> df.select(when(df.age == 2, df.age + 1).alias("age")).collect() [Row(age=3), Row(age=None)] df3 = df.withColumn("when", sf.when(df.age<7, "kindergarten").when((df.age>=7)&(df.age<11), 'low_grade').otherwise("high_grade")) print df3.show() """ +---+------+-----+-----+------------+ |age|height|level| name| when| +---+------+-----+-----+------------+ | 5| 80| a|Alice|kindergarten| | 5| 80| a| Bob|kindergarten| | 10| 80| b| Cycy| low_grade| | 12| 75| b| Didi| high_grade| | 10| 70| b| EiEi| low_grade| +---+------+-----+-----+------------+ """
93.pyspark.sql.functions.udf(f, returnType=StringType)
创建一个表示用户定义函数(UDF)的列表达式。
>>> from pyspark.sql.types import IntegerType >>> slen = udf(lambda s: len(s), IntegerType()) >>> df.select(slen(df.name).alias('slen')).collect() [Row(slen=5), Row(slen=3)]
udf只能对每一行进行操作,无法对groupBy后的数据处理。
from pyspark.sql import types as stdef ratio(a, b): if a is None or b is None or b == 0: r = -1.0 else: r = 1.0 * a / b return r col_ratio = udf(ratio, st.DoubleType()) df_udf = df.withColumn("ratio", col_ratio(df.age, df.height)) print df_udf.show() """ +---+------+-----+-----+-------------------+ |age|height|level| name| ratio| +---+------+-----+-----+-------------------+ | 5| 80| a|Alice| 0.0625| | 5| 80| a| Bob| 0.0625| | 10| 80| b| Cycy| 0.125| | 12| 75| b| Didi| 0.16| | 10| 70| b| EiEi|0.14285714285714285| +---+------+-----+-----+-------------------+ """