# Spark實戰尋找5億次訪問中，訪問次數最多的人

## 問題描述

• 1、用戶的id
• 2、用戶訪問的時間
• 3、用戶逗留的時間
• 4、用戶執行的操作
• 5、用戶的其餘數據（比如IP等等）

## 問題分析

5億條ID數據，首先可以用map將其緩存到RDD中，然後對RDD進行reduceByKey，最後找出出現最多的ID。思路很簡單，因此代碼量也不會很多

## 實現

### scala實現

RandomId.class

``````import scala.Serializable;
public class RandomId implements Serializable {
private static final long twist(long u, long v) {
return (((u & 0x80000000L) | (v & 0x7fffffffL)) >> 1) ^ ((v & 1) == 1 ? 0x9908b0dfL : 0);
}
private long[] state= new long[624];
private int left = 1;
public RandomId() {
for (int j = 1; j < 624; j++) {
state[j] = (1812433253L * (state[j - 1] ^ (state[j - 1] >> 30)) + j);
state[j] &= 0xfffffffffL;
}
}
public void next_state() {
int p = 0;
left = 624;
for (int j = 228; --j > 0; p++)
state[p] = state[p+397] ^ twist(state[p], state[p + 1]);
for (int j=397;--j>0;p++)
state[p] = state[p-227] ^ twist(state[p], state[p + 1]);
state[p] = state[p-227] ^ twist(state[p], state[0]);
}
public long next() {
if (--left == 0) next_state();
return state[624-left];
}
}
``````

``````import org.apache.spark.{SparkConf, SparkContext}
object ActiveVisitor {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("spark://master:7077").setAppName("ActiveVisitor")
val sc = new SparkContext(conf)
val list = 1 until 100000
val id =new RandomId()
var max = 0
var maxId = 0L
val lastNum = sc.parallelize(list).flatMap(num => {
var list2 = List(id.next())
for (i <- 1 to 50000){
list2 = id.next() :: list2
}
println(num +"%")
list2
}).map((_,1)).reduceByKey(_+_).foreach(x => {
if (x._2 > max){
max = x._2
maxId = x._1
println(x)
}
})
}
}
``````

``````import org.apache.spark.{SparkConf, SparkContext}
object ActiveVisitor {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("spark://master:7077").setAppName("ActiveVisitor")
val sc = new SparkContext(conf)
//生成一個0-9999的列表
val list = 1 until 10000
val id =new RandomId()
//這裡記錄最大的次數
var max = 0
//這裡記錄最大次數的ID
var maxId = 0L
val lastNum = sc.parallelize(list)
//第一步生成5億條數據
.flatMap(num => {
//遍歷list列表
//總共遍歷1萬次每次生成5萬個ID
var list2 = List(id.next())
for (i <- 1 to 50000){
list2 = id.next() :: list2
}
//這裡記錄當前生成ID的百分比
println(num/1000.0 +"%")
//返回生成完成後的list
//每次循環裡面都包含5萬個ID
list2
})
//遍歷5億條數據
//為每條數據出現標記1
.map((_,1))
//對標記後的數據進行處理
//得到每個ID出現的次數，即（ID，Count）
.reduceByKey(_+_)
//遍歷處理後的數據
.foreach(x => {
//將最大值存儲在max中
if (x._2 > max){
max = x._2
maxId = x._1
//若X比之前記錄的值大，則輸出該id和次數
//最後一次輸出結果，則是出現次數最多的的ID和以及其出現的次數
//當然出現次數最多的可能有多個ID
//這裡只輸出一個
println(x)
}
})
}
}
``````

## 運行得到結果

``````1%
5000%
2%
5001%
3%
5002%
4%
5003%
5%
5004%
6%
5005%
7%
5006%
8%
5007%
9%
5008%
10%
5009%
11%
5010%
12%
5011%
5012%
13%
5013%
14%
15%
5014%
...
...
...
``````

``````5634%
5635%
5636%
5637%
5638%
5639%
5640%
5641%
5642%
5643%
5644%
5645%
2019-03-05 11:52:14 INFO  ExternalSorter:54 - Thread 63 spilling in-memory map of 1007.3 MB to disk (2 times so far)
647%
648%
649%
650%
651%
652%
653%
654%
655%
656%
``````

## 結果

``````import org.apache.spark.{SparkConf, SparkContext}
object ActiveVisitor {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("spark://master:7077").setAppName("ActiveVisitor")
val sc = new SparkContext(conf)
//生成一個0-9999的列表
val list = 1 until 10000
val id =new RandomId()
//這裡記錄最大的次數
var max = 0
//這裡記錄最大次數的ID
var maxId = 0L
val lastNum = sc.parallelize(list)
//第一步生成5億條數據
.flatMap(num => {
//遍歷list列表
//總共遍歷1萬次每次生成5萬個ID
var list2 = List(id.next())
for (i <- 1 to 50000){
list2 = id.next() :: list2
}
//這裡記錄當前生成ID的百分比
println(num/1000.0 +"%")
//返回生成完成後的list
//每次循環裡面都包含5萬個ID
list2
})
//遍歷5億條數據
//為每條數據出現標記1
.map((_,1))
//對標記後的數據進行處理
//得到每個ID出現的次數，即（ID，Count）
.reduceByKey(_+_)
//為數據進行排序
//倒序
.sortByKey(false)
//次數最多的，在第一個，將其輸出
println(lastNum.first())
}
}
``````

### 相關文章

【機器學習】線性迴歸原理介紹

【機器學習】機器學習簡介

【機器學習】深度學習開發環境搭建

Spark實戰搭建我們的Spark分佈式架構