Skip to content

Instantly share code, notes, and snippets.

@zachdaniel
Last active May 7, 2024 17:20
Show Gist options
  • Save zachdaniel/d5ab06a9d2362fceeb6d27c37b206e28 to your computer and use it in GitHub Desktop.
Save zachdaniel/d5ab06a9d2362fceeb6d27c37b206e28 to your computer and use it in GitHub Desktop.
A small demo to show how you might, given a stream, do a "fan out", processing different elements in separate streams. Powered by simple primitives like `Stream.resource` and `spawn_link`. Open in Livebook: https://livebook.dev/run?url=https%3A%2F%2Fgist.github.com%2Fzachdaniel%2Fd5ab06a9d2362fceeb6d27c37b206e28
<!-- livebook:{"persist_outputs":true} -->
# Distribute
## Section
A small toy to show how you might, given a stream, do a "fan out", processing different elements in separate streams. Powered by simple primitives like `Stream.resource` and `spawn_link`.
```elixir
defmodule Distribute do
@moduledoc """
Documentation for `Distribute`.
"""
def distribute(enum, key_fun, stream_funs, opts \\ []) do
stream_ref = make_ref()
collect? = Keyword.get(opts, :collect?, true)
Stream.transform(
enum,
fn -> %{} end,
fn i, pids ->
stream_id = key_fun.(i)
case Map.fetch(pids, stream_id) do
{:ok, {pid, _ref}} ->
send(pid, {:stream_distribute_push, stream_ref, i})
{[], pids}
:error ->
worker_ref = make_ref()
pid =
distributed_worker(
self(),
worker_ref,
stream_ref,
stream_funs[stream_id] || (& &1),
collect?
)
send(pid, {:stream_distribute_push, stream_ref, i})
{[], Map.put(pids, stream_id, {pid, worker_ref})}
end
end,
fn pids ->
if collect? do
{pids
|> Stream.flat_map(fn {_, {pid, worker_ref}} ->
send(pid, {:stream_distribute_stop, stream_ref})
Stream.resource(
fn -> nil end,
fn
:stop ->
{:halt, nil}
nil ->
receive do
{:stream_distribute_pull, ^worker_ref, ^stream_ref, :item, item} ->
{[item], nil}
{:stream_distribute_pull, ^worker_ref, ^stream_ref, :done} ->
{receive_all_pulls(worker_ref, stream_ref), :stop}
after
0 ->
{[], nil}
end
end,
fn nil -> :ok end
)
end), nil}
else
{[], nil}
end
end,
fn _ -> :ok end
)
end
defp distributed_worker(parent, worker_ref, stream_ref, stream_fun, collect?) do
spawn_link(fn ->
stream =
Stream.resource(
fn -> nil end,
fn
:stop ->
{:halt, nil}
nil ->
receive do
{:stream_distribute_push, ^stream_ref, item} ->
{[item], nil}
{:stream_distribute_stop, ^stream_ref} ->
{receive_all_pushes(stream_ref), :stop}
after
0 ->
{[], nil}
end
end,
fn nil -> :ok end
)
stream = stream_fun.(stream)
if collect? do
Enum.each(
stream,
&send(parent, {:stream_distribute_pull, worker_ref, stream_ref, :item, &1})
)
send(parent, {:stream_distribute_pull, worker_ref, stream_ref, :done})
[]
else
Stream.run(stream)
send(parent, {:stream_distribute_pull, worker_ref, stream_ref, :done})
end
end)
end
defp receive_all_pulls(worker_ref, stream_ref, acc \\ []) do
receive do
{:stream_distribute_pull, ^worker_ref, ^stream_ref, :item, item} ->
[item | acc]
after
0 -> acc
end
end
defp receive_all_pushes(stream_ref, acc \\ []) do
receive do
{:stream_distribute_push, ^stream_ref, item} ->
[item | acc]
after
0 -> acc
end
end
end
```
<!-- livebook:{"output":true} -->
```
{:module, Distribute, <<70, 79, 82, 49, 0, 0, 26, ...>>, {:receive_all_pushes, 2}}
```
```elixir
processed =
[{:a, 1}, {:a, 2}, {:b, 1}, {:b, 2}]
|> Distribute.distribute(&elem(&1, 0), %{
a: fn stream -> Stream.map(stream, &{:a, elem(&1, 1), self()}) end,
b: fn stream -> Stream.map(stream, &{:b, elem(&1, 1), self()}) end
})
|> Enum.to_list()
|> IO.inspect()
{as, bs} = Enum.split_with(processed, &(elem(&1, 0) == :a))
a_pids = as |> Enum.map(&elem(&1, 2)) |> Enum.uniq() |> IO.inspect()
b_pids = bs |> Enum.map(&elem(&1, 2)) |> Enum.uniq() |> IO.inspect()
a_pids != b_pids
```
<!-- livebook:{"output":true} -->
```
[
{:a, 1, #PID<0.201.0>},
{:a, 2, #PID<0.201.0>},
{:b, 1, #PID<0.202.0>},
{:b, 2, #PID<0.202.0>}
]
[#PID<0.201.0>]
[#PID<0.202.0>]
```
<!-- livebook:{"output":true} -->
```
true
```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment