Skip to content

Instantly share code, notes, and snippets.

@omendezmorales
Created January 14, 2019 18:56
Show Gist options
  • Save omendezmorales/3ac8dceb680f11decfc6d3517bb83275 to your computer and use it in GitHub Desktop.
Save omendezmorales/3ac8dceb680f11decfc6d3517bb83275 to your computer and use it in GitHub Desktop.
Created on Cognitive Class Labs
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"<a href=\"https://cognitiveclass.ai\"><img src = \"https://ibm.box.com/shared/static/9gegpsmnsoo25ikkbl4qzlvlyjbgxs5x.png\" width = 400> </a>"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"## Analyzing a log file\n",
"\n",
"First, let's download the data that we will working with in this lab."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Data Downloaded!\n"
]
}
],
"source": [
"# download the data from the IBM server\n",
"# this may take ~30 seconds depending on your interent speed\n",
"!wget --quiet https://ibm.box.com/shared/static/j8skrriqeqw66f51iyz911zyqai64j2g.zip\n",
"print(\"Data Downloaded!\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Data Extracted!\n"
]
}
],
"source": [
"# unzip the folder's content into \"resources\" directory\n",
"# this may take ~30 seconds depending on your internet speed\n",
"!unzip -q -o -d /resources/jupyterlab/labs/BD0211EN/ j8skrriqeqw66f51iyz911zyqai64j2g.zip\n",
"print(\"Data Extracted!\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
".\n",
"..\n",
".DS_Store\n",
"README.md\n",
"followers.txt\n",
"notebook.log\n",
"nyctaxi.csv\n",
"nyctaxi100.csv\n",
"nyctaxisub.csv\n",
"nycweather.csv\n",
"pom.xml\n",
"taxistreams.py\n",
"users.txt\n"
]
}
],
"source": [
"# list the extracted files\n",
"!ls -1ha /resources/jupyterlab/labs/BD0211EN/LabData/\n",
"#!cat /resources/jupyterlab/labs/BD0211EN/LabData/notebook.log"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"['__add__',\n",
" '__class__',\n",
" '__delattr__',\n",
" '__dict__',\n",
" '__dir__',\n",
" '__doc__',\n",
" '__eq__',\n",
" '__format__',\n",
" '__ge__',\n",
" '__getattribute__',\n",
" '__getnewargs__',\n",
" '__gt__',\n",
" '__hash__',\n",
" '__init__',\n",
" '__init_subclass__',\n",
" '__le__',\n",
" '__lt__',\n",
" '__module__',\n",
" '__ne__',\n",
" '__new__',\n",
" '__reduce__',\n",
" '__reduce_ex__',\n",
" '__repr__',\n",
" '__setattr__',\n",
" '__sizeof__',\n",
" '__str__',\n",
" '__subclasshook__',\n",
" '__weakref__',\n",
" '_computeFractionForSampleSize',\n",
" '_defaultReducePartitions',\n",
" '_id',\n",
" '_jrdd',\n",
" '_jrdd_deserializer',\n",
" '_memory_limit',\n",
" '_pickled',\n",
" '_reserialize',\n",
" '_to_java_object_rdd',\n",
" 'aggregate',\n",
" 'aggregateByKey',\n",
" 'cache',\n",
" 'cartesian',\n",
" 'checkpoint',\n",
" 'coalesce',\n",
" 'cogroup',\n",
" 'collect',\n",
" 'collectAsMap',\n",
" 'combineByKey',\n",
" 'context',\n",
" 'count',\n",
" 'countApprox',\n",
" 'countApproxDistinct',\n",
" 'countByKey',\n",
" 'countByValue',\n",
" 'ctx',\n",
" 'distinct',\n",
" 'filter',\n",
" 'first',\n",
" 'flatMap',\n",
" 'flatMapValues',\n",
" 'fold',\n",
" 'foldByKey',\n",
" 'foreach',\n",
" 'foreachPartition',\n",
" 'fullOuterJoin',\n",
" 'getCheckpointFile',\n",
" 'getNumPartitions',\n",
" 'getStorageLevel',\n",
" 'glom',\n",
" 'groupBy',\n",
" 'groupByKey',\n",
" 'groupWith',\n",
" 'histogram',\n",
" 'id',\n",
" 'intersection',\n",
" 'isCheckpointed',\n",
" 'isEmpty',\n",
" 'isLocallyCheckpointed',\n",
" 'is_cached',\n",
" 'is_checkpointed',\n",
" 'join',\n",
" 'keyBy',\n",
" 'keys',\n",
" 'leftOuterJoin',\n",
" 'localCheckpoint',\n",
" 'lookup',\n",
" 'map',\n",
" 'mapPartitions',\n",
" 'mapPartitionsWithIndex',\n",
" 'mapPartitionsWithSplit',\n",
" 'mapValues',\n",
" 'max',\n",
" 'mean',\n",
" 'meanApprox',\n",
" 'min',\n",
" 'name',\n",
" 'partitionBy',\n",
" 'partitioner',\n",
" 'persist',\n",
" 'pipe',\n",
" 'randomSplit',\n",
" 'reduce',\n",
" 'reduceByKey',\n",
" 'reduceByKeyLocally',\n",
" 'repartition',\n",
" 'repartitionAndSortWithinPartitions',\n",
" 'rightOuterJoin',\n",
" 'sample',\n",
" 'sampleByKey',\n",
" 'sampleStdev',\n",
" 'sampleVariance',\n",
" 'saveAsHadoopDataset',\n",
" 'saveAsHadoopFile',\n",
" 'saveAsNewAPIHadoopDataset',\n",
" 'saveAsNewAPIHadoopFile',\n",
" 'saveAsPickleFile',\n",
" 'saveAsSequenceFile',\n",
" 'saveAsTextFile',\n",
" 'setName',\n",
" 'sortBy',\n",
" 'sortByKey',\n",
" 'stats',\n",
" 'stdev',\n",
" 'subtract',\n",
" 'subtractByKey',\n",
" 'sum',\n",
" 'sumApprox',\n",
" 'take',\n",
" 'takeOrdered',\n",
" 'takeSample',\n",
" 'toDF',\n",
" 'toDebugString',\n",
" 'toLocalIterator',\n",
" 'top',\n",
" 'treeAggregate',\n",
" 'treeReduce',\n",
" 'union',\n",
" 'unpersist',\n",
" 'values',\n",
" 'variance',\n",
" 'zip',\n",
" 'zipWithIndex',\n",
" 'zipWithUniqueId']"
]
},
"execution_count": 16,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"dir(logFile)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now, let's create an RDD by loading the log file that we analyze in the Scala version of this lab."
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"collapsed": true,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"logFile = sc.textFile(\"/resources/jupyterlab/labs/BD0211EN/LabData/notebook.log\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"### <span style=\"color: red\">YOUR TURN:</span> \n",
"\n",
"#### In the cell below, filter out the lines that contains INFO"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {
"collapsed": true,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"# WRITE YOUR CODE BELOW\n",
"filtered= logFile.filter(lambda word: \"INFO\" in word)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"13438"
]
},
"execution_count": 22,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"filtered.count()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"['__add__',\n",
" '__class__',\n",
" '__delattr__',\n",
" '__dict__',\n",
" '__dir__',\n",
" '__doc__',\n",
" '__eq__',\n",
" '__format__',\n",
" '__ge__',\n",
" '__getattribute__',\n",
" '__getnewargs__',\n",
" '__gt__',\n",
" '__hash__',\n",
" '__init__',\n",
" '__init_subclass__',\n",
" '__le__',\n",
" '__lt__',\n",
" '__module__',\n",
" '__ne__',\n",
" '__new__',\n",
" '__reduce__',\n",
" '__reduce_ex__',\n",
" '__repr__',\n",
" '__setattr__',\n",
" '__sizeof__',\n",
" '__str__',\n",
" '__subclasshook__',\n",
" '__weakref__',\n",
" '_bypass_serializer',\n",
" '_computeFractionForSampleSize',\n",
" '_defaultReducePartitions',\n",
" '_id',\n",
" '_is_pipelinable',\n",
" '_jrdd',\n",
" '_jrdd_deserializer',\n",
" '_jrdd_val',\n",
" '_memory_limit',\n",
" '_pickled',\n",
" '_prev_jrdd',\n",
" '_prev_jrdd_deserializer',\n",
" '_reserialize',\n",
" '_to_java_object_rdd',\n",
" 'aggregate',\n",
" 'aggregateByKey',\n",
" 'cache',\n",
" 'cartesian',\n",
" 'checkpoint',\n",
" 'coalesce',\n",
" 'cogroup',\n",
" 'collect',\n",
" 'collectAsMap',\n",
" 'combineByKey',\n",
" 'context',\n",
" 'count',\n",
" 'countApprox',\n",
" 'countApproxDistinct',\n",
" 'countByKey',\n",
" 'countByValue',\n",
" 'ctx',\n",
" 'distinct',\n",
" 'filter',\n",
" 'first',\n",
" 'flatMap',\n",
" 'flatMapValues',\n",
" 'fold',\n",
" 'foldByKey',\n",
" 'foreach',\n",
" 'foreachPartition',\n",
" 'fullOuterJoin',\n",
" 'func',\n",
" 'getCheckpointFile',\n",
" 'getNumPartitions',\n",
" 'getStorageLevel',\n",
" 'glom',\n",
" 'groupBy',\n",
" 'groupByKey',\n",
" 'groupWith',\n",
" 'histogram',\n",
" 'id',\n",
" 'intersection',\n",
" 'isCheckpointed',\n",
" 'isEmpty',\n",
" 'isLocallyCheckpointed',\n",
" 'is_cached',\n",
" 'is_checkpointed',\n",
" 'join',\n",
" 'keyBy',\n",
" 'keys',\n",
" 'leftOuterJoin',\n",
" 'localCheckpoint',\n",
" 'lookup',\n",
" 'map',\n",
" 'mapPartitions',\n",
" 'mapPartitionsWithIndex',\n",
" 'mapPartitionsWithSplit',\n",
" 'mapValues',\n",
" 'max',\n",
" 'mean',\n",
" 'meanApprox',\n",
" 'min',\n",
" 'name',\n",
" 'partitionBy',\n",
" 'partitioner',\n",
" 'persist',\n",
" 'pipe',\n",
" 'preservesPartitioning',\n",
" 'prev',\n",
" 'randomSplit',\n",
" 'reduce',\n",
" 'reduceByKey',\n",
" 'reduceByKeyLocally',\n",
" 'repartition',\n",
" 'repartitionAndSortWithinPartitions',\n",
" 'rightOuterJoin',\n",
" 'sample',\n",
" 'sampleByKey',\n",
" 'sampleStdev',\n",
" 'sampleVariance',\n",
" 'saveAsHadoopDataset',\n",
" 'saveAsHadoopFile',\n",
" 'saveAsNewAPIHadoopDataset',\n",
" 'saveAsNewAPIHadoopFile',\n",
" 'saveAsPickleFile',\n",
" 'saveAsSequenceFile',\n",
" 'saveAsTextFile',\n",
" 'setName',\n",
" 'sortBy',\n",
" 'sortByKey',\n",
" 'stats',\n",
" 'stdev',\n",
" 'subtract',\n",
" 'subtractByKey',\n",
" 'sum',\n",
" 'sumApprox',\n",
" 'take',\n",
" 'takeOrdered',\n",
" 'takeSample',\n",
" 'toDF',\n",
" 'toDebugString',\n",
" 'toLocalIterator',\n",
" 'top',\n",
" 'treeAggregate',\n",
" 'treeReduce',\n",
" 'union',\n",
" 'unpersist',\n",
" 'values',\n",
" 'variance',\n",
" 'zip',\n",
" 'zipWithIndex',\n",
" 'zipWithUniqueId']"
]
},
"execution_count": 14,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"dir(filtered)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"ename": "Py4JJavaError",
"evalue": "An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.\n: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n File \"/home/jupyterlab/spark-2.3.1/python/lib/pyspark.zip/pyspark/worker.py\", line 230, in main\n process()\n File \"/home/jupyterlab/spark-2.3.1/python/lib/pyspark.zip/pyspark/worker.py\", line 225, in process\n serializer.dump_stream(func(split_index, iterator), outfile)\n File \"/home/jupyterlab/spark-2.3.1/python/lib/pyspark.zip/pyspark/serializers.py\", line 372, in dump_stream\n vs = list(itertools.islice(iterator, batch))\n File \"/home/jupyterlab/spark-2.3.1/python/lib/pyspark.zip/pyspark/util.py\", line 55, in wrapper\n return f(*args, **kwargs)\nTypeError: 'str' object is not callable\n\n\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)\n\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)\n\tat org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)\n\tat scala.collection.Iterator$class.foreach(Iterator.scala:893)\n\tat org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)\n\tat scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)\n\tat scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)\n\tat scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)\n\tat org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)\n\tat scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)\n\tat org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)\n\tat scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)\n\tat org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)\n\tat org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)\n\tat org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)\n\tat org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)\n\tat org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:109)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:748)\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)\n\tat scala.Option.foreach(Option.scala:257)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)\n\tat org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)\n\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:363)\n\tat org.apache.spark.rdd.RDD.collect(RDD.scala:938)\n\tat org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:162)\n\tat org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n\tat py4j.Gateway.invoke(Gateway.java:282)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:238)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n File \"/home/jupyterlab/spark-2.3.1/python/lib/pyspark.zip/pyspark/worker.py\", line 230, in main\n process()\n File \"/home/jupyterlab/spark-2.3.1/python/lib/pyspark.zip/pyspark/worker.py\", line 225, in process\n serializer.dump_stream(func(split_index, iterator), outfile)\n File \"/home/jupyterlab/spark-2.3.1/python/lib/pyspark.zip/pyspark/serializers.py\", line 372, in dump_stream\n vs = list(itertools.islice(iterator, batch))\n File \"/home/jupyterlab/spark-2.3.1/python/lib/pyspark.zip/pyspark/util.py\", line 55, in wrapper\n return f(*args, **kwargs)\nTypeError: 'str' object is not callable\n\n\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)\n\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)\n\tat org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)\n\tat scala.collection.Iterator$class.foreach(Iterator.scala:893)\n\tat org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)\n\tat scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)\n\tat scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)\n\tat scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)\n\tat org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)\n\tat scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)\n\tat org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)\n\tat scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)\n\tat org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)\n\tat org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)\n\tat org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)\n\tat org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)\n\tat org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:109)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\t... 1 more\n",
"output_type": "error",
"traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mPy4JJavaError\u001b[0m Traceback (most recent call last)",
"\u001b[0;32m<ipython-input-11-21dab2cd9579>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m\u001b[0m\n\u001b[0;32m----> 1\u001b[0;31m \u001b[0mfiltered\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcollect\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m",
"\u001b[0;32m~/spark-2.3.1/python/pyspark/rdd.py\u001b[0m in \u001b[0;36mcollect\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 832\u001b[0m \"\"\"\n\u001b[1;32m 833\u001b[0m \u001b[0;32mwith\u001b[0m \u001b[0mSCCallSiteSync\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcontext\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0mcss\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 834\u001b[0;31m \u001b[0msock_info\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mctx\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jvm\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mPythonRDD\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcollectAndServe\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jrdd\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mrdd\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 835\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mlist\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0m_load_from_socket\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0msock_info\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jrdd_deserializer\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 836\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/spark-2.3.1/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py\u001b[0m in \u001b[0;36m__call__\u001b[0;34m(self, *args)\u001b[0m\n\u001b[1;32m 1255\u001b[0m \u001b[0manswer\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgateway_client\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msend_command\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mcommand\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1256\u001b[0m return_value = get_return_value(\n\u001b[0;32m-> 1257\u001b[0;31m answer, self.gateway_client, self.target_id, self.name)\n\u001b[0m\u001b[1;32m 1258\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1259\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mtemp_arg\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mtemp_args\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/spark-2.3.1/python/pyspark/sql/utils.py\u001b[0m in \u001b[0;36mdeco\u001b[0;34m(*a, **kw)\u001b[0m\n\u001b[1;32m 61\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mdeco\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkw\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 62\u001b[0m \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 63\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mf\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkw\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 64\u001b[0m \u001b[0;32mexcept\u001b[0m \u001b[0mpy4j\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mprotocol\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mPy4JJavaError\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 65\u001b[0m \u001b[0ms\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mjava_exception\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mtoString\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/spark-2.3.1/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py\u001b[0m in \u001b[0;36mget_return_value\u001b[0;34m(answer, gateway_client, target_id, name)\u001b[0m\n\u001b[1;32m 326\u001b[0m raise Py4JJavaError(\n\u001b[1;32m 327\u001b[0m \u001b[0;34m\"An error occurred while calling {0}{1}{2}.\\n\"\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 328\u001b[0;31m format(target_id, \".\", name), value)\n\u001b[0m\u001b[1;32m 329\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 330\u001b[0m raise Py4JError(\n",
"\u001b[0;31mPy4JJavaError\u001b[0m: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.\n: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n File \"/home/jupyterlab/spark-2.3.1/python/lib/pyspark.zip/pyspark/worker.py\", line 230, in main\n process()\n File \"/home/jupyterlab/spark-2.3.1/python/lib/pyspark.zip/pyspark/worker.py\", line 225, in process\n serializer.dump_stream(func(split_index, iterator), outfile)\n File \"/home/jupyterlab/spark-2.3.1/python/lib/pyspark.zip/pyspark/serializers.py\", line 372, in dump_stream\n vs = list(itertools.islice(iterator, batch))\n File \"/home/jupyterlab/spark-2.3.1/python/lib/pyspark.zip/pyspark/util.py\", line 55, in wrapper\n return f(*args, **kwargs)\nTypeError: 'str' object is not callable\n\n\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)\n\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)\n\tat org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)\n\tat scala.collection.Iterator$class.foreach(Iterator.scala:893)\n\tat org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)\n\tat scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)\n\tat scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)\n\tat scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)\n\tat org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)\n\tat scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)\n\tat org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)\n\tat scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)\n\tat org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)\n\tat org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)\n\tat org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)\n\tat org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)\n\tat org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:109)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:748)\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)\n\tat scala.Option.foreach(Option.scala:257)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)\n\tat org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)\n\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:363)\n\tat org.apache.spark.rdd.RDD.collect(RDD.scala:938)\n\tat org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:162)\n\tat org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n\tat py4j.Gateway.invoke(Gateway.java:282)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:238)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n File \"/home/jupyterlab/spark-2.3.1/python/lib/pyspark.zip/pyspark/worker.py\", line 230, in main\n process()\n File \"/home/jupyterlab/spark-2.3.1/python/lib/pyspark.zip/pyspark/worker.py\", line 225, in process\n serializer.dump_stream(func(split_index, iterator), outfile)\n File \"/home/jupyterlab/spark-2.3.1/python/lib/pyspark.zip/pyspark/serializers.py\", line 372, in dump_stream\n vs = list(itertools.islice(iterator, batch))\n File \"/home/jupyterlab/spark-2.3.1/python/lib/pyspark.zip/pyspark/util.py\", line 55, in wrapper\n return f(*args, **kwargs)\nTypeError: 'str' object is not callable\n\n\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)\n\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)\n\tat org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)\n\tat scala.collection.Iterator$class.foreach(Iterator.scala:893)\n\tat org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)\n\tat scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)\n\tat scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)\n\tat scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)\n\tat org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)\n\tat scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)\n\tat org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)\n\tat scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)\n\tat org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)\n\tat org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)\n\tat org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)\n\tat org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)\n\tat org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:109)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\t... 1 more\n"
]
}
],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"Highlight text for answer:\n",
"\n",
"<textarea rows=\"3\" cols=\"80\" style=\"color: white\">\n",
"info = logFile.filter(lambda line: \"INFO\" in line)\n",
"</textarea>"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"#### Count the lines:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"Highlight text for answer:\n",
"\n",
"<textarea rows=\"3\" cols=\"80\" style=\"color: white\">\n",
"info.count()\n",
"</textarea>"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"#### Count the lines with \"spark\" in it by combining transformation and action."
]
},
{
"cell_type": "code",
"execution_count": 36,
"metadata": {
"collapsed": false,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"# WRITE YOUR CODE BELOW\n",
"lines_spark= logFile.filter(lambda word: \"spark\" in word)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"Highlight text for answer:\n",
"\n",
"<textarea rows=\"3\" cols=\"80\" style=\"color: white\">\n",
"info.filter(lambda line: \"spark\" in line).count()\n",
"</textarea>"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"#### Fetch those lines as an array of Strings"
]
},
{
"cell_type": "code",
"execution_count": 34,
"metadata": {
"collapsed": false,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"# WRITE YOUR CODE BELOW\n",
"info = logFile.filter(lambda word: \"spark\" in word).collect()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": 28,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"['__add__',\n",
" '__class__',\n",
" '__contains__',\n",
" '__delattr__',\n",
" '__delitem__',\n",
" '__dir__',\n",
" '__doc__',\n",
" '__eq__',\n",
" '__format__',\n",
" '__ge__',\n",
" '__getattribute__',\n",
" '__getitem__',\n",
" '__gt__',\n",
" '__hash__',\n",
" '__iadd__',\n",
" '__imul__',\n",
" '__init__',\n",
" '__init_subclass__',\n",
" '__iter__',\n",
" '__le__',\n",
" '__len__',\n",
" '__lt__',\n",
" '__mul__',\n",
" '__ne__',\n",
" '__new__',\n",
" '__reduce__',\n",
" '__reduce_ex__',\n",
" '__repr__',\n",
" '__reversed__',\n",
" '__rmul__',\n",
" '__setattr__',\n",
" '__setitem__',\n",
" '__sizeof__',\n",
" '__str__',\n",
" '__subclasshook__',\n",
" 'append',\n",
" 'clear',\n",
" 'copy',\n",
" 'count',\n",
" 'extend',\n",
" 'index',\n",
" 'insert',\n",
" 'pop',\n",
" 'remove',\n",
" 'reverse',\n",
" 'sort']"
]
},
"execution_count": 28,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"dir(an_array)"
]
},
{
"cell_type": "code",
"execution_count": 32,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"<method-wrapper '__len__' of list object at 0x7f6f11a15408>"
]
},
"execution_count": 32,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"an_array.__len__"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"Highlight text for answer:\n",
"\n",
"<textarea rows=\"3\" cols=\"80\" style=\"color: white\">\n",
"info.filter(lambda line: \"spark\" in line).collect()\n",
"</textarea>"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"View the graph of an RDD using this command:"
]
},
{
"cell_type": "code",
"execution_count": 37,
"metadata": {
"collapsed": false,
"deletable": true,
"editable": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"b'(2) PythonRDD[11] at RDD at PythonRDD.scala:49 []\\n | /resources/jupyterlab/labs/BD0211EN/LabData/notebook.log MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []\\n | /resources/jupyterlab/labs/BD0211EN/LabData/notebook.log HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []'\n"
]
}
],
"source": [
"print(lines_spark.toDebugString())"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"## Joining RDDs\n",
"\n",
"Next, you are going to create RDDs for the same README and the POM files that we used in the Scala version. "
]
},
{
"cell_type": "code",
"execution_count": 38,
"metadata": {
"collapsed": true,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"readmeFile = sc.textFile(\"/resources/jupyterlab/labs/BD0211EN/LabData/README.md\")\n",
"pomFile = sc.textFile(\"/resources/jupyterlab/labs/BD0211EN/LabData/pom.xml\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"How many Spark keywords are in each file?"
]
},
{
"cell_type": "code",
"execution_count": 39,
"metadata": {
"collapsed": false,
"deletable": true,
"editable": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"18\n",
"2\n"
]
}
],
"source": [
"print(readmeFile.filter(lambda line: \"Spark\" in line).count())\n",
"print(pomFile.filter(lambda line: \"Spark\" in line).count())"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"Now do a WordCount on each RDD so that the results are (K,V) pairs of (word,count)"
]
},
{
"cell_type": "code",
"execution_count": 41,
"metadata": {
"collapsed": false,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"readmeCount = readmeFile. \\\n",
" flatMap(lambda line: line.split(\" \")). \\\n",
" map(lambda word: (word, 1)). \\\n",
" reduceByKey(lambda a, b: a + b)\n",
" \n",
"pomCount = pomFile. \\\n",
" flatMap(lambda line: line.split(\" \")). \\\n",
" map(lambda word: (word, 1)). \\\n",
" reduceByKey(lambda a, b: a + b)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"To see the array for either of them, just call the collect function on it."
]
},
{
"cell_type": "code",
"execution_count": 42,
"metadata": {
"collapsed": false,
"deletable": true,
"editable": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Readme Count\n",
"\n",
"[('', 43), ('Spark is a fast and general cluster computing system for Big Data. It provides', 1), ('and Spark Streaming for stream processing.', 1), ('guide, on the [project web page](http://spark.apache.org/documentation.html)', 1), ('## Building Spark', 1), ('Spark is built using [Apache Maven](http://maven.apache.org/).', 1), (' build/mvn -DskipTests clean package', 1), ('Try the following command, which should return 1000:', 1), (' scala> sc.parallelize(1 to 1000).count()', 1), ('## Interactive Python Shell', 1), (' ./bin/pyspark', 1), ('And run the following command, which should also return 1000:', 1), ('Spark also comes with several sample programs in the `examples` directory.', 1), ('To run one of them, use `./bin/run-example <class> [params]`. For example:', 1), (' ./bin/run-example SparkPi', 1), ('will run the Pi example locally.', 1), ('You can set the MASTER environment variable when running examples to submit', 1), ('examples to a cluster. This can be a mesos:// or spark:// URL,', 1), ('can also use an abbreviated class name if the class is in the `examples`', 1), ('package. For instance:', 1), (' MASTER=spark://host:7077 ./bin/run-example SparkPi', 1), ('Many of the example programs print usage help if no params are given.', 1), ('Testing first requires [building Spark](#building-spark). Once Spark is built, tests', 1), ('can be run using:', 1), ('[run tests for a module, or individual tests](https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools).', 1), ('## A Note About Hadoop Versions', 1), ('storage systems. Because the protocols have changed in different versions of', 1), ('Hadoop, you must build Spark against the same version that your cluster runs.', 1), ('[\"Specifying the Hadoop Version\"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version)', 1), ('for detailed guidance on building for a particular distribution of Hadoop, including', 1), ('distribution.', 1), ('## Configuration', 1), ('Please refer to the [Configuration Guide](http://spark.apache.org/docs/latest/configuration.html)', 1), ('in the online documentation for an overview on how to configure Spark.', 1), ('# Apache Spark', 1), ('high-level APIs in Scala, Java, Python, and R, and an optimized engine that', 1), ('supports general computation graphs for data analysis. It also supports a', 1), ('rich set of higher-level tools including Spark SQL for SQL and DataFrames,', 1), ('MLlib for machine learning, GraphX for graph processing,', 1), ('<http://spark.apache.org/>', 1), ('## Online Documentation', 1), ('You can find the latest Spark documentation, including a programming', 1), ('and [project wiki](https://cwiki.apache.org/confluence/display/SPARK).', 1), ('This README file only contains basic setup instructions.', 1), ('To build Spark and its example programs, run:', 1), ('(You do not need to do this if you downloaded a pre-built package.)', 1), ('More detailed documentation is available from the project site, at', 1), ('[\"Building Spark\"](http://spark.apache.org/docs/latest/building-spark.html).', 1), ('## Interactive Scala Shell', 1), ('The easiest way to start using Spark is through the Scala shell:', 1), (' ./bin/spark-shell', 1), ('Alternatively, if you prefer Python, you can use the Python shell:', 1), (' >>> sc.parallelize(range(1000)).count()', 1), ('## Example Programs', 1), ('\"yarn\" to run on YARN, and \"local\" to run', 1), ('locally with one thread, or \"local[N]\" to run locally with N threads. You', 1), ('## Running Tests', 1), (' ./dev/run-tests', 1), ('Please see the guidance on how to', 1), ('Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported', 1), ('Please refer to the build documentation at', 1), ('building for particular Hive and Hive Thriftserver distributions. See also', 1), ('[\"Third Party Hadoop Distributions\"](http://spark.apache.org/docs/latest/hadoop-third-party-distributions.html)', 1), ('for guidance on building a Spark application that works with a particular', 1)]\n"
]
}
],
"source": [
"print(\"Readme Count\\n\")\n",
"print(readmeCount.collect())"
]
},
{
"cell_type": "code",
"execution_count": 43,
"metadata": {
"collapsed": false,
"deletable": true,
"editable": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Pom Count\n",
"\n",
"[('<?xml version=\"1.0\" encoding=\"UTF-8\"?>', 1), (' ~ Licensed to the Apache Software Foundation (ASF) under one or more', 1), (' ~ contributor license agreements. See the NOTICE file distributed with', 1), (' ~ The ASF licenses this file to You under the Apache License, Version 2.0', 1), (' http://www.apache.org/licenses/LICENSE-2.0', 1), (' ~ distributed under the License is distributed on an \"AS IS\" BASIS,', 1), (' ~ limitations under the License.', 1), (' -->', 1), ('', 841), ('<project xmlns=\"http://maven.apache.org/POM/4.0.0\" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xsi:schemaLocation=\"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd\">', 1), (' <modelVersion>4.0.0</modelVersion>', 1), (' <parent>', 1), (' <groupId>org.apache.spark</groupId>', 2), (' <artifactId>spark-parent_2.10</artifactId>', 1), (' <version>1.6.0-SNAPSHOT</version>', 1), (' <properties>', 1), (' <sbt.project.name>examples</sbt.project.name>', 1), (' </properties>', 1), (' <packaging>jar</packaging>', 1), (' <dependencies>', 1), (' <dependency>', 24), ('<version>${project.version}</version>', 11), (' </dependency>', 24), ('<artifactId>spark-streaming_${scala.binary.version}</artifactId>', 1), ('<artifactId>spark-bagel_${scala.binary.version}</artifactId>', 1), ('<artifactId>spark-hive_${scala.binary.version}</artifactId>', 1), ('<artifactId>spark-graphx_${scala.binary.version}</artifactId>', 1), ('<artifactId>spark-streaming-flume_${scala.binary.version}</artifactId>', 1), ('<exclusions>', 6), (' <artifactId>protobuf-java</artifactId>', 1), (' <!-- SPARK-4455 -->', 4), (' <groupId>org.apache.hbase</groupId>', 5), (' <artifactId>hbase-annotations</artifactId>', 4), (' <artifactId>jruby-complete</artifactId>', 1), ('<artifactId>hbase-protocol</artifactId>', 1), ('<artifactId>hbase-common</artifactId>', 1), (' <exclusion>', 1), (' <artifactId>netty</artifactId>', 1), (' <artifactId>hadoop-core</artifactId>', 1), (' <artifactId>hadoop-mapreduce-client-core</artifactId>', 1), (' <artifactId>hadoop-annotations</artifactId>', 1), (' <artifactId>commons-math</artifactId>', 1), (' <groupId>com.sun.jersey</groupId>', 4), (' <artifactId>jersey-core</artifactId>', 2), (' <groupId>org.slf4j</groupId>', 1), (' <artifactId>slf4j-api</artifactId>', 1), (' <artifactId>commons-io</artifactId>', 1), ('<scope>test</scope>', 2), ('<artifactId>commons-math3</artifactId>', 1), ('<groupId>com.twitter</groupId>', 1), ('<groupId>org.scalacheck</groupId>', 1), ('<artifactId>cassandra-all</artifactId>', 1), ('<version>1.2.6</version>', 1), (' <groupId>com.googlecode.concurrentlinkedhashmap</groupId>', 1), (' <artifactId>commons-cli</artifactId>', 1), (' <groupId>commons-codec</groupId>', 1), (' <groupId>commons-lang</groupId>', 1), (' <artifactId>commons-lang</artifactId>', 1), (' <groupId>commons-logging</groupId>', 1), (' <artifactId>commons-logging</artifactId>', 1), (' <artifactId>netty</artifactId>', 1), (' <groupId>jline</groupId>', 1), (' <groupId>org.apache.cassandra.deps</groupId>', 1), (' <artifactId>avro</artifactId>', 1), ('<groupId>com.github.scopt</groupId>', 1), ('<artifactId>scopt_${scala.binary.version}</artifactId>', 1), ('<version>3.2.0</version>', 1), ('them to be provided.', 1), (' </dependencies>', 1), (' <build>', 1), (' <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>', 1), (' <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>', 1), ('<plugin>', 3), (' <groupId>org.apache.maven.plugins</groupId>', 3), (' <artifactId>maven-deploy-plugin</artifactId>', 1), (' <skip>true</skip>', 2), (' </configuration>', 3), ('</plugin>', 3), (' <artifactId>maven-shade-plugin</artifactId>', 1), (' <shadedArtifactAttached>false</shadedArtifactAttached>', 1), (' <outputFile>${project.build.directory}/scala-${scala.binary.version}/spark-examples-${project.version}-hadoop${hadoop.version}.jar</outputFile>', 1), (' <artifactSet>', 1), ('<includes>', 1), (' </artifactSet>', 1), ('<filter>', 1), (' <artifact>*:*</artifact>', 1), (' <exclude>META-INF/*.DSA</exclude>', 1), (' <exclude>META-INF/*.RSA</exclude>', 1), (' </excludes>', 1), ('</filter>', 1), (' </filters>', 1), ('<transformer implementation=\"org.apache.maven.plugins.shade.resource.ServicesResourceTransformer\" />', 1), ('</transformer>', 2), ('<transformer implementation=\"org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer\">', 1), (' <resource>log4j.properties</resource>', 1), (' </build>', 1), ('<dependencies>', 1), (' <artifactId>spark-streaming-kinesis-asl_${scala.binary.version}</artifactId>', 1), ('</dependencies>', 1), (' </profile>', 6), (' <flume.deps.scope>provided</flume.deps.scope>', 1), (' <hadoop.deps.scope>provided</hadoop.deps.scope>', 1), ('<id>hbase-provided</id>', 1), (' <hbase.deps.scope>provided</hbase.deps.scope>', 1), ('<id>parquet-provided</id>', 1), (' <parquet.deps.scope>provided</parquet.deps.scope>', 1), (' </profiles>', 1), ('<!--', 1), (' ~ this work for additional information regarding copyright ownership.', 1), (' ~ (the \"License\"); you may not use this file except in compliance with', 1), (' ~ the License. You may obtain a copy of the License at', 1), (' ~', 3), (' ~ Unless required by applicable law or agreed to in writing, software', 1), (' ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.', 1), (' ~ See the License for the specific language governing permissions and', 1), (' <relativePath>../pom.xml</relativePath>', 1), (' </parent>', 1), (' <groupId>org.apache.spark</groupId>', 1), (' <artifactId>spark-examples_2.10</artifactId>', 1), (' <name>Spark Project Examples</name>', 1), (' <url>http://spark.apache.org/</url>', 1), ('<groupId>org.apache.spark</groupId>', 11), ('<artifactId>spark-core_${scala.binary.version}</artifactId>', 1), ('<scope>provided</scope>', 8), ('<artifactId>spark-mllib_${scala.binary.version}</artifactId>', 1), ('<artifactId>spark-streaming-twitter_${scala.binary.version}</artifactId>', 1), ('<artifactId>spark-streaming-mqtt_${scala.binary.version}</artifactId>', 1), ('<artifactId>spark-streaming-zeromq_${scala.binary.version}</artifactId>', 1), (' <exclusion>', 34), (' <groupId>org.spark-project.protobuf</groupId>', 1), (' </exclusion>', 34), ('</exclusions>', 5), ('<artifactId>spark-streaming-kafka_${scala.binary.version}</artifactId>', 1), ('<groupId>org.apache.hbase</groupId>', 7), ('<artifactId>hbase-testing-util</artifactId>', 1), ('<version>${hbase.version}</version>', 7), ('<scope>${hbase.deps.scope}</scope>', 6), (' <groupId>org.jruby</groupId>', 1), ('<artifactId>hbase-client</artifactId>', 1), (' <groupId>io.netty</groupId>', 1), (' </exclusion>', 1), (' </exclusions>', 1), ('<artifactId>hbase-server</artifactId>', 1), (' <groupId>org.apache.hadoop</groupId>', 7), (' <artifactId>hadoop-client</artifactId>', 1), (' <artifactId>hadoop-mapreduce-client-jobclient</artifactId>', 1), (' <artifactId>hadoop-auth</artifactId>', 1), (' <artifactId>hadoop-hdfs</artifactId>', 1), (' <artifactId>hbase-hadoop1-compat</artifactId>', 1), (' <groupId>org.apache.commons</groupId>', 2), (' <artifactId>jersey-server</artifactId>', 1), (' <artifactId>jersey-json</artifactId>', 1), (' <!-- hbase uses v2.4, which is better, but ...-->', 1), (' <groupId>commons-io</groupId>', 1), ('<artifactId>hbase-hadoop-compat</artifactId>', 2), ('<type>test-jar</type>', 1), ('<groupId>org.apache.commons</groupId>', 1), ('<artifactId>algebird-core_${scala.binary.version}</artifactId>', 1), ('<version>0.9.0</version>', 1), ('<artifactId>scalacheck_${scala.binary.version}</artifactId>', 1), ('<groupId>org.apache.cassandra</groupId>', 1), (' <groupId>com.google.guava</groupId>', 1), (' <artifactId>guava</artifactId>', 1), (' <artifactId>concurrentlinkedhashmap-lru</artifactId>', 1), (' <groupId>com.ning</groupId>', 1), (' <artifactId>compress-lzf</artifactId>', 1), (' <groupId>commons-cli</groupId>', 1), (' <artifactId>commons-codec</artifactId>', 1), (' <groupId>io.netty</groupId>', 1), (' <artifactId>jline</artifactId>', 1), (' <groupId>net.jpountz.lz4</groupId>', 1), (' <artifactId>lz4</artifactId>', 1), (' <artifactId>commons-math3</artifactId>', 1), (' <groupId>org.apache.thrift</groupId>', 1), (' <artifactId>libthrift</artifactId>', 1), (' <!--', 1), ('The following dependencies are already present in the Spark assembly, so we want to force', 1), (' -->', 1), ('<groupId>org.scala-lang</groupId>', 1), ('<artifactId>scala-library</artifactId>', 1), (' <plugins>', 1), (' <configuration>', 3), (' <artifactId>maven-install-plugin</artifactId>', 1), (' <include>*:*</include>', 1), ('</includes>', 1), (' <filters>', 1), (' <excludes>', 1), (' <exclude>META-INF/*.SF</exclude>', 1), (' <transformers>', 1), ('<transformer implementation=\"org.apache.maven.plugins.shade.resource.AppendingTransformer\">', 1), (' <resource>reference.conf</resource>', 1), (' </transformers>', 1), (' </plugins>', 1), (' <profiles>', 1), (' <profile>', 6), ('<id>kinesis-asl</id>', 1), (' <dependency>', 1), (' <version>${project.version}</version>', 1), (' </dependency>', 1), (' <!-- Profiles that disable inclusion of certain dependencies. -->', 1), ('<id>flume-provided</id>', 1), ('<properties>', 5), ('</properties>', 5), ('<id>hadoop-provided</id>', 1), ('<id>hive-provided</id>', 1), (' <hive.deps.scope>provided</hive.deps.scope>', 1), ('</project>', 1)]\n"
]
}
],
"source": [
"print(\"Pom Count\\n\")\n",
"print(pomCount.collect())"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"The join function combines the two datasets (K,V) and (K,W) together and get (K, (V,W)). Let's join these two counts together."
]
},
{
"cell_type": "code",
"execution_count": 44,
"metadata": {
"collapsed": false,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"joined = readmeCount.join(pomCount)"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"Print the value to the console"
]
},
{
"cell_type": "code",
"execution_count": 45,
"metadata": {
"collapsed": false,
"deletable": true,
"editable": true
},
"outputs": [
{
"data": {
"text/plain": [
"[('', (43, 841))]"
]
},
"execution_count": 45,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"joined.collect()"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"Let's combine the values together to get the total count"
]
},
{
"cell_type": "code",
"execution_count": 46,
"metadata": {
"collapsed": true,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"joinedSum = joined.map(lambda k: (k[0], (k[1][0]+k[1][1])))"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"To check if it is correct, print the first five elements from the joined and the joinedSum RDD"
]
},
{
"cell_type": "code",
"execution_count": 47,
"metadata": {
"collapsed": false,
"deletable": true,
"editable": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Joined Individial\n",
"\n",
"[('', (43, 841))]\n",
"\n",
"\n",
"Joined Sum\n",
"\n",
"[('', 884)]\n"
]
}
],
"source": [
"print(\"Joined Individial\\n\")\n",
"print(joined.take(5))\n",
"\n",
"print(\"\\n\\nJoined Sum\\n\")\n",
"print(joinedSum.take(5))"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"## Shared variables\n",
"\n",
"Normally, when a function passed to a Spark operation (such as map or reduce) is executed on a remote cluster node, it works on separate copies of all the variables used in the function. These variables are copied to each machine, and no updates to the variables on the remote machine are propagated back to the driver program. Supporting general, read-write shared variables across tasks would be inefficient. However, Spark does provide two limited types of shared variables for two common usage patterns: broadcast variables and accumulators.\n",
"\n",
"### Broadcast variables\n",
"\n",
"Broadcast variables are useful for when you have a large dataset that you want to use across all the worker nodes. A read-only variable is cached on each machine rather than shipping a copy of it with tasks. Spark actions are executed through a set of stages, separated by distributed “shuffle” operations. Spark automatically broadcasts the common data needed by tasks within each stage.\n",
"\n",
"\n",
"Read more here: [http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables](http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables)\n",
"\n",
"Create a broadcast variable. Type in:"
]
},
{
"cell_type": "code",
"execution_count": 48,
"metadata": {
"collapsed": false,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"broadcastVar = sc.broadcast([1,2,3])"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"To get the value, type in:"
]
},
{
"cell_type": "code",
"execution_count": 49,
"metadata": {
"collapsed": false,
"deletable": true,
"editable": true
},
"outputs": [
{
"data": {
"text/plain": [
"[1, 2, 3]"
]
},
"execution_count": 49,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"broadcastVar.value"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"### Accumulators\n",
"\n",
"Accumulators are variables that can only be added through an associative operation. It is used to implement counters and sum efficiently in parallel. Spark natively supports numeric type accumulators and standard mutable collections. Programmers can extend these for new types. Only the driver can read the values of the accumulators. The workers can only invoke it to increment the value.\n",
"\n",
"Create the accumulator variable. Type in:"
]
},
{
"cell_type": "code",
"execution_count": 50,
"metadata": {
"collapsed": true,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"accum = sc.accumulator(0)"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"Next parallelize an array of four integers and run it through a loop to add each integer value to the accumulator variable. Type in:"
]
},
{
"cell_type": "code",
"execution_count": 51,
"metadata": {
"collapsed": true,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"rdd = sc.parallelize([1,2,3,4])\n",
"def f(x):\n",
" global accum\n",
" accum += x"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"Next, iterate through each element of the rdd and apply the function f on it:"
]
},
{
"cell_type": "code",
"execution_count": 52,
"metadata": {
"collapsed": true,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"rdd.foreach(f)"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"To get the current value of the accumulator variable, type in:"
]
},
{
"cell_type": "code",
"execution_count": 53,
"metadata": {
"collapsed": false,
"deletable": true,
"editable": true
},
"outputs": [
{
"data": {
"text/plain": [
"10"
]
},
"execution_count": 53,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"accum.value"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"You should get a value of 10.\n",
"\n",
"This command can only be invoked on the driver side. The worker nodes can only increment the accumulator."
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"\n",
"## Key-value pairs\n",
"\n",
"You have already seen a bit about key-value pairs in the Joining RDD section.\n",
"\n",
"Create a key-value pair of two characters. Type in:"
]
},
{
"cell_type": "code",
"execution_count": 54,
"metadata": {
"collapsed": true,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"pair = ('a', 'b')"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"To access the value of the first index use [0] and [1] method for the 2nd."
]
},
{
"cell_type": "code",
"execution_count": 55,
"metadata": {
"collapsed": false,
"deletable": true,
"editable": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"a\n",
"b\n"
]
}
],
"source": [
"print(pair[0])\n",
"print(pair[1])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<div class=\"alert alert-success alertsuccess\" style=\"margin-top: 20px\">\n",
"**Tip**: Enjoyed using Jupyter notebooks with Spark? Get yourself a free \n",
" <a href=\"http://cocl.us/DSX_on_Cloud\">IBM Cloud</a> account where you can use Data Science Experience notebooks\n",
" and have *two* Spark executors for free!\n",
"</div>"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"### Summary\n",
"Having completed this exercise, you should now be able to describe Spark’s primary data abstraction, work with Resilient Distributed Dataset (RDD) operations, and utilize shared variables and key-value pairs."
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"This notebook is part of the free course on **Cognitive Class** called *Spark Fundamentals I*. If you accessed this notebook outside the course, you can take this free self-paced course, online by going to: http://cocl.us/Spark_Fundamentals_I"
]
},
{
"cell_type": "markdown",
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"### About the Authors: \n",
"Hi! It's [Alex Aklson](https://www.linkedin.com/in/aklson/), one of the authors of this notebook. I hope you found this lab educational! There is much more to learn about Spark but you are well on your way. Feel free to connect with me if you have any questions.\n",
"<hr>"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.6"
},
"widgets": {
"state": {},
"version": "1.1.2"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment