# PySpark 介绍

spark (opens new window) 是一款分布式的计算框架,用于调度成百上千的服务器集群,计算TB、PB乃至EB级别的海量数据

PySpark 是由 Spark 官方开发的 Python 语言第三方库,其作用:

# 作为 Python 库进行数据处理
# 提交至Spark集群,进行分布式集群计算

# 构建 PySpark 执行环境入口对象

PySpark 的执行环境入口对象是类:Sparkcontext 的类对象

from pyspark import SparkConf, SparkContext

# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于Sparkconf类对象创建Sparkcontext类对象
sc = SparkContext(conf = conf)
# 打印PySpark的运行版本
print(sc.version)
# 停止Sparkcontext对象的运行(停止PySpark程序)
sc.stop()

# RDD对象

  • 弹性分布式数据集 Resilient Distributed Datasets
  • PySpark 支持多种数据(json、文件、数据库)的输入,在输入完成后,都会得到一个RDD类的对象
  • RDD 是 PySpark 中数据计算的载体,后续对数据进行各类计算,都是基于 RDD 对象进行,它可以:
# 提供数据存储,即数据输入
# 提供数据计算的各类方法
# RDD 的数据计算方法,数据输出返回值依旧是RDD对象

# 数据输入

  • 数据容器转RDD对象
from pyspark import Sparkconf, Sparkcontext

conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)

# 通过parallelize方法将Python容器对象加载到Spark内,成为RDD对象
rdd1 = sc.parallelize([1, 2, 3, 4, 5]) # list => [1,2,3,4,5]
rdd2 = sc.parallelize((1, 2, 3, 4, 5)) # tuple => [1,2,3,4,5]
rdd3 = sc.parallelize("abcd") # str => ['a','b','c','d']
rdd4 = sc.parallelize({1, 2, 3, 4, 5}) # set => [1,2,3,4,5]
rdd5 = sc.parallelize({"key1": "value1", "key2": "value2"}) # dict => ['key1','key2']

# 如果要查看RDD里面有什么内容,需要用collect()方法
print(rdd.collect())

注意

  • 字符串会被拆分出一个个的字符,存入RDD对象
  • 字典仅有key会被存入RDD对象
  • 读取文件转RDD对象
from pyspark import Sparkconf, Sparkcontext

conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)

rdd = sc.textFile(文件路径)

# 打印RDD内容
print(rdd.collect())

# 数据计算

  • map算子:是将RDD的数据一条条处理返回新的RDD

 
 
 


 





















# rdd.map(func)
# func: f:(T) -> U
# (T) -> U 这个方法接受一个参数传入,传入参数类型不限. 返回一个返回值,返回值类型不限
# (A) -> A 这个方法接受一个参数传入,传入参数类型不限. 返回一个返回值,返回值和传入参数类型一致

from pyspark import Sparkconf, Sparkcontext
# 需要配置PYSPARK_PYTHON参数
import os
os.environ['PYSPARK_PYTHON']= "D:/dev/python/python310/python.exe"T

conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)

rdd=sc.parallelize([12345])

# 通过map方法将全部数据都乘以10
def func(data):
	return data * 10

rdd2 = rdd.map(func)
# lambda 表达式
rdd2 = rdd.map(lambda x: x * 10)
# 链式调用
rdd2 = rdd.map(lambda x: x * 10).map(lambda x: x + 5)

# 打印RDD内容
print(rdd2.collect())
  • flatMap算子:对 rdd 执行 map 操作,然后进行解除嵌套操作
# 嵌套的
List = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]

rdd.flatMap(lambda x:x.split(",")).collect())
# 解除了嵌套 List = [1, 2, 3, 4, 5, 6, 7, 8, 9]
  • reduceByKey算子:针对KV型 RDD,自动按照key分组,然后根据提供的聚合逻辑,完成组内数据(valve)的聚合操作
# rdd.reduceByKey(func)
# func:(V,V) -> V 接受2个传入参数(类型要一致),返回一个返回值,类型和传入要求一致

rdd = sc.parallelize([('a',1),('a',1),('b',1),('b',1),('b',1)])
result = rdd.reduceByKey(lambda a, b : a + b)
print(result.collect())
# 结果:[('b', 3),('a',2)]

reduceByKey算子

  • Filter算子:过滤想要的数据进行保留
# rdd.filter(func)
# func:(T) -> bool 传入1个参数进来随意类里,返回值必须是True or False
# 返回是True的数据被保留, False的数据被丢弃

rdd=sc.parallel1ze([1, 2, 3, 4, 5])
# 保留奇数
rdd.filter(lambda x:True if(x % 2 ==1) else False)
print(rdd.filter(lambda x:x % 2 ==1).collect())
  • distinct算子:对RDD数据进行去重,返回新RDD
# rdd.distinct() 无需传参
rdd = sc.parallelize([1, 1, 3, 3, 5, 5, 6, 6, 9, 9])
# 数据去重
print(rdd.distinct().coltect())
  • sortBy算子:对RDD数据进行排序,基于你指定的排序依据
# rdd.sortBy(func, ascending=False, numPartitions=1)
# func:(T) -> U:告知按照rdd中的哪个数据进行排序,如:lambda x:x[1]表示按照第二列元素进行排序
# ascending:True升序 False 降序
# numPartitions:用多少分区排序

final_rdd = rdd.sortBy(lambda x: x[1], ascending=False, numPartitions=1)

# 数据输出

  • collect算子:将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象
  • reduce算子:对RDD数据集按照你传入的逻辑进行聚合
# rdd.reduce(func)
# func:(T, T) -> T 2参数传入1个返回值,返回值和参数要求类型一致

rdd = sc.parallel1ze(range(1, 10))
# 将rdd的数据进行累加求和
num = rdd.reduce(lambda a,b:a+b)
print(num)
  • take算子:取RDD的前N个元素,组合成list返回给你
rdd = sc.parallelize([3,2,1,4,5,6])
print(rdd.take(5))
# 返回[3,2,1,4,5]
  • count算子:算RDD有多少条数据,返回值是一个数字
rdd = sc.parallelize([3,2,1,4,5,6])
print(rdd.count())
# 返回 6
  • saveAsTextFile算子:将RDD的数据写入文本文件中
# 调用保存文件的算子,需要配置Hadoop依赖
# 下载Hadoop安装包 http://archive.apache.org/dist/hadoop/common/hadoop-3.0.0/hadoop-3.0.0.tar.gz
# 在Python代码中使用os模块配置:os.environ["HADOOP_HOME"]="HADOOP解压文件夹路径"
# 下载winutils.exe,并放入Hadoop解压文件夹的bin目录内
# https://raw.githubusercontent.com/steveloughran/winutils/master/hadoop-3.0.0/bin/winutils.exe
# 下载hadoop.dll,并放入:C:/Windows/System32 文件夹内
# https://raw.githubusercontent.com/steveloughran/winutils/master/hadoop-3.0.0/bin/hadoop.dll

# 修改RDD并行度为1,否则会生成多个文件
# 方式1,SparkConf对象设置属性全局并行度为1:
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
conf.set('spark.default.parallelism","1")
sc = SparkContext(conf=conf)
# 方式2,创建RDD的时候设置(parallelize方法传入numslices参数为1)
rdd1 = sc.parallelize([1, 2, 3, 4, 5], numSlices = 1)
rdd1 = sc.parallelize([1, 2, 3, 4, 5], 1)

rdd = sc.para1lelize([3,2,1,4,5,6])
rdd.saveAsTextFile("D:/output3")
rdd.saveAsTextFile("../data/output/test.txt")