Skip to content

Instantly share code, notes, and snippets.

@tgodfrey
Last active June 5, 2020 15:30
Show Gist options
  • Save tgodfrey/1a67753d51cb202ca8eb04b933cec924 to your computer and use it in GitHub Desktop.
Save tgodfrey/1a67753d51cb202ca8eb04b933cec924 to your computer and use it in GitHub Desktop.
Elixir Module for working with MessageBus
defmodule MyApp.Redix do
@pool_size 5
# How long, in seconds, to keep messages in the backlog
@max_backlog_age 604800
# How many messages may be kept in the global backlog
@max_global_backlog_size 2000
# How many messages may be kep in the per-channel backlog
@max_backlog_size 1000
@global_id_key "__mb_global_id_n"
@global_backlog_key "__mb_global_backlog_n"
# Lua script to publish new messages on Redis. This script comes from
# the MessageBus source:
# https://github.com/SamSaffron/message_bus/blob/master/lib/message_bus/backends/redis.rb
@lua_publish """
local start_payload = ARGV[1]
local max_backlog_age = ARGV[2]
local max_backlog_size = tonumber(ARGV[3])
local max_global_backlog_size = tonumber(ARGV[4])
local channel = ARGV[5]
local global_id_key = KEYS[1]
local backlog_id_key = KEYS[2]
local backlog_key = KEYS[3]
local global_backlog_key = KEYS[4]
local redis_channel_name = KEYS[5]
local global_id = redis.call("INCR", global_id_key)
local backlog_id = redis.call("INCR", backlog_id_key)
local payload = string.format("%i|%i|%s", global_id, backlog_id, start_payload)
local global_backlog_message = string.format("%i|%s", backlog_id, channel)
redis.call("ZADD", backlog_key, backlog_id, payload)
redis.call("EXPIRE", backlog_key, max_backlog_age)
redis.call("ZADD", global_backlog_key, global_id, global_backlog_message)
redis.call("EXPIRE", global_backlog_key, max_backlog_age)
redis.call("PUBLISH", redis_channel_name, payload)
redis.call("EXPIRE", backlog_id_key, max_backlog_age)
if backlog_id > max_backlog_size then
redis.call("ZREMRANGEBYSCORE", backlog_key, 1, backlog_id - max_backlog_size)
end
if global_id > max_global_backlog_size then
redis.call("ZREMRANGEBYSCORE", global_backlog_key, 1, global_id - max_global_backlog_size)
end
return backlog_id
"""
def child_spec(_args) do
# Specs for the Redix connections.
children =
for i <- 0..(@pool_size - 1) do
Supervisor.child_spec({Redix, name: :"redix_#{i}"}, id: {Redix, i})
end
%{
id: RedixSupervisor,
type: :supervisor,
start: {Supervisor, :start_link, [children, [strategy: :one_for_one]]}
}
end
def command(command) do
Redix.command(:"redix_#{random_index()}", command)
end
defp random_index() do
rem(System.unique_integer([:positive]), @pool_size)
end
def publish(channel, message) do
# MessageBus requires the message to be the value for the attribute "data"
data = Jason.encode!(%{data: message})
# Comments below are the corresponding variable names in the Lua script
publish_args = [
encode_without_ids(channel, data), # start_payload
@max_backlog_age, # max_backlog_age
@max_backlog_size, # max_backlog_size
@max_global_backlog_size, # max_global_backlog_size
channel # channel
]
publish_keys = [
@global_id_key, # global_id_key
backlog_id_key(channel), # backlog_id_key
backlog_key(channel), # backlog_key
@global_backlog_key, # global_backlog_key
redis_channel_name() # redis_channel_name
]
cached_eval(@lua_publish, publish_keys, publish_args)
end
defp encode_without_ids(channel, data) do
String.replace(channel, "|", "$$123$$") <> "|" <> data
end
defp backlog_id_key(channel) do
"__mb_backlog_id_n_#{channel}"
end
defp backlog_key(channel) do
"__mb_backlog_n_#{channel}"
end
defp redis_channel_name() do
db = Application.get_env(:my_app, MyAppWeb.Endpoint)[:redis_db] || 0
"_message_bus_#{db}"
end
# Redis has two commands for executing a Lua script, EVAL and EVALSHA. When a
# script is executed on a Redis server, it store the script, and uses the SHA1
# of the script as its key. This means that if you know what the hash is,
# you can execute the script, without actually having the text of the script. The
# following function copies the technique in MessageBus, where the function first
# attempts to use EVALSHA to execute the script by passing in the SHA1 for the
# script. If that fails with a NOSCRIPT error, the script is passed instead.
defp cached_eval(script, keys, args) do
script_sha1 = :crypto.hash(:sha, @lua_publish)
# Commands are send to Redis as a list of strings, one each for the command,
# and each argument or key or any other value which is to be passed to that
# command. List.flatten/1 is used to eliminate the nested lists, but
# preserve their values.
evalsha_cmd = List.flatten(["EVALSHA", script_sha1, 5, keys, args])
eval_cmd = List.flatten(["EVAL", script, 5, keys, args])
case MyApp.Redix.command(evalsha_cmd) do
{:ok, backlog_id} -> {:ok, backlog_id}
{
:error,
%Redix.Error{message: "NOSCRIPT No matching script. Please use EVAL."}
}
-> MyApp.Redix.command(eval_cmd)
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment