Skip to content

Instantly share code, notes, and snippets.

@francisdb
Last active October 16, 2017 12:19
Show Gist options
  • Save francisdb/a31100c18129f1bab75efe87945afffa to your computer and use it in GitHub Desktop.
Save francisdb/a31100c18129f1bab75efe87945afffa to your computer and use it in GitHub Desktop.
Akka streams utilities
import akka.NotUsed
import akka.stream.scaladsl.{Flow, Source} `
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.stream.Attributes._
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import scala.collection.immutable.Seq
object AkkaStreams{
/**
* Lifts a Flow to be able to be applied to a flow of Sequences
*/
def liftToSeq[I, O](flow: Flow[I,O, _]): Flow[Seq[I], Seq[O], NotUsed] =
Flow[Seq[I]].flatMapConcat{ ss =>
Source(ss).via(flow).grouped(Int.MaxValue)
}
val stringLength = Flow[String].map(_.length)
val stringLengthSeq = liftToSeq(stringLength)
}
/**
* Performs an optimized grouping by key expecting the incoming flow to be ordered by the key
*
* @param p the key extractor
*/
final case class GroupAdjacent[T, A](p: T => A) extends GraphStage[FlowShape[T, Seq[T]]] {
private val in = Inlet[T]("GroupedBySequential.in")
private val out = Outlet[Seq[T]]("GroupedBySequential.out")
override val shape = FlowShape.of(in, out)
override protected val initialAttributes: Attributes = name("groupedBySequential")
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
private var lastKey: Option[A] = None
// TODO should we set a size hint on the builder?
private val buf = Vector.newBuilder[T]
override def onPush(): Unit = {
val currentValue = grab(in)
val key = p(currentValue)
if(lastKey.isEmpty){
lastKey = Some(key)
}
if(lastKey.contains(key)){
buf += currentValue
pull(in)
}else{
lastKey = Some(key)
val elements = buf.result()
buf.clear()
buf += currentValue
push(out, elements)
}
}
override def onPull(): Unit = {
pull(in)
}
override def onUpstreamFinish(): Unit = {
// Since the upstream has finished we have to push the buffer downStream
val elements = buf.result()
if (elements.nonEmpty) {
buf.clear()
// push(out, elements)
// use emit as out might not yet be ready to receive
emit(out, elements)
}
completeStage()
}
setHandlers(in, out, this)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment