Skip to content

Instantly share code, notes, and snippets.

@ibuenros
Created June 29, 2014 17:12
Show Gist options
  • Save ibuenros/9b94736c2bad2f4b8e23 to your computer and use it in GitHub Desktop.
Save ibuenros/9b94736c2bad2f4b8e23 to your computer and use it in GitHub Desktop.
Spark productionizing utilities developed by Ooyala, shown in Spark Summit 2014
//==================================================================
// SPARK INSTRUMENTATION
//==================================================================
import com.codahale.metrics.{MetricRegistry, Meter, Gauge}
import org.apache.spark.{SparkEnv, Accumulator}
import org.apache.spark.metrics.source.Source
import org.joda.time.DateTime
import scala.collection.mutable
/** Instrumentation for Spark based on accumulators.
*
* Usage:
* val instrumentation = new SparkInstrumentation("example.metrics")
* val numReqs = sc.accumulator(0L)
* instrumentation.source.registerDailyAccumulator(numReqs, "numReqs")
* instrumentation.register()
*
* Will create and report the following metrics:
* - Gauge with total number of requests (daily)
* - Meter with rate of requests
*
* @param prefix prefix for all metrics that will be reported by this Instrumentation
*/
class SparkInstrumentation(prefix: String) extends Serializable {
val accumulators = mutable.Set[Accumulator[Long]]()
private class InstrumentationSource(prefix: String) extends Source {
val metricRegistry = new MetricRegistry
val sourceName = prefix
val oldgauges = mutable.Map[String,Long]()
val oldtimes = mutable.Map[String, DateTime]()
val meters = mutable.Map[String,Meter]()
/** Computes metrics based on accumulator. Gauge never resets.
*
* @param a Metrics will be derived from this accumulator
* @param name Name of the metrics
*/
def registerAccumulator(a: Accumulator[Long], name: String){
oldgauges += (name -> 0L)
meters += (name -> metricRegistry.meter(name + "-rate"))
metricRegistry.register(MetricRegistry.name(name),
new Gauge[Long] {
override def getValue: Long = {
meters(name).mark(a.value - oldgauges(name))
oldgauges(name) = a.value
return a.value
}
})
}
/** Computes metrics based on accumulator. Gauge resets at the end of the day.
*
* @param a Metrics will be derived from this accumulator
* @param name Name of the metrics
*/
def registerDailyAccumulator(a: Accumulator[Long], name: String){
oldgauges += (name -> 0L)
meters += (name -> metricRegistry.meter(name + "-rate"))
oldtimes += (name -> DateTime.now)
metricRegistry.register(MetricRegistry.name(name),
new Gauge[Long] {
override def getValue: Long = {
meters(name).mark(a.value - oldgauges(name))
val now = DateTime.now
if (now.getDayOfMonth != oldtimes(name).getDayOfMonth){
a.setValue(0L)
}
oldtimes(name) = now
oldgauges(name) = a.value
return a.value
}
})
}
}
val source = new InstrumentationSource(prefix)
/** Register the Instrumentation with Spark so the metrics are reported to any provided Sink. */
def register(){
SparkEnv.get.metricsSystem.registerSource(source)
}
}
//============================================
// STREAMING LAUNCHER / SERVER
//============================================
import scalax.io.Resource
import org.apache.spark.deploy.yarn.{Client, ClientArguments}
import org.apache.spark.{Logging, SparkConf}
import org.apache.hadoop.yarn.api.records.{ApplicationReport, YarnApplicationState, ApplicationId}
import org.apache.hadoop.yarn.client.ClientRMProxy
import org.apache.hadoop.yarn.api.ApplicationClientProtocol
import org.apache.hadoop.yarn.api.protocolrecords.{GetApplicationsResponse, GetApplicationsRequest}
import org.eclipse.jetty.server.{Request, Handler, Server}
import org.eclipse.jetty.server.handler.{HandlerList, ContextHandler, AbstractHandler}
import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
import com.lambdaworks.jacks.JacksMapper
import scala.annotation.tailrec
import org.eclipse.jetty.util.thread.QueuedThreadPool
import scala.util.{Failure, Success, Try}
/** Local launcher client for streaming applications in YARN.
*
* Extends usual Spark YARN client for streaming applications. This class should not be called by the user,
* instead, the StreamingClient object is the entry point for launching applications.
*
* @param args User supplied arguments
* @param sparkConf Spark Configuration
*/
class StreamingClient(args: ClientArguments, sparkConf: SparkConf) extends Client(args, sparkConf) {
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
var appIdOption: Option[ApplicationId] = None
val clientHttp = new ClientHttp(this)
val launcherArgs = args
/** Connects to or launches application in YARN cluster.
*
* 1. Search for existing application by the same name.
* 2. If found, monitor existing application.
* 3. If not found, launch new application with this name, and monitor.
*/
override def run() {
sparkConf.set("spark.yarn.report.interval", "10000")
clientHttp.bind()
println("Using yarn at " + yarnConf.getRaw("fs.defaultFS"))
val pidFile = System.getenv("SPARK_LAUNCHER_PID_DIR") match {
case "" => None
case x => Some(Resource.fromFile("%s/%s.appInfo".format(x,args.appName)))
}
val instances = getSparkApplications(setAsJavaSet(Set("SPARK")),java.util.EnumSet.of(YarnApplicationState.RUNNING, YarnApplicationState.ACCEPTED, YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING, YarnApplicationState.SUBMITTED)).flatMap { report =>
if (report.getName == args.appName){
Some(report.getApplicationId)
} else {
None
}
}.toList
if (instances.size > 0){
println("Application already running. Monitoring old application.")
init(yarnConf)
start()
appIdOption = Some(instances.head)
monitorApplication(appIdOption.get)
System.exit(0)
} else{
//super.stop()
println("Application not found.")
appIdOption = Some(runApp())
if (!appIdOption.isDefined){
println("Application didn't start correctly")
System.exit(1)
}
if (pidFile.isDefined) {
pidFile.get.write("%s %s".format(appIdOption.get.toString, args.userArgs.mkString(" ")))
System.exit(0)
}
monitorApplication(appIdOption.get)
System.exit(0)
}
}
/** Gets list of Spark applications in YARN cluster */
def getSparkApplications(applicationTypes: java.util.Set[String], applicationStates: java.util.EnumSet[YarnApplicationState]): java.util.List[ApplicationReport] = {
setConfig(yarnConf)
val rmClient = ClientRMProxy.createRMProxy(getConfig, classOf[ApplicationClientProtocol])
val request: GetApplicationsRequest = GetApplicationsRequest.newInstance(applicationTypes, applicationStates)
val response: GetApplicationsResponse = rmClient.getApplications(request)
return response.getApplicationList
}
}
/** Local launcher client for streaming applications in YARN.
*
* Usage:
* java -cp /etc/hadoop/conf:AppJar.jar:spark-assembly.jar org.apache.spark.yarn.StreamingClient --jar AppJar.jar
* --addJars /jars/config.jar --class ooyala.app.MainClass --arg arg1 --arg arg2 --name MyApp
*
*/
object StreamingClient {
def main(argStrings: Array[String]) {
// Set an env variable indicating we are running in YARN mode.
// Note: anything env variable with SPARK_ prefix gets propagated to all (remote) processes -
// see Client#setupLaunchEnv().
System.setProperty("SPARK_YARN_MODE", "true")
val sparkConf = new SparkConf()
val args = new ClientArguments(argStrings, sparkConf)
new StreamingClient(args, sparkConf).run()
}
}
/** Starts an HTTP server for the launcher client.
*
* Allows to check health of application launcher by querying /healthz route.
*
* @param launcher Server will track status of this launcher client.
*/
class ClientHttp(val launcher: StreamingClient) extends Logging {
val port = 8081
var boundPort: Option[Int] = None
var server: Option[Server] = None
val handlers = Seq[(String, Handler)](
("/healthz", healthHandler)
)
/** /healthz route handler
*
* Reports health of the launcher and publishes information of the application
*/
def healthHandler: Handler = {
new AbstractHandler {
override def handle(target: String, baseRequest: Request, request: HttpServletRequest, response: HttpServletResponse): Unit = {
response.setContentType("application/json")
if (!launcher.appIdOption.isDefined) {
response.setStatus(HttpServletResponse.SC_NOT_FOUND)
val res = Map(
"LauncherStatus" -> "Application Not Found"
)
baseRequest.setHandled(true)
response.getWriter.println(JacksMapper.writeValueAsString(res))
return
}
val report = launcher.getApplicationReport(launcher.appIdOption.get)
response.setStatus(HttpServletResponse.SC_OK)
baseRequest.setHandled(true)
val res = Map(
"LauncherStatus" -> "Online",
"YarnCluster" -> launcher.yarnConf.getRaw("fs.defaultFS"),
"ApplicationId" -> launcher.appIdOption.get.toString,
"ApplicationStatus" -> report.getYarnApplicationState,
"StartedAt" -> report.getStartTime.toString,
"TrackingURL" -> report.getTrackingUrl.toString,
"ApplicationName" -> launcher.launcherArgs.appName
)
response.getWriter.println(JacksMapper.writeValueAsString(res))
}
}
}
/**
* Attempts to start a Jetty server at the supplied ip:port which uses the supplied handlers.
*
* If the desired port number is contented, continues incrementing ports until a free port is
* found. Returns the chosen port and the jetty Server object.
*/
def startJettyServer(ip: String, port: Int, handlers: Seq[(String, Handler)]): (Server, Int) = {
val handlersToRegister = handlers.map { case (path, handler) =>
val contextHandler = new ContextHandler(path)
contextHandler.setAllowNullPathInfo(true)
contextHandler.setHandler(handler)
contextHandler
}
val handlerList = new HandlerList
handlerList.setHandlers(handlersToRegister.toArray)
@tailrec
def connect(currentPort: Int): (Server, Int) = {
val server = new Server(currentPort)
val pool = new QueuedThreadPool
pool.setDaemon(true)
server.setThreadPool(pool)
server.setHandler(handlerList)
Try {
server.start()
} match {
case s: Success[_] =>
(server, server.getConnectors.head.getLocalPort)
case f: Failure[_] =>
server.stop()
logInfo("Failed to create UI at port, %s. Trying again.".format(currentPort))
logInfo("Error was: " + f.toString)
connect((currentPort + 1) % 65536)
}
}
connect(port)
}
def bind() {
try {
val (srv, usedPort) = startJettyServer("0.0.0.0", port, handlers)
logInfo("Started Streaming Launcher UI at port %d".format(usedPort))
server = Some(srv)
boundPort = Some(usedPort)
} catch {
case e: Exception =>
logError("Failed to create Spark JettyUtils", e)
System.exit(1)
}
}
}
//===============================================
// DATADOG SINK
//===============================================
import com.codahale.metrics.MetricRegistry
// Requires com.clipperz.metrics-datadog artifact (not in Maven)
// Compile from https://github.com/clipperz/metrics-datadog
import com.codahale.metrics.reporting.{DatadogReporter, HttpTransport}
import java.util.concurrent.TimeUnit
import java.util.Properties
import org.apache.spark.metrics.sink.Sink
/** Sink to report metrics to Datadog */
class DatadogSink(val property: Properties, val registry: MetricRegistry, securityMgr: SecurityManager) extends Sink {
val DD_KEY_PERIOD = "period"
val DD_DEFAULT_PERIOD = 10L
val DD_KEY_UNIT = "unit"
val DD_DEFAULT_UNIT = TimeUnit.SECONDS
val DD_API_KEY = "apikey"
val DD_KEY_HOST = "host"
val DD_DEFAULT_HOST = ""
def propertyToOption(prop: String) = Option(property.getProperty(prop))
if (!propertyToOption(DD_API_KEY).isDefined) {
throw new Exception("Datadog sink requires 'apikey' property.")
}
val pollPeriod = propertyToOption(DD_KEY_PERIOD).map(_.toLong)
.getOrElse(DD_DEFAULT_PERIOD)
val pollUnit = propertyToOption(DD_KEY_UNIT).map(u => TimeUnit.valueOf(u.toUpperCase))
.getOrElse(DD_DEFAULT_UNIT)
val host = propertyToOption(DD_KEY_HOST).getOrElse(DD_DEFAULT_HOST)
val apikey = propertyToOption(DD_API_KEY).get
val transport = new HttpTransport("app.datadoghq.com",apikey)
val reporter = DatadogReporter.forRegistry(registry)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build(transport,host)
override def start {
reporter.start(pollPeriod, pollUnit)
}
override def stop {
reporter.stop()
}
}
@binarymechanic
Copy link

