Improve resiliency of STOMP subscription

This commit is contained in:
Fred Boniface 2024-04-24 23:09:13 +01:00
parent f3745da86e
commit 5b9c444ac5
1 changed files with 44 additions and 18 deletions

View File

@ -1,37 +1,63 @@
package vstp package vstp
import ( import (
"fmt"
"time" "time"
"git.fjla.uk/owlboard/timetable-mgr/log" "git.fjla.uk/owlboard/timetable-mgr/log"
"git.fjla.uk/owlboard/timetable-mgr/messaging" "git.fjla.uk/owlboard/timetable-mgr/messaging"
"github.com/go-stomp/stomp/v3" "github.com/go-stomp/stomp/v3"
"go.uber.org/zap"
) )
func Subscribe() { func Subscribe() {
sub, err := messaging.Client.Subscribe("/topic/VSTP_ALL", stomp.AckAuto) var sub *stomp.Subscription
if err != nil { var err error
log.Fatal("Unable to start subscription: " + err.Error()) retryCount := 0
maxRetries := 5
for retryCount < maxRetries {
sub, err = messaging.Client.Subscribe("/topic/VSTP_ALL", stomp.AckAuto)
if err != nil {
log.Warn("Unable to start subscription", zap.Error(err))
time.Sleep(10 * time.Second)
retryCount++
continue
}
break
} }
if sub == nil {
log.Fatal("Failed to subscribe to VSTP topic", zap.Int("attempts", maxRetries))
}
log.Info("Subscription to VSTP topic successful, listening") log.Info("Subscription to VSTP topic successful, listening")
go func() { go func() {
log.Debug("GOROUTINE: VSTP Message Handler Started")
defer func() {
if r := recover(); r != nil {
log.Warn("GOROUTINE: VSTP Message Handler Stopped")
time.Sleep(time.Second * 10)
log.Fatal("GOROUTINE: VSTP Message Handler Failed")
}
}()
for { for {
msg := <-sub.C func() {
if msg.Err != nil { defer func() {
log.Error("STOMP Message Error: " + msg.Err.Error()) if r := recover(); r != nil {
} else { log.Warn("VSTP Message Handler Stopped, waiting for recovery")
log.Info("STOMP Message Received") time.Sleep(time.Second * 10)
handle(msg) }
} }()
log.Info("VSTP Message handler started")
for {
msg := <-sub.C
if msg.Err != nil {
log.Error("STOMP Message Error", zap.Error(msg.Err))
} else {
if msg != nil {
log.Info("STOMP Message Received")
fmt.Println(string(msg.Body))
handle(msg)
} else {
log.Info("STOMP Message Empty")
}
}
}
}()
} }
}() }()