Skip to content

pyspark

from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *
import pyspark.sql.functions as f

spark = SparkSession\
    .builder\
    .appName("AppName")\
    .getOrCreate()
sc = spark.sparkContext
sqlconext = SQLContext(sc)

读取csv文件

#file_name = "hdfs:///user/
#file_name = "file:///home/

# method 1:
# input_schema = StructType()
# for col in column_names:
#     input_schema.add(StructField(col, StringType(), True))              
# method 2:
# input_schema = spark.read.schema("col0 INT, col2 DOUBLE")

df = spark.read.csv(file_name, schema=input_schema, sep='\t')

选择列

df = df[['clk', 'site', 'category', 'location', 'ctr']]

修改列

df = df.withColumn('ctr', f.col('ctr')/1000000)

# 增加常量列
df = df.withColumn('constant', f.lit(10))

过滤行

df = df.filter(
        (f.col('site') == '1') & 
        (f.col('category') == 'FOCUS2')
        )

排序

df = df.orderBy(["age", "name"], ascending=[0, 1])
http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.orderBy

分组聚合

df = df.groupBy(['location']).agg(
        f.count('clk'),
        f.sum('clk'), 
        f.sum('ctr'), 
        f.sum('ctr')/f.sum('clk'))
  • 使用pyspark.sql.functions里的聚合函数

  • pandas_udf 分组数据会转化为pd.DataFrame, 需要注意内存是否放的下分组数据, pandas_udf设置的返回类型 需要与函数返回的pd.DataFrame类型一致

import pandas as pd
from pyspark.sql.types import *
from pyspark.sql.functions import pandas_udf, PandasUDFType

uid_schema = StructType()
uid_schema.add(StructField("uid", LongType(), True))
uid_schema.add(StructField("uid_finish_pv", LongType(), True))
uid_schema.add(StructField("uid_finish_clk", LongType(), True))
print("uid schema:", uid_schema)
@f.pandas_udf(uid_schema, f.PandasUDFType.GROUPED_MAP)
def uid_extract(pdf):
    d = {}
    d['uid'] = [pdf['uid'][0]]
    d['uid_finish_pv'] =  [len(pdf['finish'])]
    d['uid_finish_clk'] = [sum(pdf['finish'])]
    df = pd.DataFrame(d, columns=uid_schema.fieldNames())
    return df
uid_df = data_df.groupby(['uid']).apply(uid_extract)

join

left_df.join(right_df, ["request_id", "location"])

RDD 和 DataFrame 转换

from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *

spark = SparkSession\
    .builder\
    .appName("StatLocationPctr")\
    .getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)

# DataFrame to RDD
df.rdd 

# RDD to DataFrame
input_schema = StructType()
input_schema.add(StructField("feature", StringType(), True))
input_schema.add(StructField("value", StringType(), True))
df = sqlContext.createDataFrame(rdd, input_schema)

df = sc.parallelize([
    (1, "foo", 2.0, "2016-02-16"),
    (2, "bar", 3.0, "2016-02-16")
]).toDF(["id", "x", "y", "date"])

保存rdd到文件

df.coalesce(1).write.save(output_dir, format="csv", sep='\t')
df.repartition(1).write.save(output_dir, format="csv", sep='\t')
df.rdd.saveAsTextFile(output_dir)
rdd.saveAsTextFile(output_dir)
rdd.repartition(1).saveAsTextFile(output_dir) 

# 按分区存储文件
df.write.partitionBy('year', 'month').save(dict_output_dir, format="csv", sep='\t')

需要特定格式的输出时, 可以使用map方法先拼接成特定格式的字符串,然后再输出

在保存df之前,如果对df进行了排序, repartition会打乱顺序, coalesce不会

pyspark python

export SPARK_HOME=~/spark-2.4.0-bin-hadoop2.7 或者在~/spark-2.4.0-bin-hadoop2.7/conf/spark-env.sh 中配置SPARK_HOME, 后者会覆盖前者

export PATH=~/bin:~/hadoop/bin:~/spark-2.4.0-bin-hadoop2.7/bin:$PATH 设置path, 可以找到spark-submit

设置python 通过环境变量设置 export PYSPARK_PYTHON=/usr/local/bin/python2.7 export PYSPARK_DRIVER_PYTHON=/usr/local/bin/python2.7 export SPARK_YARN_USER_ENV="PYSPARK_PYTHON=/usr/local/bin/python2.7"

提交程序时设置python spark-submit \ --master local \ --conf "spark.pyspark.python=/home/appops/Python/bin/python" \ --conf "spark.pyspark.driver.python=/home/appops/Python/bin/python" \

jar包 ~/spark-2.4.0-bin-hadoop2.7/jars/ hadoop-lzo-0.4.20.jar