Compare commits

...

10 Commits

26 changed files with 359 additions and 206 deletions

View File

@ -2,6 +2,7 @@ package background
import (
"math"
"os"
"runtime"
"time"
@ -18,23 +19,25 @@ const frequency = 2 * time.Hour // Figure out a sensible frequency!
func InitTicker(cfg *helpers.Configuration, stop <-chan struct{}) {
go runTicker(cfg, stop)
// Run goroutine logging ticker if runtime set to debug
if helpers.Runtime == "debug" {
// Run goroutine logging ticker if env "perflog" is set to "on"
if os.Getenv("perflog") == "on" {
go goroutineTicker(stop)
}
}
// Runs the ticker and handles tick events
func runTicker(cfg *helpers.Configuration, stop <-chan struct{}) {
log.Msg.Sugar().Infof("Starting background ticker, runs every %s", frequency)
log.Info("Starting background task ticker", zap.Duration("frequency", frequency))
ticker := time.NewTicker(frequency)
defer ticker.Stop()
for {
select {
case <-stop:
log.Debug("Stopping background task ticker")
return
case <-ticker.C:
log.Debug("Running background tasks")
go cif.CheckCif(cfg)
go corpus.CheckCorpus(cfg)
}
@ -43,7 +46,7 @@ func runTicker(cfg *helpers.Configuration, stop <-chan struct{}) {
// Starts a ticker that logs how many goroutines are running every two seconds
func goroutineTicker(stop <-chan struct{}) {
log.Msg.Warn("Starting goroutine Tracker ticker - DEBUG USE ONLY")
log.Debug("Starting goroutine resource logging ticker")
ticker := time.NewTicker(1000 * time.Millisecond)
defer ticker.Stop()
for {
@ -64,5 +67,5 @@ func debugLog() {
heapMem := float64(memStats.HeapAlloc) / (1024 * 1024)
heapMemRound := math.Round(heapMem*100) / 100
log.Msg.Debug("Performance", zap.Int("goroutine-count", goroutines), zap.Float64("heap-mem (MB)", heapMemRound))
log.Debug("Performance", zap.Int("goroutine-count", goroutines), zap.Float64("heap-mem (MB)", heapMemRound))
}

View File

@ -14,43 +14,43 @@ import (
func CheckCif(cfg *helpers.Configuration) {
// Check that it is after 0600, if not then skip update
if time.Now().In(londonTimezone).Hour() <= dataAvailable {
log.Msg.Info("Too early to update CIF data, not published until 0600")
log.Info("Too early to update CIF data, not published until 0600")
return
}
log.Msg.Info("Checking age of CIF Data")
log.Debug("Checking age of CIF Data")
// Load and read metadata from database
metadata, err := dbAccess.GetCifMetadata()
if err != nil {
log.Msg.Error("Unable to read last update time", zap.Error(err))
log.Error("Unable to read last update time", zap.Error(err))
return
}
// If no metadata is found in DB, presume no CIF data exists
if metadata == nil {
log.Msg.Info("Full CIF download required")
log.Info("Full CIF download required")
err := runCifFullDownload(cfg)
if err != nil {
log.Msg.Error("Unable to run full CIF Update", zap.Error(err))
log.Error("Unable to run full CIF Update", zap.Error(err))
}
return
}
// Check if last update was today
if isSameToday(metadata.LastUpdate) {
log.Msg.Info("CIF Data has already been updated today, skipping")
log.Info("CIF Data has already been updated today, skipping")
return
}
// Check how many days since last update, if more than 5, run full update, else run update
daysSinceLastUpdate := howManyDaysAgo(metadata.LastUpdate)
if daysSinceLastUpdate > 5 {
log.Msg.Debug("Full Update Requested due to time since last update", zap.Int("daysSinceLastUpdate", daysSinceLastUpdate))
log.Msg.Info("Full CIF download required")
log.Debug("Full Update Requested due to time since last update", zap.Int("daysSinceLastUpdate", daysSinceLastUpdate))
log.Info("Full CIF download required")
err := runCifFullDownload(cfg)
if err != nil {
log.Msg.Error("Unable to run full CIF Update", zap.Error(err))
log.Error("Full CIF update failed", zap.Error(err))
}
return
}
@ -58,9 +58,9 @@ func CheckCif(cfg *helpers.Configuration) {
daysToUpdate := generateUpdateDays(daysSinceLastUpdate)
// Run the update
log.Msg.Info("CIF Update required", zap.Any("days to update", daysToUpdate))
log.Info("CIF Update required", zap.Any("days to update", daysToUpdate))
err = runCifUpdateDownload(cfg, metadata, daysToUpdate)
if err != nil {
log.Msg.Error("Unable to run CIF update", zap.Error(err))
log.Error("Daily CIF update failed", zap.Error(err))
}
}

View File

@ -7,6 +7,7 @@ import (
"git.fjla.uk/owlboard/go-types/pkg/upstreamApi"
"git.fjla.uk/owlboard/timetable-mgr/helpers"
"git.fjla.uk/owlboard/timetable-mgr/log"
"go.uber.org/zap"
)
func ConvertServiceType(input *upstreamApi.JsonScheduleV1, vstp bool) (*database.Service, error) {
@ -46,7 +47,7 @@ func parseSpeed(CIFSpeed *string) int32 {
speed, err := strconv.ParseInt(actualSpeed, 10, 32)
if err != nil {
log.Msg.Warn("Unable to parse speed: " + *CIFSpeed + ", returning 0")
log.Warn("Unable to parse speed", zap.String("input-value", *CIFSpeed))
return int32(0)
}
return int32(speed)

View File

@ -10,13 +10,8 @@ import (
// Fetches the day string for the provided date.
func getDayString(t time.Time) string {
london, err := time.LoadLocation("Europe/London")
if err != nil {
log.Msg.Error("Unable to load time zone info", zap.Error(err))
}
timeNow := t.In(london)
day := timeNow.Weekday()
time := t.In(londonTimezone)
day := time.Weekday()
dayStrings := [...]string{"sun", "mon", "tue", "wed", "thu", "fri", "sat"}
@ -44,7 +39,6 @@ func isSameToday(t time.Time) bool {
// Returns how many days ago `t` was compared to today
func howManyDaysAgo(t time.Time) int {
log.Msg.Debug("Calculating how many days ago", zap.Time("Input time", t))
// Truncate both times to midnight in UTC timezone
today := time.Now().UTC().Truncate(24 * time.Hour)
input := t.UTC().Truncate(24 * time.Hour)
@ -76,7 +70,7 @@ func ParseCifDate(input *string, startOrEnd string) time.Time {
layout := "2006-01-02" // Layout of input
t, err := time.ParseInLocation(layout, *input, londonTimezone)
if err != nil {
log.Msg.Error("Error parsing date string", zap.String("date string", *input), zap.Error(err))
log.Error("Error parsing date string", zap.String("date string", *input), zap.Error(err))
return time.Time{}
}
@ -85,11 +79,11 @@ func ParseCifDate(input *string, startOrEnd string) time.Time {
} else if startOrEnd == "end" {
t = time.Date(t.Year(), t.Month(), t.Day(), 23, 59, 59, 0, londonTimezone)
} else {
log.Msg.Error("Error parsing date string", zap.String("date string", *input), zap.Error(err))
log.Error("Error parsing date string", zap.String("date string", *input), zap.Error(err))
return time.Time{}
}
return t
return t.UTC()
}
// Parses CIF days_run field and converts to array of day strings

View File

@ -103,6 +103,8 @@ func TestParseCifDate(t *testing.T) {
for _, tc := range testCases {
result := ParseCifDate(&tc.dateString, tc.startOrEnd)
result = result.In(londonTimezone)
//fmt.Println(tc.dateString, "|UTC: ", result.In(time.UTC), "|EU/Lon: ", result)
if result != tc.expect {
t.Errorf("For datestring %s, startOrEnd %s, expected %s, but got %s", tc.dateString, tc.startOrEnd, tc.expect.Format(layout), result.Format(layout))
}

View File

@ -33,7 +33,19 @@ func checkMetadata(oldMeta *dbAccess.CifMetadata, newMeta *upstreamApi.JsonTimet
// Evaluates whether the given time is after yesterday at 0600
func isAfterYesterdayAt0600(t time.Time) bool {
yesterday0600 := time.Now().In(time.UTC).AddDate(0, 0, -1)
yesterday0600 := time.Now().In(londonTimezone).AddDate(0, 0, -1)
yesterday0600 = time.Date(yesterday0600.Year(), yesterday0600.Month(), yesterday0600.Day(), 6, 0, 0, 0, time.UTC)
return t.After(yesterday0600)
}
// Accepts the JsonTimetableV1 struct which contains CIF File metadata and produces a DB Ready struct.
func generateMetadata(header *upstreamApi.JsonTimetableV1) *dbAccess.CifMetadata {
newMetadata := dbAccess.CifMetadata{
Doctype: dbAccess.Doctype,
LastTimestamp: header.Timestamp,
LastUpdate: time.Now().In(londonTimezone),
LastSequence: header.Metadata.Sequence,
}
return &newMetadata
}

View File

@ -56,3 +56,27 @@ func TestGenerateMetadata(t *testing.T) {
t.Errorf("LastUpdate: expected %s, got %s", expected.LastUpdate, result.LastUpdate)
}
}
func TestIsAfterYesterdayAt0600(t *testing.T) {
yesterday0600 := time.Now().In(londonTimezone).AddDate(0, 0, -1).Truncate(24 * time.Hour).Add(6 * time.Hour)
testCases := []struct {
input time.Time
expect bool
}{
{yesterday0600.Add(-1 * time.Hour), false},
{yesterday0600.Add(-12 * time.Hour), false},
{yesterday0600.Add(-24 * time.Hour), false},
{yesterday0600.Add(1 * time.Microsecond), true},
{yesterday0600.Add(1 * time.Hour), true},
{yesterday0600.Add(12 * time.Hour), true},
{yesterday0600.Add(24 * time.Hour), true},
}
for _, tc := range testCases {
result := isAfterYesterdayAt0600(tc.input)
if result != tc.expect {
t.Errorf("For input %v, expected %t, but got %t", tc.input, tc.expect, result)
}
}
}

View File

@ -13,7 +13,7 @@ import (
// Accepts the CIF data as a stream and outputs parsed data
func parseCifDataStream(dataStream io.ReadCloser) (*parsedData, error) {
defer dataStream.Close()
log.Msg.Debug("Starting CIF Datastream parsing")
log.Debug("Starting CIF Datastream parsing")
if dataStream == nil {
return nil, errors.New("unable to parse nil pointer")
}
@ -29,7 +29,7 @@ func parseCifDataStream(dataStream io.ReadCloser) (*parsedData, error) {
for decoder.More() {
var obj map[string]json.RawMessage
if err := decoder.Decode(&obj); err != nil {
log.Msg.Error("Error decoding JSON String")
log.Error("Error decoding JSON String")
return nil, err
}
@ -39,7 +39,7 @@ func parseCifDataStream(dataStream io.ReadCloser) (*parsedData, error) {
case "JsonTimetableV1":
var timetable upstreamApi.JsonTimetableV1
if err := json.Unmarshal(value, &timetable); err != nil {
log.Msg.Error("Error decoding JSONTimetableV1 object", zap.Error(err))
log.Error("Error decoding JSONTimetableV1 object", zap.Error(err))
continue
}
parsed.header = timetable
@ -53,17 +53,17 @@ func parseCifDataStream(dataStream io.ReadCloser) (*parsedData, error) {
case "JsonScheduleV1":
var schedule upstreamApi.JsonScheduleV1
if err := json.Unmarshal(value, &schedule); err != nil {
log.Msg.Error("Error decoding JSONScheduleV1 object", zap.Error(err))
log.Error("Error decoding JSONScheduleV1 object", zap.Error(err))
continue
}
parsed.sched = append(parsed.sched, schedule)
case "EOF":
log.Msg.Info("Reached EOF")
log.Debug("Reached EOF")
default:
log.Msg.Warn("Unknown CIF Data type", zap.String("key", key))
log.Warn("Unknown CIF Data type", zap.String("key", key))
}
}
}
log.Msg.Debug("CIF Parsing completed")
log.Debug("CIF Parsing completed")
return &parsed, nil
}

View File

@ -1,8 +1,6 @@
package cif
import (
"time"
"git.fjla.uk/owlboard/go-types/pkg/database"
"git.fjla.uk/owlboard/go-types/pkg/upstreamApi"
"git.fjla.uk/owlboard/timetable-mgr/dbAccess"
@ -12,8 +10,8 @@ import (
// Processes parsed CIF data and applies the data to the database
func processParsedCif(data *parsedData) error {
log.Msg.Debug("Starting CIF Processing")
log.Msg.Info("Processing CIF Data", zap.Int("schedule-count", len(data.sched)))
log.Debug("Starting CIF Processing")
log.Info("Processing CIF Data", zap.Int("schedule-count", len(data.sched)))
// Batch size for processing
batchSize := 250 // Needs optimising for better RAM use. 1000 ~ 5.7GB, 500 ~ 5.5GB, 750 ~ 5.2GB
@ -34,7 +32,7 @@ func processParsedCif(data *parsedData) error {
if len(deleteBatch) > 0 {
err := doDeletions(deleteBatch)
if err != nil {
log.Msg.Error("Error deleting CIF Entries", zap.Error(err))
log.Error("Error deleting CIF Entries", zap.Error(err))
return err
}
}
@ -56,13 +54,13 @@ func processParsedCif(data *parsedData) error {
if len(createBatch) > 0 {
err := doCreations(createBatch)
if err != nil {
log.Msg.Error("Error creating CIF Entries", zap.Error(err))
log.Error("Error creating CIF Entries", zap.Error(err))
return err
}
}
}
log.Msg.Debug("CIF Processing complete")
log.Debug("CIF Processing complete")
return nil
}
@ -70,7 +68,7 @@ func processParsedCif(data *parsedData) error {
func doDeletions(deletions []*upstreamApi.JsonScheduleV1) error {
defer func() {
if r := recover(); r != nil {
log.Msg.Panic("Panic:", zap.Any("panic", r))
log.Panic("Panic:", zap.Any("panic", r))
}
}()
deleteQueries := make([]database.DeleteQuery, 0)
@ -86,7 +84,7 @@ func doDeletions(deletions []*upstreamApi.JsonScheduleV1) error {
err := dbAccess.DeleteCifEntries(deleteQueries)
if err != nil {
log.Msg.Error("Error deleting documents", zap.Error(err))
log.Error("Error deleting documents", zap.Error(err))
return err
}
@ -99,7 +97,8 @@ func doCreations(creations []*upstreamApi.JsonScheduleV1) error {
for _, item := range creations {
document, err := ConvertServiceType(item, false)
if err != nil {
log.Msg.Error("Error converting JsonSchedule to Service type", zap.Error(err))
log.Error("Error converting JsonSchedule to Service type", zap.Error(err))
continue
}
createDocuments = append(createDocuments, *document)
@ -107,21 +106,9 @@ func doCreations(creations []*upstreamApi.JsonScheduleV1) error {
err := dbAccess.CreateCifEntries(createDocuments)
if err != nil {
log.Msg.Error("Error creating documents", zap.Error(err))
log.Error("Error creating documents", zap.Error(err))
return err
}
return nil
}
// Accepts the JsonTimetableV1 struct which contains CIF File metadata and produces a DB Ready struct.
func generateMetadata(header *upstreamApi.JsonTimetableV1) *dbAccess.CifMetadata {
newMetadata := dbAccess.CifMetadata{
Doctype: dbAccess.Doctype,
LastTimestamp: header.Timestamp,
LastUpdate: time.Now().In(londonTimezone),
LastSequence: header.Metadata.Sequence,
}
return &newMetadata
}

View File

@ -13,22 +13,22 @@ import (
// Replaces all existing CIF Data with a new download
func runCifFullDownload(cfg *helpers.Configuration) error {
log.Msg.Info("Downloading all CIF Data")
log.Info("Downloading all CIF Data")
// Download CIF Data file
url, err := getUpdateUrl("full")
if err != nil {
log.Msg.Error("Error getting download URL", zap.Error(err))
log.Error("Error getting download URL", zap.Error(err))
}
dataStream, err := nrod.NrodStream(url, cfg)
if err != nil {
log.Msg.Error("Error downloading CIF data", zap.Error(err))
log.Error("Error downloading CIF data", zap.Error(err))
}
// Parse CIF file
parsed, err := parseCifDataStream(dataStream)
if err != nil {
log.Msg.Error("Error parsing CIF data", zap.Error(err))
log.Error("Error parsing CIF data", zap.Error(err))
return err
}
@ -38,56 +38,68 @@ func runCifFullDownload(cfg *helpers.Configuration) error {
// Process CIF file
err = processParsedCif(parsed)
if err != nil {
log.Msg.Error("Error processing CIF data", zap.Error(err))
log.Error("Error processing CIF data", zap.Error(err))
}
newMeta := generateMetadata(&parsed.header)
ok := dbAccess.PutCifMetadata(newMeta, fullUpdateType)
if !ok {
log.Msg.Warn("CIF Data updated, but metadata write failed")
log.Warn("CIF Data updated, but metadata write failed")
}
// Set parsed to nil to encourage garbage collection
parsed = nil
// Clear out of date schedules
cutoff := time.Now().Add(-time.Hour * 24 * 7)
log.Debug("Attempting to remove outdated services", zap.Time("scheduleEnd before", cutoff))
count, err := dbAccess.RemoveOutdatedServices(cutoff)
if err != nil {
log.Warn("Out of date services not removed", zap.Error(err))
} else {
log.Info("Out of date services removed", zap.Int64("removal count", count))
}
return nil
}
// Runs a CIF Update for up to five days
func runCifUpdateDownload(cfg *helpers.Configuration, metadata *dbAccess.CifMetadata, days []time.Time) error {
log.Msg.Info("Downloading CIF Updates")
log.Info("Downloading CIF Updates")
// Loop over dates
for _, time := range days {
log.Msg.Info("Downloading CIF File", zap.Time("CIF Data from", time))
log.Info("Downloading CIF File", zap.Time("CIF Data from", time))
// Download CIF data file
data, err := fetchUpdate(time, cfg)
if err != nil {
log.Msg.Error("Error fetching CIF update", zap.Error(err))
log.Error("Error fetching CIF update", zap.Error(err))
return err
}
// Parse CIF file
parsed, err := parseCifDataStream(data)
if err != nil {
log.Msg.Error("Error parsing CIF data", zap.Error(err))
log.Error("Error parsing CIF data", zap.Error(err))
return err
}
// Check CIF Metadata
log.Msg.Debug("Starting metadata checks")
log.Debug("Starting metadata checks")
reason, update := checkMetadata(metadata, &parsed.header)
if !update {
log.Msg.Warn("Update file not processed", zap.String("reason", reason))
log.Warn("Update file not processed", zap.String("reason", reason))
continue
}
log.Msg.Info("CIF Data is suitable for processing", zap.String("reason", reason))
log.Info("CIF Data is suitable for processing", zap.String("reason", reason))
// Process CIF file
err = processParsedCif(parsed)
if err != nil {
log.Msg.Error("Error processing CIF data", zap.Error(err))
log.Error("Error processing CIF data", zap.Error(err))
}
metadata = generateMetadata(&parsed.header)
@ -96,7 +108,7 @@ func runCifUpdateDownload(cfg *helpers.Configuration, metadata *dbAccess.CifMeta
ok := dbAccess.PutCifMetadata(metadata, dailyUpdateType)
if !ok {
log.Msg.Warn("CIF Data updated, but metadata write failed.")
log.Warn("CIF Data updated, but metadata write failed.")
}
return nil

View File

@ -11,10 +11,10 @@ import (
// Checks if the CORPUS Data needs updating, and calls an updater function if needed.
func CheckCorpus(cfg *helpers.Configuration) {
log.Msg.Debug("Checking age of CORPUS Data")
log.Debug("Checking age of CORPUS Data")
utime, err := dbAccess.CheckUpdateTime(dbAccess.CorpusCollection)
if err != nil {
log.Msg.Error("Error checking last CORPUS update", zap.Error(err))
log.Error("Error checking last CORPUS update", zap.Error(err))
}
lastUpdate := time.Unix(utime, 0)
@ -22,17 +22,17 @@ func CheckCorpus(cfg *helpers.Configuration) {
dataAge := currentTime.Sub(lastUpdate)
fortnight := 14 * 24 * time.Hour
log.Msg.Debug("CORPUS Data", zap.Duration("Data Age", dataAge), zap.Duration("Max Age", 14*24*time.Hour))
log.Debug("CORPUS Data", zap.Duration("Data Age", dataAge), zap.Duration("Max Age", 14*24*time.Hour))
if dataAge >= fortnight {
log.Msg.Info("CORPUS Data is more than two weeks old")
log.Info("CORPUS update required")
err := RunCorpusUpdate(cfg)
if err != nil {
log.Msg.Warn("CORPUS Update did not run")
log.Warn("CORPUS Update did not run")
} else {
log.Msg.Info("CORPUS data has been updated")
log.Info("CORPUS data has been updated")
}
} else {
log.Msg.Info("CORPUS Data is less than two weeks old, not updating")
log.Info("CORPUS Data not stale, skipping updating")
}
}

View File

@ -14,14 +14,14 @@ import (
func parseCorpusData(stream io.ReadCloser) (*[]database.CorpusEntry, error) {
defer stream.Close()
log.Msg.Debug("Starting CORPUS Data parsing")
log.Debug("Starting CORPUS Data parsing")
var corpusEntries []database.CorpusEntry
decoder := json.NewDecoder(stream)
// Expect an object at the root of the JSON stream
if _, err := decoder.Token(); err != nil {
log.Msg.Error("Error parsing CORPUS Data", zap.Error(err))
log.Error("Error parsing CORPUS Data", zap.Error(err))
return nil, err
}
@ -29,19 +29,19 @@ func parseCorpusData(stream io.ReadCloser) (*[]database.CorpusEntry, error) {
for decoder.More() {
// Decode the next JSON token
if tok, err := decoder.Token(); err != nil {
log.Msg.Error("Error parsing CORPUS Data", zap.Error(err))
log.Error("Error parsing CORPUS Data", zap.Error(err))
return nil, err
} else if tok == "TIPLOCDATA" {
// Found the "TIPLOCDATA" key, expect the associated array
if !decoder.More() {
err := errors.New("missing array after TIPLOCDATA key")
log.Msg.Error("Error parsing CORPUS Data", zap.Error(err))
log.Error("Error parsing CORPUS Data", zap.Error(err))
return nil, err
}
// Start reading the array associated with the "TIPLOCDATA" key
if _, err := decoder.Token(); err != nil {
log.Msg.Error("Error parsing CORPUS Data", zap.Error(err))
log.Error("Error parsing CORPUS Data", zap.Error(err))
return nil, err
}
@ -49,7 +49,7 @@ func parseCorpusData(stream io.ReadCloser) (*[]database.CorpusEntry, error) {
for decoder.More() {
var corpusEntry database.CorpusEntry
if err := decoder.Decode(&corpusEntry); err != nil {
log.Msg.Error("Error parsing CORPUS Data", zap.Error(err))
log.Error("Error parsing CORPUS Data", zap.Error(err))
return nil, err
}
corpusEntries = append(corpusEntries, corpusEntry)
@ -58,7 +58,7 @@ func parseCorpusData(stream io.ReadCloser) (*[]database.CorpusEntry, error) {
}
}
log.Msg.Debug("CORPUS parsing complete")
log.Debug("CORPUS parsing complete")
return &corpusEntries, nil
}

View File

@ -12,13 +12,13 @@ import (
func RunCorpusUpdate(cfg *helpers.Configuration) error {
resp, err := nrod.NrodStream(url, cfg)
if err != nil {
log.Msg.Error("Failed to fetch CORPUS data", zap.Error(err))
log.Error("Failed to fetch CORPUS data", zap.Error(err))
return err
}
unsortedCorpusData, err := parseCorpusData(resp)
if err != nil {
log.Msg.Error("Error parsing Corpus data", zap.Error(err))
log.Error("Error parsing Corpus data", zap.Error(err))
return err
}
@ -26,24 +26,24 @@ func RunCorpusUpdate(cfg *helpers.Configuration) error {
stationData := createStationEntries(corpusData)
if err := dbAccess.DropCollection(dbAccess.CorpusCollection); err != nil {
log.Msg.Warn("CORPUS data may be incomplete")
log.Msg.Error("Error dropping CORPUS Data", zap.Error(err))
log.Warn("CORPUS data may be incomplete")
log.Error("Error dropping CORPUS Data", zap.Error(err))
return err
}
if err := dbAccess.DropCollection(dbAccess.StationsCollection); err != nil {
log.Msg.Warn("Stations data may be incomplete")
log.Msg.Error("Error dropping stations Data", zap.Error(err))
log.Warn("Stations data may be incomplete")
log.Error("Error dropping stations Data", zap.Error(err))
return err
}
if err := dbAccess.PutManyCorpus(corpusData); err != nil {
log.Msg.Warn("CORPUS data may be incomplete")
log.Msg.Error("Error inserting CORPUS Data", zap.Error(err))
log.Warn("CORPUS data may be incomplete")
log.Error("Error inserting CORPUS Data", zap.Error(err))
return err
}
if err := dbAccess.PutManyStations(stationData); err != nil {
log.Msg.Warn("Stations data may be incomplete")
log.Msg.Error("Error inserting stations data", zap.Error(err))
log.Warn("Stations data may be incomplete")
log.Error("Error inserting stations data", zap.Error(err))
return err
}

View File

@ -28,9 +28,9 @@ func PushVersionToDb() {
coll := MongoClient.Database("owlboard").Collection("versions")
_, err := coll.UpdateOne(context.TODO(), versionSelector, bson.M{"$set": version}, opts)
if err != nil {
log.Msg.Warn("Unable to push version to database: " + err.Error())
log.Warn("Unable to push version to database: " + err.Error())
} else {
log.Msg.Debug("Version up to date in Database")
log.Debug("Version up to date in Database")
}
}
@ -39,7 +39,7 @@ func PutOneService(data database.Service) bool {
coll := MongoClient.Database(databaseName).Collection(timetableCollection)
_, err := coll.InsertOne(context.TODO(), data)
if err != nil {
log.Msg.Error("Unable to insert to database: " + err.Error())
log.Error("Unable to insert to database: " + err.Error())
return false
}
return true
@ -55,7 +55,7 @@ func DeleteOneService(data database.DeleteQuery) bool {
}
_, err := coll.DeleteOne(context.TODO(), filter)
if err != nil {
log.Msg.Error("Unable to delete service: " + err.Error())
log.Error("Unable to delete service: " + err.Error())
return false
}
return true

View File

@ -36,11 +36,10 @@ func GetCifMetadata() (*CifMetadata, error) {
if errors.Is(err, mongo.ErrNoDocuments) {
return nil, nil
}
log.Msg.Error("Error fetching CIF Metadata")
return nil, err
}
log.Msg.Debug("Fetched CIF Metadata from database", zap.Any("Metadata", result))
log.Debug("Fetched CIF Metadata from database", zap.Any("Metadata", result))
return &result, nil
}
@ -64,11 +63,11 @@ func PutCifMetadata(metadata *CifMetadata, lastUpdateType string) bool {
_, err := collection.UpdateOne(context.Background(), filter, update, options)
if err != nil {
log.Msg.Error("Error updating CIF Metadata", zap.Error(err))
log.Error("Error updating CIF Metadata", zap.Error(err))
return false
}
log.Msg.Info("New CIF Metadata written", zap.Time("Update time", metadata.LastUpdate))
log.Info("New CIF Metadata written", zap.Time("Update time", metadata.LastUpdate))
return true
}
@ -76,15 +75,15 @@ func PutCifMetadata(metadata *CifMetadata, lastUpdateType string) bool {
func DeleteCifEntries(deletions []database.DeleteQuery) error {
defer func() {
if r := recover(); r != nil {
log.Msg.Panic("Panic:", zap.Any("panic", r))
log.Panic("Panic:", zap.Any("panic", r))
}
}()
// Skip if deletions is empty
if len(deletions) == 0 {
log.Msg.Info("No deletions required")
log.Info("No deletions required")
return nil
}
log.Msg.Info("Running deletions against database", zap.Int("count", len(deletions)))
log.Debug("Running deletions against database", zap.Int("count", len(deletions)))
// Prepare deletion tasks
collection := MongoClient.Database(databaseName).Collection(timetableCollection)
@ -103,7 +102,6 @@ func DeleteCifEntries(deletions []database.DeleteQuery) error {
_, err := collection.BulkWrite(context.Background(), bulkDeletions, bulkWriteOptions)
if err != nil {
log.Msg.Error("Error deleting documents", zap.Error(err))
return err
}
@ -114,10 +112,10 @@ func DeleteCifEntries(deletions []database.DeleteQuery) error {
func CreateCifEntries(schedules []database.Service) error {
// Skip if deletions is empty
if len(schedules) == 0 {
log.Msg.Info("No creations required")
log.Info("No creations required")
return nil
}
log.Msg.Info("Running creations against database", zap.Int("count", len(schedules)))
log.Debug("Running creations against database", zap.Int("count", len(schedules)))
collection := MongoClient.Database(databaseName).Collection(timetableCollection)
@ -132,9 +130,24 @@ func CreateCifEntries(schedules []database.Service) error {
_, err := collection.BulkWrite(context.Background(), models, bulkWriteOptions)
if err != nil {
log.Msg.Error("Error inserting documents", zap.Error(err))
return err
}
return nil
}
// Removes any schedules which ended before 'cutoff'
func RemoveOutdatedServices(cutoff time.Time) (count int64, err error) {
// Define filter
filter := bson.M{"scheduleEndDate": bson.M{"$lt": cutoff}}
collection := MongoClient.Database(databaseName).Collection(timetableCollection)
res, err := collection.DeleteMany(context.Background(), filter)
if err != nil {
return // Automatically returns named values
}
count = res.DeletedCount
return // Automatically returns names values
}

View File

@ -3,9 +3,9 @@ package dbAccess
import (
"git.fjla.uk/owlboard/timetable-mgr/helpers"
"git.fjla.uk/owlboard/timetable-mgr/log"
"go.uber.org/zap"
"context"
"fmt"
"time"
"go.mongodb.org/mongo-driver/mongo"
@ -26,19 +26,43 @@ var bsonOpts = &options.BSONOptions{
UseJSONStructTags: true,
}
// Initialise the DB Connection
func InitDataAccess(cfg *helpers.Configuration) {
uri := getDbUri(cfg)
log.Debug("Starting database connection")
url := getDbUri(cfg)
const maxRetries = 8
for attempt := 1; attempt <= maxRetries; attempt++ {
log.Info("Attempting to connect to database", zap.Int("attempt", attempt), zap.Int("max-tries", maxRetries))
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
client, err := mongo.Connect(ctx, options.Client().ApplyURI(uri).SetBSONOptions(bsonOpts))
client, err := mongo.Connect(ctx, options.Client().ApplyURI(url).SetBSONOptions(bsonOpts))
if err != nil {
fmt.Println(err)
log.Msg.Fatal("Error connecting to database: " + err.Error())
} else {
log.Msg.Info("Database connection successful")
log.Warn("Error connecting to database", zap.Int("attempt", attempt), zap.Int("max-tries", maxRetries))
cancel()
if attempt != maxRetries {
helpers.BackoffDelay(attempt)
}
continue
}
err = client.Ping(ctx, nil)
if err != nil {
log.Warn("Error pinging database", zap.Int("attempt", attempt), zap.Int("max-tries", maxRetries))
cancel()
if attempt != maxRetries {
helpers.BackoffDelay(attempt)
}
continue
}
MongoClient = client
log.Info("Database connection successful")
return
}
log.Fatal("Failed to connect to database on multiple attempts", zap.Int("attempts", maxRetries))
}
// Closes the connection to the database - used for cleanup functions
@ -47,9 +71,9 @@ func CloseMongoClient() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := MongoClient.Disconnect(ctx); err != nil {
log.Msg.Warn("Error disconnecting MongoDB client: " + err.Error())
log.Warn("Error disconnecting MongoDB client: " + err.Error())
} else {
log.Msg.Info("MongoDB client disconnected.")
log.Info("MongoDB client disconnected.")
}
}
}

View File

@ -12,13 +12,13 @@ import (
// CAUTION: Drops the collection named in collectionName
func DropCollection(collectionName string) error {
log.Msg.Info("Dropping collection", zap.String("Collection Name", collectionName))
log.Info("Dropping collection", zap.String("Collection Name", collectionName))
database := MongoClient.Database(databaseName)
collection := database.Collection(collectionName)
err := collection.Drop(context.Background())
if err != nil {
log.Msg.Error("Error dropping collection", zap.String("Collection Name", collectionName), zap.Error(err))
log.Error("Error dropping collection", zap.String("Collection Name", collectionName), zap.Error(err))
return err
}
@ -45,7 +45,7 @@ func CheckUpdateTime(collectionName string) (int64, error) {
// Sets a new update time (unix timestamp) of the collection named in collectionName. The update time is calculated within the function.
func SetUpdateTime(collectionName string) error {
log.Msg.Info("Setting update time", zap.String("collection", collectionName))
log.Info("Setting update time", zap.String("collection", collectionName))
database := MongoClient.Database(databaseName)
collection := database.Collection("meta")
options := options.Update().SetUpsert(true)
@ -65,7 +65,7 @@ func SetUpdateTime(collectionName string) error {
_, err := collection.UpdateOne(context.Background(), filter, update, options)
if err != nil {
log.Msg.Error("Error setting update time", zap.String("collection", collectionName), zap.Error(err))
log.Error("Error setting update time", zap.String("collection", collectionName), zap.Error(err))
return err
}
return nil

18
helpers/backoff.go Normal file
View File

@ -0,0 +1,18 @@
package helpers
import (
"math"
"time"
"git.fjla.uk/owlboard/timetable-mgr/log"
"go.uber.org/zap"
)
// Implements an exponential backoff strategy and sleeps for a duration calculated as 1 second to the power of (attempt - 1).
// The backoff time doubles with each attempt, starting from 1 second for the first attempt.
func BackoffDelay(attempt int) {
base := time.Second
backoff := base * time.Duration(math.Pow(2, float64(attempt-1)))
log.Info("Retry backoff", zap.Duration("delay", backoff))
time.Sleep(backoff)
}

View File

@ -1,36 +1,77 @@
package log
import (
"os"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"git.fjla.uk/owlboard/timetable-mgr/helpers"
)
var Msg *zap.Logger
// Use outside `package log`
// should be avoided.
var post *zap.Logger
// Initialises the logger
func init() {
func InitLogger() {
var err error
// Create a custom configuration with a human-readable "Console" encoder
config := zap.NewDevelopmentConfig()
config.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder // Adds color to log levels
// Determine the log level based on the runtime mode
logLevel := zapcore.DebugLevel
if helpers.Runtime != "debug" {
logLevel = zapcore.InfoLevel
mode := os.Getenv("runtime")
if mode == "" {
mode = "prod"
}
// Set the log level
config.Level = zap.NewAtomicLevelAt(logLevel)
var level zapcore.Level
if mode == "debug" {
level = zap.DebugLevel
} else {
level = zap.InfoLevel
}
Msg, err = config.Build() // Potential source of the error
config := zap.NewDevelopmentConfig()
config.DisableStacktrace = true
config.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder
config.Level = zap.NewAtomicLevelAt(level)
post, err = config.Build(zap.AddCallerSkip(1))
if err != nil {
panic("Failed to initialize logger: " + err.Error())
}
// Log the selected log level (optional, can be helpful for debugging)
Msg.Info("Log level set to: " + logLevel.String())
defer post.Sync()
Info("Logger initialised", zap.String("level", level.String()), zap.String("runtime", mode))
}
// Logs message at info level
func Info(msg string, fields ...zap.Field) {
post.Info(msg, fields...)
}
// Logs message at debug level
func Debug(msg string, fields ...zap.Field) {
post.Debug(msg, fields...)
}
// Logs message at warn level
func Warn(msg string, fields ...zap.Field) {
post.Warn(msg, fields...)
}
// Logs message at error level
func Error(msg string, fields ...zap.Field) {
post.Error(msg, fields...)
}
// Logs message at fatal level then call os.exit(1)
func Fatal(msg string, fields ...zap.Field) {
post.Fatal(msg, fields...)
}
// Logs message at panic level the panics
func Panic(msg string, fields ...zap.Field) {
post.Panic(msg, fields...)
}
// Flushes log messages
func Cleanup() {
Info("Flushing log messages")
post.Sync()
}

50
main.go
View File

@ -16,19 +16,25 @@ import (
"git.fjla.uk/owlboard/timetable-mgr/log"
"git.fjla.uk/owlboard/timetable-mgr/messaging"
"git.fjla.uk/owlboard/timetable-mgr/vstp"
"go.uber.org/zap"
)
func main() {
func init() {
printStartupBanner()
fmt.Printf("Version %s \n\n", helpers.Version)
}
func main() {
log.InitLogger()
defer log.Cleanup()
log.Info("Initialising OwlBoard timetable-mgr", zap.String("version", helpers.Version))
cfg, err := helpers.LoadConfig()
if err != nil {
fmt.Println("Error loading configuration", err)
log.Fatal("Unable to load configuration", zap.Error(err))
return
}
cfg.PrintConfig()
log.Msg.Info("Initialised OwlBoard timetable-mgr " + helpers.Version)
dbAccess.InitDataAccess(cfg)
dbAccess.PushVersionToDb()
@ -60,32 +66,50 @@ func handleSignals(cfg *helpers.Configuration, stop chan<- struct{}) {
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
sig := <-sigChan
log.Msg.Warn("Signal received: " + sig.String())
log.Warn("Signal received: " + sig.String())
cleanup(cfg, stop)
}
// Cleans up open connections ready for a clean exit of the program
func cleanup(cfg *helpers.Configuration, stop chan<- struct{}) {
log.Msg.Debug("Cleaning up open connections")
log.Debug("Cleaning up open connections")
if cfg.VstpOn {
if messaging.Client != nil {
log.Msg.Info("Closing STOMP Client")
log.Info("Closing STOMP Client")
messaging.Disconnect(messaging.Client)
}
}
if dbAccess.MongoClient != nil {
log.Msg.Info("Closing MongoDB Client")
log.Info("Closing MongoDB Client")
dbAccess.CloseMongoClient()
}
log.Msg.Info("Signalling to other goroutines")
log.Info("Signalling to other goroutines")
close(stop)
log.Msg.Info("Program ready to exit")
if log.Msg != nil {
log.Msg.Sync()
}
log.Info("Program ready to exit")
time.Sleep(500 * time.Millisecond)
log.Cleanup()
os.Exit(0)
}
func printStartupBanner() {
art := `
___ _ ____ _
/ _ \__ _| | __ ) ___ __ _ _ __ __| |
| | | \ \ /\ / / | _ \ / _ \ / _' | '__/ _' |
| |_| |\ V V /| | |_) | (_) | (_| | | | (_| |
\___/ \_/\_/ |_|____/ \___/ \__,_|_| \__,_|
_ _ _ _ _
| |_(_)_ __ ___ ___| |_ __ _| |__ | | ___ _ __ ___ __ _ _ __
| __| | '_ ' _ \ / _ \ __/ _' | '_ \| |/ _ \_____| '_ ' _ \ / _' | '__|
| |_| | | | | | | __/ || (_| | |_) | | __/_____| | | | | | (_| | |
\__|_|_| |_| |_|\___|\__\__,_|_.__/|_|\___| |_| |_| |_|\__, |_|
|___/
`
fmt.Println(art)
}

View File

@ -21,11 +21,11 @@ func dial(user, pass string) *stomp.Conn {
stomp.ConnOpt.Header("client-id", user+"-mq-client"),
)
if err != nil {
log.Msg.Fatal("Unable to connect to STOMP Client: " + err.Error())
log.Fatal("Unable to connect to STOMP Client: " + err.Error())
conn.MustDisconnect()
}
log.Msg.Info("Initialised STOMP Client")
log.Info("Initialised STOMP Client")
return conn
}
@ -34,14 +34,14 @@ func dial(user, pass string) *stomp.Conn {
func Disconnect(conn *stomp.Conn) {
if conn != nil {
err := conn.Disconnect()
log.Msg.Warn("Disconnected STOMP Client")
log.Warn("Disconnected STOMP Client")
if err != nil {
conn.MustDisconnect()
log.Msg.Error("STOMP Disconnection failed, forced disconnect")
log.Error("STOMP Disconnection failed, forced disconnect")
}
return
}
log.Msg.Error("STOMP Disconnect failed, next connection attempt may fail")
log.Error("STOMP Disconnect failed, next connection attempt may fail")
}
// Register against the MQ Server and log each message for testing purposes

View File

@ -14,7 +14,7 @@ import (
// Downloads NROD Data and extracts if GZIP, returns a io.Reader
func NrodStream(url string, cfg *helpers.Configuration) (io.ReadCloser, error) {
log.Msg.Debug("Fetching NROD data stream", zap.String("Request URL", url))
log.Debug("Fetching NROD data stream", zap.String("Request URL", url))
client := http.Client{
Timeout: time.Second * 300,
@ -22,7 +22,7 @@ func NrodStream(url string, cfg *helpers.Configuration) (io.ReadCloser, error) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
log.Msg.Error("Error creating HTTP Request", zap.Error(err))
log.Error("Error creating HTTP Request", zap.Error(err))
return nil, err
}
@ -30,13 +30,13 @@ func NrodStream(url string, cfg *helpers.Configuration) (io.ReadCloser, error) {
resp, err := client.Do(req)
if err != nil {
log.Msg.Error("Error carrying out HTTP Request", zap.Error(err), zap.Int("STATUS", resp.StatusCode))
log.Error("Error carrying out HTTP Request", zap.Error(err), zap.Int("STATUS", resp.StatusCode))
return nil, err
}
if resp.StatusCode != http.StatusOK {
err := fmt.Errorf("unexpected status code: %d", resp.StatusCode)
log.Msg.Error("Non-successful status code", zap.Error(err))
log.Error("Non-successful status code", zap.Error(err))
return nil, err
}
@ -46,13 +46,13 @@ func NrodStream(url string, cfg *helpers.Configuration) (io.ReadCloser, error) {
}
func NrodStreamExtract(resp *http.Response) (io.ReadCloser, error) {
log.Msg.Debug("Extracting NROD Download")
log.Debug("Extracting NROD Download")
log.Msg.Debug("Content Type", zap.String("Content-Encoding", resp.Header.Get("Content-Encoding")))
log.Debug("Content Type", zap.String("Content-Encoding", resp.Header.Get("Content-Encoding")))
gzReader, err := gzip.NewReader(resp.Body)
if err != nil {
log.Msg.Warn("Unable to create GZIP Reader, data probably not gzipped")
log.Warn("Unable to create GZIP Reader, data probably not gzipped")
return resp.Body, err
}

View File

@ -20,26 +20,26 @@ func processEntryType(entry database.Service) {
case "Delete":
deleteEntry(entry)
default:
log.Msg.Error("Unknown transaction type: " + entry.TransactionType)
log.Warn("Unknown transaction type: " + entry.TransactionType)
}
}
func createEntry(entry database.Service) {
log.Msg.Info("Entry Creation requested for: " + entry.TrainUid + " - " + entry.Headcode + " - " + entry.Operator)
log.Info("Entry Creation requested for: " + entry.TrainUid + " - " + entry.Headcode + " - " + entry.Operator)
status := dbAccess.PutOneService(entry)
if status {
log.Msg.Info("Database entry created")
log.Info("Database entry created")
} else {
log.Msg.Error("Database entry failed, skipped service")
log.Error("Database entry failed, skipped service")
}
}
func updateEntry(entry database.Service) {
log.Msg.Info("Entry UPDATE requested for: " + entry.TrainUid + " - " + entry.Headcode + " - " + entry.Operator)
log.Warn("Entry UPDATE requested for: " + entry.TrainUid + " - " + entry.Headcode + " - " + entry.Operator)
}
func deleteEntry(entry database.Service) {
log.Msg.Info("Entry DELETE requested for: " + entry.TrainUid + " - " + entry.Headcode)
log.Info("Entry DELETE requested for: " + entry.TrainUid + " - " + entry.Headcode)
var deletionQuery = database.DeleteQuery{
TrainUid: entry.TrainUid,
ScheduleStartDate: entry.ScheduleStartDate,
@ -47,9 +47,9 @@ func deleteEntry(entry database.Service) {
}
status := dbAccess.DeleteOneService(deletionQuery)
if status {
log.Msg.Info("Database entry deleted")
log.Info("Database entry deleted")
} else {
log.Msg.Error("Database deletion failed, skipped deletion")
log.Error("Database deletion failed, skipped deletion")
fmt.Printf("%+v\n", deletionQuery)
}
}

View File

@ -1,17 +1,16 @@
package vstp
import (
"fmt"
"git.fjla.uk/owlboard/timetable-mgr/log"
"github.com/go-stomp/stomp/v3"
"go.uber.org/zap"
)
var count uint64 = 0
func handle(msg *stomp.Message) {
count++
log.Msg.Info("Messages since started: " + fmt.Sprint(count))
log.Info("Message received", zap.Uint64("total since startup", count))
schedule := unmarshalData(string(msg.Body))
processEntryType(schedule)
}

View File

@ -18,23 +18,22 @@ func unmarshalData(jsonData string) database.Service {
var schedule upstreamApi.MsgData
err := json.Unmarshal([]byte(jsonData), &schedule)
if err != nil {
log.Msg.Error("Unable to unmarshal message body: " + err.Error())
log.Error("Unable to unmarshal message body: " + err.Error())
//return err
}
log.Msg.Debug("Unmarshalling Complete")
log.Debug("Unmarshalling Complete")
if schedule.Data.CIFMsg.ScheduleSegment == nil {
log.Msg.Warn("ScheduleSegment is nil")
log.Warn("ScheduleSegment is nil")
} else if len(schedule.Data.CIFMsg.ScheduleSegment) == 0 {
log.Msg.Warn("ScheduleSegment is empty")
log.Warn("ScheduleSegment is empty")
}
return formatData(&schedule.Data.CIFMsg)
}
// Transforms the upstreamApi.Schedule type into a database.Service type
func formatData(dataInput *upstreamApi.Schedule) database.Service {
log.Msg.Debug("ScheduleSegment length: " + fmt.Sprint(len(dataInput.ScheduleSegment)))
log.Msg.Debug("Printing dataInput to console:")
log.Debug("ScheduleSegment length: " + fmt.Sprint(len(dataInput.ScheduleSegment)))
var operator, headcode, powerType string
var planSpeed int32
@ -70,20 +69,20 @@ func formatData(dataInput *upstreamApi.Schedule) database.Service {
// Uses the map provided in 'helpers' to translate incorrect CIF speeds to their correct equivalent
func parseSpeed(CIFSpeed string) int32 {
log.Msg.Debug("CIFSpeed Input: '" + CIFSpeed + "'")
log.Debug("CIFSpeed Input: '" + CIFSpeed + "'")
if CIFSpeed == "" {
log.Msg.Debug("Speed data not provided")
log.Debug("Speed data not provided")
return int32(0)
}
actualSpeed, exists := helpers.SpeedMap[CIFSpeed]
if !exists {
actualSpeed = CIFSpeed
}
log.Msg.Debug("Corrected Speed: " + actualSpeed)
log.Debug("Corrected Speed: " + actualSpeed)
speed, err := strconv.ParseInt(actualSpeed, 10, 32)
if err != nil {
log.Msg.Warn("Unable to parse speed: " + CIFSpeed + ", returning 0")
log.Warn("Unable to parse speed: " + CIFSpeed + ", returning 0")
return int32(0)
}
return int32(speed)
@ -91,10 +90,10 @@ func parseSpeed(CIFSpeed string) int32 {
// Converts the date string provided from the upstream API into a proper Date type and adds a time
func parseDate(dateString string, end bool) time.Time {
log.Msg.Debug("Date Input: " + dateString)
log.Debug("Date Input: " + dateString)
date, err := time.Parse("2006-01-02", dateString)
if err != nil {
log.Msg.Error("Unable to parse date: " + dateString)
log.Error("Unable to parse date: " + dateString)
return time.Time{}
}
@ -107,13 +106,13 @@ func parseDate(dateString string, end bool) time.Time {
}
dateWithTime := time.Date(date.Year(), date.Month(), date.Day(), hour, minute, second, nanosecond, location)
log.Msg.Debug("Parsed date: " + dateWithTime.String())
log.Debug("Parsed date: " + dateWithTime.String())
return dateWithTime
}
// Converts the binary stype 'daysRun' field into an array of short days
func parseDaysRun(daysBinary string) []string {
log.Msg.Debug("daysRun Input: " + daysBinary)
log.Debug("daysRun Input: " + daysBinary)
shortDays := []string{"m", "t", "w", "th", "f", "s", "su"}
var result []string
for i, digit := range daysBinary {

View File

@ -11,25 +11,25 @@ import (
func Subscribe() {
sub, err := messaging.Client.Subscribe("/topic/VSTP_ALL", stomp.AckAuto)
if err != nil {
log.Msg.Fatal("Unable to start subscription: " + err.Error())
log.Fatal("Unable to start subscription: " + err.Error())
}
log.Msg.Info("Subscription to VSTP topic successful, listening")
log.Info("Subscription to VSTP topic successful, listening")
go func() {
log.Msg.Debug("GOROUTINE: VSTP Message Handler Started")
log.Debug("GOROUTINE: VSTP Message Handler Started")
defer func() {
if r := recover(); r != nil {
log.Msg.Warn("GOROUTINE: VSTP Message Handler Stopped")
log.Warn("GOROUTINE: VSTP Message Handler Stopped")
time.Sleep(time.Second * 10)
log.Msg.Fatal("GOROUTINE: VSTP Message Handler Failed")
log.Fatal("GOROUTINE: VSTP Message Handler Failed")
}
}()
for {
msg := <-sub.C
if msg.Err != nil {
log.Msg.Error("STOMP Message Error: " + msg.Err.Error())
log.Error("STOMP Message Error: " + msg.Err.Error())
} else {
log.Msg.Info("STOMP Message Received")
log.Info("STOMP Message Received")
handle(msg)
}
}