Topic Classification using Latent Dirichlet Allocation(LDA) ML Library

import org.apache.spark.sql.SparkSession
val sparkSession = SparkSession.builder
  .master("local")
  .appName("my-spark-app")
  .config("spark.some.config.option", "config-value")
  .getOrCreate()
val df = spark.read.json("dbfs:/mnt/JSON10/JSON/sampleDoc.txt")
import org.apache.spark.sql.SparkSession sparkSession: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@351b4b37 df: org.apache.spark.sql.DataFrame = [filename: string, id: string ... 1 more field]

display(df)
06930000002Uik0AAC_v003.txt06930000002Uik0Getting started is wonderful here
069300000033HZFAA2_v001.txt069300000033HZFre: december fy13 wonderfully explained �
06930000001cVBkAAM_v001.txt06930000001cVBkfitness reimbursement ?? slide number 1

df.drop(df.col("_corrupt_record"))
val splits = df.randomSplit(Array(0.8, 0.2), seed = 11L)
val train = splits(0).cache()
val test = splits(1).cache()
org.apache.spark.sql.AnalysisException: Cannot resolve column name "_corrupt_record" among (filename, id, text);

display(df)
06930000002Uik0AAC_v003.txt06930000002Uik0Getting started is wonderful here
069300000033HZFAA2_v001.txt069300000033HZFre: december fy13 wonderfully explained �
06930000001cVBkAAM_v001.txt06930000001cVBkfitness reimbursement ?? slide number 1

import org.apache.spark.ml.feature.RegexTokenizer
// Set params for RegexTokenizer
val tokenizer = new RegexTokenizer()
.setPattern("[\\W_]+")
.setMinTokenLength(4) // Filter away tokens with length < 4
.setInputCol("text")
.setOutputCol("tokens")
// Tokenize document
val tokenized_df = tokenizer.transform(df)
import org.apache.spark.ml.feature.RegexTokenizer tokenizer: org.apache.spark.ml.feature.RegexTokenizer = regexTok_05b9b2c90cb1 tokenized_df: org.apache.spark.sql.DataFrame = [filename: string, id: string ... 2 more fields]

display(tokenized_df)
06930000002Uik0AAC_v003.txt06930000002Uik0Getting started is wonderful here["getting","started","wonderful","here"]
069300000033HZFAA2_v001.txt069300000033HZFre: december fy13 wonderfully explained �["december","fy13","wonderfully","explained"]
06930000001cVBkAAM_v001.txt06930000001cVBkfitness reimbursement ?? slide number 1["fitness","reimbursement","slide","number"]

display(tokenized_df.select("tokens"))
["getting","started","wonderful","here"]
["december","fy13","wonderfully","explained"]
["fitness","reimbursement","slide","number"]

%sh wget http://ir.dcs.gla.ac.uk/resources/linguistic_utils/stop_words -O /tmp/stopwords
--2016-08-11 04:48:37-- http://ir.dcs.gla.ac.uk/resources/linguistic_utils/stop_words Resolving ir.dcs.gla.ac.uk (ir.dcs.gla.ac.uk)... 130.209.240.253 Connecting to ir.dcs.gla.ac.uk (ir.dcs.gla.ac.uk)|130.209.240.253|:80... connected. HTTP request sent, awaiting response... 200 OK Length: 2237 (2.2K) [text/plain] Saving to: ?/tmp/stopwords? 0K .. 100% 350M=0s 2016-08-11 04:48:37 (350 MB/s) - ?/tmp/stopwords? saved [2237/2237]

%fs cp file:/tmp/stopwords dbfs:/tmp/stopwords
res53: Boolean = true

