From 5b9c444ac593d7df7921c0467cbfac48c8ba91be Mon Sep 17 00:00:00 2001 From: Fred Boniface Date: Wed, 24 Apr 2024 23:09:13 +0100 Subject: [PATCH] Improve resiliency of STOMP subscription --- vstp/subscribe.go | 62 +++++++++++++++++++++++++++++++++-------------- 1 file changed, 44 insertions(+), 18 deletions(-) diff --git a/vstp/subscribe.go b/vstp/subscribe.go index d1fd95b..5014709 100644 --- a/vstp/subscribe.go +++ b/vstp/subscribe.go @@ -1,37 +1,63 @@ package vstp import ( + "fmt" "time" "git.fjla.uk/owlboard/timetable-mgr/log" "git.fjla.uk/owlboard/timetable-mgr/messaging" "github.com/go-stomp/stomp/v3" + "go.uber.org/zap" ) func Subscribe() { - sub, err := messaging.Client.Subscribe("/topic/VSTP_ALL", stomp.AckAuto) - if err != nil { - log.Fatal("Unable to start subscription: " + err.Error()) + var sub *stomp.Subscription + var err error + retryCount := 0 + maxRetries := 5 + + for retryCount < maxRetries { + sub, err = messaging.Client.Subscribe("/topic/VSTP_ALL", stomp.AckAuto) + if err != nil { + log.Warn("Unable to start subscription", zap.Error(err)) + time.Sleep(10 * time.Second) + retryCount++ + continue + } + break } + + if sub == nil { + log.Fatal("Failed to subscribe to VSTP topic", zap.Int("attempts", maxRetries)) + } + log.Info("Subscription to VSTP topic successful, listening") go func() { - log.Debug("GOROUTINE: VSTP Message Handler Started") - defer func() { - if r := recover(); r != nil { - log.Warn("GOROUTINE: VSTP Message Handler Stopped") - time.Sleep(time.Second * 10) - log.Fatal("GOROUTINE: VSTP Message Handler Failed") - } - }() for { - msg := <-sub.C - if msg.Err != nil { - log.Error("STOMP Message Error: " + msg.Err.Error()) - } else { - log.Info("STOMP Message Received") - handle(msg) - } + func() { + defer func() { + if r := recover(); r != nil { + log.Warn("VSTP Message Handler Stopped, waiting for recovery") + time.Sleep(time.Second * 10) + } + }() + log.Info("VSTP Message handler started") + for { + msg := <-sub.C + if msg.Err != nil { + log.Error("STOMP Message Error", zap.Error(msg.Err)) + } else { + if msg != nil { + log.Info("STOMP Message Received") + fmt.Println(string(msg.Body)) + handle(msg) + } else { + log.Info("STOMP Message Empty") + } + } + } + }() } }()