spark

spark

框架模块

spark core:核心RDD

spark sql:针对结构化数据,本身针对离线计算。

spark streaming: 在core基础上,流式计算

运行模式

本地模式:local,以一个独立进程,通过其内部多个线程模拟spark(开发,测试)

standalone:集群,各个角色以独立进程的形式存在

YARN:集群,角色运行在YARN的容器内,

Kubernets:在Kubernets容器内

云服务:在云平台上运行

架构角色

YARN

resoumanger:集群管理

nodemanger:单机管理

applicationmaster:单任务管理者

task:单任务执行

image-20240821164007378

spark

master:集群资源管家

worker:单机资源管家

Driver:管理单任务 (在local模式在,既管理又干活)

Executor:运行单任务

local模式

local[N] 指定N个线程模拟

local[*] 对线程数无限制,根据cpu的cores设定

image-20240821164406582

sparkcount

创建sparkconf对象

通过sparkcontext创建对象

1
2
3
conf = SparkConf("local[*]").setAppName("WordCountHelloWorld")
# 通过 SparkConf 对象构建 SparkContext 对象
sc = SparkContext(conf=conf)

读文件

  • 读本地的文件

    1
    2
    file_rdd = sc.textFile("./data/input/word.tx")
    # "file:///tmp/pycharm_project_846/data/input/word.txt" linux上文件
  • 读HDFS上的文件

    1
    file_rdd = sc.textFile("hdfs://node1:8020/input/word.txt")

wordcount原理

image-20240822085308917

RDD

简介

RDD:弹性(数据讯息再内存或磁盘中)分布式(跨进程存储)数据集合,分布式框架下的统一数据抽象对象

特性:有分区;计算方法都会作用再每一个分片上;RDD之间又相互依赖关系;KV型RDD可以有分区器;分区数据的读取尽量靠近数据所在服务器

一份RDD本质分成了多个分区:

image-20240822095233696

KV型RDD:RDD内存存储的数据式二元元组(“hadoop”,3)(“hadoop”,1)(“flink”,3)

默认分区器:hash分区器

编程操作

程序入口:sparkcontext对象

RDD创建:

  • 并行化集合创建(本地转分布式RDD);

    1
    2
    3
    4
    5
    6
    7
    conf = SparkConf().setMaster("local[*]").setAppName("test")
    sc = SparkContext(conf=conf)
    # 本地转分布式RDD对象
    rdd = sc.parallelize([1,2,3,4,5,6,7,8,9],3)
    print("分区:",rdd.getNumPartitions())
    # 把RDD每个分区的对象collect到driver
    print("RDD内容:", rdd.collect())
  • 读取外部文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    # 读取本地文件
    file_rdd = sc.textFile("../../data/input/word.txt")
    print(file_rdd.getNumPartitions())
    print(file_rdd.collect())
    file_rdd2 = sc.textFile("../../data/input/word.txt",3)
    print(file_rdd2.getNumPartitions())
    file_rdd3 = sc.textFile("../../data/input/word.txt", 100)
    print(file_rdd3.getNumPartitions())

    # 读取HDFS
    file_rdd4 = sc.textFile("hdfs://node1:8020/input/word.txt")
    print(file_rdd4.collect())

    # 针对小文件
    rdd = sc.wholeTextFiles("../../data/input/tiny_files")
    print(rdd.map(lambda x:x[1]).collect())

RDD算子:分布式集合上的API

  • Transformation转换算子 RDD——>RDD

    构建执行计划,对RDD迭代,无Action不干活

    1. map算子(接收处理函数)

      1
      print(rdd.map(lambda x: x*10).collect())
    2. flatmap算子(先map 再解除嵌套)

      1
      2
      3
      rdd = sc.parallelize(["hadoop spark map", "spark flink", "hadoop spark"])
      print(rdd.map(lambda x: (x.split(" "))).collect())
      print(rdd.flatMap(lambda x: (x.split(" "))).collect())
    3. reducebykey (自动按照key分组,完成组内数据的聚合)

      1
      2
      3
      # 传入的参数只是对value
      rdd = sc.parallelize([['a',1], ['b',1], ['c',1], ['a',2]])
      print(rdd.reduceByKey(lambda a,b: a+b).collect())
    4. mapvalues 只针对value的map算子

    1
    2
    rdd = sc.parallelize([('a',1), ('b',1), ('c',1)])
    print(rdd.mapValues(lambda value: value*10).collect())
    1. groupby (确定按照k还是v分组)
    1
    2
    3
    rdd = sc.parallelize([('a',1), ('b',1), ('c',1),('a',4), ('b',4), ('c',6)])
    print(rdd.groupBy(lambda x:x[0]).map(lambda x:(x[0], list(x[1]))).collect())

    1. Filter (过滤数据,结果为True的保留)
    1
    2
    3
    rdd = sc.parallelize([1,2,3,4,5,6])
    result = rdd.filter(lambda x: x %2 == 1)
    print(result.collect())
    1. distinct(去重)
    1
    2
    rdd = sc.parallelize([1, 1, 2, 2, 2, 3, 4, 5, 6])
    print(rdd.distinct().collect())
    1. union(2个RDD合并成一个)

      1
      2
      3
      4
      5
      rdd = sc.parallelize([1, 1, 2, 2, 2, 3, 4, 5, 6])

      rdd2 = sc.parallelize(['a','v'])
      rdd3 = rdd.union(rdd2)
      print(rdd3.collect())
    2. join (对两个RDD执行JOIN操作 只用于二元组, 内连接,左外,右外)

      1
      2
      3
      4
      5
      6
      7
      8
      9
      rdd = sc.parallelize([(1001, 'zhangsan'), (1002, 'lisi'),
      (1003, 'wangwu'), (1004, 'laoliu')]
      )
      rdd2 = sc.parallelize([(1001, '科技部'), (1002,'教育部')])
      # 按照key关联
      rdd3 = rdd.join(rdd2)
      print(rdd3.collect())
      print(rdd.leftOuterJoin(rdd2).collect())
      print(rdd.rightOuterJoin(rdd2).collect())
    3. intersection (求两个RDD的交集)

      rdd.intersection(rdd2)

    4. glom (将RDD数据加上嵌套, 这个嵌套按照分区处理)

      rdd.glom().collect()

    5. groupbykey(针对KV型,自动按照Key分组)

    6. sortby(对RDD数据进行排序)

    rdd.sortby(func, ascending=True)
    
    
    1
    2
    3
    rdd = sc.parallelize([('a', 1), ('a',3), ('c', 5), ('b', 2)])
    print(rdd.sortBy(lambda x :x[0], ascending=True, numPartitions=1).collect())

    1. sortbykey(对KV型的,按Key排序)

      sortbykey(ascending=True, numparttions, func)

      func: 再排序前对数据key处理

    2. mappartitions (对每个分区只有一次IO)结果与map无差异

      func里面传入的是迭代器对象

    3. partitionby (自定义分区操作)

    4. repartition(对RDD分区重新分区,仅改变数量,不约定新分区规则)

    案例:对订单数据,提取背景的数据,组合北京合商品类别进行输出,并对结果进行去重,得到北京售卖的商品类别信息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    rdd = sc.textFile("../../data/input/order.text")
    file_rdd = rdd.flatMap(lambda line: line.split("|"))
    dict_rdd = file_rdd.map(lambda json_dir : json.loads(json_dir))
    # 过滤数据
    bj_rdd = dict_rdd.filter(lambda d : d['areaName']=='北京')
    # 组合北京合商品类型
    ctg_rdd = bj_rdd.map(lambda d : d['areaName'] + '-' + d['category'])
    # 去重
    res_rdd = ctg_rdd.distinct()
    print(res_rdd.collect())
  • Action算子 RDD——> not RDD

    1. countBykey(对key进行计数)

    2. collect(把RDD各个分区数据收集到driver,形成一个List对象)

    3. reduce (对RDD数据集按照传入逻辑聚合)

    4. fold (带有初始值的聚合)

      rdd.fold(10, lambda a,b:a+b) 初始值会在分区内 和 分区键聚合 都用到

    5. first (取出RDD的第一个元素)

    6. take (取RDD的前N个元素) 返回list

    7. top (对RDD结果进行降序排序,取前N个)

    8. count (RDD中数据量)

    9. takesample(随机抽样RDD数据)

      takesample(True, 采样数,随机数种子) True:允许重复取一个数字

    10. takeordered 对RDD排序取前N个

      takeordered(前N个, 对排序数据更改) 默认升序

    11. foreach (对每个元素进行你想要的操作) +func 由excutor直接输出

    12. saveastextfile (将RDD数据写入文件)由excutor直接输出

    13. mappartitions ()

    14. foreachpartiton (一次处理一整个分区的数据)

    流水线的开关

RDD缓存

RDD是过程数据,老旧没用的会从内存清理

RDD缓存API:rdd.cache() rdd.persist()

Spark案例练习

任务:搜索关键词统计,用户搜索点击统计,搜索时间段统计

广播变量

进程内资源共享,一个进程里面的数据发了一份即可

broadcast = sc.broadcast(原数据)

value = broadcast.value

本地集合对象和分布式对象操作时需要使用广播变量避免多次网络IO

累加器:

ac = sc.accumalate(0)

能把每个分区的数据反映到全局变量中

Spark SQL

  • 概述

Spark的用于处理结构化数据的module

pandas: dataframe 二维表,单机集合

sparkcore:RDD 无标准数据结构,分布式集合

sparksql:datatframe 二维表,分布式集合

dataset:用于java和scala

dataframe: 用于python java scala

  • sparkSession对象:sparksql的入口对象,也可以用于RDD编程
  • dataframe组成:structtype描述表结构。structfield描述一个列的信息, row对象记录一行数据, column记录一列列数据并包含列的信息

dataframe代码构建

  • 基于RDD

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    rdd = sc.textFile("../data/input/sql/people.txt").\
    map(lambda line: line.split(",")).\
    map(lambda line: (line[0], int(line[1])))

    # 参数1:rdd 参数2:列名
    df = spark.createDataFrame(rdd, schema=['name', 'age'] )
    df.printSchema()
    # n:展示多少数据 truncate:是否对数据进行截断
    df.show(20, truncate=False)

    df.createOrReplaceTempView("people")
    spark.sql("select * from people where age > 30").show()

  • 读取外部数据:parquet:SPARK的列式存储文件格式

  • 编程操作:

    1. DSL语法 df.where().limit()

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      # df.select(["id", "subject"]).show()
      # df.select("subject", "score").show()
      # df.select(id_column, subject_column).show()

      # # filter
      # df.filter("score < 99").show()
      # df.filter(df["score"] < 99).show()
      #
      # # where
      # df.where("score < 99").show()
      # df.where(df["score"] < 99).show()

      # groupby 分组 返回值groupeddata,不是df了 得接上聚合mean, max,count
      df.groupBy("subject").count().show()
      df.groupBy(df["subject"]).count().show()
    2. SQL语法 spark.sql()

      1
      2
      3
      4
      5
      6
      7
      8
      df.createTempView("score")
      df.createOrReplaceTempView("score_1")
      df.createGlobalTempView("score_2")

      spark.sql("select subject, count(*) from score group by subject").show()
      spark.sql("select subject, count(*) from score_1 group by subject").show()
      spark.sql("select subject, count(*) from global_temp.score_2 group by subject").show()

  • wordcount案例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    rdd = sc.textFile("../data/input/word.txt").\
    flatMap(lambda x: x.split(" ")).\
    map(lambda x: [x])

    df = rdd.toDF(["word"])
    df.createTempView("words")
    spark.sql("select word, count(*) as cnt from words group by word order by cnt desc ").show()

    df =spark.read.format("text").load("../data/input/word.txt")
    # withcolumn:对已存在的列做新操作,返回一个新列
    df2 = df.withColumn("value", F.explode(F.split(df['value'], " ")))
    df2.groupBy("value").count().\
    withColumnRenamed("value", "word").\
    withColumnRenamed("count", "cnt").\
    orderBy("cnt", ascending=False).show()
  • 电影评分案例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    schema= StructType().add("user_id", StringType(), nullable=True).add("movie_id", IntegerType(), nullable=True).add("rank", IntegerType(), nullable=True).add("ts", StringType(), nullable=True)
    df = spark.read.format("csv").option("header", False).option("sep", "\t").option("encoding", "utf-8").schema(schema).load("../data/input/sql/u.data")

    df.groupBy("user_id").avg("rank").withColumnRenamed('avg(rank)','avg_rank').\
    withColumn('avg_rank', F.round('avg_rank', 2)).\
    orderBy('avg_rank', ascending=False).show()

    df.createTempView("movie")
    spark.sql("select movie_id, round(avg(rank), 2) as avg_rank from movie group by movie_id order by avg_rank").show()

    # 查询大于平均分电影数量
    num = df.filter(df["rank"] > df.select(F.avg(df["rank"])).first()['avg(rank)']).count()
    print(num)

    # 查询大于3分电影中打分最多次的用户,其平均分
    usr_id = df.where(df["rank"]> 3).groupBy("user_id").count().withColumnRenamed('count', 'cnt').\
    orderBy('cnt', ascending=False).limit(1).first()["user_id"]
    df.filter(df["user_id"]==usr_id).select(F.round(F.avg(df["rank"]),2)).show()

    # 查询每个用户的最低和最高打分
    df.groupBy('user_id').\
    agg(
    F.round( F.avg("rank"), 2).alias('avg_rank'),
    F.min('rank').alias('min_rank'),
    F.max('rank').alias('max_rank')

    ).show()

    # 查询评分超过100次的电影的平均分排名top10
    df.groupBy('movie_id'). agg(
    F.count('movie_id').alias('cnt'),
    F.round(F.avg('rank'),2).alias('avg_rank')
    ).where('cnt > 100' ).orderBy('avg_rank', ascending=False).\
    limit(10).show()

    # agg : 是groupeddata对象的api,在里面可以写多个聚合
    # alias: 是column对象api,对列改名
    # withcolumnRename: 是dataframe的api
    # orderby:dataframe的api
    # first: dataframe的API 返回第一个row对象,不再是df
  • shuffle分区数目设置

    1
    2
    3
    4
    spark = SparkSession.builder.appName("test").\
    master("local[*]").\
    config("spark.sql.shuffle.partitions", "2").\
    getOrCreate()
  • 数据清洗API


spark
https://wendyflv.github.io/2024/08/21/spark/
作者
Wendyflv
发布于
2024年8月21日
许可协议