val stopwords = sc.textFile("/tmp/stopwords").collect()
stopwords: Array[String] = Array(a, about, above, across, after, afterwards, again, against, all, almost, alone, along, already, also, although, always, am, among, amongst, amoungst, amount, an, and, another, any, anyhow, anyone, anything, anyway, anywhere, are, around, as, at, back, be, became, because, become, becomes, becoming, been, before, beforehand, behind, being, below, beside, besides, between, beyond, bill, both, bottom, but, by, call, can, cannot, cant, co, computer, con, could, couldnt, cry, de, describe, detail, do, done, down, due, during, each, eg, eight, either, eleven, else, elsewhere, empty, enough, etc, even, ever, every, everyone, everything, everywhere, except, few, fifteen, fify, fill, find, fire, first, five, for, former, formerly, forty, found, four, from, front, full, further, get, give, go, had, has, hasnt, have, he, hence, her, here, hereafter, hereby, herein, hereupon, hers, herself, him, himself, his, how, however, hundred, i, ie, if, in, inc, indeed, interest, into, is, it, its, itself, keep, last, latter, latterly, least, less, ltd, made, many, may, me, meanwhile, might, mill, mine, more, moreover, most, mostly, move, much, must, my, myself, name, namely, neither, never, nevertheless, next, nine, no, nobody, none, noone, nor, not, nothing, now, nowhere, of, off, often, on, once, one, only, onto, or, other, others, otherwise, our, ours, ourselves, out, over, own, part, per, perhaps, please, put, rather, re, same, see, seem, seemed, seeming, seems, serious, several, she, should, show, side, since, sincere, six, sixty, so, some, somehow, someone, something, sometime, sometimes, somewhere, still, such, system, take, ten, than, that, the, their, them, themselves, then, thence, there, thereafter, thereby, therefore, therein, thereupon, these, they, thick, thin, third, this, those, though, three, through, throughout, thru, thus, to, together, too, top, toward, towards, twelve, twenty, two, un, under, until, up, upon, us, very, via, was, we, well, were, what, whatever, when, whence, whenever, where, whereafter, whereas, whereby, wherein, whereupon, wherever, whether, which, while, whither, who, whoever, whole, whom, whose, why, will, with, within, without, would, yet, you, your, yours, yourself, yourselves)

import org.apache.spark.ml.feature.StopWordsRemover
// Set params for StopWordsRemover
val remover = new StopWordsRemover()
.setStopWords(stopwords) // This parameter is optional
.setInputCol("tokens")
.setOutputCol("filtered")
// Create new DF with Stopwords removed
val filtered_df = remover.transform(tokenized_df)
import org.apache.spark.ml.feature.StopWordsRemover remover: org.apache.spark.ml.feature.StopWordsRemover = stopWords_783ff1555eb7 filtered_df: org.apache.spark.sql.DataFrame = [filename: string, id: string ... 3 more fields]

display(filtered_df)
06930000002Uik0AAC_v003.txt06930000002Uik0Getting started is wonderful here["getting","started","wonderful","here"]["getting","started","wonderful"]
069300000033HZFAA2_v001.txt069300000033HZFre: december fy13 wonderfully explained �["december","fy13","wonderfully","explained"]["december","fy13","wonderfully","explained"]
06930000001cVBkAAM_v001.txt06930000001cVBkfitness reimbursement ?? slide number 1["fitness","reimbursement","slide","number"]["fitness","reimbursement","slide","number"]

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.Row
import org.apache.spark.ml.feature.CountVectorizer
// Set params for CountVectorizer
val vectorizer = new CountVectorizer()
.setInputCol("filtered")
.setOutputCol("features")
.fit(filtered_df)
import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.sql.Row import org.apache.spark.ml.feature.CountVectorizer vectorizer: org.apache.spark.ml.feature.CountVectorizerModel = cntVec_663ad4846e39

