Skip to content

Instantly share code, notes, and snippets.

@quiiver
Created May 5, 2014 17:38
Show Gist options
  • Save quiiver/9dac6221ee47d6180e6a to your computer and use it in GitHub Desktop.
Save quiiver/9dac6221ee47d6180e6a to your computer and use it in GitHub Desktop.
Scalding source for Elasticsearch
package com.twitter.scalding.sources
import cascading.tuple.Fields
import cascading.tap.Tap;
import org.elasticsearch.hadoop.cascading.EsTap
import com.twitter.scalding._
abstract class ElasticsearchSource extends Source {
val resource: String
val fields : Fields
val query : String
val host : String
val port : Int
val properties : Map[String,String]
lazy val props = {
val p = new java.util.Properties
p.setProperty("es.nodes", host)
if (properties.nonEmpty) {
properties.foreach(kv => p.setProperty(kv._1, kv._2))
}
p
}
override def createTap(readOrWrite : AccessMode)(implicit mode : Mode) : Tap[_,_,_] = readOrWrite match {
case Read => mode match {
case _ => new EsTap(host, port, resource, query, fields, props)
}
case Write => mode match {
case _ => new EsTap(host, port, resource, query, fields, props)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment