Skip to content

Instantly share code, notes, and snippets.

@anupsavvy
Created October 9, 2015 16:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anupsavvy/810ca9c36efc55a20275 to your computer and use it in GitHub Desktop.
Save anupsavvy/810ca9c36efc55a20275 to your computer and use it in GitHub Desktop.
Load science direct articles and create doc2vec vectors for each one of them.
#coding:utf-8
# Author: Anup Sawant
# Purpose: Doc2vec vectors of Science Direct Articles
# Created: 9/21/2015
import sys
import os
import pandas as pd
from pandas import Series, DataFrame
from pyspark import SparkContext,SparkConf,StorageLevel
from pyspark.mllib.feature import Word2Vec
import pickle
import nltk
import gensim, logging
from gensim.models import Doc2Vec
from ast import literal_eval as make_tuple
import random
import numpy as np
all_articles_path = '/data/home/sd_project/raw/articles.txt'
keyword_path = '/data/home/ass198/cleaned_keywords.txt'
# cleaned_corpus_path = "/data/home/ass198/rdd_corpus"
temp_save = '/data/home/ass198/temp_save'
punctuation = """.,?!:;(){}[]"""
stopwords = nltk.corpus.stopwords.words('english')
LabeledSentence = gensim.models.doc2vec.LabeledSentence
logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)
def getSparkContext():
conf = (SparkConf()
.setMaster("local[40]")
.setAppName("Science Direct Word2Vec")
.set("spark.driver.maxResultSize", "30g"))
# .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
# .set("spark.kryoserializer.buffer.max","1900m"))
sc = SparkContext(conf = conf)
return sc
def removeNonAscii(s):
if (s != None):
return "".join(i for i in s if ord(i)<128)
else:
return ""
def detectLanguage(article_words):
language_ratios = {}
for language in nltk.corpus.stopwords.fileids():
stopwords_set = set(nltk.corpus.stopwords.words(language))
words_set = set(article_words)
common_elements = words_set.intersection(stopwords_set)
language_ratios[language] = len(common_elements)
return max(language_ratios,key=language_ratios.get)
def getDataRDD():
sc = getSparkContext()
corpus = sc.textFile(all_articles_path) \
.map(lambda line: line.split('\t',1)) \
.map(lambda record: (str(removeNonAscii(record[0])),str(removeNonAscii(record[1])))) \
.filter(lambda record: record[0] != "") \
.filter(lambda record: record[1] != "")
return corpus
def puncToWords(article):
for c in punctuation:
return article.replace(c,' %s '%c)
def cleanRDDText(corpus):
corpus = corpus.map(lambda record : (record[0],puncToWords(record[1]))) \
.map(lambda record : (record[0],[word.lower() for word in record[1].split()])) \
.filter(lambda record: detectLanguage(record[1]) == 'english') \
.map(lambda record : (record[0],[word for word in record[1] if word not in stopwords]))
return corpus
# get vectors from gensim models
def getVecs(model,corpus,size):
vecs = [np.array(model.docvecs[z.tags[0]]).reshape((1,size)) for z in corpus]
return np.concatenate(vecs)
class GetArticles(object):
def __init__(self,corpusRDD):
self.corpusRDD = corpusRDD
# self.parr = range(corpusRDD.getNumPartitions())
def readPartition(self,index):
def getPartitionElements(ind,iterator):
if index == ind:
for x in iterator:
yield x
return getPartitionElements
def __iter__(self):
random.shuffle(self.parr)
for partition_id in self.parr:
print 'Processing partition number %s' % partition_id
partition_rdd = self.corpusRDD.mapPartitionsWithIndex(self.readPartition(partition_id),True)
labeled_rdd = partition_rdd.map(lambda record : LabeledSentence(words=record[1],tags=['%s' % record[0]]))
for article in labeled_rdd.collect():
yield article
def getRddItr(corpusRDD):
return corpusRDD.toLocalIterator()
def tryGensim(cleaned_corpus_path):
size = 400
# corpusRDD = cleanRDDText(getDataRDD())
sc = getSparkContext()
corpusRDD = sc.textFile(cleaned_corpus_path).map(lambda record : make_tuple(record)) \
.map(lambda record : LabeledSentence(words=record[1],tags=['%s' % record[0]]))
corpusRDD.persist(storageLevel=StorageLevel(True, False, False, False, 1))
# # instantiate Distributed memory and Distributed Bag of Words models
model_dm = gensim.models.Doc2Vec(min_count=8,window=10,size=size,sample=1e-3,negative=5,workers=40)
model_dbow = gensim.models.Doc2Vec(min_count=8,window=10,size=size,sample=1e-3,negative=5,dm=0,workers=40)
# articles = GetArticles(corpusRDD)
# #build vocab over all articles
model_dm.build_vocab(getRddItr(corpusRDD))
model_dbow.build_vocab(getRddItr(corpusRDD))
#pass through the dataset multiple times,
#shuffling the articles each time to improve accuracy.
for epoch in range(2):
# perm = np.random.permutation(np_labeledCorpus.shape[0])
print "Training round : " + str(epoch)
model_dm.train(getRddItr(corpusRDD))
model_dbow.train(getRddItr(corpusRDD))
doc_vecs_dm = getVecs(model_dm,getRddItr(corpusRDD),size)
doc_vecs_dbow = getVecs(model_dbow,getRddItr(corpusRDD),size)
df = DataFrame(np.hstack((doc_vecs_dm,doc_vecs_dbow)),index=[z.tags[0] for z in getRddItr(corpusRDD)])
df.to_csv('/data/home/ass198/vecs.csv')
# vecs = np.hstack((doc_vecs_dm,doc_vecs_dbow))
# np.save("/data/home/ass198/vecs.npy",vecs)
def main(cleaned_corpus_path):
tryGensim(cleaned_corpus_path)
if __name__ == '__main__':
main(sys.argv[1])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment