Skip to content

Instantly share code, notes, and snippets.

@artisonian
Last active March 21, 2024 20:13
Show Gist options
  • Star 40 You must be signed in to star a gist
  • Fork 8 You must be signed in to fork a gist
  • Save artisonian/3836281 to your computer and use it in GitHub Desktop.
Save artisonian/3836281 to your computer and use it in GitHub Desktop.
go-eventsource
eventsource
go-eventsource
client/client

go-eventsource

A demo of server-sent events (i.e., EventSource).

Hat tip to athoune/broadcast-event-source for an example of SSE in Go.

Example with cURL:

curl -X POST -H "Content-Type: application/json" http://localhost:8888/update -d '{"sender": "cURL", "text":"Hello"}'

Example with Go client:

go run client.go
package main
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"time"
)
func main() {
b, err := json.Marshal(struct{ Sender, Text string }{
Sender: "Go client",
Text: fmt.Sprintf("The time is %s", time.Now().Format(time.Kitchen)),
})
check(err)
buf := bytes.NewBuffer(b)
resp, err := http.Post("http://localhost:8888/update", "application/json", buf)
check(err)
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
check(err)
fmt.Println(string(body))
}
func check(err error) {
if err != nil {
log.Fatalln(err)
}
}
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"sync"
)
// Message is the type of messages from the client
type Message struct {
Sender string `json:"sender"`
Text string `json:"text"`
}
func (m Message) String() string {
return fmt.Sprintf("%s: %s", m.Sender, m.Text)
}
// Response is the type of response to the client
type Response struct {
Success bool `json:"success"`
Message string `json:"message"`
}
func (r Response) String() string {
b, err := json.Marshal(r)
if err != nil {
log.Println("could not marshal response:", err)
return ""
}
return string(b)
}
// Broker accepts subscriptions from clients and publishes messages to them all
type Broker struct {
subscribers map[chan Message]bool
sync.Mutex
}
// Subscribe adds a client to the broker
func (b *Broker) Subscribe() chan Message {
b.Lock()
defer b.Unlock()
log.Println("subscribing to broker")
ch := make(chan Message)
b.subscribers[ch] = true
return ch
}
// Unsubscribe removes a client from the broker
func (b *Broker) Unsubscribe(ch chan Message) {
b.Lock()
defer b.Unlock()
log.Println("unsubscribing from broker")
close(ch)
delete(b.subscribers, ch)
}
// Publish sends a slice of bytes to all subscribed clients
func (b *Broker) Publish(msg Message) {
b.Lock()
defer b.Unlock()
log.Printf("Publishing to %d subscribers\n", len(b.subscribers))
for ch := range b.subscribers {
ch <- msg
}
}
// NewBroker creates a new broker
func NewBroker() *Broker {
return &Broker{subscribers: make(map[chan Message]bool)}
}
func messageHandler(b *Broker) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
if r.Header.Get("Content-Type")[:16] != "application/json" {
http.Error(w, "Content-Type must be application/json", http.StatusNotAcceptable)
return
}
var m Message
dec := json.NewDecoder(r.Body)
if err := dec.Decode(&m); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
b.Publish(m)
if r.Header.Get("Accept") == "application/json" {
w.Header().Set("Content-Type", "application/json")
fmt.Fprint(w, Response{Success: true, Message: "OK"})
return
}
fmt.Fprintln(w, "OK")
}
}
func (b *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
f, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming unsupported", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
ch := b.Subscribe()
defer b.Unsubscribe(ch)
for {
select {
case msg := <-ch:
fmt.Fprintf(w, "data: %s\n\n", msg)
f.Flush()
case <-ctx.Done():
return
}
}
}
func main() {
msgBroker := NewBroker()
http.Handle("/events", msgBroker)
http.HandleFunc("/update", messageHandler(msgBroker))
http.Handle("/", http.FileServer(http.Dir(".")))
err := http.ListenAndServe("localhost:8888", nil)
if err != nil {
panic(err)
}
}
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8" />
<title>EventSource Demo</title>
<style>
* {
box-sizing: border-box;
}
body {
background-color: whitesmoke;
font: 100%/1.5 system-ui, -apple-system, sans-serif;
padding: 1rem 3rem;
}
.container {
max-width: 45rem;
margin: 0 auto;
padding: 1.25rem;
background-color: white;
border: 1px solid lightgray;
}
.container > * + * {
margin-top: 1.5rem;
}
h1 {
margin-top: 0;
font-size: 2rem;
font-weight: normal;
color: gray;
}
#chat [type="text"] {
width: 100%;
padding: 0.25rem 0;
border-width: 0 0 2px 0;
border-style: solid;
border-color: dimgray;
font-size: 1.25rem;
transition: all 0.3s cubic-bezier(0, 0, 0.3, 1);
}
#chat [type="text"]:focus {
outline: none;
border-color: dodgerblue;
}
#messages {
max-height: 30rem;
overflow-y: auto;
}
</style>
</head>
<body>
<div class="container">
<h1>EventSource Demo</h1>
<form id="chat">
<input type="hidden" name="sender" />
<input
type="text"
name="message"
placeholder="Send a message..."
autofocus
/>
</form>
<div id="messages"></div>
</div>
<script type="module">
const form = document.querySelector("#chat");
form.sender.value = `u-${Math.random()
.toString(32)
.substr(2, 7)}`;
form.addEventListener("submit", async function(event) {
event.preventDefault();
const input = event.target.message;
const text = input.value.trim();
if (!text) return;
const res = await fetch("/update", {
method: "POST",
headers: {
"Content-Type": "application/json"
},
body: JSON.stringify({ text, sender: event.target.sender.value })
});
if (!res.ok) {
console.error("could not send message", res.statusText);
} else {
console.log(await res.text());
input.value = "";
}
});
const source = new EventSource("/events");
source.addEventListener("open", function(event) {
console.log("eventsource connection open");
});
source.addEventListener("error", function(event) {
if (event.target.readyState === 0) {
console.log("reconnecting to eventsource");
} else {
console.error("eventsource error");
}
});
source.addEventListener("message", function(event) {
const el = document.createElement("div");
el.textContent = event.data.replace(form.sender.value, "You said");
document.querySelector("#messages").prepend(el);
});
</script>
</body>
</html>
@artisonian
Copy link
Author

Note: index.html, style.css, and app.js are meant to be served from the static directory.

@artisonian
Copy link
Author

Hat tip to athoune/broadcast-event-source for code to flush a ResponseWriter.

@artisonian
Copy link
Author

I've added message broadcasting. Here's an example using cURL:

$ curl -X POST -H "Content-Type: application/json" http://localhost:8888/update -d '{"text":"Hello from cURL"}'

@artisonian
Copy link
Author

Hap tip to Nick for JSON response encoding.

@owickstrom
Copy link

Very nice! Hopefully I'll be using parts of this for a Go lab. :) Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment