Skip to content

Instantly share code, notes, and snippets.

@msbarry
Forked from quiiver/ElasticsearchSource.scala
Created June 16, 2017 19:11
Show Gist options
  • Save msbarry/8e16529d4c4960710f22778bb2af615a to your computer and use it in GitHub Desktop.
Save msbarry/8e16529d4c4960710f22778bb2af615a to your computer and use it in GitHub Desktop.
Scalding source for Elasticsearch
package com.twitter.scalding.sources
import cascading.tuple.Fields
import cascading.tap.Tap;
import org.elasticsearch.hadoop.cascading.EsTap
import com.twitter.scalding._
abstract class ElasticsearchSource extends Source {
val resource: String
val fields : Fields
val query : String
val host : String
val port : Int
val properties : Map[String,String]
lazy val props = {
val p = new java.util.Properties
p.setProperty("es.nodes", host)
if (properties.nonEmpty) {
properties.foreach(kv => p.setProperty(kv._1, kv._2))
}
p
}
override def createTap(readOrWrite : AccessMode)(implicit mode : Mode) : Tap[_,_,_] = readOrWrite match {
case Read => mode match {
case _ => new EsTap(host, port, resource, query, fields, props)
}
case Write => mode match {
case _ => new EsTap(host, port, resource, query, fields, props)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment