当前位置: > > > Spark - RDD使用详解14(案例实操3:统计页面单跳转换率)

Spark - RDD使用详解14(案例实操3:统计页面单跳转换率)

十七、案例实操3:统计页面单跳转换率

1,数据准备

(1)我们有一个电商网站的用户行为数据文件 user_visit_action.txt,下面是截取里面一部分内容:
user_visit_action.txt.zip

(2)该文件主要包含用户的 4 种行为:搜索,点击,下单,支付。数据规则如下:
  • 数据文件中每行数据采用逗号分隔数据
  • 每一行数据表示用户的一次行为,这个行为只能是 4 种行为的一种
  • 如果搜索关键字为 null,表示数据不是搜索数据
  • 如果点击的品类 ID 和产品 ID-1,表示数据不是点击数据
  • 针对于下单行为,一次可以下单多个商品,所以品类 ID 和产品 ID 可以是多个,id 之间采用中划线分隔,如果本次不是下单行为,则数据采用 null 表示
  • 支付行为和下单行为类似

(3)每行数据的详细字段说明:
编号 字段名称 字段类型 字段含义
1 date String 用户点击行为的日期
2 user_id Long 用户的 ID
3 session_id String SessionID
4 page_id Long 某个页面的 ID
5 action_time String 动作的时间点
6 search_keyword String 用户搜索的关键词
7 click_category_id Long 某一个商品品类的 ID
8 click_product_id Long 某一个商品的 ID
9 order_category_ids String 一次订单中所有品类的 ID 集合
10 order_product_ids String 一次订单中所有商品的 ID 集合
11 pay_category_ids String 一次支付中所有品类的 ID 集合
12 pay_product_ids String 一次支付中所有商品的 ID 集合
13 city_id Long 城市 ID

2,需求描述

(1)页面单跳转化率是网站转化率的一种统计形式。假设一个用户在一次 Session 过程中访问的页面路径 3,5,7,9,10,21,那么页面 3 跳到页面 5 叫一次单跳,7-9 也叫一次单跳,那么单跳转化率就是要统计页面点击的概率
  • 计算 3-5 的单跳转化率,先获取符合条件的 Session 对于页面 3 的访问次数(PV) 为 A,然后获取符合条件的 Session 中访问了页面 3 又紧接着访问了页面 5 的次数为 B,那么 B/A 就是 3-5 的页面单跳转化率。

(2)现要求分析用户行为数据文件,统计出 1,2,3,4,5,6,7 这几个页面的单跳转换率:
  • 122334455667这几个单跳转换率。

3,实现代码
(1)整个需求实现的思路是读取用户行为数据文件,然后将每行数据转换为 UserVisitAction象。接着计算得到单跳转换率的分母和分子,最后将分子除以分母即为最终需要的单跳转换率。具体细节如下:
  • 计算分母部分,代码首先过滤出符合条件的用户行为数据,即包含在指定页面连续跳转序列中的页面。然后对这些页面进行统计,得到每个页面的点击次数,并将结果收集为一个 Map,其中键为页面 ID,值为对应页面的点击次数。
  • 计算分子部分,代码首先根据 Session ID 对数据进行分组,然后根据访问时间对每个分组中的数据进行排序。接着,将排序后的数据按顺序两两组合成页面跳转的序列,并过滤掉不符合指定页面连续跳转序列的部分。最后,将过滤后的页面跳转序列映射为 ((Long, Long), Int) 的格式,其中元组表示页面跳转的起始页面和目标页面,Int 表示该跳转序列出现的次数。
  • 接下来,代码对分子部分的数据进行统计,将相同的页面跳转序列进行聚合求和,得到每个页面跳转序列的出现次数。
  • 最后,代码计算并输出每个页面跳转序列的单跳转换率,通过将分子部分的出现次数除以分母部分对应页面的点击次数得到。
object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
    // 创建 Spark 上下文环境对象(连接对象)
    val sc: SparkContext = new SparkContext(sparkConf)

    // 读取用户行为数据文件
    val fileRDD = sc.textFile("datas/user_visit_action.txt")

    // 将每行数据转换为UserVisitAction对象
    val actionDataRDD = fileRDD.map(
      action => {
        val datas = action.split(",")
        UserVisitAction(
          datas(0),
          datas(1).toLong,
          datas(2),
          datas(3).toLong,
          datas(4),
          datas(5),
          datas(6).toLong,
          datas(7).toLong,
          datas(8),
          datas(9),
          datas(10),
          datas(11),
          datas(12).toLong
        )
      }
    )

    // 对数据进行缓存(因为后面会多次用到)
    actionDataRDD.cache()

    // 对指定的页面连续跳转进行统计
    // 1-2,2-3,3-4,4-5,5-6,6-7
    val ids = List[Long](1,2,3,4,5,6,7)
    val okflowIds: List[(Long, Long)] = ids.zip(ids.tail)

    // 计算分母
    // 统计每个页面的点击次数,并将结果收集为一个Map,键为页面ID,值为对应页面的点击次数
    val pageidToCountMap: Map[Long, Long] = actionDataRDD.filter(
      action => {
        // 过滤出符合条件的用户行为数据,即包含在指定页面连续跳转序列中的页面
        ids.init.contains(action.page_id)
      }
    ).map(
      action => {
        (action.page_id, 1L)
      }
    ).reduceByKey(_ + _).collect().toMap

    // 计算分子
    // 根据session进行分组
    val sessionRDD: RDD[(String, Iterable[UserVisitAction])] = actionDataRDD.groupBy(_.session_id)

    // 分组后,根据访问时间进行排序(升序)
    val mvRDD: RDD[(String, List[((Long, Long), Int)])] = sessionRDD.mapValues(
      iter => {
        val sortList: List[UserVisitAction] = iter.toList.sortBy(_.action_time)

        // 【1,2,3,4】
        // 【1,2】,【2,3】,【3,4】
        // 【1-2,2-3,3-4】
        // Sliding : 滑窗
        // 【1,2,3,4】
        // 【2,3,4】
        // zip : 拉链
        val flowIds: List[Long] = sortList.map(_.page_id)
        val pageflowIds: List[(Long, Long)] = flowIds.zip(flowIds.tail)

        // 将不合法的页面跳转进行过滤
        pageflowIds.filter(
          t => {
            // 过滤掉不符合指定页面连续跳转序列的部分
            okflowIds.contains(t)
          }
        ).map(
          t => {
            (t, 1)
          }
        )
      }
    )
    // ((1,2),1)
    val flatRDD: RDD[((Long, Long), Int)] = mvRDD.map(_._2).flatMap(list=>list)
    // ((1,2),1) => ((1,2),sum)
    val dataRDD = flatRDD.reduceByKey(_+_)

    // 计算单跳转换率
    // 分子除以分母
    dataRDD.foreach{
      case ( (pageid1, pageid2), sum ) => {
        val lon: Long = pageidToCountMap.getOrElse(pageid1, 0L)

        println(s"页面${pageid1}跳转到页面${pageid2}单跳转换率为:" + ( sum.toDouble/lon ))
      }
    }

    //关闭 Spark
    sc.stop()
  }

  //用户访问动作表
  case class UserVisitAction(
                date: String,//用户点击行为的日期
                user_id: Long,//用户的ID
                session_id: String,//Session的ID
                page_id: Long,//某个页面的ID
                action_time: String,//动作的时间点
                search_keyword: String,//用户搜索的关键词
                click_category_id: Long,//某一个商品品类的ID
                click_product_id: Long,//某一个商品的ID
                order_category_ids: String,//一次订单中所有品类的ID集合
                order_product_ids: String,//一次订单中所有商品的ID集合
                pay_category_ids: String,//一次支付中所有品类的ID集合
                pay_product_ids: String,//一次支付中所有商品的ID集合
                city_id: Long //城市 id
              )
}

(2)运行结果如下:
评论0