Skip to content

Instantly share code, notes, and snippets.

Last active October 14, 2022 10:32
Show Gist options
  • Save johnynek/689199b4ac49364e7c94abef996ae59f to your computer and use it in GitHub Desktop.
Save johnynek/689199b4ac49364e7c94abef996ae59f to your computer and use it in GitHub Desktop.
merge sorted streams with fs2
import fs2.{Chunk, Stream, Pull}
import cats.collections.Heap
import cats.implicits._
object SortMerge {
def sortMerge[F[_], A: Ordering](streams: List[Stream[F, A]]): Stream[F, A] = {
implicit val ord: cats.Order[Stream.StepLeg[F, A]] =
new cats.Order[Stream.StepLeg[F, A]] {
val ordA = implicitly[Ordering[A]]
def compare(left: Stream.StepLeg[F, A], right: Stream.StepLeg[F, A]): Int = {
if (left.head.isEmpty) {
// prefer to step so we don't skip items
if (right.head.isEmpty) 0 else -1
else if (right.head.isEmpty) {
// we need to step so we don't misorder items
else {
// neither are empty just compare the head, right.head(0))
def go(heap: Heap[Stream.StepLeg[F, A]]): Pull[F, A, Unit] =
heap.pop match {
case Some((sl, rest)) =>
if (sl.head.nonEmpty) {
Pull.output1(sl.head(0)) >> {
val nextSl = sl.setHead(sl.head.drop(1))
val nextHeap = rest.add(nextSl)
else {
// this chunk is done
.flatMap {
case Some(nextSl) =>
val nextHeap = rest.add(nextSl)
case None =>
// this leg is exhausted
case None => Pull.done
def heapOf(ls: List[Stream.StepLeg[F, A]]): Heap[Stream.StepLeg[F, A]] =
val heap: Pull[F, fs2.INothing, Heap[Stream.StepLeg[F, A]]] =
.map { ls => heapOf(ls.flatten) }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment