165 lines
4.5 KiB
Go
165 lines
4.5 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"github.com/redis/go-redis/v9"
|
|
"time"
|
|
)
|
|
|
|
// Task: Send each challenge to the "challenge" channel.
|
|
// Each team is subscribed to the "challenge" channel and enters the challenge upon receiving.
|
|
|
|
const (
|
|
// the name of the PubSub channel
|
|
pubsubChan = "challenge"
|
|
)
|
|
|
|
// Res is the result of reading the pubsub channel
|
|
type Res struct {
|
|
result string
|
|
err error
|
|
}
|
|
|
|
// Team manages a team's subscription
|
|
// Each team uses its own client to subscribe to the "challenge" channel
|
|
type Team struct {
|
|
name string
|
|
client *redis.Client
|
|
channel *redis.PubSub
|
|
}
|
|
|
|
// getTeams scans the database for "team:*" keys
|
|
// and returns a slice of Team structs with names filled from the keys
|
|
func getTeams(client *redis.Client) []Team {
|
|
ctx := context.Background()
|
|
teams := make([]Team, 3)
|
|
teamsets := make([]string, 0, 3)
|
|
keys := make([]string, 0, 3)
|
|
var cursor uint64
|
|
for {
|
|
// Scan returns a slice of matches. The count may or may not be reached
|
|
// in the first call to Scan, so the code needs to call Scan in a loop and
|
|
// append the found keys to the teamsets slice until the cursor "returns to 0".
|
|
var err error
|
|
keys, cursor, err = client.Scan(ctx, cursor, "team:*", 3).Result()
|
|
if err != nil {
|
|
break
|
|
}
|
|
teamsets = append(teamsets, keys...)
|
|
if cursor == 0 {
|
|
break
|
|
}
|
|
}
|
|
// Lazily assume that the scan has returned 3 team sets
|
|
for i := 0; i < 3; i++ {
|
|
teams[i].name = teamsets[i]
|
|
// each team uses its own client
|
|
teams[i].client = newClient(dbconn, 0)
|
|
}
|
|
return teams
|
|
}
|
|
|
|
// subscribe subscribes to the "challenge" channel
|
|
// and waits for the subscription to be completed
|
|
func (team *Team) subscribe() error {
|
|
ctx := context.Background()
|
|
// Subscribe to the "challenge" channel
|
|
pubSub := team.client.Subscribe(ctx, pubsubChan)
|
|
|
|
// The first Subscribe() call creates the channel.
|
|
// Until that point, any attempt to publish something fails.
|
|
reply, err := pubSub.Receive(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("subscribing to channel '%s' failed: %w", pubsubChan, err)
|
|
}
|
|
// Expected response type is "*Subscription". Otherwise, something failed.
|
|
switch reply.(type) {
|
|
case *redis.Subscription:
|
|
// Success!
|
|
case *redis.Message:
|
|
// The channel is already active and contains messages, hence also a success
|
|
case *redis.Pong:
|
|
// letL's call it a success
|
|
default:
|
|
return fmt.Errorf("subscribing to a channel failed: received a reply of type %T, expected: *redis.Subscription", reply)
|
|
}
|
|
|
|
team.channel = pubSub
|
|
|
|
fmt.Printf("%s subscribed to channel '%s'\n", team.name, pubsubChan)
|
|
return nil
|
|
}
|
|
|
|
// receive receives messages from the "challenge" channel.
|
|
// It starts a goroutine that reads from the pubsub channel until
|
|
// the channel is closed or the context is done.
|
|
func (team *Team) receive(ctx context.Context, resChan chan<- Res) {
|
|
ch := team.channel.Channel()
|
|
defer close(resChan)
|
|
for {
|
|
select {
|
|
case msg, ok := <-ch:
|
|
if !ok {
|
|
// The pubsub channel has been closed
|
|
return
|
|
}
|
|
resChan <- Res{fmt.Sprintf("%s received challenge '%s'", team.name, msg.Payload), nil}
|
|
case <-ctx.Done():
|
|
resChan <- Res{"", ctx.Err()}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// publish publishes the challenge to the "challenge" channel
|
|
func publish(client *redis.Client, challenge string) error {
|
|
ctx := context.Background()
|
|
fmt.Printf("publishing challenge '%s'\n", challenge)
|
|
return client.Publish(ctx, pubsubChan, challenge).Err()
|
|
}
|
|
|
|
// pubsub subscribes to the "challenge" channel, publishes the challenges,
|
|
// and receives the published messages.
|
|
func pubsub(client *redis.Client) (err error) {
|
|
ctx := context.Background()
|
|
|
|
// Step 1: subscribe each team
|
|
teams := getTeams(client)
|
|
for i := 0; i < 3; i++ {
|
|
err = teams[i].subscribe()
|
|
if err != nil {
|
|
return fmt.Errorf("subscribing failed: %w", err)
|
|
}
|
|
}
|
|
|
|
// Step 2: publish challenges
|
|
// Read the challenges from the sorted set "challenges" and publish them
|
|
for i := int64(0); i < 5; i++ {
|
|
challenge := client.ZRange(ctx, "challenges", i, i).Val()[0]
|
|
err = publish(client, challenge)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot publish challenge %s: %w", challenge, err)
|
|
}
|
|
}
|
|
// Close the channel after one second, to terminate the receive loops.
|
|
time.AfterFunc(time.Second, func() {
|
|
teams[0].channel.Close()
|
|
fmt.Println(`PubSub channel "challenges" closed`)
|
|
})
|
|
|
|
// Step 3: receive published messages
|
|
rch := make(chan Res)
|
|
for i := 0; i < 3; i++ {
|
|
go teams[i].receive(ctx, rch)
|
|
}
|
|
for msg := range rch {
|
|
if msg.err != nil {
|
|
return fmt.Errorf("cannot receive challenge: %w", msg.err)
|
|
}
|
|
fmt.Println(msg.result)
|
|
}
|
|
|
|
return nil
|
|
}
|