Created
October 9, 2015 16:39
-
-
Save anupsavvy/810ca9c36efc55a20275 to your computer and use it in GitHub Desktop.
Load science direct articles and create doc2vec vectors for each one of them.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#coding:utf-8 | |
# Author: Anup Sawant | |
# Purpose: Doc2vec vectors of Science Direct Articles | |
# Created: 9/21/2015 | |
import sys | |
import os | |
import pandas as pd | |
from pandas import Series, DataFrame | |
from pyspark import SparkContext,SparkConf,StorageLevel | |
from pyspark.mllib.feature import Word2Vec | |
import pickle | |
import nltk | |
import gensim, logging | |
from gensim.models import Doc2Vec | |
from ast import literal_eval as make_tuple | |
import random | |
import numpy as np | |
all_articles_path = '/data/home/sd_project/raw/articles.txt' | |
keyword_path = '/data/home/ass198/cleaned_keywords.txt' | |
# cleaned_corpus_path = "/data/home/ass198/rdd_corpus" | |
temp_save = '/data/home/ass198/temp_save' | |
punctuation = """.,?!:;(){}[]""" | |
stopwords = nltk.corpus.stopwords.words('english') | |
LabeledSentence = gensim.models.doc2vec.LabeledSentence | |
logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO) | |
def getSparkContext(): | |
conf = (SparkConf() | |
.setMaster("local[40]") | |
.setAppName("Science Direct Word2Vec") | |
.set("spark.driver.maxResultSize", "30g")) | |
# .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") | |
# .set("spark.kryoserializer.buffer.max","1900m")) | |
sc = SparkContext(conf = conf) | |
return sc | |
def removeNonAscii(s): | |
if (s != None): | |
return "".join(i for i in s if ord(i)<128) | |
else: | |
return "" | |
def detectLanguage(article_words): | |
language_ratios = {} | |
for language in nltk.corpus.stopwords.fileids(): | |
stopwords_set = set(nltk.corpus.stopwords.words(language)) | |
words_set = set(article_words) | |
common_elements = words_set.intersection(stopwords_set) | |
language_ratios[language] = len(common_elements) | |
return max(language_ratios,key=language_ratios.get) | |
def getDataRDD(): | |
sc = getSparkContext() | |
corpus = sc.textFile(all_articles_path) \ | |
.map(lambda line: line.split('\t',1)) \ | |
.map(lambda record: (str(removeNonAscii(record[0])),str(removeNonAscii(record[1])))) \ | |
.filter(lambda record: record[0] != "") \ | |
.filter(lambda record: record[1] != "") | |
return corpus | |
def puncToWords(article): | |
for c in punctuation: | |
return article.replace(c,' %s '%c) | |
def cleanRDDText(corpus): | |
corpus = corpus.map(lambda record : (record[0],puncToWords(record[1]))) \ | |
.map(lambda record : (record[0],[word.lower() for word in record[1].split()])) \ | |
.filter(lambda record: detectLanguage(record[1]) == 'english') \ | |
.map(lambda record : (record[0],[word for word in record[1] if word not in stopwords])) | |
return corpus | |
# get vectors from gensim models | |
def getVecs(model,corpus,size): | |
vecs = [np.array(model.docvecs[z.tags[0]]).reshape((1,size)) for z in corpus] | |
return np.concatenate(vecs) | |
class GetArticles(object): | |
def __init__(self,corpusRDD): | |
self.corpusRDD = corpusRDD | |
# self.parr = range(corpusRDD.getNumPartitions()) | |
def readPartition(self,index): | |
def getPartitionElements(ind,iterator): | |
if index == ind: | |
for x in iterator: | |
yield x | |
return getPartitionElements | |
def __iter__(self): | |
random.shuffle(self.parr) | |
for partition_id in self.parr: | |
print 'Processing partition number %s' % partition_id | |
partition_rdd = self.corpusRDD.mapPartitionsWithIndex(self.readPartition(partition_id),True) | |
labeled_rdd = partition_rdd.map(lambda record : LabeledSentence(words=record[1],tags=['%s' % record[0]])) | |
for article in labeled_rdd.collect(): | |
yield article | |
def getRddItr(corpusRDD): | |
return corpusRDD.toLocalIterator() | |
def tryGensim(cleaned_corpus_path): | |
size = 400 | |
# corpusRDD = cleanRDDText(getDataRDD()) | |
sc = getSparkContext() | |
corpusRDD = sc.textFile(cleaned_corpus_path).map(lambda record : make_tuple(record)) \ | |
.map(lambda record : LabeledSentence(words=record[1],tags=['%s' % record[0]])) | |
corpusRDD.persist(storageLevel=StorageLevel(True, False, False, False, 1)) | |
# # instantiate Distributed memory and Distributed Bag of Words models | |
model_dm = gensim.models.Doc2Vec(min_count=8,window=10,size=size,sample=1e-3,negative=5,workers=40) | |
model_dbow = gensim.models.Doc2Vec(min_count=8,window=10,size=size,sample=1e-3,negative=5,dm=0,workers=40) | |
# articles = GetArticles(corpusRDD) | |
# #build vocab over all articles | |
model_dm.build_vocab(getRddItr(corpusRDD)) | |
model_dbow.build_vocab(getRddItr(corpusRDD)) | |
#pass through the dataset multiple times, | |
#shuffling the articles each time to improve accuracy. | |
for epoch in range(2): | |
# perm = np.random.permutation(np_labeledCorpus.shape[0]) | |
print "Training round : " + str(epoch) | |
model_dm.train(getRddItr(corpusRDD)) | |
model_dbow.train(getRddItr(corpusRDD)) | |
doc_vecs_dm = getVecs(model_dm,getRddItr(corpusRDD),size) | |
doc_vecs_dbow = getVecs(model_dbow,getRddItr(corpusRDD),size) | |
df = DataFrame(np.hstack((doc_vecs_dm,doc_vecs_dbow)),index=[z.tags[0] for z in getRddItr(corpusRDD)]) | |
df.to_csv('/data/home/ass198/vecs.csv') | |
# vecs = np.hstack((doc_vecs_dm,doc_vecs_dbow)) | |
# np.save("/data/home/ass198/vecs.npy",vecs) | |
def main(cleaned_corpus_path): | |
tryGensim(cleaned_corpus_path) | |
if __name__ == '__main__': | |
main(sys.argv[1]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment