
大数据作为一种技术手段,可以将业务和技术结合起来,使得医院、卫生部门、保险公司以及个人都能从中得到相应的价值。医疗大数据是医疗行业的未来,未来肯定是一个智慧医院,数字化医院。 医疗保健行业面临的挑战(1) 医疗保健的变革 金融风险转移 扩大覆盖范围(ACA) 新产品 付款人和供应商合作(2) 慢性病的频率和成本都在上升 II型糖尿病 哮喘 心脏病 肥胖现在也被列为一种病(3) 人口统计与行为 人口老龄化 护理改进/协调 患者/成员参与 生活方式选择,推动医疗成本上升(4) 技术变革 新的沟通渠道 社交媒体的发展。。。。微信、微博、QQ等 移动医疗、远程医疗 DIY保健 大数据分析 医疗大数据应用2.1治疗方法比较研究 这里可以通过比较患有相同疾病的人的治疗方法和病人的住院时间以及治疗过程,分析哪些方法是有效的,哪些治疗或检查是不必要的。通过不断收集有效的案列,得出某种疾病的最佳治疗方案。这样既为医院节约了资源也同时为患者减省了成本。 本案列就是国外一个案列,将医院所有患有肺炎的病人的治疗流程进行分析,判断哪些人的治疗流程合适,哪些人的治疗流程超额占用了资源,然后加以改进。2.2临床决策支持系统 临床决策支持系统是分析医生输入的目录,比较其与医疗指引的不同,从而提醒医生潜在的风险或遗漏的检查等,如药物不良反应。大数据技术可以使得这个决策支持平台更加的智能,可以收集不同的治疗方案,利用图像识别技术,识别医疗影像、挖掘医疗文献,从而建立疾病的专家库。在医生诊断过程中给出建议给医生参考。 2.3 医疗保险索赔应用 这是一个与传统保险业务相结合的应用。现在保险行业经常会发现存在医疗的超额赔付、骗保等现象。通过大数据分析,可以结合各方面数据对保险人进行分析,例如信用数据,支付能力、健康状况等,防止被包人,利用空隙损害国家利益,骗取高额保险。 2.4 收集病人住院信息 医院都希望能够解决医疗资源,患者也希望节约成本,通过收集医院病人的患病类型,住院类型等类型,将患者进行聚类分析,可以清楚的了解医院看病的类型分类。通过分析,可以预防一些流感性病毒、传染性病毒的传播等。 2.5基因分析 基因分析我觉得是未来一个比较好的应用方向,可以建立每个人的基因库,检测每个人身上潜在的隐性的疾病基因。根据自己的基因库,未来可以有很多的应用: (1) 当患病后,可以根据自己的患病情况,用药情况,结合自身的基因测序结果分析。(2) 分析两种不同疾病之间是否存在关联,比如肥胖和哮喘。(3) 将不同的基因组之间进行关联分析,发掘疾病治疗的有效方法。 2.6 智能穿戴 采集数据源:个人体征数据:血液含氧量、血压、心率、心电图和皮肤温度等。个人特征数据:年龄、性别、职业、收入、工作区域、社会关系等。 实现流程 通过为用户提供可穿戴式设备,连续记录用户的体征数据(人的血压、体温等体征数据),然后将这些数据传输到手机上,进而链接云端的数据库,然后云端分析师通过数据分析得出用户自身的连续体征波动规律,当波动出现异常时则会有预警出现,然后给出个性化的健康管理建议。 应用方向预测疾病:通过大数据分析技术为用户提前预测疾病,提供个性化的健康管理方案。降低医疗成本:利用大数据实现疾病预测的价值,能够提供健康管理方案,维持健康的成本低于疾病医治的成本,所以能够降低医疗成本。 医疗大数据的行业信息 医疗大数据现在国内发展的速度还比较慢,市场还需要一些时间去培育,这里介绍几家大数据的公司发展情况。 2009年,Apixio公司成立于美国加利福尼亚州圣马特奥,其名为HCC Profiler的医疗大数据分析平台利用非结构化数据分析并预测患者的健康状况,该平台主要针对慢性病进行分析。同时,公司新开发了一个名为Apixio Iris的平台,其背后有着更庞大的数据库作为支撑。Apixio公司的CEO Darren Schulte表示将会把D轮融资的资金用于提升现有平台的分析精准度和研发健康管理类APP。 医疗大数据领域的新星公司Apixio在新一轮D轮融资中获1930万美元资金。Apixio旨在为医疗机构提供大数据分析平台,方便医生进行更精确的诊疗。此轮融资的投资机构由SSM Partners领投,First Analysis和Bain Capital Ventures跟投。截至目前,Apixio共获投4188万美元。 2013年,一家成立于美国加利福尼亚州圣马特奥的名为Lumiata公司,也致力于利用大数据帮助医生分析患者的病情和预测病程,该公司的特点是将电子病历和病理生理学等数据整合,通过图谱分析的方式模拟人类的多维推理过程,从而分析患者的病情,并作出预测。Lumiata公司于2014年获得两次A轮融资,获投金额为1000万美元。 在国内,华润万里云医疗也在医疗大数据领域进行研究探索,该公司成立于2009年8月,项目为医学影像大数据云平台,公司致力于在基层医院、患者和专家之间形成高效、专业的连接,为基层医院和患者提供创新型影像服务。华润万里云医疗于今年3月获阿里健康投资的2.25亿元A轮融资。
Spark机器学习之推荐引擎 一. 最小二乘法建立模型 关于最小二乘法矩阵分解,我们可以参阅: 一、矩阵分解模型。 用户对物品的打分行为可以表示成一个评分矩阵A(m*n),表示m个用户对n各物品的打分情况。如下图所示: 其中,A(i,j)表示用户user i对物品item j的打分。但是,ALS 的核心就是下面这个假设:的打分矩阵 A 可以用两个小矩阵和的乘积来近似:。这样我们就把整个系统的自由度从一下降到了。我们接下来就聊聊为什么 ALS 的低秩假设是合理的。世上万千事物,人们的喜好各不相同。但。举个例子,我喜欢看略带黑色幽默的警匪电影,那么大家根据这个描述就知道我大概会喜欢昆汀的《低俗小说》、《落水狗》和韦家辉的《一个字头的诞生》。这些电影都符合我对自己喜好的描述,也就是说他们在这个抽象的低维空间的投影和我的喜好相似。再抽象一些,把人们的喜好和电影的特征都投到这个低维空间,一个人的喜好映射到了一个低维向量,一个电影的特征变成了纬度相同的向量,那么这个人和这个电影的相似度就可以表述成这两个向量之间的内积。 我们把打分理解成相似度,那么“打分矩阵A(mn)”就可以由“用户喜好特征矩阵U(mk)”和“产品特征矩阵V(n*k)”的乘积来近似了。矩阵U、矩阵V如下图所示: U V 二、交替最小二乘法(ALS)。矩阵分解模型的损失函数为: 有了损失函数之后,下面就开始谈优化方法了,通常的优化方法分为两种:交叉最小二乘法(alternative least squares)和随机梯度下降法(stochastic gradient descent)。本文使用算法的思想就是:我们先随机生成然后固定它求解,再固定求解,这样交替进行下去,直到取得最优解min(C)。因为每步迭代都会降低误差,并且误差是有下界的,所以 ALS 一定会收敛。但由于问题是非凸的,ALS 并不保证会收敛到全局最优解。但在实际应用中,ALS 对初始点不是很敏感,是不是全局最优解造成的影响并不大。 算法的执行步骤: 1、先随机生成一个。一般可以取0值或者全局均值。 2、固定(即:认为是已知的常量),来求解。 此时,损失函数为: 由于C中只有Vj一个未知变量,因此C的最优化问题转化为最小二乘问题,用最小二乘法求解Vj的最优解: 固定j(j=1,2,......,n),则:C的导数 按照上式依次计算v1,v2,......,vn,从而得到。 3、固定(即:认为是已知的量),来求解。 此时,损失函数为: 同理,用步骤2中类似的方法,可以计算ui的值: 依照上式依次计算u1,u2,......,um,从而得到。 4、循环执行步骤2、3,直到损失函数C的值收敛(或者设置一个迭代次数N,迭代执行步骤2、3 N次后停止)。这样,就得到了C最优解对应的矩阵U、V。 MovieLens 数据该数据集由用户ID,影片ID,评分,时间戳组成 我们只需要前3个字段 复制代码/ Load the raw ratings data from a file. Replace 'PATH' with the path to the MovieLens data /val rawData = sc.textFile("/PATH/ml-100k/u.data")rawData.first()// 14/03/30 13:21:25 INFO SparkContext: Job finished: first at :17, took 0.002843 s// res24: String = 196 242 3 881250949 / Extract the user id, movie id and rating only from the dataset /val rawRatings = rawData.map(_.split("t").take(3))rawRatings.first()// 14/03/30 13:22:44 INFO SparkContext: Job finished: first at :21, took 0.003703 s// res25: Array[String] = Array(196, 242, 3)复制代码 MLlib ALS模型MLlib导入ALS模型: import org.apache.spark.mllib.recommendation.ALS我们看一下ALS.train函数: 复制代码ALS.train/* <console>:13: error: ambiguous reference to overloaded definition, both method train in object ALS of type (ratings: org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating], rank: Int, iterations: Int)org.apache.spark.mllib.recommendation.MatrixFactorizationModel and method train in object ALS of type (ratings: org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating], rank: Int, iterations: Int, lambda: Double)org.apache.spark.mllib.recommendation.MatrixFactorizationModel match expected type ? ALS.train ^ */复制代码 我们可以得知train函数需要四个参数:ratings: org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating], rank: Int, iterations: Int, lambda: Double ratings org.apache.spark.mllib.recommendation.Rating类是对用户ID,影片ID,评分的封装 我们可以这样生成Rating的org.apache.spark.rdd.RDD: val ratings = rawRatings.map { case Array(user, movie, rating) => Rating(user.toInt, movie.toInt, rating.toDouble) }ratings.first()// 14/03/30 13:26:43 INFO SparkContext: Job finished: first at :24, took 0.002808 s// res28: org.apache.spark.mllib.recommendation.Rating = Rating(196,242,3.0) rank 对应ALS模型中的因子个数,即“两个小矩阵和”中的k iterations 对应运行时的迭代次数 lambda: 控制模型的正则化过程,从而控制模型的过拟合情况。 由此,我们可以得到模型: 复制代码/ Train the ALS model with rank=50, iterations=10, lambda=0.01 /val model = ALS.train(ratings, 50, 10, 0.01)// ...// 14/03/30 13:28:44 INFO MemoryStore: ensureFreeSpace(128) called with curMem=7544924, maxMem=311387750// 14/03/30 13:28:44 INFO MemoryStore: Block broadcast_120 stored as values to memory (estimated size 128.0 B, free 289.8 MB)// model: org.apache.spark.mllib.recommendation.MatrixFactorizationModel = org.apache.spark.mllib.recommendation.MatrixFactorizationModel@7c7fbd3b / Inspect the user factors /model.userFeatures// res29: org.apache.spark.rdd.RDD[(Int, Array[Double])] = FlatMappedRDD[1099] at flatMap at ALS.scala:231 / Count user factors and force computation /model.userFeatures.count// ...// 14/03/30 13:30:08 INFO SparkContext: Job finished: count at :26, took 5.009689 s// res30: Long = 943 model.productFeatures.count// ...// 14/03/30 13:30:59 INFO SparkContext: Job finished: count at :26, took 0.247783 s// res31: Long = 1682 / Make a prediction for a single user and movie pair / val predictedRating = model.predict(789, 123)复制代码 二. 使用推荐模型 用户推荐用户推荐,向给定用户推荐物品。这里,我们给用户789推荐前10个他可能喜欢的电影。我们可以先解析下电影资料数据集 该数据集是由“|”分割,我们只需要前两个字段电影ID和电影名称 val movies = sc.textFile("/PATH/ml-100k/u.item")val titles = movies.map(line => line.split("\|").take(2)).map(array => (array(0).toInt, array(1))).collectAsMap()titles(123)// res68: String = Frighteners, The (1996)我们看一下预测的结果: 复制代码/ Make predictions for a single user across all movies /val userId = 789val K = 10val topKRecs = model.recommendProducts(userId, K)println(topKRecs.mkString("n"))/* Rating(789,715,5.931851273771102)Rating(789,12,5.582301095666215)Rating(789,959,5.516272981542168)Rating(789,42,5.458065302395629)Rating(789,584,5.449949837103569)Rating(789,750,5.348768847643657)Rating(789,663,5.30832117499004)Rating(789,134,5.278933936827717)Rating(789,156,5.250959077906759)Rating(789,432,5.169863417126231)*/topKRecs.map(rating => (titles(rating.product), rating.rating)).foreach(println)/*(To Die For (1995),5.931851273771102)(Usual Suspects, The (1995),5.582301095666215)(Dazed and Confused (1993),5.516272981542168)(Clerks (1994),5.458065302395629)(Secret Garden, The (1993),5.449949837103569)(Amistad (1997),5.348768847643657)(Being There (1979),5.30832117499004)(Citizen Kane (1941),5.278933936827717)(Reservoir Dogs (1992),5.250959077906759)(Fantasia (1940),5.169863417126231)*/复制代码我们再来看一下实际上的结果是: 复制代码val moviesForUser = ratings.keyBy(_.user).lookup(789)// moviesForUser: Seq[org.apache.spark.mllib.recommendation.Rating] = WrappedArray(Rating(789,1012,4.0), Rating(789,127,5.0), Rating(789,475,5.0), Rating(789,93,4.0), ...// ...println(moviesForUser.size)// 33moviesForUser.sortBy(-_.rating).take(10).map(rating => (titles(rating.product), rating.rating)).foreach(println)/*(Godfather, The (1972),5.0)(Trainspotting (1996),5.0)(Dead Man Walking (1995),5.0)(Star Wars (1977),5.0)(Swingers (1996),5.0)(Leaving Las Vegas (1995),5.0)(Bound (1996),5.0)(Fargo (1996),5.0)(Last Supper, The (1995),5.0)(Private Parts (1997),4.0)*/复制代码很遗憾,一个都没对上~不过,这很正常。因为预测的结果恰好都是用户789没看过的电影,其预测的评分都在5.0以上,而实际上的结果是根据用户789已经看过的电影按评分排序获得的,这也体现的推荐系统的作用~ 物品推荐物品推荐,给定一个物品,哪些物品和它最相似。这里我们使用余弦相似度: Cosine相似度计算 将查询语句的特征词的权值组成向量 a 网页中对应的特征词的权值组成向量 b 查询语句与该网页的Cosine相似度: / Compute the cosine similarity between two vectors /def cosineSimilarity(vec1: DoubleMatrix, vec2: DoubleMatrix): Double = { vec1.dot(vec2) / (vec1.norm2() * vec2.norm2()) } jblas线性代数库这里MLlib库需要依赖jblas线性代数库,如果大家编译jblas的jar包有问题,可以到我的百度云上获取。把jar包加到lib文件夹后,记得在spark-env.sh添加配置: SPARK_DIST_CLASSPATH="$SPARK_DIST_CLASSPATH:$SPARK_LIBRARY_PATH/jblas-1.2.4-SNAPSHOT.jar" import org.jblas.DoubleMatrixval aMatrix = new DoubleMatrix(Array(1.0, 2.0, 3.0))// aMatrix: org.jblas.DoubleMatrix = [1.000000; 2.000000; 3.000000] 求各个产品的余弦相似度: val sims = model.productFeatures.map{ case (id, factor) => val factorVector = new DoubleMatrix(factor) val sim = cosineSimilarity(factorVector, itemVector) (id, sim) }求相似度最高的前10个相识电影。第一名肯定是自己,所以要取前11个,再除去第1个: 复制代码val sortedSims2 = sims.top(K + 1)(Ordering.by[(Int, Double), Double] { case (id, similarity) => similarity })sortedSims2.slice(1, 11).map{ case (id, sim) => (titles(id), sim) }.mkString("n")/* (Hideaway (1995),0.6932331537649621)(Body Snatchers (1993),0.6898690594544726)(Evil Dead II (1987),0.6897964975027041)(Alien: Resurrection (1997),0.6891221044611473)(Stephen King's The Langoliers (1995),0.6864214133620066)(Liar Liar (1997),0.6812075443259535)(Tales from the Crypt Presents: Bordello of Blood (1996),0.6754663844488256)(Army of Darkness (1993),0.6702643811753909)(Mystery Science Theater 3000: The Movie (1996),0.6594872765176396)(Scream (1996),0.6538249646863378)*/复制代码 三.推荐模型评估 1.MSE/RMSE均方差(MSE),就是对各个实际存在评分的项,pow(预测评分-实际评分,2)的值进行累加,在除以项数。而均方根差(RMSE)就是MSE开根号。 我们先用ratings生成(user,product)RDD,作为model.predict()的参数,从而生成以(user,product)为key,value为预测的rating的RDD。然后,用ratings生成以(user,product)为key,实际rating为value的RDD,并join上前者: 复制代码val usersProducts = ratings.map{ case Rating(user, product, rating) => (user, product)}val predictions = model.predict(usersProducts).map{ case Rating(user, product, rating) => ((user, product), rating) }val ratingsAndPredictions = ratings.map{ case Rating(user, product, rating) => ((user, product), rating) }.join(predictions)ratingsAndPredictions.first()//res21: ((Int, Int), (Double, Double)) = ((291,800),(2.0,2.052364223387371))复制代码使用MLLib的评估函数,我们要传入一个(actual,predicted)的RDD。actual和predicted左右位置可以交换: 复制代码import org.apache.spark.mllib.evaluation.RegressionMetricsval predictedAndTrue = ratingsAndPredictions.map { case ((user, product), (actual, predicted)) => (actual, predicted) }val regressionMetrics = new RegressionMetrics(predictedAndTrue)println("Mean Squared Error = " + regressionMetrics.meanSquaredError)println("Root Mean Squared Error = " + regressionMetrics.rootMeanSquaredError)// Mean Squared Error = 0.08231947642632852// Root Mean Squared Error = 0.2869137090247319复制代码 MAPK/MAPK值平均准确率(MAPK)可以简单的这么理解: 设定推荐K=10,即推荐10个物品。预测该用户评分最高的10个物品ID作为文本1,实际上用户评分过所有物品ID作为文本2,求二者的相关度。(个人认为该评估方法在这里不是很适用) 我们可以按评分排序预测物品ID,再从头遍历,如果该预测ID出现在实际评分过ID的集合中,那么就增加一定分数(当然,排名高的应该比排名低的增加更多的分数,因为前者更能体现推荐的准确性)。最后将累加得到的分数除以min(K,actual.size) 如果是针对所有用户,我们需要把各个用户的累加分数进行累加,在除以用户数。 在MLlib里面,使用的是全局平均准确率(MAP,不设定K)。它需要我们传入(predicted.Array,actual.Array)的RDD。 现在,我们先来生成predicted: 我们先生成产品矩阵: / Compute recommendations for all users /val itemFactors = model.productFeatures.map { case (id, factor) => factor }.collect()val itemMatrix = new DoubleMatrix(itemFactors)println(itemMatrix.rows, itemMatrix.columns)// (1682,50)以便工作节点能够访问到,我们把该矩阵以广播变量的形式分发出去: // broadcast the item factor matrixval imBroadcast = sc.broadcast(itemMatrix)“”,矩阵相乘,计算出评分。scores.data.zipWithIndex,scores.data再按评分排序。生成recommendedIds,构建(userId, recommendedIds)RDD。 复制代码val allRecs = model.userFeatures.map{ case (userId, array) => val userVector = new DoubleMatrix(array) val scores = imBroadcast.value.mmul(userVector) val sortedWithId = scores.data.zipWithIndex.sortBy(-_._1) val recommendedIds = sortedWithId.map(_._2 + 1).toSeq (userId, recommendedIds)}复制代码生成actual: // next get all the movie ids per user, grouped by user idval userMovies = ratings.map{ case Rating(user, product, rating) => (user, product) }.groupBy(_._1)// userMovies: org.apache.spark.rdd.RDD[(Int, Seq[(Int, Int)])] = MapPartitionsRDD[277] at groupBy at :21生成(predicted.Array,actual.Array)的RDD,并使用评估函数: 复制代码import org.apache.spark.mllib.evaluation.RankingMetricsval predictedAndTrueForRanking = allRecs.join(userMovies).map{ case (userId, (predicted, actualWithIds)) => val actual = actualWithIds.map(_._2) (predicted.toArray, actual.toArray)}val rankingMetrics = new RankingMetrics(predictedAndTrueForRanking)println("Mean Average Precision = " + rankingMetrics.meanAveragePrecision)// Mean Average Precision = 0.07171412913757183复制代码
最近看到一篇文章介绍了数据分析与挖掘的十大经典算法:C4.5,K-Means,SVM,Apriori,EM,PageRank,AdaBoost,KNN,Native Bayes,CART。 1. C4.5 C4.5算法是机器学习算法中的一种分类决策树算法,其核心算法是ID3算法. C4.5算法继承了ID3算法的优点,并在以下几方面对ID3算法进行了改进: 1) 用信息增益率来选择属性,克服了用信息增益选择属性时偏向选择取值多的属性的不足; 2) 在树构造过程中进行剪枝; 3) 能够完成对连续属性的离散化处理; 4) 能够对不完整数据进行处理。 C4.5算法有如下优点:产生的分类规则易于理解,准确率较高。其缺点是:在构造树的过程中,需要对数据集进行多次的顺序扫描和排序,因而导致算法的低效。 2. The k-means algorithm 即K-Means算法 k-means algorithm算法是一个聚类算法,把n的对象根据他们的属性分为k个分割,k < n。它与处理混合正态分布的最大期望算法很相似,因为他们都试图找到数据中自然聚类的中心。它假设对象属性来自于空间向量,并且目标是使各个群组内部的均 方误差总和最小。 3. Support vector machines 支持向量机,英文为Support Vector Machine,简称SV机(论文中一般简称SVM)。它是一种監督式學習的方法,它广泛的应用于统计分类以及回归分析中。支持向量机将向量映射到一个更 高维的空间里,在这个空间里建立有一个最大间隔超平面。在分开数据的超平面的两边建有两个互相平行的超平面。分隔超平面使两个平行超平面的距离最大化。假 定平行超平面间的距离或差距越大,分类器的总误差越小。一个极好的指南是C.J.C Burges的《模式识别支持向量机指南》。van der Walt 和 Barnard 将支持向量机和其他分类器进行了比较。 4. The Apriori algorithm Apriori算法是一种最有影响的挖掘布尔关联规则频繁项集的算法。其核心是基于两阶段频集思想的递推算法。该关联规则在分类上属于单维、单层、布尔关联规则。在这里,所有支持度大于最小支持度的项集称为频繁项集,简称频集 5. 最大期望(EM)算法 在统计计算中,最大期望(EM,Expectation–Maximization)算法是在概率(probabilistic)模型中寻找参数最大似然 估计的算法,其中概率模型依赖于无法观测的隐藏变量(Latent Variabl)。最大期望经常用在机器学习和计算机视觉的数据集聚(Data Clustering)领域。 6. PageRank PageRank是Google算法的重要内容。2001年9月被授予美国专利,专利人是Google创始人之一拉里·佩奇(Larry Page)。因此,PageRank里的page不是指网页,而是指佩奇,即这个等级方法是以佩奇来命名的。 PageRank根据网站的外部链接和内部链接的数量和质量俩衡量网站的价值。PageRank背后的概念是,每个到页面的链接都是对该页面的一次投票, 被链接的越多,就意味着被其他网站投票越多。这个就是所谓的“链接流行度”——衡量多少人愿意将他们的网站和你的网站挂钩。PageRank这个概念引自 学术中一篇论文的被引述的频度——即被别人引述的次数越多,一般判断这篇论文的权威性就越高。 7. AdaBoost Adaboost是一种迭代算法,其核心思想是针对同一个训练集训练不同的分类器(弱分类器),然后把这些弱分类器集合起来,构成一个更强的最终分类器 (强分类器)。其算法本身是通过改变数据分布来实现的,它根据每次训练集之中每个样本的分类是否正确,以及上次的总体分类的准确率,来确定每个样本的权 值。将修改过权值的新数据集送给下层分类器进行训练,最后将每次训练得到的分类器最后融合起来,作为最后的决策分类器。 8. kNN: k-nearest neighbor classification K最近邻(k-Nearest Neighbor,KNN)分类算法,是一个理论上比较成熟的方法,也是最简单的机器学习算法之一。该方法的思路是:如果一个样本在特征空间中的k个最相似(即特征空间中最邻近)的样本中的大多数属于某一个类别,则该样本也属于这个类别。 9. Naive Bayes 在众多的分类模型中,应用最为广泛的两种分类模型是决策树模型(Decision Tree Model)和朴素贝叶斯模型(Naive Bayesian Model,NBC)。 朴素贝叶斯模型发源于古典数学理论,有着坚实的数学基础,以 及稳定的分类效率。同时,NBC模型所需估计的参数很少,对缺失数据不太敏感,算法也比较简单。理论上,NBC模型与其他分类方法相比具有最小的误差率。 但是实际上并非总是如此,这是因为NBC模型假设属性之间相互独立,这个假设在实际应用中往往是不成立的,这给NBC模型的正确分类带来了一定影响。在属 性个数比较多或者属性之间相关性较大时,NBC模型的分类效率比不上决策树模型。而在属性相关性较小时,NBC模型的性能最为良好。 10. CART: 分类与回归树 CART, Classification and Regression Trees。 在分类树下面有两个关键的思想。第一个是关于递归地划分自变量空间的想法;第二个想法是用验证数据进行剪枝。
数据分析与挖掘,指的是通过对大量的数据进行观察与分析。发掘其中的未知的,潜在的、对决策有价值的关系、模式和趋势,并利用这些规则建立决策模型、提供预测性支持的方法和过程。 作为一名大数据开发工程师,什么能力才是我们我们的核心竞争力,答案是肯定的,那就是数据分析与挖掘。只有让数据产生价值才是数据开发工程师的职责。下面我将从几个方面介绍数据挖掘: 1.数据挖掘的基本任务 数据挖据的基本任务包括利用分类与预测、聚类分析、关联规则、时序模式、偏差检验、智能推荐等方法,帮助企业提取数据中蕴含的商业价值,提高企业的竞争力。2.数据挖掘的过程 2.1 定义数据挖掘目标 针对具体的数据挖掘需求,我们首先要做的就是明确本次数据挖掘的目标是什么?预期达到怎样的效果?因此我们在进行数据挖掘工作前,必须先了解项目相关背景知识,弄清用户的需求。比如某电商平台的数据挖掘应用,可定义一下数据挖掘目标: (1) 分析挖掘用户数据,建立用户画像与物品画像等(2) 基于用户画像实现动态商品智能推荐,帮助用户快速发现自己感兴趣的商品,同时确保给用户推荐的也是企业所期望的,实现用户与企业的双赢。(3) 对平台客户进行群体细分,了解不同客户的贡献度与消费特征,分析哪些客户是最有价值的,哪些是需要重点的,对不同价值的客户采取不同的营销策略,将有限的资源投放到最有价值的客户身上,实现精准化营销。(4) 基于商品的历史销售情况,综合节假日、气候和竞争对手等影响因素,对商品销售量进行趋势预测,方便企业准备库存。 2.2 数据抽样 在明确了数据挖掘的目标后,接下来要做的工作就是抽取企业的数据挖掘库中的数据子集,随着现在大数据技术的发展,也有很多企业进行全量数据的抽取,不过为了数据挖掘的效率,可以选择抽样使用数据,节省系统资源。抽样有很多的方法:比如随机抽样、等距抽样、分层抽样、分类抽样等。 2.3 数据预处理 当采集的数据维度过大,如何进行降维处理、缺失值处理等都是数据预处理过程中要解决的问题。如何对数据进行预处理以改善数据质量,并最终达到完善数据挖掘结果。目前数据预处理一般包括:数据筛选、数据质量转换、缺失值处理、坏数据处理、数据标准化、数据规约等。 2.4 挖掘建模 数据挖掘建模是数据挖掘的核心工作,选择哪种算法进行模型构建?在生成最终的数据集后,就可以在此基础上建立模型来进行聚类分析了。建立模型阶段主要是选择和应用各种建模技术,同时对它们的参数进行校准以达到最优值。在明确建模技术和算法后需要确定模型参数和输入变量。模型参数包括类的个数和最大迭代步数等。在建模过程中,我们采用多种技术手段,并将建模效果进行对比。需要挑选合适的变量参与建模。参与建模的变量太多会削弱主要业务属性的影响,并给理解分群结果带来困难;变量太少则不能全面覆盖需要考察的各方面属性,可能会遗漏一些重要的属性关系。输入变量的选择对建立满意的模型至关重要。 建立模型是一个螺旋上升,不断优化的过程,在每一次聚类结束后,需要判断聚类结果在业务上是否有意义,其各群特征是否明显。如果结果不理想,则需要调整聚类模型,对模型进行优化,称之为聚类优化。聚类优化可通过调整聚类个数及调整聚类变量输入来实现,也可以通过多次运行,选择满意的结果。通常可以依据以下原则判断聚类结果是否理想:类间特征差异是否明显;群内特征是否相似;聚类结果是否易于管理及是否具有业务指导意义。 2.5 模型评价 建模的过程中会得到一系列的分析结果,它们是对目标问题多侧面的描述,这时需要对它们进行验证和评价,以得到合理的,完备的决策信息。对产生的模型结果需要进行对比验证、准确度验证、支持度验证等检验以确定模型的价值。在这个阶段需要引入更多层面和背景的用户进行测试和验证,通过对几种模型的综合比较,产生最后的优化模型。根据业务对模型进行解释应用,不同的模型的评价方法往往也不同。 3.常用的数据挖据建模工具 3.1 SAS Enterprise Miner Enterprise Miner(EM)是SAS提供的一个图形化界面、菜单驱动的、拖拉式操作、对用户非常友好且功能非常强大的集成的数据挖掘系统。它集成了: (1) 数据获取工具(2) 数据抽样工具(3) 数据筛选工具(4) 数据变量转换工具(5) 数据挖据数据库(6) 数据挖掘过程(7) 多种形式的回归工具(8) 为建立决策树的数据剖分工具(9) 决策树浏览工具(10) 人工神经元网络(11) 数据挖据的评价工具在SAS/EM中,可利用具有明确代表意义的图形化的模块将这些数据挖掘工具单元组成一个数据流程图,并以此来组织你的数据挖掘过程。对于有经验的数据挖掘专家,SAS/EM提供大量的选项,可让有经验的数据分析人员进行精细化调整分析处理。 3.2 IBM SPSS Modeler IBM SPSS Modeler原名Clementine,2009年被IBM收购以后对产品进行性能和功能进行了大幅度的改进和提升,几乎一年一个版本。它封装了了先进的统计学和数据挖掘技术来获取预测分析。SPSS Modeler提供图形化的界面,屏蔽了数据挖据算法的复杂性和操作的繁琐,让使用者只需要聚焦如何使用数据挖掘技术去解决实际的商业问题。 3.3 Python Python是一种面向对象、解释型的计算机程序设计语言,它拥有高效的数据结构,能简单的进行面向对象的编程。python本身不提供数据挖掘环境,但是python它有各种数据挖掘的扩展库。比如比较常见的有:Numpy、Scipy、Matplotlib等,他们分别为Python提供快速数组处理、科学计算以及绘图的能力,在用到机器学习和人工神经网络时,我们会用到SKlearn库和Keras库,它提供了完善的机器学习工具箱,包括:数据的预处理、分析、回归、预测、模型分析等。正是由于有了这些扩展库,python才是数据分析与挖据常用的语言。 3.4 SQL Server 微软公司的SQL Server中集成了数据挖掘组件--Analysis Servers,借助于SQL Server数据库管理功能可以很好的集成在SQL Servers中,SQL Server 2008中提供很多数据挖掘算法,比如:决策树算法、聚类分析算法、Native Bayes算法、关联规则算法等9种算法。但是这些模型的建立都依赖与SQL Server平台,所以平台移植性比较差。 3.5 RapidMiner RapidMiner也成为YALE,提供图形化的操作界面,采用树状结构来组织分析组件,树上每个节点表示不同的运算符。RapidMiner中提供了大量的运算符,包括数据预处理、变换、建模、评估等各个环节。RapidMiner是基于Java开发的,基于Weka来构建的,所以Yale可以调用Weka中的组件,Yale中还提供扩展套件Radoop,可以和Hadoop结合起来用,在Hadoop集群中运行任务。 3.6 Weka Weka的全名是怀卡托智能分析环境(Waikato Environment for Knowledge Analysis),是一款免费的,非商业化(与之对应的是SPSS公司商业数据挖掘产品--Clementine )的,基于JAVA环境下开源的机器学习(machine learning)以及数据挖掘(data mining)软件。它和它的源代码可在其官方网站下载。有趣的是,该软件的缩写WEKA也是New Zealand独有的一种鸟名,而Weka的主要开发者同时恰好来自New Zealand的the University of Waikato。WEKA作为一个公开的数据挖掘工作平台,集合了大量能承担数据挖掘任务的机器学习算法,包括对数据进行预处理,分类,回归、聚类、关联规则以及在新的交互式界面上的可视化。跟很多电子表格或数据分析软件一样,WEKA所处理的数据集是一个二维的表格。 3.7 KnimeKNIME是一个基于Eclipse平台开发,模块化的数据挖掘系统。它能够让用户可视化创建数据流(也就常说的pipeline),选择性的执行部分或所有分解步骤,然后通过数据和模型上的交互式视图研究执行后的结果。可以扩展使用Weka中的算法,同时Knime也提供基于数据流的方式来组织数据挖掘过程,每个节点都有数据的输入/输出端口,用接收或输出计算结果。 3.8 TipDM TipDM(顶尖大数据挖掘平台)使用Java语言开发,能从各种数据源获取数据,构建数据挖掘模型。TipDM目前已经集成了数十种预测算法和分析技术,支持数据挖掘流程所需要的主要过程,并提供开发的应用接口和算法,能够满足各种复杂的应用需求。 4.数加平台 数加平台是阿里云提供的数据处理管理平台,平台提供了各种模块:机器学习、推荐引擎、数据可视化(Datav)等,数加平台为DT时代的大数据处理的强力引擎。平台屏蔽了数据分析与挖掘过程中的复杂性,以图像化的界面构建数据处理流程,让大数据处理工程师只要专注于数据挖掘的分析过程。 数加平台是基于阿里云的ODPS平台,所以具有强大的数据处理能力,各种计算引擎和工具相互结合在一起形成数加生态体系,就开源的Hadoop生态体系一样。 数加的生态体系组成如下:
一、简介 Apache Flume是一个分布式的、可靠的、可用的系统,可用于从不同的数据源中高效地收集、聚合和移动海量日志数据到集中式数据存储系统。 ODPS Sink是基于ODPS DataHub Service开发的Flume插件,可以将Flume的Event数据导入到ODPS中。插件兼容Flume的原有功能特性,支持ODPS表自定义分区、且可以自动创建分区。 二、环境要求1、JDK(1.6以上,推荐1.7)2、Flume-NG 1.x 三、插件部署1、下载ODPS Sink插件并解压:aliyun-odps-flume-plugin2、Flume-NG 1.x下载:https://flume.apache.org/download.html(1)下载 apache-flume-1.6.0-bin.tar.gz(2)下载apache-flume-1.6.0-src.tar.gz3、Flume的安装 (1) 解压apache-flume-1.6.0-src.tar.gz和apache-flume-1.6.0-bin.tar.gz (2) 将apache-flume-1.6.0-src中的文件复制到apache-flume-1.6.0-bin中4、部署ODPS Sink插件:将文件夹odps_sink移动到Apache Flume安装目录下:$ mkdir {YOUR_APACHE_FLUME_DIR}/plugins.d$mv odps_sink/ { YOUR_APACHE_FLUME_DIR }/plugins.d/移动后,核验ODPS Sink插件是否已经在相应目录:$ ls { YOUR_APACHE_FLUME_DIR}/plugins.dodps_sink部署完成后,只需要在Flume的配置文件中将sink的type字段配置为:com.aliyun.odps.flume.sink.OdpsSink即可使用 四、配置示例例:将日志文件中的结构化数据进行解析,并上传到ODPS表中需要上传的日志文件格式如下(每行为一条记录,字段之间逗号分隔): test_basic.log some,log,line1some,log,line2...第一步、在ODPS 的 project创建ODPS Datahub表建表语句如下所示:CREATE TABLE hub_table_basic (col1 STRING, col2 STRING) PARTITIONED BY (pt STRING) INTO 1 SHARDS HUBLIFECYCLE 1; 第二步、创建Flume作业配置文件:在Flume安装目录的conf/文件夹下创建名为odps_basic.conf的文件,并输入内容如下: odps_basic.conf A single-node Flume configuration for ODPS Name the components on this agent a1.sources = r1a1.sinks = k1a1.channels = c1 Describe/configure the source a1.sources.r1.type = execa1.sources.r1.command = cat {YOUR_LOG_DIRECTORY}/test_basic.log Describe the sink a1.sinks.k1.type = com.aliyun.odps.flume.sink.OdpsSinka1.sinks.k1.accessID = {YOUR_ALIYUN_ODPS_ACCESS_ID}a1.sinks.k1.accessKey = {YOUR_ALIYUN_ODPS_ACCESS_KEY}a1.sinks.k1.odps.endPoint = http://service.odps.aliyun.com/apia1.sinks.k1.odps.datahub.endPoint = http://dh.odps.aliyun.coma1.sinks.k1.odps.project = {YOUR_ALIYUN_ODPS_PROJECT}a1.sinks.k1.odps.table = hub_table_basica1.sinks.k1.odps.partition = 20150814a1.sinks.k1.batchSize = 100a1.sinks.k1.serializer = DELIMITEDa1.sinks.k1.serializer.delimiter = ,a1.sinks.k1.serializer.fieldnames = col1,,col2a1.sinks.k1.serializer.charset = UTF-8a1.sinks.k1.shard.number = 1a1.sinks.k1.shard.maxTimeOut = 60a1.sinks.k1.autoCreatePartition = true Use a channel which buffers events in memory a1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 1000 Bind the source and sink to the channel a1.sources.r1.channels = c1a1.sinks.k1.channel = c1 第三步:启动Flume启动Flume并指定agent的名称和配置文件路径,-Dflume.root.logger=INFO,console选项可以将日志实时输出到控制台。$ cd { YOUR_APACHE_FLUME_DIR}$ bin/flume-ng agent -n a1 -c conf -f conf/odps_basic.conf -Dflume.root.logger=INFO,console写入成功,显示日志如下:...Write success. Event count: 2...在ODPS Datahub表中即可查到数据; 多数据源上传到ODPS多个数据上传到odps,只需要配置对应的source和channel,可以有一下几种上传方式:(1) 多个source和一个channel和一个sink (2) 多个source和多个channel和一个sink (3) 多个source,多个channel和多个sink,输出到多个地方存储 (4)多个agent的复杂情况: 下面给出(1)中情况的配置: odps_basic.conf A single-node Flume configuration for ODPS Name the components on this agent a1.sources = r1 r2a1.sinks = k1a1.channels = c1 Describe/configure the source a1.sources.r1.type = execa1.sources.r1.command = cat {YOUR_LOG_DIRECTORY}/test_basic.log source2的配置 a1.sources.r2.type = execa1.sources.r2.command = cat {YOUR_LOG_DIRECTORY}/test_basic2.log Describe the sink a1.sinks.k1.type = com.aliyun.odps.flume.sink.OdpsSinka1.sinks.k1.accessID = {YOUR_ALIYUN_ODPS_ACCESS_ID}a1.sinks.k1.accessKey = {YOUR_ALIYUN_ODPS_ACCESS_KEY}a1.sinks.k1.odps.endPoint = http://service.odps.aliyun.com/apia1.sinks.k1.odps.datahub.endPoint = http://dh.odps.aliyun.coma1.sinks.k1.odps.project = {YOUR_ALIYUN_ODPS_PROJECT}a1.sinks.k1.odps.table = hub_table_basica1.sinks.k1.odps.partition = 20150814a1.sinks.k1.batchSize = 100a1.sinks.k1.serializer = DELIMITEDa1.sinks.k1.serializer.delimiter = ,a1.sinks.k1.serializer.fieldnames = col1,,col2a1.sinks.k1.serializer.charset = UTF-8a1.sinks.k1.shard.number = 1a1.sinks.k1.shard.maxTimeOut = 60a1.sinks.k1.autoCreatePartition = true Use a channel which buffers events in memory a1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 1000 Bind the source and sink to the channel a1.sources.r1.channels = c1a1.sinks.k1.channel = c1 source2的channel a1.sources.r2.channels = c2 可能遇到的问题:1、在数据sink阶段报错,数据无法传递这个错误是由于数据的最上面加了一行注释,它默认读取改行导致数据的行数与配置文件中 配置的行数不一致,所以报上面这个错,删出上面的注释行问题就解决了。 2、OOM 问题:flume 报错:java.lang.OutOfMemoryError: GC overhead limit exceeded或者:java.lang.OutOfMemoryError: Java heap spaceException in thread "SinkRunner-PollingRunner-DefaultSinkProcessor" java.lang.OutOfMemoryError: Java heap spaceFlume 启动时的最大堆内存大小默认是 20M,线上环境很容易 OOM,因此需要你在 flume-env.sh 中添加 JVM 启动参数: JAVA_OPTS="-Xms8192m -Xmx8192m -Xss256k -Xmn2g -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"然后在启动 agent 的时候一定要带上 -c conf 选项,否则 flume-env.sh 里配置的环境变量不会被加载生效
Python基础框架和工具 最近在学Python金融大数据分析,在安装Python进行大数据分析的环境时遇到很多问题,例如:在安装pandas包时候就要到各种错误,总是缺少很多安装包,最后发现利用Python的Anaconda进行科学计算环境的搭建非常方便。 Anaconda是和Canopy类似的科学计算环境,安装非常方便,而且自带的conda包管理器也十分强大。 1、 Anaconda介绍:我们可以从http://continuum.io/downloads上下载适合你操作系统的Anaconda,那么我们为啥用Anaconda进行Python部署呢?有一下几个方面的因素:(1) 库/软件包 可以安装100多个重要的Python库和软件包,而且可以版本一致的方式安装所有的软件包。(2) 开源 Anaconda是免费开源的,而且分发版本中所有的库和软件包也是如此。(3) 跨平台 Anaconda可以运用于Windows、Mac OS 和Linux平台。(4) 自动更新 Anaconda中的库和软件包可以免费在线自动更新。 (5) Conda软件包管理程序 Conda软件管理程序可以并行使用多个Python版本和多个版本的库。 Anaconda的安装非常简单,在windows下只需双击安装程序,然后安装提示一步步下去即可,在Linux中,在shell中进入安装程序所在的目录,然后输入:bash Anaconda -1.x.x-Linux-x86[_64].sh 安装完成后,就可以利用这100多个库和软件包了,anaconda所包含的一些库和软件包: 2、安装模板:Anaconda已经自带了很多的科学计算用的库和模板,但是这还不够,有时候我们需要安装一些其他模板:condaanaconda自带了conda命令用于安装模板和更新模板,比如: 安装模板 conda install scipy 更新模板 conda update scipy 更新所有的模块 conda update --all pippip是Python自带的模块安装工具,比如:pip install requestspip install requests --upgrade 升级Anaconda新版本发布后,可以利用pyenv来安装最新版本,也可以利用Anaconda自带的更新工具升级:conda update condaconda update anaconda python开发常用的IDE:(1) Spyder(2) IPython(3)PyCharm 下面用conda创建一个名叫python2的版本为python2.7的环境。conda create -n python2 python=2.7这样就会在Anaconda安装目录下的envs目录下创建python2这个目录。 向其中安装扩展可以:直接用 conda install 并用 -n 指明安装到的环境,这里自然就是 python2 ,例如:conda install -n python2 pandas想使用Spyder,可以配置好环境变量后。在cmd窗口使用Spyder就可以打开窗口了:
最近在学习使用阿里云的推荐引擎时,在使用的过程中用到很多推荐算法,所以就研究了一下,这里主要介绍一种推荐算法—基于物品的协同过滤算法。ItemCF算法不是根据物品内容的属性计算物品之间的相似度,而是通过分析用户的行为记录来计算用户的相似度。该算法认为物品A和物品B相似的依据是因为喜欢物品A的用户也喜欢物品B。基于物品的协同过滤算法实现步骤:1、计算物品之间的相似度2、根据物品的相似度和用户的历史行为记录给用户生成推荐列表下面我们一起来看一下这两部是如何实现的:一、计算物品之间的相似度通过查询一下资料,ItemCF的物品相似度计算模型如下:公式中|N(i)|表示喜欢物品i的用户数,|N(j)|表示喜欢物品j的用户数, |N(i)∩N(j)|表示同时喜欢物品i和物品j的用户数。从上面的公式我们可以看出物品i和物品j相似是因为他们共同别很多的用户喜欢,相似度越高表示同时喜欢他们的用户数越多。下面举例讲解一下相似度的计算过程:假设用户A对物品a,b,d有过评价,用户B对物品b,c,e有过评价,如下图: A : a b dB : b c eC : c dD : b c dE : a d根据上面用户的行为构建:用户——物品倒排表:例如:物品a有用户A和E做过评价。a : A Eb : A B Dc : B C Dd : A C D Ee : B根据上面的倒排表我们可以构建一个相似度矩阵: 图 1.1 计算物品的相似度图中最左边的是用户输入的用户行为记录,每一行代表用户感兴趣的物品集合,然后对每个物品集合,我们将里面的物品两两加一,得到一个矩阵。最终将这些矩阵进行相加得到上面的C矩阵。其中Ci记录了同时喜欢物品i和j的用户数。这样我们就得到了物品之间的相似度矩阵W。 二、根据物品的相似度和用户的历史行为记录给用户生成推荐列表ItemCF通过下面的公式计算用户u对一个物品j的兴趣: 这里的N(u)代表用户喜欢的物品的集合,S(j,k)是和物品j最相似的的k个物品的集合,wij是物品j和i的相似度,r_ui代表用户u对物品i的兴趣。该公式的含义是,和用户历史上最感兴趣的物品月相似的物品,越有可能在用户的推荐列表中获得比较高的排名。下面是查阅资料找到的一些优化方法:(1)、用户活跃度对物品相似度的影响即认为活跃用户对物品相似度的贡献应该小于不活跃的用户,所以增加一个IUF(Inverse User Frequence)参数来修正物品相似度的计算公式:用这种相似度计算的ItemCF被记为ItemCF-IUF。ItemCF-IUF在准确率和召回率两个指标上和ItemCF相近,但它明显提高了推荐结果的覆盖率,降低了推荐结果的流行度,从这个意义上说,ItemCF-IUF确实改进了ItemCF的综合性能。 (2)、物品相似度的归一化Karypis在研究中发现如果将ItemCF的相似度矩阵按最大值归一化,可以提高推荐的准确度。其研究表明,如果已经得到了物品相似度矩阵w,那么可用如下公式得到归一化之后的相似度矩阵w':最终结果表明,归一化的好处不仅仅在于增加推荐的准确度,它还可以提高推荐的覆盖率和多样性。用这种相似度计算的ItemCF被记为ItemCF-Norm。
这里主要介绍从MongoDB同步数据到ODPS。ruby环境的搭建以及fluent_plugin_mongo_odps插件的安装。1.准备工作1.1安装环境要求Ruby 2.1以上Gem 2.4.5以上1.2 ruby的安装首先查看你的Linux系统是否安装了ruby可以用下面两个命令查询rpm -qa | grep ruby 或yum list | grep ruby1.3 安装一些依赖环境 执行下面的命令:yum install gcc-c++ patch readline readline-devel zlib zlib-devel libyaml-devel libffi-devel openssl-devel make bzip2 autoconf automake libtool bison iconv-devel wget tar 1.4 安装ruby步骤(1) cd ~/(2)wget https://ruby.taobao.org/mirrirs/ruby/ruby-2.2.3.tar.gz(3) tar –zxvf ruby-2.2.3.rar.gz(4) cd ruby-2.2.3(5) ./configure(6) make(7) make install 1.5 修改gem库源由于国内gem库源被墙了,访问有问题,所以将gem库换成淘宝的gem镜像源(1) 查看当前的源地址 输入:gem sources 默认:https://rubygems.org/(2) 删除默认的源地址输入:gem sources –r https://rubygems.org/注:默认的url地址后必须有”/”,否则删不掉。 (3) 添加淘宝的镜像库输入:gem sources -a https://ruby.taobao.org/注:国内使用淘 宝的源比较稳定,且安装或更新网速都比较快(4) 更新源的缓存输入:gem sources –u更新源的缓存后即完成了Ruby的gem源修改。最后输入 gem sources –l 2.安装Fluent-plugin-aliyun-odps通过gem直接下载插件输入:gem install fluent-plugin-aliyun-odps查找默认安装路径输入:whereis ruby(/usr/local/lib/ruby/gems/2.2.0/gems) 3.安装ruby driver执行下面的命令:(A Ruby driver for MongoDB)gem install mango 安装fluent-plugin-mongo 插件: install fluent-plugin-mongo fluent_mongo_odps.conf配置文件 type mongo_tail host 172.16.1.157 port 27017 database test collection mongo_small adapter mongo tag app.mongo #user root #password root # waiting time when there is no next document. default is 1s. wait_time 5 # Convert 'time'(BSON's time) to fluent time(Unix time). time_key time #disable_collection_check true type aliyun_odps aliyun_access_id OERGMhXn6H2mBkhk aliyun_access_key qnuSKMKoMcY5Va97GGFtL0nvlAoLZx aliyun_odps_endpoint http://service.odps.aliyun.com/api aliyun_odps_hub_endpoint http://dh.odps.aliyun.com buffer_chunk_limit 2m buffer_queue_limit 128 flush_interval 5s project dtstack_dev 在fluent的bin目录下启动fluent插件: –c fluent_mysql_odps.conf 执行命令,出现 Successfully import …. 表示同步数据成功 问题1:2016-05-27 11:12:47 +0800 [error]: unexpected error error_class=Fluent::ConfigError error=# 解决方法:这个问题就是导入的数据源mongo_tail只支持capped collection的数据,新建一个capped collection问题就解决了。
Spark与elasticsearch结合使用是一种常用的场景,小编在这里整理了一些Spark与ES结合使用的方法。一、 write data to elasticsearch利用elasticsearch Hadoop可以将任何的RDD保存到Elasticsearch,不过有个前提其内容可以翻译成文件。这意味着RDD需要一个Map/JavaBean/Scala case classScala在Scala中只需要以下几步: Spark Scala imports Elasticsearch-hadoop Scala imports Start Spark through its Scala API makeRDD index content(内容索引) index ES under spark/docs下面是一个例子: Scala用户可能会使用SEQ和→符号声明根对象(即JSON文件)而不是使用Map。而类似的,第一个符号的结果略有不同,不能相匹配的一个JSON文件:序列是一阶序列(换句话说,一个列表),←会创建一个Tuple(元组),或多或少是一个有序的,元素的定数。例如,一个列表的列表不能作为一个文件,因为它不能被映射到一个JSON对象;但是它可以在一个自由的使用。因此在上面的例子Map(K→V)代替SEQ(K→V)作为一种替代上面的隐式导入,elasticsearch-hadoop支持spark的Scala用户通过org.elasticsearch.spark.rdd包作为实用类允许显式方法调用EsSpark。此外,而不是地图(这是方便,但需要一个映射,每个实例,由于它们的结构的差异),使用一个case class: EsSpark importrs 定义一个Case class名叫Trip 利用Trip实例创建一个RDD 明确RDD的index通过EsSpark 例子: 对于指定documents的id(或者其他类似于TTL或时间戳的元数据),可以设置名字为es.mapping.id的映射。下面以前的实例,Elasticsearch利用filed的id作为documents的id.更新RDD的配置configuration(也可以在SparkConf上设置全局的属性,不建议这样做)Writing existing to Elasticsearch如果Rdd的数据已经在Json中,elasticsearch-hadoop允许直接索引而不需要任何转换,数据直接发送到Elasticsearch.这时候elasticsearch-hadoop希望RDD包含字符或者字节数组(string[]/byte[]),假设每个条目代表一个JSON文档。如果RDD没有正确的签名,这savejsontoes方法无法应用(在Scala中他们将不可用)。 Writing to dynamic/multi-resources当被写入ES的数据需要索引不同的buckets,可以利用es.resource.write,下面media的例子配置如下: 用于拆分数据的文档。任何字段都可以被声明(但要确保它在所有的文件中都是可用的) 保存每个对象根据其资源的模式,在这个例子的基础上media_type每个文档或者对象被写入,Elasticsearch Hadoop将提取media_type字段,使用它的值来确定目标资源。 Handling document metadataElasticsearch允许每个文档有自己的元数据(metadata),正如上面所解释的,通过各种映射选项可以自定义这些参数,以便他们的值是从他们的归属文档中提取。我们甚至可以包括/排除哪些部分数据被备份到Elasticsearch,在Spark中,Elasticsearch Hadoop扩展此功能允许将元数据提供的外部文档本身给pair RDDS用。另一方面,对于包含key-value元组的RDDS,metadata可以从作为文档源的key-value中提取。当有更多的Id需要被指定时,可以使用scala.collection.Map来接收 org.elasticsearch.spark.rdd.Metadata的key的类型:当有更多的Id需要被指定时,可以使用ava.util.Map来接收 org.elasticsearch.spark.rdd.Metadata的key的类型:二、 Reading data from elasticsearch读数据需要定义一个EsRDD,将数据流从ES读到Spark该方法可以被重载来指定一个额外的查询或配置图(overriding sparkconf):从Elasticsearch返回的文件,默认情况下,作为一个tuple2,包含第一个元素是文档ID和第二个元素实际文件通过Scala集合来表示,名字类似于Map[Sting,Any],其中key是字段名称和value是各自的值。 elasticsearch-hadoop自动转换spark内置类型作为Elasticsearch类型,如下表:SaprkSQL on support直接看下面的例子:
Spark概念介绍:spark应用程序在集群中以一系列独立的线程运行,通过驱动器程序(Driver Program)发起一系列的并行操作。SparkContext对象作为中间的连接对象,通过SparkContext对象连接集群。SparkContext对象可以连接集群管理器(YARN,Mesos.standalone等) 目前Spark集群支持以下集群管理模式:(1)本地模式(2)Mesos模式: 一种通用的集群管理模式,可以运行Hadoop Mapreduce和应用服务 (3)YARN模式:Hadoop2.0的资源管理模式 Spark的Http Server的启动过程:在SparkContext初始化的过程中创建SparkUI(包含一个bind函数)bind函数中的startJettyServer函数通过Connect启动JettyServer Spark WebUI的页面数据获取:当SparkUI进行初始化操作时,会添加监听(Listener)SparkListener(利用观察者模式),如果监听到Stage和task相关的事件发生,Listener就会收到通知,则对数据进行更新,页面的数据需要手动进行刷新
Spark集群搭建 一、环境说明1、机器:3台虚机(hadoop01/hadoop02/hadoop03)2、Linux版本:CentOS 6.53、JDK版本:1.84、Hadoop版本:hadoop-2.5.25、Spark版本:Spark-1.3.16、Scala版本:scala-2.10.6二、安装步骤1、安装Hadoop,这里不做具体讲解172.16.1.156 hadoop01172.16.1.157 hadoop02172.16.1.158 hadoop032、下载Spark如果是基于Hadoop部署spark,可以对应hadoop的版本下载spark下载地址:http://spark.apache.org/downloads.html3、下载Scala下载地址:http://www.scala-lang.org/download/2.10.6.html 4、安装Scala解压:tar -zxvf scala-2.10.6.tgz 5.安装spark解压:tar -zxvf spark-1.3.1-bin-hadoop2.4.tgz 6、在~/.bash_profile中配置环境变量QQ截图20160427162720.png 7、配置spark_env.sh(Spark运行的环境变量)修改spark_env.sh.template复制为spark_env.shmv spark_env.sh.template spark_env.sh配置以上环境变量export JAVA_HOME=/home/hadoop/jdk1.8export SPARK_MASTER_IP=spark01export SPARK_MASTER_PORT=7077export SPARK_WORKER_CORES=1export SPARK_WORKER_INSTANCES=1 8、复制hadoop01节点配置好的spark到其他节点 scp -r ~/spark-1.3.1-bin-hadoop2.4/ hadoop@hadoop02:~/ scp -r ~/spark-1.3.1-bin-hadoop2.4/ hadoop@hadoop03:~/ 9.spark的web管理界面 :http://172.16.1.156:8080/spark WEBUI界面:http://172.16.1.156:4040/jobs/ spark-shell启动: 到spark的bin目录下执行 ./spark-shell
Spark可以运行在各种集群管理器上,并通过集群管理器访问集群中的其他机器。Spark主要有三种集群管理器,如果只是想让spark运行起来,可以采用spark自带的独立集群管理器,采用独立部署的模式;如果是想让Spark部署在其他集群上,各应用共享集群的话,可以采取两种集群管理器:Hadoop Yarn 或 Apache Mesos。 一、独立集群管理器 Spark独立集群管理器提供的在集群上运行应用的简单方法。要使用集群启动脚本,按照以下步骤执行即可:1、将编译好的Spark发送到集群的其他节点相同的目录下,例如: /home/opt/spark2、设置集群的主节点和其他机器的SSH免密码登录3、编辑主节点的conf/slaves文件,添加上所有的工作节点的主机名4、在主节点上运行sbin/start-all.sh启动集群,可以在http://masternode:8080上看到集群管理界面5、要停止集群,在主节点上运行 sbin/stop-all.sh 二、Hadoop Yarn YARN是Hadoop2.0中引入的集群管理器,可以让多中数据处理框架运行在一个共享的资源池上,而且和Hadoop的分布式存储系统(HDFS)安装在同一个物理节点上。所以让Spark运行在配置了YARN的集群上是一个非常好的选择,这样当Spark程序运行在存储节点上的时候可以快速的访问HDFS中的数据。在Spark中使用YARN的步骤: 1.找到你的Hadoop的配置目录,然后把它设置问环境变量HADOOP_CONF_DIR。export HADOOP_CONF_DIR="..."然后采用如下方式提交作业spark-submit --master yarn yourapp 2、配置资源用量(1) --executor-memory 设置每个执行器的内存用量(2)--executor-cores 设置每个执行器进程从YARN中占用的核心数目(3)--num-wxecutors Spark应用会使用固定数量的执行器节点,默认为2 三、Apache Mesos Mesos是一个通用的集群管理器,既可以运行分析性负载又可以运行长期运行的服务。在Mesos上使用Spark,可以采用以下方式:spark-submit --master mesos://masternode:5050 yourapp 1、Mesos的调度模式Mesos的调度模式分为两种:粗粒度模式和细粒度模式粗粒度模式:只Spark会提前为每个执行器分配固定数量的CPU,而且在任务结束前不会释放这些资源。可以通过设置spark.mesos.coarse为true,开启粗粒度调度模式细粒度模式(默认):执行器进程占用的CPU核心数会在执行任务的过程中动态变化。 2、配置资源用量(1) --executor-memory 设置每个执行器的资源(2) --total-executor-cores 设置应用占用的核心数