What package do you add this to? I tried org.apache.spark.metrics.source and got:

Error:(80, 7) private class InstrumentationSource escapes its defining scope as part of type SparkInstrumentation.this.InstrumentationSource
val source = new InstrumentationSource(prefix)
^

thanks

@kernel164
Copy link

  1. placed SparkInstrumentation in package org.apache.spark.metrics.source
  2. and then removed private keyword from "private class InstrumentationSource"
  3. and then had to add (provided "org.eclipse.jetty:jetty-servlet:8.1.14.v20131031") in my build.gradle to fix this issue - https://www.mail-archive.com/user@spark.apache.org/msg25445.html

@binarymechanic
Copy link

@kernel141 - awesome - thanks for the additional info

@harleensahni
Copy link

@ibuenros Is there any particular licensing attached with this file or is it completely free to use without restriction? It looks really useful for some spark monitoring we're intending to do for one of our projects. Thanks.

@mcwhitak
Copy link

Sadly in 2016 this is still the best solution for user defined accumulator metrics.

@KenpachiRules
Copy link

KenpachiRules commented May 8, 2018

I tried mimicking the custom metrics class and registered it but sadly it does not get written to Graphite sink
. This is structured streaming spark hence I am not sure whether there is a limitation on that , but I believe it shouldnt be a limitation . Attaching code snippets.

package com.hari.spark.kafka.struct.streaming

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.Dataset
import org.apache.spark.Accumulator
import com.codahale.metrics._
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.types.StructType

object ReadWriteKafka {

def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder.appName("TestGrafana").getOrCreate

import sparkSession.implicits._
val rowsPassed = sparkSession.sparkContext.accumulator[Long](0, "rowsPassed")
CustomRowsProcessedMetrics.regiserCustomMetrics("rowsPassed", rowsPassed)
val readFromKafka = sparkSession.readStream.format("kafka").option("kafka.bootstrap.servers", "10.65.137.104:9092").option("subscribe", "GrafanaSource").
  option("enable.auto.commit", "false").option("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  .option("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer").option("failOnDataLoss", "false").option("auto.offset.reset", "latest").load
// perform same transformation
//implicit val employeeEncoder: Encoder[(String, String)] = Encoders.tuple(Encoders.STRING, Encoders.STRING)
val keyValueTuples = readFromKafka.selectExpr("CAST(key as STRING)", "CAST(value as STRING)").as[(String, String)]
val rowProcessedStore = countRowsProcessed(rowsPassed,keyValueTuples.filter(tup => tup._2.contains("ichigo")).map(tup => (tup._1, tup._2.toUpperCase)))
rowProcessedStore.printSchema
val writeToKafka = rowProcessedStore.writeStream.format("kafka").option("kafka.bootstrap.servers", "10.65.137.104:9092").
  option("key.serializer", "org.apache.kafka.common.serialization.StringSerializer").option("checkpointLocation", "/Hari/hdfs/staging1/").
  option("value.serializer", "org.apache.kafka.common.serialization.StringSerializer").option("topic", "GrafanaTarget").start
writeToKafka.awaitTermination

}

def countRowsProcessed(acc: Accumulator[Long], ds: Dataset[(String, String)]): Dataset[(String, String)] = {
implicit val employeeEncoder: Encoder[(String, String)] = Encoders.tuple(Encoders.STRING, Encoders.STRING)
ds.map {
x =>
acc += 1
x
}.toDF("key", "value").as[(String, String)]
}

}


