Spark - SparkSQL使用详解7(案例实操1:各区域热门商品Top3)
七、案例实操1:各区域热门商品 Top3
1,数据说明
(1)首先 user_visit_action.txt 文件中存放了所有用户的行为记录,下面是截取其中的一部分内容:
- 文件中每行数据的详细字段说明如下:
编号 | 字段名称 | 字段类型 | 字段含义 |
1 | date | String | 用户点击行为的日期 |
2 | user_id | Long | 用户的 ID |
3 | session_id | String | Session 的 ID |
4 | page_id | Long | 某个页面的 ID |
5 | action_time | String | 动作的时间点 |
6 | search_keyword | String | 用户搜索的关键词 |
7 | click_category_id | Long | 某一个商品品类的 ID |
8 | click_product_id | Long | 某一个商品的 ID |
9 | order_category_ids | String | 一次订单中所有品类的 ID 集合 |
10 | order_product_ids | String | 一次订单中所有商品的 ID 集合 |
11 | pay_category_ids | String | 一次支付中所有品类的 ID 集合 |
12 | pay_product_ids | String | 一次支付中所有商品的 ID 集合 |
13 | city_id | Long | 城市 id |
- 完整数据:数据.zip
(2)文件 product_info.txt 中则存放的是商品信息,下面是该文件的部分内容:
(3)文件 city_info.txt 中则存放的是城市信息,下面是该文件的部分内容:
(2)运行结果如下说明数据已经准备完毕:
1 商品_1 自营 2 商品_2 自营 3 商品_3 第三方 4 商品_4 自营 5 商品_5 自营
(3)文件 city_info.txt 中则存放的是城市信息,下面是该文件的部分内容:
1 北京 华北 2 上海 华东 3 深圳 华南 4 广州 华南 5 武汉 华中 6 南京 华东 7 天津 华北
2,数据准备
(1)在实现具体需求之前,我们首先需要创建相应的 Hive 表,并将 txt 文件中的数据导入到表中,具体代码如下:
object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") //创建 SparkSession 对象 val spark: SparkSession = SparkSession.builder().config(sparkConf) .enableHiveSupport() // 添加Hive支持 .getOrCreate() // 导入隐式转换 import spark.implicits._ // 创建3张表并导入初始化数据 spark.sql( """ |CREATE TABLE `user_visit_action`( | `date` string, | `user_id` bigint, | `session_id` string, | `page_id` bigint, | `action_time` string, | `search_keyword` string, | `click_category_id` bigint, | `click_product_id` bigint, | `order_category_ids` string, | `order_product_ids` string, | `pay_category_ids` string, | `pay_product_ids` string, | `city_id` bigint) |row format delimited fields terminated by '\t' """.stripMargin) spark.sql( """ |load data local inpath 'datas/user_visit_action.txt' into table user_visit_action """.stripMargin) spark.sql( """ |CREATE TABLE `product_info`( | `product_id` bigint, | `product_name` string, | `extend_info` string) |row format delimited fields terminated by '\t' """.stripMargin) spark.sql( """ |load data local inpath 'datas/product_info.txt' into table product_info """.stripMargin) spark.sql( """ |CREATE TABLE `city_info`( | `city_id` bigint, | `city_name` string, | `area` string) |row format delimited fields terminated by '\t' """.stripMargin) spark.sql( """ |load data local inpath 'datas/city_info.txt' into table city_info """.stripMargin) // 查看目前所有的Hive表 spark.sql("show tables").show() // 查看其中 city_info 表的模式信息,包括列名、数据类型和其他元数据信息 spark.sql("DESCRIBE user_visit_action").show() // 查看 city_info 表的数据 spark.sql("""select * from city_info""").show //关闭 Spark spark.stop() } }
(2)运行结果如下说明数据已经准备完毕:
(3)由于我使用的是内置的 Hive,可以看到项目根目录下会出现一个 spark-warehouse 文件夹,该文件夹下的的每个子文件夹是一张表,子文件夹名为表名,里面则是表的数据文件。
3,需求说明
(1)我们需要计算各个区域前三大热门商品(这里的热门商品是从点击量的维度来看),并备注上每个商品在主要城市中的分布比例,超过两个城市用其他显示。最终结果类似如下形式:
地区 | 商品名称 | 点击次数 | 城市备注 |
华北 | 商品 A | 100000 | 北京 21.2%,天津 13.2%,其他 65.6% |
华北 | 商品 P | 80200 | 北京 63.0%,太原 10%,其他 27.0% |
华北 | 商品 M | 40000 | 北京 63.0%,太原 10%,其他 27.0% |
东北 | 商品 J | 92000 | 大连 28%,辽宁 17.0%,其他 55.0% |
(2)要实现该需求,大概思路如下:
- 查询出来所有的点击记录,并与 city_info 表连接,得到每个城市所在的地区,与 Product_info 表连接得到产品名称
- 按照地区和商品 id 分组,统计出每个商品在每个地区的总点击次数
- 每个地区内按照点击次数降序排列
- 只取前三名
- 城市备注需要自定义 UDAF 函数
4,功能实现
(1)首先我们自定义一个聚合函数 CityRemarkUDAF,用于计算城市的点击数量并生成城市备注信息。
// 自定义聚合函数的缓冲区数据类型 case class Buffer( var total : Long, var cityMap:mutable.Map[String, Long] ) // 自定义聚合函数:实现城市备注功能 // 1. 继承Aggregator, 定义泛型 // IN : 城市名称 // BUF : Buffer =>【总点击数量,Map[(city, cnt), (city, cnt)]】 // OUT : 备注信息 // 2. 重写6个方法 class CityRemarkUDAF extends Aggregator[String, Buffer, String]{ // 缓冲区初始化 override def zero: Buffer = { Buffer(0, mutable.Map[String, Long]()) } // 更新缓冲区数据 override def reduce(buff: Buffer, city: String): Buffer = { buff.total += 1 val newCount = buff.cityMap.getOrElse(city, 0L) + 1 buff.cityMap.update(city, newCount) buff } // 合并缓冲区数据 override def merge(buff1: Buffer, buff2: Buffer): Buffer = { buff1.total += buff2.total val map1 = buff1.cityMap val map2 = buff2.cityMap // 两个Map的合并操作 map2.foreach{ case (city , cnt) => { val newCount = map1.getOrElse(city, 0L) + cnt map1.update(city, newCount) } } buff1.cityMap = map1 buff1 } // 将统计的结果生成字符串信息 override def finish(buff: Buffer): String = { val remarkList = ListBuffer[String]() val totalcnt = buff.total val cityMap = buff.cityMap // 降序排列 val cityCntList = cityMap.toList.sortWith( (left, right) => { left._2 > right._2 } ).take(2) val hasMore = cityMap.size > 2 var rsum = 0L cityCntList.foreach{ case ( city, cnt ) => { val r = cnt * 100 / totalcnt remarkList.append(s"${city} ${r}%") rsum += r } } if ( hasMore ) { remarkList.append(s"其他 ${100 - rsum}%") } remarkList.mkString(", ") } // Spark 用于序列化缓冲区的编码器 override def bufferEncoder: Encoder[Buffer] = Encoders.product // Spark 用于序列化输出结果的编码器 override def outputEncoder: Encoder[String] = Encoders.STRING }
(2)接着我们程序中通过如下步骤进行统计计算:
- 首先将用户行为数据、产品信息、城市信息三张表进行 JOIN,并筛选出 click_product_id > -1 的记录。将查询结果命名为 t1。
- 接着使用用户自定义的 UDAF(CityRemarkUDAF),实现了对区域内商品点击数量的聚合,并生成城市备注信息。将聚合结果命名为 t2。
- 然后使用 Spark SQL 的窗口函数 rank(),在每个区域内对商品的点击数量进行排名。将排名结果命名为 t3。
- 最后从 t3 表中选择排名在前三位的数据,并显示在控制台上。
object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") //创建 SparkSession 对象 val spark: SparkSession = SparkSession.builder().config(sparkConf) .enableHiveSupport() // 添加Hive支持 .getOrCreate() // 查询基本数据 spark.sql( """ | select | a.*, | p.product_name, | c.area, | c.city_name | from user_visit_action a | join product_info p on a.click_product_id = p.product_id | join city_info c on a.city_id = c.city_id | where a.click_product_id > -1 """.stripMargin).createOrReplaceTempView("t1") // 根据区域,商品进行数据聚合 spark.udf.register("cityRemark", functions.udaf(new CityRemarkUDAF())) spark.sql( """ | select | area, | product_name, | count(*) as clickCnt, | cityRemark(city_name) as city_remark | from t1 group by area, product_name """.stripMargin).createOrReplaceTempView("t2") // 区域内对点击数量进行排行 spark.sql( """ | select | *, | rank() over( partition by area order by clickCnt desc ) as rank | from t2 """.stripMargin).createOrReplaceTempView("t3") // 取前3名 spark.sql( """ | select | * | from t3 where rank <= 3 """.stripMargin).show(false) //关闭 Spark spark.stop() } }
(3)运行结果如下: