pyspark
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *
import pyspark.sql.functions as f
spark = SparkSession\
.builder\
.appName("AppName")\
.getOrCreate()
sc = spark.sparkContext
sqlconext = SQLContext(sc)
读取csv文件
#file_name = "hdfs:///user/
#file_name = "file:///home/
# method 1:
# input_schema = StructType()
# for col in column_names:
# input_schema.add(StructField(col, StringType(), True))
# method 2:
# input_schema = spark.read.schema("col0 INT, col2 DOUBLE")
df = spark.read.csv(file_name, schema=input_schema, sep='\t')
选择列
df = df[['clk', 'site', 'category', 'location', 'ctr']]
修改列
df = df.withColumn('ctr', f.col('ctr')/1000000)
# 增加常量列
df = df.withColumn('constant', f.lit(10))
过滤行
df = df.filter(
(f.col('site') == '1') &
(f.col('category') == 'FOCUS2')
)
排序
df = df.orderBy(["age", "name"], ascending=[0, 1])
http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.orderBy
分组聚合
df = df.groupBy(['location']).agg(
f.count('clk'),
f.sum('clk'),
f.sum('ctr'),
f.sum('ctr')/f.sum('clk'))
-
使用pyspark.sql.functions里的聚合函数
-
pandas_udf 分组数据会转化为pd.DataFrame, 需要注意内存是否放的下分组数据, pandas_udf设置的返回类型 需要与函数返回的pd.DataFrame类型一致
import pandas as pd
from pyspark.sql.types import *
from pyspark.sql.functions import pandas_udf, PandasUDFType
uid_schema = StructType()
uid_schema.add(StructField("uid", LongType(), True))
uid_schema.add(StructField("uid_finish_pv", LongType(), True))
uid_schema.add(StructField("uid_finish_clk", LongType(), True))
print("uid schema:", uid_schema)
@f.pandas_udf(uid_schema, f.PandasUDFType.GROUPED_MAP)
def uid_extract(pdf):
d = {}
d['uid'] = [pdf['uid'][0]]
d['uid_finish_pv'] = [len(pdf['finish'])]
d['uid_finish_clk'] = [sum(pdf['finish'])]
df = pd.DataFrame(d, columns=uid_schema.fieldNames())
return df
uid_df = data_df.groupby(['uid']).apply(uid_extract)
join
left_df.join(right_df, ["request_id", "location"])
RDD 和 DataFrame 转换
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *
spark = SparkSession\
.builder\
.appName("StatLocationPctr")\
.getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
# DataFrame to RDD
df.rdd
# RDD to DataFrame
input_schema = StructType()
input_schema.add(StructField("feature", StringType(), True))
input_schema.add(StructField("value", StringType(), True))
df = sqlContext.createDataFrame(rdd, input_schema)
df = sc.parallelize([
(1, "foo", 2.0, "2016-02-16"),
(2, "bar", 3.0, "2016-02-16")
]).toDF(["id", "x", "y", "date"])
保存rdd到文件
df.coalesce(1).write.save(output_dir, format="csv", sep='\t')
df.repartition(1).write.save(output_dir, format="csv", sep='\t')
df.rdd.saveAsTextFile(output_dir)
rdd.saveAsTextFile(output_dir)
rdd.repartition(1).saveAsTextFile(output_dir)
# 按分区存储文件
df.write.partitionBy('year', 'month').save(dict_output_dir, format="csv", sep='\t')
需要特定格式的输出时, 可以使用map方法先拼接成特定格式的字符串,然后再输出
在保存df之前,如果对df进行了排序, repartition会打乱顺序, coalesce不会
pyspark python
export SPARK_HOME=~/spark-2.4.0-bin-hadoop2.7 或者在~/spark-2.4.0-bin-hadoop2.7/conf/spark-env.sh 中配置SPARK_HOME, 后者会覆盖前者
export PATH=~/bin:~/hadoop/bin:~/spark-2.4.0-bin-hadoop2.7/bin:$PATH 设置path, 可以找到spark-submit
设置python 通过环境变量设置 export PYSPARK_PYTHON=/usr/local/bin/python2.7 export PYSPARK_DRIVER_PYTHON=/usr/local/bin/python2.7 export SPARK_YARN_USER_ENV="PYSPARK_PYTHON=/usr/local/bin/python2.7"
提交程序时设置python spark-submit \ --master local \ --conf "spark.pyspark.python=/home/appops/Python/bin/python" \ --conf "spark.pyspark.driver.python=/home/appops/Python/bin/python" \
jar包 ~/spark-2.4.0-bin-hadoop2.7/jars/ hadoop-lzo-0.4.20.jar