运维开发网

基于Spark的FPGrowth(关联规则算法)

运维开发网 https://www.qedev.com 2020-04-10 11:27 出处:网络 作者:运维开发网整理
转载请标明出处:小帆的帆的专栏 例子:  总共有10000个消费者购买了商品,  其中购买尿布的有1000人,  购买啤酒的有2000人,  购买面包的有500人,  同时购买尿布和啤酒的有800人,  同时购买尿布的面包的有100人。 关联规则 关联规则:用于表示数据内隐含的关联性,例如:购买尿布的人往往会购买啤酒。 支持度(support) 支持度:{X, Y}同时出现的概率,例如:{尿布,啤

转载请标明出处:小帆的帆的专栏

例子: 
总共有10000个消费者购买了商品, 
其中购买尿布的有1000人, 
购买啤酒的有2000人, 
购买面包的有500人, 
同时购买尿布和啤酒的有800人, 
同时购买尿布的面包的有100人。

关联规则

关联规则:用于表示数据内隐含的关联性,例如:购买尿布的人往往会购买啤酒。

支持度(support)

支持度:{X, Y}同时出现的概率,例如:{尿布,啤酒}同时出现的概率

support=同时购买{X,Y}的人数总人数

{尿布,啤酒}的支持度 = 800 / 10000 = 0.08 
{尿布,面包}的支持度 = 100 / 10000 = 0.01

注意:{尿布,啤酒}的支持度等于{啤酒,尿布}的支持度,支持度没有先后顺序之分

置信度(confidence)

置信度:购买X的人,同时购买Y的概率,例如:购买尿布的人,同时购买啤酒的概率,而这个概率就是购买尿布时购买啤酒的置信度

confidence(X−>Y)=同时购买{X,Y}的人数购买X的人数

confidence(Y−>X)=同时购买{X,Y}的人数购买Y的人数

( 尿布 -> 啤酒 ) 的置信度 = 800 / 1000 = 0.8 
( 啤酒 -> 尿布 ) 的置信度 = 800 / 2000 = 0.4

Spark计算支持度和置信度

 
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext} object Test { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Test").setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) // 测试数据, 为方便分析问题 // 左边一列是用户,有三个用户a,b,c // 右边一列是公司,表示用户喜欢的公司 val testData = Array( ("a", "google"), ("a", "apple"), ("a", "mi"), ("b", "google"), ("b", "apple"), ("c", "google") ) val data = sc.parallelize(testData) // 最终我们要构造出这样的结果:公司A、公司B、支持度、A->B的置信度、B->A的置信度 // 要求支持度和置信度就需要三个值,喜欢A公司的人数,喜欢B公司的人数,同时喜欢A和B公司的人数 // 我们先求前两个 val companyCountRDD = data.map(a => (a._2, 1)).reduceByKey(_ + _) /** * (mi,1) * (google,3) * (apple,2) */ companyCountRDD.collect().foreach(println) // 要计算同时喜欢A和B公司的人数,要先知道A,B所有可能的组合 // 比如:1, 2, 3,;所有可能的组合就是(1,2),(1,3),(2,3) // 这里我们简单的用cartesian算子实现 // cartesian算子会得到这样的结果: // (1,1),(1,2),(1,3), // (2,1),(2,2),(2,3), // (3,1),(3,2),(3,3) // 然后filter算子,只保留左边大于右边的结果,这样能过滤掉相等的结果,如(1,1),还有重复的结果,如(2,1),因为我们已经有(1,2)了 val cartesianRDD = companyCountRDD.cartesian(companyCountRDD).filter(tuple => tuple._1._1 > tuple._2._1).map(t => ((t._1._1, t._2._1), (t._1._2, t._2._2))) // 这样我们不但得到了A和B的所有组合,还顺带聚合了计算用的到的数据 /** 公司A、公司B、喜欢A公司的人数、喜欢B公司的人数 * ((mi,google),(1,3)) * ((mi,apple),(1,2)) * ((google,apple),(3,2)) */ cartesianRDD.collect().foreach(println) // 下面开始计算,同时喜欢A和B公司的人数 // 比如a这个人,它喜欢google,apple,mi; 那么就是同时喜欢(mi,google),(mi,apple),(google,apple) // 所以我们先要将数据转换成(a, (google,apple,mi)) // 这个时候用户就没用了,我们只需要知道公司的组合 // 因此转换成(mi,google),(mi,apple),(google,apple) // 最后用flatMap将结果打散,再计数 val userCompaniesRDD = data.groupByKey().cache() val meanwhileRDD = userCompaniesRDD.map(_._2) // 这里采用了类似cartesian的做法计算所有的组合,然后过滤掉不需要的 .flatMap(iter => iter.flatMap(i => iter.map(j => (i, j))).filter(tuple => tuple._1 > tuple._2)) .map(tuple => (tuple, 1)) .reduceByKey(_ + _) // 计算用户总数,后面会用到 val userNum = userCompaniesRDD.count() /** 公司A、公司B、同时喜欢A和B公司的人数 * ((mi,apple),1) * ((mi,google),1) * ((google,apple),2) */ meanwhileRDD.collect().foreach(println) val calRDD = cartesianRDD.join(meanwhileRDD) /** 公司A、公司B、喜欢A公司的人数,喜欢B公司的人数,同时喜欢A和B公司的人数 * ((mi,apple),((1,2),1)) * ((mi,google),((1,3),1)) * ((google,apple),((3,2),2)) */ calRDD.collect.foreach(println) // 计算结果 val resultRDD = calRDD.map(t => { val aCompany = t._1._1 val bCompany = t._1._2 val aCount = t._2._1._1 val bCount = t._2._1._2 val aAndbCount = t._2._2 * 1.0 // 公司A、公司B、支持度、A->B的置信度、B->A的置信度 (aCompany, bCompany, aAndbCount / userNum, aAndbCount / aCount, aAndbCount / bCount) }) /** * (mi,apple,0.3333333333333333,1.0,0.5) * (mi,google,0.3333333333333333,1.0,0.3333333333333333) * (google,apple,0.6666666666666666,0.6666666666666666,1.0) */ resultRDD.collect.foreach(println) // 最后可以过滤掉数值太低的 // 支持度的阈值是1%,置信度阈值50% val support = 0.01 val confidence = 0.5 resultRDD.filter(a => a._3 > support && a._4 > confidence && a._5 > confidence).collect().foreach(println) } }

注意:cartesian这个算子很恐怖,如果要追求性能的话,还是要自己写一个算法

参考

本文的例子以及支持度,置信度的概念,总结自炼数成金-黄美灵老师的Spark MLlib 机器学习算法与源码解析课程课程文档

在推荐中,关联规则推荐使用的比较频繁,毕竟是通过概率来预测的,易于理解且准确度比较高,不过有一个缺点为,想要覆盖推荐物品的数量,就要降低支持度与置信度。过高的支持度与置信度会导致物品覆盖不过,这里需要其他的推荐方法合作,建议使用基于Spark的模型推荐算法(矩阵分解+ALS).

一FPGrowth算法描述:

FPGrowth算法

概念:支持度,置信度,提升度(Spark好像没有计算这个的函数,需要自己计算) 
列子:假如10000个消费者购买了商品,尿布1000个,啤酒2000个,面包500个,同时购买了尿布和啤酒800个,同时购买了尿布和面包100个。 
1)支持度:在所有项集中出现的可能性,项集同时含有,x与y的概率。是第一道门槛,衡量量是多少,可以理解为‘出镜率’,一般会支持初始值过滤掉低的规则。 
尿布和啤酒的支持度为:800/10000=8% 
2)置信度:在X发生的条件下,Y发生的概率。这是第二道门槛,衡量的是质量,设置最小的置信度筛选可靠的规则。 
尿布-》啤酒的置信度为:800/1000=80%,啤酒-》尿布的置信度为:800/2000=40% 
3)提升度:在含有x条件下同时含有Y的可能性(x->y的置信度)比没有x这个条件下含有Y的可能性之比:confidence(尿布=> 啤酒)/概率(啤酒)) = 80%/((2000+800)/10000) 。如果提升度=1,那就是没啥关系这两个。 
通过支持度和置信度可以得出强关联关系,通过提升的,可判别有效的强关联关系。

2 FPGrowth特点

1)产生候选集,2)只需要两次遍历数据库,提高效率。 
再举个我们这里的列子,列如时间的原因是,使用最近的行为训练规则,太久的行为没有意义 
样本如下: 
列子:用户,时间,消费的漫画 
u1,20160925,成都1995,seven,神兽退散。 
u2,20160925,成都1995,seven,six。 
u1,20160922,成都1995,恶魔日记

比如产生了如下规则: 
规则:成都1995,seven->神兽退散 
这条规则: 
成都1995,seven的支持度2/3 
成都1995,seven-》神兽退散,的置信度1/2 
这里打个广告哈,成都1995,seven,神兽退散(漫画)比较真的比较好看,成都1995也要拍网剧了哈!! 
关联规则主要的难道在于频繁项集的筛选,apriori算法就是一个一个组合的,如果item数量很多,那么太慢了,FPGrowth算法速度比较快。 
我本身对FPGowth的树形结构产生频繁项集不是特别了解,以后可以研究下哈,核心点就是通过头树和树减少遍历次数吧 

算法 
输入:参数,样本 
输出:规则 
FPGrowth参考资料 
参考资料 
http://www.cnblogs.com/zhangchaoyang/articles/2198946.html

二Spark代码实现(修改了一下Spark的列子)

 
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
//数据样本: r z h k p z y x w v u t s s x o n r x z y m t s q e z x z y r q t p
 
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
package org.wq.scala.ml import org.apache.spark.mllib.fpm.FPGrowth import org.apache.spark.{SparkConf, SparkContext} /** * Created by Administrator on 2016/10/24. */ object FP_GrowthTest { def main(args:Array[String]){ val conf = new SparkConf().setAppName("FPGrowthTest").setMaster("local").set("spark.sql.warehouse.dir","E:/ideaWorkspace/ScalaSparkMl/spark-warehouse") val sc = new SparkContext(conf) //设置参数 //最小支持度 val minSupport=0.2 //最小置信度 val minConfidence=0.8 //数据分区 val numPartitions=2 //取出数据 val data = sc.textFile("data/mllib/sample_fpgrowth.txt") //把数据通过空格分割 val transactions=data.map(x=>x.split(" ")) transactions.cache() //创建一个FPGrowth的算法实列 val fpg = new FPGrowth() //设置训练时候的最小支持度和数据分区 fpg.setMinSupport(minSupport) fpg.setNumPartitions(numPartitions) //把数据带入算法中 val model = fpg.run(transactions) //查看所有的频繁项集,并且列出它出现的次数 model.freqItemsets.collect().foreach(itemset=>{ println( itemset.items.mkString("[", ",", "]")+","+itemset.freq) }) //通过置信度筛选出推荐规则则 //antecedent表示前项 //consequent表示后项 //confidence表示规则的置信度 //这里可以把规则写入到Mysql数据库中,以后使用来做推荐 //如果规则过多就把规则写入redis,这里就可以直接从内存中读取了,我选择的方式是写入Mysql,然后再把推荐清单写入redis model.generateAssociationRules(minConfidence).collect().foreach(rule=>{ println(rule.antecedent.mkString(",")+"-->"+ rule.consequent.mkString(",")+"-->"+ rule.confidence) }) //查看规则生成的数量 println(model.generateAssociationRules(minConfidence).collect().length) //并且所有的规则产生的推荐,后项只有1个,相同的前项产生不同的推荐结果是不同的行 //不同的规则可能会产生同一个推荐结果,所以样本数据过规则的时候需要去重 } }

上面规则是本地运行的,部署的话需要改下哈,代码如下

 
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
package org.wq.scala.ml import org.apache.spark.mllib.fpm.FPGrowth import org.apache.spark.{SparkConf, SparkContext} /** * Created by Administrator on 2016/10/24. */ object FP_Growth { def main(args:Array[String]){ if(args.length!=4){ println("请输入4个参数 购物篮数据路径 最小支持度 最小置信度 数据分区") System.exit(0) } val conf = new SparkConf().setAppName("FPGrowthTest") val sc = new SparkContext(conf) val data_path=args(0) //设置参数 //最小支持度 val minSupport=args(1).toDouble //最小置信度 val minConfidence=args(2).toDouble //数据分区 val numPartitions=args(3).toInt //取出数据 val data = sc.textFile(data_path) //把数据通过空格分割 val transactions=data.map(x=>x.split(" ")) transactions.cache() //创建一个FPGrowth的算法实列 val fpg = new FPGrowth() //设置训练时候的最小支持度和数据分区 fpg.setMinSupport(minSupport) fpg.setNumPartitions(numPartitions) //把数据带入算法中 val model = fpg.run(transactions) //查看所有的频繁项集,并且列出它出现的次数 model.freqItemsets.collect().foreach(itemset=>{ println( itemset.items.mkString("[", ",", "]")+","+itemset.freq) }) //通过置信度筛选出推荐规则则 //antecedent表示前项 //consequent表示后项 //confidence表示规则的置信度 //这里可以把规则写入到Mysql数据库中,以后使用来做推荐 //如果规则过多就把规则写入redis,这里就可以直接从内存中读取了,我选择的方式是写入Mysql,然后再把推荐清单写入redis model.generateAssociationRules(minConfidence).collect().foreach(rule=>{ println(rule.antecedent.mkString(",")+"-->"+ rule.consequent.mkString(",")+"-->"+ rule.confidence) }) //查看规则生成的数量 println(model.generateAssociationRules(minConfidence).collect().length) //并且所有的规则产生的推荐,后项只有1个,相同的前项产生不同的推荐结果是不同的行 //不同的规则可能会产生同一个推荐结果,所以样本数据过规则的时候需要去重 } }

三提交部署

上传jar与数据到主节点 

基于Spark的FPGrowth(关联规则算法) 

基于Spark的FPGrowth(关联规则算法)

 
 
  • 1
  • 2
  • 3
  • 4
  • 1
  • 2
  • 3
  • 4
#然后把数据文件scp到各个节点 cd /home/jar/data scp sample_fpgrowth.txt spark@slave1:/home/jar/data/ scp sample_fpgrowth.txt spark@slave2:/home/jar/data/

基于Spark的FPGrowth(关联规则算法)

然后提交给spark集群运行 
数据目录:/home/jar/data 
jar目录:/home/jar 
模型目录:/home/jar/model

 
 
  • 1
  • 1
spark-submit --class org.wq.scala.ml.FP_Growth --master spark://master:7077 --executor-memory 700m --num-executors 1 /home/jar/FP_Growth.jar /home/jar/data/sample_fpgrowth.txt 0.2 0.8 2

运行结果: 
频繁项集: 

基于Spark的FPGrowth(关联规则算法)

规则: 

基于Spark的FPGrowth(关联规则算法)

集群跑job信息 

基于Spark的FPGrowth(关联规则算法)

四:注意事项

1我使用了20w多样本计算,近2000个物品,支持度5%,置信70%,训练出来的规则很多,最后匹配的规则比较慢,而且物品的覆盖比较少。所以把近2000的物品修改为主推的近500,这样规则就减少了很多,切覆盖的物品也比较多。具体参数自己试下哈,样本和样本的结构不一样。 
2FpGrowth的训练其实比较快的,把样本量提升到了50w,训练的时间也是分钟级别的,10分钟左右吧,前提是支持度比高。在调整算法的时间,支持度很重要,关系到运行的时间,我把支持度调整的很低的时候,算法跑不出来,也会内存溢出(本身内存也不大哈)。不过时间多也无所谓,因为本身就是离线模型训练哈。 
3参数调整方案,多试试,觉得准确性和物品覆盖比较满意的时候就行了额,至于参数的自动迭代,完全没什么思路,除了输入不同参数,求最好。 
4训练的时候数据能cache就cache哈,会比叫快哈 
5 这个训练中,6个样本,都产生了85个规则,可以想象,样本量大了之后,规则暴多,所以把规则写入MySQL,group by,group_concat()(会mysql的应该明白我说啥)可以合并规则,把置信度高的放在前面,当然自己写代码可以哈。  s,t,x–>z–>1  s,t,x–>y–>1  合并为s,t,x–>z,y置信度高的在前面哈

0

精彩评论

暂无评论...
验证码 换一张
取 消