import com.codahale.metrics.{ MetricRegistry, Meter, Gauge }
import org.apache.spark.Accumulator
import scala.collection.mutable.Map
import org.apache.spark.SparkEnv
import org.apache.spark.metrics.source.CustomMetrics

package com.hari.spark.kafka.struct.streaming {
object CustomRowsProcessedMetrics extends Serializable {

def regiserCustomMetrics(metricsName: String, acc: Accumulator[Long]) {
  import org.apache.spark.metrics.source.CustomMetrics
  val customMets = new CustomMetrics(metricsName, Map[String, Long](), Map[String, Meter]())
  customMets.regGaugeAndMeter(acc, metricsName)
  SparkEnv.get.metricsSystem.registerSource(customMets)
  val test = SparkEnv.get.metricsSystem.getSourcesByName(metricsName)
  println(test)
}

}
}

package org.apache.spark.metrics.source {

class CustomMetrics(metricsName: String, gauge1: Map[String, Long], metrics: Map[String, Meter]) extends Source {

def metricRegistry = new MetricRegistry()
def sourceName = metricsName
// update the metrics with time series

def regGaugeAndMeter(acc: Accumulator[Long], metricsName: String): Unit = {
  import com.codahale.metrics.{ MetricRegistry, Meter, Gauge }
  gauge1 += (metricsName -> 0L)
  metrics += (metricsName -> metricRegistry.meter(metricsName+"-rate"))
  metricRegistry.register(
    MetricRegistry.name(metricsName),
    new Gauge[Long] {
      override def getValue: Long = {
        metrics(metricsName).mark(acc.value - gauge1(metricsName))
        gauge1(metricsName) = acc.value
        println("The incremented values are  --->  " +acc.value )
        return acc.value
      }
    })
}

}

}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment