大数据Spark电影评分数据分析

| 2022-09-21

目录

  • ​1 数据 ETL​
  • ​2 使用 SQL 分析​
  • ​3 使用 DSL 分析​
  • ​4 保存结果数据​
  • ​5 案例完整代码​
  • ​6 Shuffle 分区数目问题​

    1 数据 ETL

    使用电影评分数据进行数据分析,分别使用DSL编程和SQL编程,熟悉数据处理函数及SQL使用,业务需求说明:对电影评分数据进行统分析,获取Top10电影(电影评分平均值最高,并且每个电影被评分的次数大于2000)。数据集ratings.dat总共100万条数据,数据格式如下每行数据各个字段之间使用双冒号分开:

    大数据Spark电影评分数据分析_sql

    数据处理分析步骤如下:

    1. 第一步、读取电影评分数据,从本地文件系统读取
    2. 第二步、转换数据,指定Schema信息,封装到DataFrame
    3. 第三步、基于SQL方式分析
    4. 第四步、基于DSL方式分析

    读取电影评分数据,将其转换为DataFrame,使用指定列名方式定义Schema信息,代码如下:

    // 构建SparkSession实例对象
    val spark: SparkSession = SparkSession.builder()
      .master("local[4]")
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .getOrCreate()
    // 导入隐式转换
    
    import spark.implicits._
    
    // 1. 读取电影评分数据,从本地文件系统读取
    val rawRatingsDS: Dataset[String] = spark.read.textFile("datas/ml-1m/ratings.dat")
    // 2. 转换数据
    val ratingsDF: DataFrame = rawRatingsDS
      // 过滤数据.
      .filter(line => null != line && line.trim.split("::").length == 4)
      // 提取转换数据
      .mapPartitions { iter =>
        iter.map { line =>
          // 按照分割符分割,拆箱到变量中
          val Array(userId, movieId, rating, timestamp) = line.trim.split("::")
          // 返回四元组
          (userId, movieId, rating.toDouble, timestamp.toLong)
        }
      }
      // 指定列名添加Schema
      .toDF("userId", "movieId", "rating", "timestamp")
    /*
    root
    |-- userId: string (nullable = true)
    |-- movieId: string (nullable = true)
    |-- rating: double (nullable = false)
    |-- timestamp: long (nullable = false)
    */
    //ratingsDF.printSchema()
    /*
    +------+-------+------+---------+
    |userId|movieId|rating|timestamp|
    +------+-------+------+---------+
    | 1| 1193| 5.0|978300760|
    | 1| 661| 3.0|978302109|
    | 1| 594| 4.0|978302268|
    | 1| 919| 4.0|978301368|
    +------+-------+------+---------+
    */
    //ratingsDF.show(4)

    2 使用 SQL 分析

    首先将DataFrame注册为临时视图,再编写SQL语句,最后使用SparkSession执行,代码如下:

    // TODO: 基于SQL方式分析
    // 第一步、注册DataFrame为临时视图
    ratingsDF.createOrReplaceTempView("view_temp_ratings")
    // 第二步、编写SQL
    val top10MovieDF: DataFrame = spark.sql(
      """
        |SELECT
        | movieId, ROUND(AVG(rating), 2) AS avg_rating, COUNT(movieId) AS cnt_rating
        |FROM
        | view_temp_ratings
        |GROUP BY
        | movieId
        |HAVING
        | cnt_rating > 2000
        |ORDER BY
        | avg_rating DESC, cnt_rating DESC
        |LIMIT
        | 10
    """.stripMargin)
    //top10MovieDF.printSchema()
    top10MovieDF.show(10, truncate = false)

    应用scala的stripMargin方法,在scala中stripMargin默认是“|”作为出来连接符,在多行换行的行头前面加一个“|”符号即可。

    代码实例:

    val speech = “”"abc

    |def""".stripMargin

    运行的结果为:

    abc

    ldef

    运行程序结果如下:

    大数据Spark电影评分数据分析_big data_02

    3 使用 DSL 分析

    调用Dataset中函数,采用链式编程分析数据,核心代码如下:

    // TODO: 基于DSL=Domain Special Language(特定领域语言) 分析
    
    import org.apache.spark.sql.functions._
    
    val resultDF: DataFrame = ratingsDF
      // 选取字段
      .select($"movieId", $"rating")
      // 分组:按照电影ID,获取平均评分和评分次数
      .groupBy($"movieId")
      .agg( //
        round(avg($"rating"), 2).as("avg_rating"), //
        count($"movieId").as("cnt_rating") //
      )
      // 过滤:评分次数大于2000
      .filter($"cnt_rating" > 2000)
      // 排序:先按照评分降序,再按照次数降序
      .orderBy($"avg_rating".desc, $"cnt_rating".desc)
      // 获取前10
      .limit(10)
    //resultDF.printSchema()
    resultDF.show(10)

    Round函数返回一个数值,该数值是按照指定的小数位数进行四舍五入运算的结果。除数值外,也可对日期进行舍入运算。

    round(3.19, 1) 将 3.19 四舍五入到一个小数位 (3.2)

    round(2.649, 1) 将 2.649 四舍五入到一个小数位 (2.6)

    round(-5.574, 2) 将 -5.574 四舍五入到两小数位 (-5.57)

    其中使用SparkSQL中自带函数库functions,在org.apache.spark.sql.functions中,包含常用函

    数,有些与Hive中函数库类似,但是名称不一样。

    大数据Spark电影评分数据分析_数据_03

    使用需要导入函数库:import org.apache.spark.sql.functions._

    4 保存结果数据

    将分析结果数据保存到外部存储系统中,比如保存到MySQL数据库表中或者CSV文件中。

    // TODO: 将分析的结果数据保存MySQL数据库和CSV文件
    // 结果DataFrame被使用多次,缓存
    resultDF.persist(StorageLevel.MEMORY_AND_DISK)
    // 1. 保存MySQL数据库表汇总
    resultDF
      .coalesce(1) // 考虑降低分区数目
      .write
      .mode("overwrite")
      .option("driver", "com.mysql.cj.jdbc.Driver")
      .option("user", "root")
      .option("password", "123456")
      .jdbc(
        "jdbc:mysql://node1.oldlu.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnic
          ode = true",
          "db_test.tb_top10_movies",
          new Properties ()
          )
          // 2. 保存CSV文件:每行数据中个字段之间使用逗号隔开
          resultDF
          .coalesce (1)
          .write.mode ("overwrite")
          .csv ("datas/top10-movies")
          // 释放缓存数据
          resultDF.unpersist ()

    查看数据库中结果表的数据:

    大数据Spark电影评分数据分析_数据_04

    5 案例完整代码

    电影评分数据分析,经过数据ETL、数据分析(SQL分析和DSL分析)及最终保存结果,整套

    数据处理分析流程,其中涉及到很多数据细节,完整代码如下

    import java.util.Properties
    import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
    import org.apache.spark.storage.StorageLevel
    
    /**
     * 需求:对电影评分数据进行统计分析,获取Top10电影(电影评分平均值最高,并且每个电影被评分的次数大于2000)
     */
    object SparkTop10Movie {
      def main(args: Array[String]): Unit = {
        // 构建SparkSession实例对象
        val spark: SparkSession = SparkSession.builder()
          .master("local[4]")
          .appName(this.getClass.getSimpleName.stripSuffix("$"))
          // TODO: 设置shuffle时分区数目
          .config("spark.sql.shuffle.partitions", "4")
          .getOrCreate()
        // 导入隐式转换
        import spark.implicits._
        // 1. 读取电影评分数据,从本地文件系统读取
        val rawRatingsDS: Dataset[String] = spark.read.textFile("datas/ml-1m/ratings.dat")
        // 2. 转换数据
        val ratingsDF: DataFrame = rawRatingsDS
          // 过滤数据
          .filter(line => null != line && line.trim.split("::").length == 4)
          // 提取转换数据
          .mapPartitions { iter =>
            iter.map { line =>
              // 按照分割符分割,拆箱到变量中
              val Array(userId, movieId, rating, timestamp) = line.trim.split("::")
              // 返回四元组
              (userId, movieId, rating.toDouble, timestamp.toLong)
            }
          }
          // 指定列名添加Schema
          .toDF("userId", "movieId", "rating", "timestamp")
        /*
        root
        |-- userId: string (nullable = true)
        |-- movieId: string (nullable = true)
        |-- rating: double (nullable = false)
        |-- timestamp: long (nullable = false)
        */
        //ratingsDF.printSchema()
        /*
        +------+-------+------+---------+
        |userId|movieId|rating|timestamp|
        +------+-------+------+---------+
        | 1| 1193| 5.0|978300760|
        | 1| 661| 3.0|978302109|
        | 1| 594| 4.0|978302268|
        | 1| 919| 4.0|978301368|
        +------+-------+------+---------+
        */
        //ratingsDF.show(4)
        // TODO: 基于SQL方式分析
        // 第一步、注册DataFrame为临时视图
        ratingsDF.createOrReplaceTempView("view_temp_ratings")
        // 第二步、编写SQL
        val top10MovieDF: DataFrame = spark.sql(
          """
            |SELECT
            | movieId, ROUND(AVG(rating), 2) AS avg_rating, COUNT(movieId) AS cnt_rating
            |FROM
            | view_temp_ratings
            |GROUP BY
            | movieId
            |HAVING
            | cnt_rating > 2000
            |ORDER BY
            | avg_rating DESC, cnt_rating DESC
            |LIMIT
            | 10
    """.stripMargin)
        //top10MovieDF.printSchema()
        top10MovieDF.show(10, truncate = false)
        println("===============================================================")
        // TODO: 基于DSL=Domain Special Language(特定领域语言) 分析
        import org.apache.spark.sql.functions._
        val resultDF: DataFrame = ratingsDF
          // 选取字段
          .select($"movieId", $"rating")
          // 分组:按照电影ID,获取平均评分和评分次数
          .groupBy($"movieId")
          .agg( //
            round(avg($"rating"), 2).as("avg_rating"), //
            count($"movieId").as("cnt_rating") //
          )
          // 过滤:评分次数大于2000
          .filter($"cnt_rating" > 2000)
          // 排序:先按照评分降序,再按照次数降序
          .orderBy($"avg_rating".desc, $"cnt_rating".desc)
          // 获取前10
          .limit(10)
        //resultDF.printSchema()
        resultDF.show(10)
        // TODO: 将分析的结果数据保存MySQL数据库和CSV文件
        // 结果DataFrame被使用多次,缓存
        resultDF.persist(StorageLevel.MEMORY_AND_DISK)
        // 1. 保存MySQL数据库表汇总
        resultDF
          .coalesce(1) // 考虑降低分区数目
          .write
          .mode("overwrite")
          .option("driver", "com.mysql.cj.jdbc.Driver")
          .option("user", "root")
          .option("password", "123456")
          .jdbc(
            "jdbc:mysql://node1.oldlu.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnic
              ode = true",
              "db_test.tb_top10_movies",
              new Properties ()
              )
              // 2. 保存CSV文件:每行数据中个字段之间使用逗号隔开
              resultDF
              .coalesce (1)
              .write.mode ("overwrite")
              .csv ("datas/top10-movies")
              // 释放缓存数据
              resultDF.unpersist ()
              // 应用结束,关闭资源
              Thread.sleep (10000000)
              spark.stop ()
              }
              }

    6 Shuffle 分区数目问题

    运行上述程序时,查看WEB UI监控页面发现,某个Stage中有200个Task任务,也就是说RDD有200分区Partition。大数据Spark电影评分数据分析_big data_05

    原因:在SparkSQL中当Job中产生Shuffle时,默认的分区数(spark.sql.shuffle.partitions )为

    200,在实际项目中要合理的设置。在构建SparkSession实例对象时,设置参数的值:

    // 构建SparkSession实例对象
    val spark: SparkSession = SparkSession.builder()
    .master("local[4]")
    .appName(this.getClass.getSimpleName.stripSuffix("$"))
    // TODO: 设置shuffle时分区数目
    .config("spark.sql.shuffle.partitions", "4")
    .getOrCreate()
    // 导入隐式转换
    import spark.implicits._