Spark机器学习8· 文本处理(spark-shell)

简介: ![](http://img3.douban.com/lpic/s28277325.jpg) [Spark机器学习](http://book.douban.com/subject/26593179/) 自然语言处理(NLP,Natural Language Processing) - 提取...


Spark机器学习

自然语言处理(NLP,Natural Language Processing)

  • 提取特征
  • 建模
  • 机器学习

TF-IDF(词频 term frequency–逆向文件频率 inverse document frequency)

  • 短语加权:根据词频,为单词赋予权值
  • 特征哈希:使用哈希方程对特征赋予向量下标

0 运行环境

tar xfvz 20news-bydate.tar.gz

export SPARK_HOME=/Users/erichan/Garden/spark-1.5.1-bin-hadoop2.6
cd $SPARK_HOME
bin/spark-shell --name my_mlib --packages org.jblas:jblas:1.2.4-SNAPSHOT --driver-memory 4G --executor-memory 4G --driver-cores 2

1 提取特征

val PATH = "/Users/erichan/sourcecode/book/Spark机器学习/20news-bydate"
val path = PATH+"/20news-bydate-train/*"
val rdd = sc.wholeTextFiles(path)
println(rdd.count)

11314

查看新闻组主题

val newsgroups = rdd.map { case (file, text) => file.split("/").takeRight(2).head }
val countByGroup = newsgroups.map(n => (n, 1)).reduceByKey(_ + _).collect.sortBy(-_._2).mkString("\n")
println(countByGroup)

(rec.sport.hockey,600)

(soc.religion.christian,599)
(rec.motorcycles,598)
(rec.sport.baseball,597)
(sci.crypt,595)
(rec.autos,594)
(sci.med,594)
(comp.windows.x,593)
(sci.space,593)
(sci.electronics,591)
(comp.os.ms-windows.misc,591)
(comp.sys.ibm.pc.hardware,590)
(misc.forsale,585)
(comp.graphics,584)
(comp.sys.mac.hardware,578)
(talk.politics.mideast,564)
(talk.politics.guns,546)
(alt.atheism,480)
(talk.politics.misc,465)
(talk.religion.misc,377)

2 建模

2.1 分词

val text = rdd.map { case (file, text) => text }
val whiteSpaceSplit = text.flatMap(t => t.split(" ").map(_.toLowerCase))
println(whiteSpaceSplit.distinct.count)
println(whiteSpaceSplit.sample(true, 0.3, 42).take(100).mkString(","))

402978
from:,mathew,mathew,faq:,faq:,atheist,resources

summary:,music,--,fiction,,mantis,consultants,,uk.
supersedes:,290

archive-name:,1.0

,,,,,,,,,,,,,,,,,,,organizations

,organizations

,,,,,,,,,,,,,,,,stickers,and,and,the,from,from,in,to:,to:,ffrf,,256-8900

evolution,designs

evolution,a,stick,cars,,written
inside.,fish,us.

write,evolution,,,,,,,bay,can,get,get,,to,the
price,is,of,the,the,so,on.,and,foote.,,atheist,pp.,0-910309-26-4,,,atrocities,,foote:,aap.,,the

2.2 改进分词

val nonWordSplit = text.flatMap(t => t.split("""\W+""").map(_.toLowerCase))
println(nonWordSplit.distinct.count)
println(nonWordSplit.distinct.sample(true, 0.3, 42).take(100).mkString(","))
val regex = """[^0-9]*""".r
val filterNumbers = nonWordSplit.filter(token => regex.pattern.matcher(token).matches)
println(filterNumbers.distinct.count)
println(filterNumbers.distinct.sample(true, 0.3, 42).take(100).mkString(","))

2.3 移除停用词

val tokenCounts = filterNumbers.map(t => (t, 1)).reduceByKey(_ + _)
val oreringDesc = Ordering.by[(String, Int), Int](_._2)
//println(tokenCounts.top(20)(oreringDesc).mkString("\n"))

val stopwords = Set(
    "the","a","an","of","or","in","for","by","on","but", "is", "not", "with", "as", "was", "if",
    "they", "are", "this", "and", "it", "have", "from", "at", "my", "be", "that", "to"
)
val tokenCountsFilteredStopwords = tokenCounts.filter { case (k, v) => !stopwords.contains(k) }
//println(tokenCountsFilteredStopwords.top(20)(oreringDesc).mkString("\n"))

val tokenCountsFilteredSize = tokenCountsFilteredStopwords.filter { case (k, v) => k.size >= 2 }
println(tokenCountsFilteredSize.top(20)(oreringDesc).mkString("\n"))

2.4 移除低频词

val oreringAsc = Ordering.by[(String, Int), Int](-_._2)
//println(tokenCountsFilteredSize.top(20)(oreringAsc).mkString("\n"))

val rareTokens = tokenCounts.filter{ case (k, v) => v < 2 }.map { case (k, v) => k }.collect.toSet
val tokenCountsFilteredAll = tokenCountsFilteredSize.filter { case (k, v) => !rareTokens.contains(k) }
println(tokenCountsFilteredAll.top(20)(oreringAsc).mkString("\n"))

def tokenize(line: String): Seq[String] = {
    line.split("""\W+""")
        .map(_.toLowerCase)
        .filter(token => regex.pattern.matcher(token).matches)
        .filterNot(token => stopwords.contains(token))
        .filterNot(token => rareTokens.contains(token))
        .filter(token => token.size >= 2)
        .toSeq
}

//println(text.flatMap(doc => tokenize(doc)).distinct.count)
val tokens = text.map(doc => tokenize(doc))
println(tokens.first.take(20))

2.5 提取词干

  • 标准NLP方法
  • 搜索引擎

    • NLTK
    • OpenNLP
    • Lucene

3 训练模型

3.1 HashingTF 特征哈希

import org.apache.spark.mllib.linalg.{ SparseVector => SV }
import org.apache.spark.mllib.feature.HashingTF
import org.apache.spark.mllib.feature.IDF
// set the dimensionality of TF-IDF vectors to 2^18
val dim = math.pow(2, 18).toInt
val hashingTF = new HashingTF(dim)
val tf = hashingTF.transform(tokens)
tf.cache
val v = tf.first.asInstanceOf[SV]
println(v.size)
println(v.values.size)
println(v.values.take(10).toSeq)
println(v.indices.take(10).toSeq)

262144

706
WrappedArray(1.0, 1.0, 1.0, 1.0, 2.0, 1.0, 1.0, 2.0, 1.0, 1.0)
WrappedArray(313, 713, 871, 1202, 1203, 1209, 1795, 1862, 3115, 3166)

fit & transform

val idf = new IDF().fit(tf)
val tfidf = idf.transform(tf)
val v2 = tfidf.first.asInstanceOf[SV]
println(v2.values.size)
println(v2.values.take(10).toSeq)
println(v2.indices.take(10).toSeq)

706
WrappedArray(2.3869085659322193, 4.670445463955571, 6.561295835827856, 4.597686109673142, 8.932700215224111, 5.750365619611528, 2.1871123786150006, 5.520408782213984, 3.4312512246662714, 1.7430324343790569)
WrappedArray(313, 713, 871, 1202, 1203, 1209, 1795, 1862, 3115, 3166)

3.2 分析权重

val minMaxVals = tfidf.map { v =>
    val sv = v.asInstanceOf[SV]
    (sv.values.min, sv.values.max)
}
val globalMinMax = minMaxVals.reduce { case ((min1, max1), (min2, max2)) =>
    (math.min(min1, min2), math.max(max1, max2))
}
println(globalMinMax)

globalMinMax: (Double, Double) = (0.0,66155.39470409753)

常用词

val common = sc.parallelize(Seq(Seq("you", "do", "we")))
val tfCommon = hashingTF.transform(common)
val tfidfCommon = idf.transform(tfCommon)
val commonVector = tfidfCommon.first.asInstanceOf[SV]
println(commonVector.values.toSeq)

WrappedArray(0.9965359935704624, 1.3348773448236835, 0.5457486182039175)

不常出现的单词

val uncommon = sc.parallelize(Seq(Seq("telescope", "legislation", "investment")))
val tfUncommon = hashingTF.transform(uncommon)
val tfidfUncommon = idf.transform(tfUncommon)
val uncommonVector = tfidfUncommon.first.asInstanceOf[SV]
println(uncommonVector.values.toSeq)

WrappedArray(5.3265513728351666, 5.308532867332488, 5.483736956357579)

4 使用模型

4.1 余弦相似度

import breeze.linalg._

val hockeyText = rdd.filter { case (file, text) => file.contains("hockey") }
val hockeyTF = hockeyText.mapValues(doc => hashingTF.transform(tokenize(doc)))
val hockeyTfIdf = idf.transform(hockeyTF.map(_._2))

val hockey1 = hockeyTfIdf.sample(true, 0.1, 42).first.asInstanceOf[SV]
val breeze1 = new SparseVector(hockey1.indices, hockey1.values, hockey1.size)

val hockey2 = hockeyTfIdf.sample(true, 0.1, 43).first.asInstanceOf[SV]
val breeze2 = new SparseVector(hockey2.indices, hockey2.values, hockey2.size)
val cosineSim = breeze1.dot(breeze2) / (norm(breeze1) * norm(breeze2))
println(cosineSim)

cosineSim: Double = 0.060250114361164626

val graphicsText = rdd.filter { case (file, text) => file.contains("comp.graphics") }
val graphicsTF = graphicsText.mapValues(doc => hashingTF.transform(tokenize(doc)))
val graphicsTfIdf = idf.transform(graphicsTF.map(_._2))
val graphics = graphicsTfIdf.sample(true, 0.1, 42).first.asInstanceOf[SV]
val breezeGraphics = new SparseVector(graphics.indices, graphics.values, graphics.size)
val cosineSim2 = breeze1.dot(breezeGraphics) / (norm(breeze1) * norm(breezeGraphics))
println(cosineSim2)

cosineSim2: Double = 0.004664850323792852

val baseballText = rdd.filter { case (file, text) => file.contains("baseball") }
val baseballTF = baseballText.mapValues(doc => hashingTF.transform(tokenize(doc)))
val baseballTfIdf = idf.transform(baseballTF.map(_._2))
val baseball = baseballTfIdf.sample(true, 0.1, 42).first.asInstanceOf[SV]
val breezeBaseball = new SparseVector(baseball.indices, baseball.values, baseball.size)
val cosineSim3 = breeze1.dot(breezeBaseball) / (norm(breeze1) * norm(breezeBaseball))
println(cosineSim3)

0.05047395039466008

4.2 学习单词与主题的映射关系

多分类映射
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.classification.NaiveBayes
import org.apache.spark.mllib.evaluation.MulticlassMetrics

val newsgroupsMap = newsgroups.distinct.collect().zipWithIndex.toMap
val zipped = newsgroups.zip(tfidf)
val train = zipped.map { case (topic, vector) => LabeledPoint(newsgroupsMap(topic), vector) }
train.cache
朴素贝叶斯训练
val model = NaiveBayes.train(train, lambda = 0.1)
加载测试数据集
val testPath = PATH+"/20news-bydate-test/*"
val testRDD = sc.wholeTextFiles(testPath)
val testLabels = testRDD.map { case (file, text) =>
    val topic = file.split("/").takeRight(2).head
    newsgroupsMap(topic)
}
val testTf = testRDD.map { case (file, text) => hashingTF.transform(tokenize(text)) }
val testTfIdf = idf.transform(testTf)
val zippedTest = testLabels.zip(testTfIdf)
val test = zippedTest.map { case (topic, vector) => LabeledPoint(topic, vector) }
计算准确度和多分类加权F-指标
val predictionAndLabel = test.map(p => (model.predict(p.features), p.label))
val accuracy = 1.0 * predictionAndLabel.filter(x => x._1 == x._2).count() / test.count()
println(accuracy)

0.7915560276155071

val metrics = new MulticlassMetrics(predictionAndLabel)
println(metrics.weightedFMeasure)

0.7810675969031116

5 评估

val rawTokens = rdd.map { case (file, text) => text.split(" ") }
val rawTF = rawTokens.map(doc => hashingTF.transform(doc))
val rawTrain = newsgroups.zip(rawTF).map { case (topic, vector) => LabeledPoint(newsgroupsMap(topic), vector) }
val rawModel = NaiveBayes.train(rawTrain, lambda = 0.1)
val rawTestTF = testRDD.map { case (file, text) => hashingTF.transform(text.split(" ")) }
val rawZippedTest = testLabels.zip(rawTestTF)
val rawTest = rawZippedTest.map { case (topic, vector) => LabeledPoint(topic, vector) }
val rawPredictionAndLabel = rawTest.map(p => (rawModel.predict(p.features), p.label))
val rawAccuracy = 1.0 * rawPredictionAndLabel.filter(x => x._1 == x._2).count() / rawTest.count()
println(rawAccuracy)

0.7648698884758365

val rawMetrics = new MulticlassMetrics(rawPredictionAndLabel)
println(rawMetrics.weightedFMeasure)

0.7653320418573546

6 Word2Vec模型

Word2Vec模型(分布向量表示):把每个单词表示成一个向量,MLlib中使用skip-gram模型

6.1 训练

import org.apache.spark.mllib.feature.Word2Vec
val word2vec = new Word2Vec()
word2vec.setSeed(42) // we do this to generate the same results each time
val word2vecModel = word2vec.fit(tokens)

6.2 使用

最相似的20个单词
word2vecModel.findSynonyms("hockey", 20).foreach(println)

(sport,1.4818968962277133)

(ecac,1.467546566194254)
(hispanic,1.4166835301985194)
(glens,1.4061103042432825)
(woofers,1.3810090447028116)
(tournament,1.3148823031671586)
(champs,1.3133863003013941)
(boxscores,1.307735040384543)
(aargh,1.274986851270267)
(ahl,1.265165428167253)
(playoff,1.2645991118770572)
(ncaa,1.2383382015648046)
(pool,1.2261154635870224)
(champion,1.2119919989539134)
(filinuk,1.2062208620660915)
(olympic,1.2026738930160243)
(motorcycles,1.2008032355579679)
(yankees,1.1989755767973371)
(calder,1.194001886835493)
(homeruns,1.1800625883573932)

word2vecModel.findSynonyms("legislation", 20).foreach(println)

(accommodates,0.9918184454068688)

(briefed,0.9256758135452989)
(amended,0.9105987267173344)
(telephony,0.8679173760123956)
(pitted,0.8609974033962533)
(aclu,0.8605885863332372)
(licensee,0.8493930472487975)
(agency,0.836706135804648)
(policies,0.8337986602365566)
(senate,0.8327312936220903)
(businesses,0.8291191155630467)
(permit,0.8266658804181389)
(cpsr,0.8231228090944367)
(cooperation,0.8195562469006543)
(surveillance,0.8134342524628756)
(congress,0.8132899468772855)
(restricted,0.8115013134507126)
(procure,0.8096839595766356)
(inquiry,0.8086297702914405)
(industry,0.8077900093754752)

  • legislation 立法
  • aclu 美国公民自由协会
  • senate 参议院
  • surveillance 监视
  • inquiry 调查
目录
相关文章
|
3月前
|
Shell 数据处理 C++
【震撼揭秘】Python正则VS Shell正则:一场跨越编程边界的史诗级对决!你绝不能错过的精彩较量,带你领略文本处理的极致魅力!
【8月更文挑战第19天】正则表达式是文本处理的强大工具,在Python与Shell中有广泛应用。两者虽语法各异,但仍共享许多基本元素,如`.`、`*`及`[]`等。Python通过`re`模块支持丰富的功能,如非捕获组及命名捕获组;而Shell则依赖`grep`、`sed`和`awk`等命令实现类似效果。尽管Python提供了更高级的特性和函数,Shell在处理文本文件方面仍有其独特优势。选择合适工具需根据具体需求和个人偏好决定。
38 1
|
3月前
|
Shell Linux 数据处理
Shell文本处理秘籍:掌握这11种工具,让数据处理如虎添翼!
在Linux信息环境,文本数据处理是一项基础而重要的技能。对于广大Linux用户而言,掌握Shell环境下的文本处理工具不仅能提高工作效率,还能在面对复杂数据时游刃有余。 本文将介绍,那些在文本处理中不可或缺的`11`种命令工具,让数据处理技能更上一层楼。
|
4月前
|
Shell Linux C语言
|
6月前
|
机器学习/深度学习 数据采集 分布式计算
【机器学习】Spark ML 对数据进行规范化预处理 StandardScaler 与向量拆分
标准化Scaler是数据预处理技术,用于将特征值映射到均值0、方差1的标准正态分布,以消除不同尺度特征的影响,提升模型稳定性和精度。Spark ML中的StandardScaler实现此功能,通过`.setInputCol`、`.setOutputCol`等方法配置并应用到DataFrame数据。示例展示了如何在Spark中使用StandardScaler进行数据规范化,包括创建SparkSession,构建DataFrame,使用VectorAssembler和StandardScaler,以及将向量拆分为列。规范化有助于降低特征重要性,提高模型训练速度和计算效率。
123 6
|
6月前
|
机器学习/深度学习 分布式计算 算法
【机器学习】Spark ML 对数据特征进行 One-Hot 编码
One-Hot 编码是机器学习中将离散特征转换为数值表示的方法,每个取值映射为一个二进制向量,常用于避免特征间大小关系影响模型。Spark ML 提供 OneHotEncoder 进行编码,输入输出列可通过 `inputCol` 和 `outputCol` 参数设置。在示例中,先用 StringIndexer 对类别特征编码,再用 OneHotEncoder 转换,最后展示编码结果。注意 One-Hot 编码可能导致高维问题,可结合实际情况选择编码方式。
80 6
|
5月前
|
机器学习/深度学习 分布式计算 API
技术好文:Spark机器学习笔记一
技术好文:Spark机器学习笔记一
40 0
|
6月前
|
机器学习/深度学习 分布式计算 算法
使用Spark进行机器学习
【5月更文挑战第2天】使用Spark进行机器学习
71 2
|
6月前
|
机器学习/深度学习 Shell Perl
第七章 Shell文本处理三剑客之sed
第七章 Shell文本处理三剑客之sed
|
6月前
|
Shell Linux C语言
【Shell 命令集合 文本处理工具】Linux 字段连接 join 命令使用指南
【Shell 命令集合 文本处理工具】Linux 字段连接 join 命令使用指南
90 1
|
6月前
|
存储 Shell 索引
第七章 Shell文本处理三剑客之awk
第七章 Shell文本处理三剑客之awk
下一篇
无影云桌面