当前位置: > > > Spark - SparkSQL使用详解7(案例实操1:各区域热门商品Top3)

Spark - SparkSQL使用详解7(案例实操1:各区域热门商品Top3)

七、案例实操1:各区域热门商品 Top3

1,数据说明

(1)首先 user_visit_action.txt 文件中存放了所有用户的行为记录,下面是截取其中的一部分内容:

  • 文件中每行数据的详细字段说明如下:
编号 字段名称 字段类型 字段含义
1 date String 用户点击行为的日期
2 user_id Long 用户的 ID
3 session_id String SessionID
4 page_id Long 某个页面的 ID
5 action_time String 动作的时间点
6 search_keyword String 用户搜索的关键词
7 click_category_id Long 某一个商品品类的 ID
8 click_product_id Long 某一个商品的 ID
9 order_category_ids String 一次订单中所有品类的 ID 集合
10 order_product_ids String 一次订单中所有商品的 ID 集合
11 pay_category_ids String 一次支付中所有品类的 ID 集合
12 pay_product_ids String 一次支付中所有商品的 ID 集合
13 city_id Long 城市 id

(2)文件 product_info.txt 中则存放的是商品信息,下面是该文件的部分内容:
1	商品_1	自营
2	商品_2	自营
3	商品_3	第三方
4	商品_4	自营
5	商品_5	自营

(3)文件 city_info.txt 中则存放的是城市信息,下面是该文件的部分内容:
1	北京	华北
2	上海	华东
3	深圳	华南
4	广州	华南
5	武汉	华中
6	南京	华东
7	天津	华北

2,数据准备

(1)在实现具体需求之前,我们首先需要创建相应的 Hive 表,并将 txt 文件中的数据导入到表中,具体代码如下:
object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
    //创建 SparkSession 对象
    val spark: SparkSession = SparkSession.builder().config(sparkConf)
      .enableHiveSupport() // 添加Hive支持
      .getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 创建3张表并导入初始化数据
    spark.sql(
      """
        |CREATE TABLE `user_visit_action`(
        |  `date` string,
        |  `user_id` bigint,
        |  `session_id` string,
        |  `page_id` bigint,
        |  `action_time` string,
        |  `search_keyword` string,
        |  `click_category_id` bigint,
        |  `click_product_id` bigint,
        |  `order_category_ids` string,
        |  `order_product_ids` string,
        |  `pay_category_ids` string,
        |  `pay_product_ids` string,
        |  `city_id` bigint)
        |row format delimited fields terminated by '\t'
      """.stripMargin)

    spark.sql(
      """
        |load data local inpath 'datas/user_visit_action.txt' into table user_visit_action
      """.stripMargin)

    spark.sql(
      """
        |CREATE TABLE `product_info`(
        |  `product_id` bigint,
        |  `product_name` string,
        |  `extend_info` string)
        |row format delimited fields terminated by '\t'
      """.stripMargin)

    spark.sql(
      """
        |load data local inpath 'datas/product_info.txt' into table product_info
      """.stripMargin)

    spark.sql(
      """
        |CREATE TABLE `city_info`(
        |  `city_id` bigint,
        |  `city_name` string,
        |  `area` string)
        |row format delimited fields terminated by '\t'
      """.stripMargin)

    spark.sql(
      """
        |load data local inpath 'datas/city_info.txt' into table city_info
      """.stripMargin)

    // 查看目前所有的Hive表
    spark.sql("show tables").show()

    // 查看其中 city_info 表的模式信息,包括列名、数据类型和其他元数据信息
    spark.sql("DESCRIBE user_visit_action").show()

    // 查看 city_info 表的数据
    spark.sql("""select * from city_info""").show

    //关闭 Spark
    spark.stop()
  }
}

(2)运行结果如下说明数据已经准备完毕:

(3)由于我使用的是内置的 Hive,可以看到项目根目录下会出现一个 spark-warehouse 文件夹,该文件夹下的的每个子文件夹是一张表,子文件夹名为表名,里面则是表的数据文件。

3,需求说明

(1)我们需要计算各个区域前三大热门商品(这里的热门商品是从点击量的维度来看),并备注上每个商品在主要城市中的分布比例,超过两个城市用其他显示。最终结果类似如下形式:
地区 商品名称 点击次数 城市备注
华北 商品 A 100000 北京 21.2%,天津 13.2%,其他 65.6%
华北 商品 P 80200 北京 63.0%,太原 10%,其他 27.0%
华北 商品 M 40000 北京 63.0%,太原 10%,其他 27.0%
东北 商品 J 92000 大连 28%,辽宁 17.0%,其他 55.0%

(2)要实现该需求,大概思路如下:
  • 查询出来所有的点击记录,并与 city_info 表连接,得到每个城市所在的地区,与 Product_info 表连接得到产品名称
  • 按照地区和商品 id 分组,统计出每个商品在每个地区的总点击次数
  • 每个地区内按照点击次数降序排列
  • 只取前三名
  • 城市备注需要自定义 UDAF 函数

4,功能实现

(1)首先我们自定义一个聚合函数 CityRemarkUDAF,用于计算城市的点击数量并生成城市备注信息。
// 自定义聚合函数的缓冲区数据类型
case class Buffer( var total : Long, var cityMap:mutable.Map[String, Long] )

// 自定义聚合函数:实现城市备注功能
// 1. 继承Aggregator, 定义泛型
//    IN : 城市名称
//    BUF : Buffer =>【总点击数量,Map[(city, cnt), (city, cnt)]】
//    OUT : 备注信息
// 2. 重写6个方法
class CityRemarkUDAF extends Aggregator[String, Buffer, String]{
  // 缓冲区初始化
  override def zero: Buffer = {
    Buffer(0, mutable.Map[String, Long]())
  }

  // 更新缓冲区数据
  override def reduce(buff: Buffer, city: String): Buffer = {
    buff.total += 1
    val newCount = buff.cityMap.getOrElse(city, 0L) + 1
    buff.cityMap.update(city, newCount)
    buff
  }

  // 合并缓冲区数据
  override def merge(buff1: Buffer, buff2: Buffer): Buffer = {
    buff1.total += buff2.total

    val map1 = buff1.cityMap
    val map2 = buff2.cityMap

    // 两个Map的合并操作
    map2.foreach{
      case (city , cnt) => {
        val newCount = map1.getOrElse(city, 0L) + cnt
        map1.update(city, newCount)
      }
    }
    buff1.cityMap = map1
    buff1
  }

  // 将统计的结果生成字符串信息
  override def finish(buff: Buffer): String = {
    val remarkList = ListBuffer[String]()

    val totalcnt = buff.total
    val cityMap = buff.cityMap

    // 降序排列
    val cityCntList = cityMap.toList.sortWith(
      (left, right) => {
        left._2 > right._2
      }
    ).take(2)

    val hasMore = cityMap.size > 2
    var rsum = 0L
    cityCntList.foreach{
      case ( city, cnt ) => {
        val r = cnt * 100 / totalcnt
        remarkList.append(s"${city} ${r}%")
        rsum += r
      }
    }
    if ( hasMore ) {
      remarkList.append(s"其他 ${100 - rsum}%")
    }

    remarkList.mkString(", ")
  }

  // Spark 用于序列化缓冲区的编码器
  override def bufferEncoder: Encoder[Buffer] = Encoders.product

  // Spark 用于序列化输出结果的编码器
  override def outputEncoder: Encoder[String] = Encoders.STRING
}

(2)接着我们程序中通过如下步骤进行统计计算:
  • 首先将用户行为数据、产品信息、城市信息三张表进行 JOIN,并筛选出 click_product_id > -1 的记录。将查询结果命名为 t1
  • 接着使用用户自定义的 UDAFCityRemarkUDAF),实现了对区域内商品点击数量的聚合,并生成城市备注信息。将聚合结果命名为 t2
  • 然后使用 Spark SQL 的窗口函数 rank(),在每个区域内对商品的点击数量进行排名。将排名结果命名为 t3
  • 最后从 t3 表中选择排名在前三位的数据,并显示在控制台上。
object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
    //创建 SparkSession 对象
    val spark: SparkSession = SparkSession.builder().config(sparkConf)
      .enableHiveSupport() // 添加Hive支持
      .getOrCreate()

    // 查询基本数据
    spark.sql(
      """
        |  select
        |     a.*,
        |     p.product_name,
        |     c.area,
        |     c.city_name
        |  from user_visit_action a
        |  join product_info p on a.click_product_id = p.product_id
        |  join city_info c on a.city_id = c.city_id
        |  where a.click_product_id > -1
            """.stripMargin).createOrReplaceTempView("t1")

    // 根据区域,商品进行数据聚合
    spark.udf.register("cityRemark", functions.udaf(new CityRemarkUDAF()))
    spark.sql(
      """
        |  select
        |     area,
        |     product_name,
        |     count(*) as clickCnt,
        |     cityRemark(city_name) as city_remark
        |  from t1 group by area, product_name
            """.stripMargin).createOrReplaceTempView("t2")

    // 区域内对点击数量进行排行
    spark.sql(
      """
        |  select
        |      *,
        |      rank() over( partition by area order by clickCnt desc ) as rank
        |  from t2
            """.stripMargin).createOrReplaceTempView("t3")

    // 取前3名
    spark.sql(
      """
        | select
        |     *
        | from t3 where rank <= 3
            """.stripMargin).show(false)

    //关闭 Spark
    spark.stop()
  }
}

(3)运行结果如下:
评论0