vectorizer.transform(filtered_df).select("id", "text","features", "filtered").show()
+---------------+--------------------+--------------------+--------------------+ | id| text| features| filtered| +---------------+--------------------+--------------------+--------------------+ |06930000002Uik0|Getting started i...|(11,[2,3,6],[1.0,...|[getting, started...| |069300000033HZF|re: december fy13...|(11,[5,7,8,10],[1...|[december, fy13, ...| |06930000001cVBk|fitness reimburse...|(11,[0,1,4,9],[1....|[fitness, reimbur...| +---------------+--------------------+--------------------+--------------------+

import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
val countVectors = vectorizer.transform(filtered_df)
import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.linalg.Vectors countVectors: org.apache.spark.sql.DataFrame = [filename: string, id: string ... 4 more fields]

countVectors.show
+--------------------+---------------+--------------------+--------------------+--------------------+--------------------+ | filename| id| text| tokens| filtered| features| +--------------------+---------------+--------------------+--------------------+--------------------+--------------------+ |06930000002Uik0AA...|06930000002Uik0|Getting started i...|[getting, started...|[getting, started...|(11,[2,3,6],[1.0,...| |069300000033HZFAA...|069300000033HZF|re: december fy13...|[december, fy13, ...|[december, fy13, ...|(11,[5,7,8,10],[1...| |06930000001cVBkAA...|06930000001cVBk|fitness reimburse...|[fitness, reimbur...|[fitness, reimbur...|(11,[0,1,4,9],[1....| +--------------------+---------------+--------------------+--------------------+--------------------+--------------------+

import org.apache.spark.ml.clustering.DistributedLDAModel
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.types.{StructField, StructType}
val lda = new LDA()
  .setK(3)
  .setMaxIter(10)
  .setFeaturesCol("features")
import org.apache.spark.ml.clustering.DistributedLDAModel import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.types.{StructField, StructType} lda: org.apache.spark.ml.clustering.LDA = lda_f977d0e6a90e

val transformed = model.transform(countVectors)
transformed: org.apache.spark.sql.DataFrame = [filename: string, id: string ... 5 more fields]

val ll = model.logLikelihood(countVectors)
val lp = model.logPerplexity(countVectors)
// describeTopics
val topics = model.describeTopics(10)
// Shows the result
topics.show(false)
transformed.show(false)
+-----+-------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |topic|termIndices |termWeights | +-----+-------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |0 |[2, 5, 7, 9, 4, 10, 1, 0, 3, 8]|[0.1062041440811267, 0.10583753784271348, 0.10442749933639424, 0.09682858305200294, 0.09351340983989168, 0.09131890650704068, 0.08714867387466924, 0.08070231248484354, 0.0802804983696842, 0.07701228167379913] | |1 |[1, 6, 2, 5, 0, 3, 8, 9, 4, 7] |[0.10195178268898618, 0.09823618268931927, 0.09637471483627483, 0.09537821409096363, 0.09317671877781553, 0.08903558963685265, 0.08890389418467605, 0.08631694410851917, 0.08621391878142007, 0.08438517259595037]| |2 |[1, 9, 4, 2, 5, 10, 3, 8, 6, 7]|[0.10612398716703186, 0.09757374578910046, 0.09660179632115497, 0.09375920815595054, 0.09318237101826143, 0.09050611023493967, 0.08978021671904904, 0.08546034203857293, 0.08355203917046891, 0.08177244097296857]| +-----+-------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +---------------------------+---------------+-------------------------------------------+----------------------------------------+----------------------------------------+---------------------------------+------------------------------------------------------------+ |filename |id |text |tokens |filtered |features |topicDistribution | +---------------------------+---------------+-------------------------------------------+----------------------------------------+----------------------------------------+---------------------------------+------------------------------------------------------------+ |06930000002Uik0AAC_v003.txt|06930000002Uik0|Getting started is wonderful here |[getting, started, wonderful, here] |[getting, started, wonderful] |(11,[2,3,6],[1.0,1.0,1.0]) |[0.10318703949282229,0.7938521983720901,0.10296076213508765]| |069300000033HZFAA2_v001.txt|069300000033HZF|re: december fy13 wonderfully explained �|[december, fy13, wonderfully, explained]|[december, fy13, wonderfully, explained]|(11,[5,7,8,10],[1.0,1.0,1.0,1.0])|[0.835109741017235,0.08292735466870861,0.08196290431405635] | |06930000001cVBkAAM_v001.txt|06930000001cVBk|fitness reimbursement ?? slide number 1|[fitness, reimbursement, slide, number] |[fitness, reimbursement, slide, number] |(11,[0,1,4,9],[1.0,1.0,1.0,1.0]) |[0.08429553958886034,0.08695919972860706,0.8287452606825326]| +---------------------------+---------------+-------------------------------------------+----------------------------------------+----------------------------------------+---------------------------------+------------------------------------------------------------+ ll: Double = -46.015729151104544 lp: Double = 4.183248263936846 topics: org.apache.spark.sql.DataFrame = [topic: int, termIndices: array<int> ... 1 more field]

Comments

Popular posts from this blog

Firebase authentication with Ionic creator

Big Data - SWOT Analysis

LINKEDIN api call using NODE.JS OAUTH module