pyspark基本入门 主页                                            博客   留言 

今天为大家带来pyspark的快速入门,希望对大家的工作和学习有帮助。

基本概念介绍

首先介绍一下spark中常见的基本概念:

  • RDD:弹性分布式数据集的简称,是一个分布式对象集合,本质上是一个只读的分区记录集合。不能直接修改,只能通过一定的转换操作(map, reduce, join, group by)来创建新的RDD。
  • DAG:有向无环图,反应了RDD之间的依赖关系。
  • Executor:一个进程,负责运行任务。
  • Application:用户编写的spark应用程序。
  • Task:运行在Excutor上的工作单元。
  • Job:一个job包含多个RDD以及对应的RDD上的各种操作。
  • Stage:作业的基本调度单位。一个作业会被分为多组Task,每组任务称为一个stage。

其中,RDD是一种高度受限的内存模型,一次只能对RDD全集进行修改。 听完上述说明,大家可能理解起来很抽象,接下来我将介绍RDD编程模型,并通过程序例子来说明,方便大家理解。

RDD编程例子

1. 从文件系统中加载数据并转化成RDD格式

下面的例程可以将文本文件转化成RDD数据格式读入,便于Spark对RDD数据并行处理。

from pyspark import SparkConf, SparkContext
sc = SparkContext()
# 可以通过sc.textFiles来将text文件转化成RDD格式的数据。
# 如果是本地文件, 要加上 "file:///"
lines = sc.textFiles("file:///usr/local/sparl/example.txt")

# 下面三条语句是完全等价的
lines = sc.textFiles("hdfs://localhost:9000/user/hadoop/example.txt")
lines = sc.textFiles("/user/hadoop/example.txt")
lines = sc.textFiles("example.txt")
lines.foreach(print)

2. 将数组转化成RDD格式

array = [1, 2, 3, 4, 5]
# 通过sc.parallelize将数组转化成RDD格式
rdd = sc.parallelize(array)
rdd.foreach(print)
#1
#2
#3
#4
#5

3. RDD操作:Transformation

1. Filter
lines = sc.parallelize(['Spark is very fast', 'My name is LiLei'])
# 筛选出含有“Spark”的行,操作为并行。
linesWithSpark = lines.filter(lambda line: "Spark" in line)
# 每行并行打印
linesWithSpark.foreach(print)
# Spark is very fast
2. Map
lines = sc.parallelize(['Spark is very fast', 'My name is LiLei'])
# 每一行通过map并行处理。
words = lines.map(lambda line:line.split(" "))
words.foreach(print)
# ['Spark', 'is', 'very', 'fast']
# ['My', 'name', 'is', 'LiLie']
3. groupByKey
words = sc.parallelize([("Hadoop",1),("is",1),("good",1), \
  ("Spark",1),("is",1),("fast",1),("Spark",1),("is",1),("better",1)])
# groupByKey() 应用于 (K,V) 键值对的数据集时, 返回一个新的 (K, Iterable) 形式的数据集
words1 = words.groupByKey()
words1.foreach(print)
#('Hadoop', <pyspark.resultiterable.ResultIterable object at 0x7fb210552c88>)
#('better', <pyspark.resultiterable.ResultIterable object at 0x7fb210552e80>)
#('fast', <pyspark.resultiterable.ResultIterable object at 0x7fb210552c88>)
#('good', <pyspark.resultiterable.ResultIterable object at 0x7fb210552c88>)
#('Spark', <pyspark.resultiterable.ResultIterable object at 0x7fb210552f98>)
#('is', <pyspark.resultiterable.ResultIterable object at 0x7fb210552e10>)
4. reduceByKey
words = sc.parallelize([("Hadoop",1),("is",1),("good",1),("Spark",1), \
    ("is",1),("fast",1),("Spark",1),("is",1),("better",1)])
# reduceByKey:相同的key通过指定操作进行聚合,下方代码利用求和进行聚合
words1 = words.reduceByKey(lambda a,b:a+b)
words1.foreach(print)
#('good', 1)
#('Hadoop', 1)
#('better', 1)
#('Spark', 2)
#('fast', 1)
#('is', 3)

4. RDD操作:Action

由于Spark的惰性机制,当RDD通过Transformation操作,直到遇到Action操作后,才会执行真正的计算, 从文件中加载数据, 完成一次又一次Transformation操作, 最终, 完成Action操作得到结果。

rdd = sc.parallelize([1,2,3,4,5])
## rdd的数量
rdd.count()
#5
## 第一行rdd 
rdd.first()
#1
## 前三行rdd
rdd.take(3)
#[1, 2, 3]
rdd.reduce(lambda a,b:a+b)
#15

## 以数组的形式返回rdd中所有元素
rdd.collect()
#[1, 2, 3, 4, 5]
rdd.foreach(lambda elem:print(elem))

总结

通过将输入(文件,数组)转化成RDD,并将多个简单的Transformation和Action操作进行串联,Spark可以高效的完成很多复杂数据的处理。同时,在完成大规模的数据处理后,我们也可以利用Spark中内置的机器学习算法来对这些大规模的数据进行学习和建模。Spark中内部实现了很多分布式机器学习算法,例如SVM,Word2Vec等,我们将在之后的文章中详细讲解。

参考文献

  • https://zhenye-na.github.io/2021/01/13/api-oriented-programming-with-spark-2.html
  • 《Spark编程基础》:林子雨,赖永炫,陶继平