learning_go/testing/go-code-samples/get-started-with-redis/pubsub.go
2025-12-20 17:00:05 +03:00

165 lines
4.5 KiB
Go

package main
import (
"context"
"fmt"
"github.com/redis/go-redis/v9"
"time"
)
// Task: Send each challenge to the "challenge" channel.
// Each team is subscribed to the "challenge" channel and enters the challenge upon receiving.
const (
// the name of the PubSub channel
pubsubChan = "challenge"
)
// Res is the result of reading the pubsub channel
type Res struct {
result string
err error
}
// Team manages a team's subscription
// Each team uses its own client to subscribe to the "challenge" channel
type Team struct {
name string
client *redis.Client
channel *redis.PubSub
}
// getTeams scans the database for "team:*" keys
// and returns a slice of Team structs with names filled from the keys
func getTeams(client *redis.Client) []Team {
ctx := context.Background()
teams := make([]Team, 3)
teamsets := make([]string, 0, 3)
keys := make([]string, 0, 3)
var cursor uint64
for {
// Scan returns a slice of matches. The count may or may not be reached
// in the first call to Scan, so the code needs to call Scan in a loop and
// append the found keys to the teamsets slice until the cursor "returns to 0".
var err error
keys, cursor, err = client.Scan(ctx, cursor, "team:*", 3).Result()
if err != nil {
break
}
teamsets = append(teamsets, keys...)
if cursor == 0 {
break
}
}
// Lazily assume that the scan has returned 3 team sets
for i := 0; i < 3; i++ {
teams[i].name = teamsets[i]
// each team uses its own client
teams[i].client = newClient(dbconn, 0)
}
return teams
}
// subscribe subscribes to the "challenge" channel
// and waits for the subscription to be completed
func (team *Team) subscribe() error {
ctx := context.Background()
// Subscribe to the "challenge" channel
pubSub := team.client.Subscribe(ctx, pubsubChan)
// The first Subscribe() call creates the channel.
// Until that point, any attempt to publish something fails.
reply, err := pubSub.Receive(ctx)
if err != nil {
return fmt.Errorf("subscribing to channel '%s' failed: %w", pubsubChan, err)
}
// Expected response type is "*Subscription". Otherwise, something failed.
switch reply.(type) {
case *redis.Subscription:
// Success!
case *redis.Message:
// The channel is already active and contains messages, hence also a success
case *redis.Pong:
// letL's call it a success
default:
return fmt.Errorf("subscribing to a channel failed: received a reply of type %T, expected: *redis.Subscription", reply)
}
team.channel = pubSub
fmt.Printf("%s subscribed to channel '%s'\n", team.name, pubsubChan)
return nil
}
// receive receives messages from the "challenge" channel.
// It starts a goroutine that reads from the pubsub channel until
// the channel is closed or the context is done.
func (team *Team) receive(ctx context.Context, resChan chan<- Res) {
ch := team.channel.Channel()
defer close(resChan)
for {
select {
case msg, ok := <-ch:
if !ok {
// The pubsub channel has been closed
return
}
resChan <- Res{fmt.Sprintf("%s received challenge '%s'", team.name, msg.Payload), nil}
case <-ctx.Done():
resChan <- Res{"", ctx.Err()}
return
}
}
}
// publish publishes the challenge to the "challenge" channel
func publish(client *redis.Client, challenge string) error {
ctx := context.Background()
fmt.Printf("publishing challenge '%s'\n", challenge)
return client.Publish(ctx, pubsubChan, challenge).Err()
}
// pubsub subscribes to the "challenge" channel, publishes the challenges,
// and receives the published messages.
func pubsub(client *redis.Client) (err error) {
ctx := context.Background()
// Step 1: subscribe each team
teams := getTeams(client)
for i := 0; i < 3; i++ {
err = teams[i].subscribe()
if err != nil {
return fmt.Errorf("subscribing failed: %w", err)
}
}
// Step 2: publish challenges
// Read the challenges from the sorted set "challenges" and publish them
for i := int64(0); i < 5; i++ {
challenge := client.ZRange(ctx, "challenges", i, i).Val()[0]
err = publish(client, challenge)
if err != nil {
return fmt.Errorf("cannot publish challenge %s: %w", challenge, err)
}
}
// Close the channel after one second, to terminate the receive loops.
time.AfterFunc(time.Second, func() {
teams[0].channel.Close()
fmt.Println(`PubSub channel "challenges" closed`)
})
// Step 3: receive published messages
rch := make(chan Res)
for i := 0; i < 3; i++ {
go teams[i].receive(ctx, rch)
}
for msg := range rch {
if msg.err != nil {
return fmt.Errorf("cannot receive challenge: %w", msg.err)
}
fmt.Println(msg.result)
}
return nil
}