Compare commits
No commits in common. "3d730054c075e40d701f7e64e3e997c6c7e1120b" and "77dc11a658a72bc558eed9af93603016657648d4" have entirely different histories.
3d730054c0
...
77dc11a658
|
@ -2,7 +2,6 @@ package background
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"math"
|
"math"
|
||||||
"os"
|
|
||||||
"runtime"
|
"runtime"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -19,25 +18,23 @@ const frequency = 2 * time.Hour // Figure out a sensible frequency!
|
||||||
func InitTicker(cfg *helpers.Configuration, stop <-chan struct{}) {
|
func InitTicker(cfg *helpers.Configuration, stop <-chan struct{}) {
|
||||||
go runTicker(cfg, stop)
|
go runTicker(cfg, stop)
|
||||||
|
|
||||||
// Run goroutine logging ticker if env "perflog" is set to "on"
|
// Run goroutine logging ticker if runtime set to debug
|
||||||
if os.Getenv("perflog") == "on" {
|
if helpers.Runtime == "debug" {
|
||||||
go goroutineTicker(stop)
|
go goroutineTicker(stop)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Runs the ticker and handles tick events
|
// Runs the ticker and handles tick events
|
||||||
func runTicker(cfg *helpers.Configuration, stop <-chan struct{}) {
|
func runTicker(cfg *helpers.Configuration, stop <-chan struct{}) {
|
||||||
log.Info("Starting background task ticker", zap.Duration("frequency", frequency))
|
log.Msg.Sugar().Infof("Starting background ticker, runs every %s", frequency)
|
||||||
ticker := time.NewTicker(frequency)
|
ticker := time.NewTicker(frequency)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-stop:
|
case <-stop:
|
||||||
log.Debug("Stopping background task ticker")
|
|
||||||
return
|
return
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
log.Debug("Running background tasks")
|
|
||||||
go cif.CheckCif(cfg)
|
go cif.CheckCif(cfg)
|
||||||
go corpus.CheckCorpus(cfg)
|
go corpus.CheckCorpus(cfg)
|
||||||
}
|
}
|
||||||
|
@ -46,7 +43,7 @@ func runTicker(cfg *helpers.Configuration, stop <-chan struct{}) {
|
||||||
|
|
||||||
// Starts a ticker that logs how many goroutines are running every two seconds
|
// Starts a ticker that logs how many goroutines are running every two seconds
|
||||||
func goroutineTicker(stop <-chan struct{}) {
|
func goroutineTicker(stop <-chan struct{}) {
|
||||||
log.Debug("Starting goroutine resource logging ticker")
|
log.Msg.Warn("Starting goroutine Tracker ticker - DEBUG USE ONLY")
|
||||||
ticker := time.NewTicker(1000 * time.Millisecond)
|
ticker := time.NewTicker(1000 * time.Millisecond)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
for {
|
for {
|
||||||
|
@ -67,5 +64,5 @@ func debugLog() {
|
||||||
heapMem := float64(memStats.HeapAlloc) / (1024 * 1024)
|
heapMem := float64(memStats.HeapAlloc) / (1024 * 1024)
|
||||||
heapMemRound := math.Round(heapMem*100) / 100
|
heapMemRound := math.Round(heapMem*100) / 100
|
||||||
|
|
||||||
log.Debug("Performance", zap.Int("goroutine-count", goroutines), zap.Float64("heap-mem (MB)", heapMemRound))
|
log.Msg.Debug("Performance", zap.Int("goroutine-count", goroutines), zap.Float64("heap-mem (MB)", heapMemRound))
|
||||||
}
|
}
|
||||||
|
|
22
cif/check.go
22
cif/check.go
|
@ -14,43 +14,43 @@ import (
|
||||||
func CheckCif(cfg *helpers.Configuration) {
|
func CheckCif(cfg *helpers.Configuration) {
|
||||||
// Check that it is after 0600, if not then skip update
|
// Check that it is after 0600, if not then skip update
|
||||||
if time.Now().In(londonTimezone).Hour() <= dataAvailable {
|
if time.Now().In(londonTimezone).Hour() <= dataAvailable {
|
||||||
log.Info("Too early to update CIF data, not published until 0600")
|
log.Msg.Info("Too early to update CIF data, not published until 0600")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debug("Checking age of CIF Data")
|
log.Msg.Info("Checking age of CIF Data")
|
||||||
|
|
||||||
// Load and read metadata from database
|
// Load and read metadata from database
|
||||||
metadata, err := dbAccess.GetCifMetadata()
|
metadata, err := dbAccess.GetCifMetadata()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Unable to read last update time", zap.Error(err))
|
log.Msg.Error("Unable to read last update time", zap.Error(err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// If no metadata is found in DB, presume no CIF data exists
|
// If no metadata is found in DB, presume no CIF data exists
|
||||||
if metadata == nil {
|
if metadata == nil {
|
||||||
log.Info("Full CIF download required")
|
log.Msg.Info("Full CIF download required")
|
||||||
err := runCifFullDownload(cfg)
|
err := runCifFullDownload(cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Unable to run full CIF Update", zap.Error(err))
|
log.Msg.Error("Unable to run full CIF Update", zap.Error(err))
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if last update was today
|
// Check if last update was today
|
||||||
if isSameToday(metadata.LastUpdate) {
|
if isSameToday(metadata.LastUpdate) {
|
||||||
log.Info("CIF Data has already been updated today, skipping")
|
log.Msg.Info("CIF Data has already been updated today, skipping")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check how many days since last update, if more than 5, run full update, else run update
|
// Check how many days since last update, if more than 5, run full update, else run update
|
||||||
daysSinceLastUpdate := howManyDaysAgo(metadata.LastUpdate)
|
daysSinceLastUpdate := howManyDaysAgo(metadata.LastUpdate)
|
||||||
if daysSinceLastUpdate > 5 {
|
if daysSinceLastUpdate > 5 {
|
||||||
log.Debug("Full Update Requested due to time since last update", zap.Int("daysSinceLastUpdate", daysSinceLastUpdate))
|
log.Msg.Debug("Full Update Requested due to time since last update", zap.Int("daysSinceLastUpdate", daysSinceLastUpdate))
|
||||||
log.Info("Full CIF download required")
|
log.Msg.Info("Full CIF download required")
|
||||||
err := runCifFullDownload(cfg)
|
err := runCifFullDownload(cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Full CIF update failed", zap.Error(err))
|
log.Msg.Error("Unable to run full CIF Update", zap.Error(err))
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -58,9 +58,9 @@ func CheckCif(cfg *helpers.Configuration) {
|
||||||
daysToUpdate := generateUpdateDays(daysSinceLastUpdate)
|
daysToUpdate := generateUpdateDays(daysSinceLastUpdate)
|
||||||
|
|
||||||
// Run the update
|
// Run the update
|
||||||
log.Info("CIF Update required", zap.Any("days to update", daysToUpdate))
|
log.Msg.Info("CIF Update required", zap.Any("days to update", daysToUpdate))
|
||||||
err = runCifUpdateDownload(cfg, metadata, daysToUpdate)
|
err = runCifUpdateDownload(cfg, metadata, daysToUpdate)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Daily CIF update failed", zap.Error(err))
|
log.Msg.Error("Unable to run CIF update", zap.Error(err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,7 +7,6 @@ import (
|
||||||
"git.fjla.uk/owlboard/go-types/pkg/upstreamApi"
|
"git.fjla.uk/owlboard/go-types/pkg/upstreamApi"
|
||||||
"git.fjla.uk/owlboard/timetable-mgr/helpers"
|
"git.fjla.uk/owlboard/timetable-mgr/helpers"
|
||||||
"git.fjla.uk/owlboard/timetable-mgr/log"
|
"git.fjla.uk/owlboard/timetable-mgr/log"
|
||||||
"go.uber.org/zap"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func ConvertServiceType(input *upstreamApi.JsonScheduleV1, vstp bool) (*database.Service, error) {
|
func ConvertServiceType(input *upstreamApi.JsonScheduleV1, vstp bool) (*database.Service, error) {
|
||||||
|
@ -47,7 +46,7 @@ func parseSpeed(CIFSpeed *string) int32 {
|
||||||
|
|
||||||
speed, err := strconv.ParseInt(actualSpeed, 10, 32)
|
speed, err := strconv.ParseInt(actualSpeed, 10, 32)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("Unable to parse speed", zap.String("input-value", *CIFSpeed))
|
log.Msg.Warn("Unable to parse speed: " + *CIFSpeed + ", returning 0")
|
||||||
return int32(0)
|
return int32(0)
|
||||||
}
|
}
|
||||||
return int32(speed)
|
return int32(speed)
|
||||||
|
|
|
@ -10,8 +10,13 @@ import (
|
||||||
|
|
||||||
// Fetches the day string for the provided date.
|
// Fetches the day string for the provided date.
|
||||||
func getDayString(t time.Time) string {
|
func getDayString(t time.Time) string {
|
||||||
time := t.In(londonTimezone)
|
london, err := time.LoadLocation("Europe/London")
|
||||||
day := time.Weekday()
|
if err != nil {
|
||||||
|
log.Msg.Error("Unable to load time zone info", zap.Error(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
timeNow := t.In(london)
|
||||||
|
day := timeNow.Weekday()
|
||||||
|
|
||||||
dayStrings := [...]string{"sun", "mon", "tue", "wed", "thu", "fri", "sat"}
|
dayStrings := [...]string{"sun", "mon", "tue", "wed", "thu", "fri", "sat"}
|
||||||
|
|
||||||
|
@ -39,6 +44,7 @@ func isSameToday(t time.Time) bool {
|
||||||
|
|
||||||
// Returns how many days ago `t` was compared to today
|
// Returns how many days ago `t` was compared to today
|
||||||
func howManyDaysAgo(t time.Time) int {
|
func howManyDaysAgo(t time.Time) int {
|
||||||
|
log.Msg.Debug("Calculating how many days ago", zap.Time("Input time", t))
|
||||||
// Truncate both times to midnight in UTC timezone
|
// Truncate both times to midnight in UTC timezone
|
||||||
today := time.Now().UTC().Truncate(24 * time.Hour)
|
today := time.Now().UTC().Truncate(24 * time.Hour)
|
||||||
input := t.UTC().Truncate(24 * time.Hour)
|
input := t.UTC().Truncate(24 * time.Hour)
|
||||||
|
@ -70,7 +76,7 @@ func ParseCifDate(input *string, startOrEnd string) time.Time {
|
||||||
layout := "2006-01-02" // Layout of input
|
layout := "2006-01-02" // Layout of input
|
||||||
t, err := time.ParseInLocation(layout, *input, londonTimezone)
|
t, err := time.ParseInLocation(layout, *input, londonTimezone)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Error parsing date string", zap.String("date string", *input), zap.Error(err))
|
log.Msg.Error("Error parsing date string", zap.String("date string", *input), zap.Error(err))
|
||||||
return time.Time{}
|
return time.Time{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -79,11 +85,11 @@ func ParseCifDate(input *string, startOrEnd string) time.Time {
|
||||||
} else if startOrEnd == "end" {
|
} else if startOrEnd == "end" {
|
||||||
t = time.Date(t.Year(), t.Month(), t.Day(), 23, 59, 59, 0, londonTimezone)
|
t = time.Date(t.Year(), t.Month(), t.Day(), 23, 59, 59, 0, londonTimezone)
|
||||||
} else {
|
} else {
|
||||||
log.Error("Error parsing date string", zap.String("date string", *input), zap.Error(err))
|
log.Msg.Error("Error parsing date string", zap.String("date string", *input), zap.Error(err))
|
||||||
return time.Time{}
|
return time.Time{}
|
||||||
}
|
}
|
||||||
|
|
||||||
return t.UTC()
|
return t
|
||||||
}
|
}
|
||||||
|
|
||||||
// Parses CIF days_run field and converts to array of day strings
|
// Parses CIF days_run field and converts to array of day strings
|
||||||
|
|
|
@ -103,8 +103,6 @@ func TestParseCifDate(t *testing.T) {
|
||||||
|
|
||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
result := ParseCifDate(&tc.dateString, tc.startOrEnd)
|
result := ParseCifDate(&tc.dateString, tc.startOrEnd)
|
||||||
result = result.In(londonTimezone)
|
|
||||||
//fmt.Println(tc.dateString, "|UTC: ", result.In(time.UTC), "|EU/Lon: ", result)
|
|
||||||
if result != tc.expect {
|
if result != tc.expect {
|
||||||
t.Errorf("For datestring %s, startOrEnd %s, expected %s, but got %s", tc.dateString, tc.startOrEnd, tc.expect.Format(layout), result.Format(layout))
|
t.Errorf("For datestring %s, startOrEnd %s, expected %s, but got %s", tc.dateString, tc.startOrEnd, tc.expect.Format(layout), result.Format(layout))
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,19 +33,7 @@ func checkMetadata(oldMeta *dbAccess.CifMetadata, newMeta *upstreamApi.JsonTimet
|
||||||
|
|
||||||
// Evaluates whether the given time is after yesterday at 0600
|
// Evaluates whether the given time is after yesterday at 0600
|
||||||
func isAfterYesterdayAt0600(t time.Time) bool {
|
func isAfterYesterdayAt0600(t time.Time) bool {
|
||||||
yesterday0600 := time.Now().In(londonTimezone).AddDate(0, 0, -1)
|
yesterday0600 := time.Now().In(time.UTC).AddDate(0, 0, -1)
|
||||||
yesterday0600 = time.Date(yesterday0600.Year(), yesterday0600.Month(), yesterday0600.Day(), 6, 0, 0, 0, time.UTC)
|
yesterday0600 = time.Date(yesterday0600.Year(), yesterday0600.Month(), yesterday0600.Day(), 6, 0, 0, 0, time.UTC)
|
||||||
return t.After(yesterday0600)
|
return t.After(yesterday0600)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Accepts the JsonTimetableV1 struct which contains CIF File metadata and produces a DB Ready struct.
|
|
||||||
func generateMetadata(header *upstreamApi.JsonTimetableV1) *dbAccess.CifMetadata {
|
|
||||||
newMetadata := dbAccess.CifMetadata{
|
|
||||||
Doctype: dbAccess.Doctype,
|
|
||||||
LastTimestamp: header.Timestamp,
|
|
||||||
LastUpdate: time.Now().In(londonTimezone),
|
|
||||||
LastSequence: header.Metadata.Sequence,
|
|
||||||
}
|
|
||||||
|
|
||||||
return &newMetadata
|
|
||||||
}
|
|
||||||
|
|
14
cif/parse.go
14
cif/parse.go
|
@ -13,7 +13,7 @@ import (
|
||||||
// Accepts the CIF data as a stream and outputs parsed data
|
// Accepts the CIF data as a stream and outputs parsed data
|
||||||
func parseCifDataStream(dataStream io.ReadCloser) (*parsedData, error) {
|
func parseCifDataStream(dataStream io.ReadCloser) (*parsedData, error) {
|
||||||
defer dataStream.Close()
|
defer dataStream.Close()
|
||||||
log.Debug("Starting CIF Datastream parsing")
|
log.Msg.Debug("Starting CIF Datastream parsing")
|
||||||
if dataStream == nil {
|
if dataStream == nil {
|
||||||
return nil, errors.New("unable to parse nil pointer")
|
return nil, errors.New("unable to parse nil pointer")
|
||||||
}
|
}
|
||||||
|
@ -29,7 +29,7 @@ func parseCifDataStream(dataStream io.ReadCloser) (*parsedData, error) {
|
||||||
for decoder.More() {
|
for decoder.More() {
|
||||||
var obj map[string]json.RawMessage
|
var obj map[string]json.RawMessage
|
||||||
if err := decoder.Decode(&obj); err != nil {
|
if err := decoder.Decode(&obj); err != nil {
|
||||||
log.Error("Error decoding JSON String")
|
log.Msg.Error("Error decoding JSON String")
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -39,7 +39,7 @@ func parseCifDataStream(dataStream io.ReadCloser) (*parsedData, error) {
|
||||||
case "JsonTimetableV1":
|
case "JsonTimetableV1":
|
||||||
var timetable upstreamApi.JsonTimetableV1
|
var timetable upstreamApi.JsonTimetableV1
|
||||||
if err := json.Unmarshal(value, &timetable); err != nil {
|
if err := json.Unmarshal(value, &timetable); err != nil {
|
||||||
log.Error("Error decoding JSONTimetableV1 object", zap.Error(err))
|
log.Msg.Error("Error decoding JSONTimetableV1 object", zap.Error(err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
parsed.header = timetable
|
parsed.header = timetable
|
||||||
|
@ -53,17 +53,17 @@ func parseCifDataStream(dataStream io.ReadCloser) (*parsedData, error) {
|
||||||
case "JsonScheduleV1":
|
case "JsonScheduleV1":
|
||||||
var schedule upstreamApi.JsonScheduleV1
|
var schedule upstreamApi.JsonScheduleV1
|
||||||
if err := json.Unmarshal(value, &schedule); err != nil {
|
if err := json.Unmarshal(value, &schedule); err != nil {
|
||||||
log.Error("Error decoding JSONScheduleV1 object", zap.Error(err))
|
log.Msg.Error("Error decoding JSONScheduleV1 object", zap.Error(err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
parsed.sched = append(parsed.sched, schedule)
|
parsed.sched = append(parsed.sched, schedule)
|
||||||
case "EOF":
|
case "EOF":
|
||||||
log.Debug("Reached EOF")
|
log.Msg.Info("Reached EOF")
|
||||||
default:
|
default:
|
||||||
log.Warn("Unknown CIF Data type", zap.String("key", key))
|
log.Msg.Warn("Unknown CIF Data type", zap.String("key", key))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.Debug("CIF Parsing completed")
|
log.Msg.Debug("CIF Parsing completed")
|
||||||
return &parsed, nil
|
return &parsed, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
package cif
|
package cif
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
"git.fjla.uk/owlboard/go-types/pkg/database"
|
"git.fjla.uk/owlboard/go-types/pkg/database"
|
||||||
"git.fjla.uk/owlboard/go-types/pkg/upstreamApi"
|
"git.fjla.uk/owlboard/go-types/pkg/upstreamApi"
|
||||||
"git.fjla.uk/owlboard/timetable-mgr/dbAccess"
|
"git.fjla.uk/owlboard/timetable-mgr/dbAccess"
|
||||||
|
@ -10,8 +12,8 @@ import (
|
||||||
|
|
||||||
// Processes parsed CIF data and applies the data to the database
|
// Processes parsed CIF data and applies the data to the database
|
||||||
func processParsedCif(data *parsedData) error {
|
func processParsedCif(data *parsedData) error {
|
||||||
log.Debug("Starting CIF Processing")
|
log.Msg.Debug("Starting CIF Processing")
|
||||||
log.Info("Processing CIF Data", zap.Int("schedule-count", len(data.sched)))
|
log.Msg.Info("Processing CIF Data", zap.Int("schedule-count", len(data.sched)))
|
||||||
|
|
||||||
// Batch size for processing
|
// Batch size for processing
|
||||||
batchSize := 250 // Needs optimising for better RAM use. 1000 ~ 5.7GB, 500 ~ 5.5GB, 750 ~ 5.2GB
|
batchSize := 250 // Needs optimising for better RAM use. 1000 ~ 5.7GB, 500 ~ 5.5GB, 750 ~ 5.2GB
|
||||||
|
@ -32,7 +34,7 @@ func processParsedCif(data *parsedData) error {
|
||||||
if len(deleteBatch) > 0 {
|
if len(deleteBatch) > 0 {
|
||||||
err := doDeletions(deleteBatch)
|
err := doDeletions(deleteBatch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Error deleting CIF Entries", zap.Error(err))
|
log.Msg.Error("Error deleting CIF Entries", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -54,13 +56,13 @@ func processParsedCif(data *parsedData) error {
|
||||||
if len(createBatch) > 0 {
|
if len(createBatch) > 0 {
|
||||||
err := doCreations(createBatch)
|
err := doCreations(createBatch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Error creating CIF Entries", zap.Error(err))
|
log.Msg.Error("Error creating CIF Entries", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debug("CIF Processing complete")
|
log.Msg.Debug("CIF Processing complete")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -68,7 +70,7 @@ func processParsedCif(data *parsedData) error {
|
||||||
func doDeletions(deletions []*upstreamApi.JsonScheduleV1) error {
|
func doDeletions(deletions []*upstreamApi.JsonScheduleV1) error {
|
||||||
defer func() {
|
defer func() {
|
||||||
if r := recover(); r != nil {
|
if r := recover(); r != nil {
|
||||||
log.Panic("Panic:", zap.Any("panic", r))
|
log.Msg.Panic("Panic:", zap.Any("panic", r))
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
deleteQueries := make([]database.DeleteQuery, 0)
|
deleteQueries := make([]database.DeleteQuery, 0)
|
||||||
|
@ -84,7 +86,7 @@ func doDeletions(deletions []*upstreamApi.JsonScheduleV1) error {
|
||||||
|
|
||||||
err := dbAccess.DeleteCifEntries(deleteQueries)
|
err := dbAccess.DeleteCifEntries(deleteQueries)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Error deleting documents", zap.Error(err))
|
log.Msg.Error("Error deleting documents", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -97,8 +99,7 @@ func doCreations(creations []*upstreamApi.JsonScheduleV1) error {
|
||||||
for _, item := range creations {
|
for _, item := range creations {
|
||||||
document, err := ConvertServiceType(item, false)
|
document, err := ConvertServiceType(item, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Error converting JsonSchedule to Service type", zap.Error(err))
|
log.Msg.Error("Error converting JsonSchedule to Service type", zap.Error(err))
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
|
|
||||||
createDocuments = append(createDocuments, *document)
|
createDocuments = append(createDocuments, *document)
|
||||||
|
@ -106,9 +107,21 @@ func doCreations(creations []*upstreamApi.JsonScheduleV1) error {
|
||||||
|
|
||||||
err := dbAccess.CreateCifEntries(createDocuments)
|
err := dbAccess.CreateCifEntries(createDocuments)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Error creating documents", zap.Error(err))
|
log.Msg.Error("Error creating documents", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Accepts the JsonTimetableV1 struct which contains CIF File metadata and produces a DB Ready struct.
|
||||||
|
func generateMetadata(header *upstreamApi.JsonTimetableV1) *dbAccess.CifMetadata {
|
||||||
|
newMetadata := dbAccess.CifMetadata{
|
||||||
|
Doctype: dbAccess.Doctype,
|
||||||
|
LastTimestamp: header.Timestamp,
|
||||||
|
LastUpdate: time.Now().In(londonTimezone),
|
||||||
|
LastSequence: header.Metadata.Sequence,
|
||||||
|
}
|
||||||
|
|
||||||
|
return &newMetadata
|
||||||
|
}
|
||||||
|
|
|
@ -56,27 +56,3 @@ func TestGenerateMetadata(t *testing.T) {
|
||||||
t.Errorf("LastUpdate: expected %s, got %s", expected.LastUpdate, result.LastUpdate)
|
t.Errorf("LastUpdate: expected %s, got %s", expected.LastUpdate, result.LastUpdate)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestIsAfterYesterdayAt0600(t *testing.T) {
|
|
||||||
yesterday0600 := time.Now().In(londonTimezone).AddDate(0, 0, -1).Truncate(24 * time.Hour).Add(6 * time.Hour)
|
|
||||||
|
|
||||||
testCases := []struct {
|
|
||||||
input time.Time
|
|
||||||
expect bool
|
|
||||||
}{
|
|
||||||
{yesterday0600.Add(-1 * time.Hour), false},
|
|
||||||
{yesterday0600.Add(-12 * time.Hour), false},
|
|
||||||
{yesterday0600.Add(-24 * time.Hour), false},
|
|
||||||
{yesterday0600.Add(1 * time.Microsecond), true},
|
|
||||||
{yesterday0600.Add(1 * time.Hour), true},
|
|
||||||
{yesterday0600.Add(12 * time.Hour), true},
|
|
||||||
{yesterday0600.Add(24 * time.Hour), true},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tc := range testCases {
|
|
||||||
result := isAfterYesterdayAt0600(tc.input)
|
|
||||||
if result != tc.expect {
|
|
||||||
t.Errorf("For input %v, expected %t, but got %t", tc.input, tc.expect, result)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -13,22 +13,22 @@ import (
|
||||||
|
|
||||||
// Replaces all existing CIF Data with a new download
|
// Replaces all existing CIF Data with a new download
|
||||||
func runCifFullDownload(cfg *helpers.Configuration) error {
|
func runCifFullDownload(cfg *helpers.Configuration) error {
|
||||||
log.Info("Downloading all CIF Data")
|
log.Msg.Info("Downloading all CIF Data")
|
||||||
|
|
||||||
// Download CIF Data file
|
// Download CIF Data file
|
||||||
url, err := getUpdateUrl("full")
|
url, err := getUpdateUrl("full")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Error getting download URL", zap.Error(err))
|
log.Msg.Error("Error getting download URL", zap.Error(err))
|
||||||
}
|
}
|
||||||
dataStream, err := nrod.NrodStream(url, cfg)
|
dataStream, err := nrod.NrodStream(url, cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Error downloading CIF data", zap.Error(err))
|
log.Msg.Error("Error downloading CIF data", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Parse CIF file
|
// Parse CIF file
|
||||||
parsed, err := parseCifDataStream(dataStream)
|
parsed, err := parseCifDataStream(dataStream)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Error parsing CIF data", zap.Error(err))
|
log.Msg.Error("Error parsing CIF data", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -38,68 +38,56 @@ func runCifFullDownload(cfg *helpers.Configuration) error {
|
||||||
// Process CIF file
|
// Process CIF file
|
||||||
err = processParsedCif(parsed)
|
err = processParsedCif(parsed)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Error processing CIF data", zap.Error(err))
|
log.Msg.Error("Error processing CIF data", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
newMeta := generateMetadata(&parsed.header)
|
newMeta := generateMetadata(&parsed.header)
|
||||||
ok := dbAccess.PutCifMetadata(newMeta, fullUpdateType)
|
ok := dbAccess.PutCifMetadata(newMeta, fullUpdateType)
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Warn("CIF Data updated, but metadata write failed")
|
log.Msg.Warn("CIF Data updated, but metadata write failed")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set parsed to nil to encourage garbage collection
|
|
||||||
parsed = nil
|
parsed = nil
|
||||||
|
|
||||||
// Clear out of date schedules
|
|
||||||
cutoff := time.Now().Add(-time.Hour * 24 * 7)
|
|
||||||
log.Debug("Attempting to remove outdated services", zap.Time("scheduleEnd before", cutoff))
|
|
||||||
count, err := dbAccess.RemoveOutdatedServices(cutoff)
|
|
||||||
if err != nil {
|
|
||||||
log.Warn("Out of date services not removed", zap.Error(err))
|
|
||||||
} else {
|
|
||||||
log.Info("Out of date services removed", zap.Int64("removal count", count))
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Runs a CIF Update for up to five days
|
// Runs a CIF Update for up to five days
|
||||||
func runCifUpdateDownload(cfg *helpers.Configuration, metadata *dbAccess.CifMetadata, days []time.Time) error {
|
func runCifUpdateDownload(cfg *helpers.Configuration, metadata *dbAccess.CifMetadata, days []time.Time) error {
|
||||||
log.Info("Downloading CIF Updates")
|
log.Msg.Info("Downloading CIF Updates")
|
||||||
|
|
||||||
// Loop over dates
|
// Loop over dates
|
||||||
for _, time := range days {
|
for _, time := range days {
|
||||||
log.Info("Downloading CIF File", zap.Time("CIF Data from", time))
|
log.Msg.Info("Downloading CIF File", zap.Time("CIF Data from", time))
|
||||||
|
|
||||||
// Download CIF data file
|
// Download CIF data file
|
||||||
data, err := fetchUpdate(time, cfg)
|
data, err := fetchUpdate(time, cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Error fetching CIF update", zap.Error(err))
|
log.Msg.Error("Error fetching CIF update", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Parse CIF file
|
// Parse CIF file
|
||||||
parsed, err := parseCifDataStream(data)
|
parsed, err := parseCifDataStream(data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Error parsing CIF data", zap.Error(err))
|
log.Msg.Error("Error parsing CIF data", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check CIF Metadata
|
// Check CIF Metadata
|
||||||
log.Debug("Starting metadata checks")
|
log.Msg.Debug("Starting metadata checks")
|
||||||
|
|
||||||
reason, update := checkMetadata(metadata, &parsed.header)
|
reason, update := checkMetadata(metadata, &parsed.header)
|
||||||
if !update {
|
if !update {
|
||||||
log.Warn("Update file not processed", zap.String("reason", reason))
|
log.Msg.Warn("Update file not processed", zap.String("reason", reason))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("CIF Data is suitable for processing", zap.String("reason", reason))
|
log.Msg.Info("CIF Data is suitable for processing", zap.String("reason", reason))
|
||||||
|
|
||||||
// Process CIF file
|
// Process CIF file
|
||||||
err = processParsedCif(parsed)
|
err = processParsedCif(parsed)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Error processing CIF data", zap.Error(err))
|
log.Msg.Error("Error processing CIF data", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
metadata = generateMetadata(&parsed.header)
|
metadata = generateMetadata(&parsed.header)
|
||||||
|
@ -108,7 +96,7 @@ func runCifUpdateDownload(cfg *helpers.Configuration, metadata *dbAccess.CifMeta
|
||||||
|
|
||||||
ok := dbAccess.PutCifMetadata(metadata, dailyUpdateType)
|
ok := dbAccess.PutCifMetadata(metadata, dailyUpdateType)
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Warn("CIF Data updated, but metadata write failed.")
|
log.Msg.Warn("CIF Data updated, but metadata write failed.")
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -11,10 +11,10 @@ import (
|
||||||
|
|
||||||
// Checks if the CORPUS Data needs updating, and calls an updater function if needed.
|
// Checks if the CORPUS Data needs updating, and calls an updater function if needed.
|
||||||
func CheckCorpus(cfg *helpers.Configuration) {
|
func CheckCorpus(cfg *helpers.Configuration) {
|
||||||
log.Debug("Checking age of CORPUS Data")
|
log.Msg.Debug("Checking age of CORPUS Data")
|
||||||
utime, err := dbAccess.CheckUpdateTime(dbAccess.CorpusCollection)
|
utime, err := dbAccess.CheckUpdateTime(dbAccess.CorpusCollection)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Error checking last CORPUS update", zap.Error(err))
|
log.Msg.Error("Error checking last CORPUS update", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
lastUpdate := time.Unix(utime, 0)
|
lastUpdate := time.Unix(utime, 0)
|
||||||
|
@ -22,17 +22,17 @@ func CheckCorpus(cfg *helpers.Configuration) {
|
||||||
dataAge := currentTime.Sub(lastUpdate)
|
dataAge := currentTime.Sub(lastUpdate)
|
||||||
fortnight := 14 * 24 * time.Hour
|
fortnight := 14 * 24 * time.Hour
|
||||||
|
|
||||||
log.Debug("CORPUS Data", zap.Duration("Data Age", dataAge), zap.Duration("Max Age", 14*24*time.Hour))
|
log.Msg.Debug("CORPUS Data", zap.Duration("Data Age", dataAge), zap.Duration("Max Age", 14*24*time.Hour))
|
||||||
|
|
||||||
if dataAge >= fortnight {
|
if dataAge >= fortnight {
|
||||||
log.Info("CORPUS update required")
|
log.Msg.Info("CORPUS Data is more than two weeks old")
|
||||||
err := RunCorpusUpdate(cfg)
|
err := RunCorpusUpdate(cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("CORPUS Update did not run")
|
log.Msg.Warn("CORPUS Update did not run")
|
||||||
} else {
|
} else {
|
||||||
log.Info("CORPUS data has been updated")
|
log.Msg.Info("CORPUS data has been updated")
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
log.Info("CORPUS Data not stale, skipping updating")
|
log.Msg.Info("CORPUS Data is less than two weeks old, not updating")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,14 +14,14 @@ import (
|
||||||
func parseCorpusData(stream io.ReadCloser) (*[]database.CorpusEntry, error) {
|
func parseCorpusData(stream io.ReadCloser) (*[]database.CorpusEntry, error) {
|
||||||
defer stream.Close()
|
defer stream.Close()
|
||||||
|
|
||||||
log.Debug("Starting CORPUS Data parsing")
|
log.Msg.Debug("Starting CORPUS Data parsing")
|
||||||
|
|
||||||
var corpusEntries []database.CorpusEntry
|
var corpusEntries []database.CorpusEntry
|
||||||
decoder := json.NewDecoder(stream)
|
decoder := json.NewDecoder(stream)
|
||||||
|
|
||||||
// Expect an object at the root of the JSON stream
|
// Expect an object at the root of the JSON stream
|
||||||
if _, err := decoder.Token(); err != nil {
|
if _, err := decoder.Token(); err != nil {
|
||||||
log.Error("Error parsing CORPUS Data", zap.Error(err))
|
log.Msg.Error("Error parsing CORPUS Data", zap.Error(err))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -29,19 +29,19 @@ func parseCorpusData(stream io.ReadCloser) (*[]database.CorpusEntry, error) {
|
||||||
for decoder.More() {
|
for decoder.More() {
|
||||||
// Decode the next JSON token
|
// Decode the next JSON token
|
||||||
if tok, err := decoder.Token(); err != nil {
|
if tok, err := decoder.Token(); err != nil {
|
||||||
log.Error("Error parsing CORPUS Data", zap.Error(err))
|
log.Msg.Error("Error parsing CORPUS Data", zap.Error(err))
|
||||||
return nil, err
|
return nil, err
|
||||||
} else if tok == "TIPLOCDATA" {
|
} else if tok == "TIPLOCDATA" {
|
||||||
// Found the "TIPLOCDATA" key, expect the associated array
|
// Found the "TIPLOCDATA" key, expect the associated array
|
||||||
if !decoder.More() {
|
if !decoder.More() {
|
||||||
err := errors.New("missing array after TIPLOCDATA key")
|
err := errors.New("missing array after TIPLOCDATA key")
|
||||||
log.Error("Error parsing CORPUS Data", zap.Error(err))
|
log.Msg.Error("Error parsing CORPUS Data", zap.Error(err))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start reading the array associated with the "TIPLOCDATA" key
|
// Start reading the array associated with the "TIPLOCDATA" key
|
||||||
if _, err := decoder.Token(); err != nil {
|
if _, err := decoder.Token(); err != nil {
|
||||||
log.Error("Error parsing CORPUS Data", zap.Error(err))
|
log.Msg.Error("Error parsing CORPUS Data", zap.Error(err))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -49,7 +49,7 @@ func parseCorpusData(stream io.ReadCloser) (*[]database.CorpusEntry, error) {
|
||||||
for decoder.More() {
|
for decoder.More() {
|
||||||
var corpusEntry database.CorpusEntry
|
var corpusEntry database.CorpusEntry
|
||||||
if err := decoder.Decode(&corpusEntry); err != nil {
|
if err := decoder.Decode(&corpusEntry); err != nil {
|
||||||
log.Error("Error parsing CORPUS Data", zap.Error(err))
|
log.Msg.Error("Error parsing CORPUS Data", zap.Error(err))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
corpusEntries = append(corpusEntries, corpusEntry)
|
corpusEntries = append(corpusEntries, corpusEntry)
|
||||||
|
@ -58,7 +58,7 @@ func parseCorpusData(stream io.ReadCloser) (*[]database.CorpusEntry, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debug("CORPUS parsing complete")
|
log.Msg.Debug("CORPUS parsing complete")
|
||||||
|
|
||||||
return &corpusEntries, nil
|
return &corpusEntries, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,13 +12,13 @@ import (
|
||||||
func RunCorpusUpdate(cfg *helpers.Configuration) error {
|
func RunCorpusUpdate(cfg *helpers.Configuration) error {
|
||||||
resp, err := nrod.NrodStream(url, cfg)
|
resp, err := nrod.NrodStream(url, cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Failed to fetch CORPUS data", zap.Error(err))
|
log.Msg.Error("Failed to fetch CORPUS data", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
unsortedCorpusData, err := parseCorpusData(resp)
|
unsortedCorpusData, err := parseCorpusData(resp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Error parsing Corpus data", zap.Error(err))
|
log.Msg.Error("Error parsing Corpus data", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -26,24 +26,24 @@ func RunCorpusUpdate(cfg *helpers.Configuration) error {
|
||||||
stationData := createStationEntries(corpusData)
|
stationData := createStationEntries(corpusData)
|
||||||
|
|
||||||
if err := dbAccess.DropCollection(dbAccess.CorpusCollection); err != nil {
|
if err := dbAccess.DropCollection(dbAccess.CorpusCollection); err != nil {
|
||||||
log.Warn("CORPUS data may be incomplete")
|
log.Msg.Warn("CORPUS data may be incomplete")
|
||||||
log.Error("Error dropping CORPUS Data", zap.Error(err))
|
log.Msg.Error("Error dropping CORPUS Data", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := dbAccess.DropCollection(dbAccess.StationsCollection); err != nil {
|
if err := dbAccess.DropCollection(dbAccess.StationsCollection); err != nil {
|
||||||
log.Warn("Stations data may be incomplete")
|
log.Msg.Warn("Stations data may be incomplete")
|
||||||
log.Error("Error dropping stations Data", zap.Error(err))
|
log.Msg.Error("Error dropping stations Data", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := dbAccess.PutManyCorpus(corpusData); err != nil {
|
if err := dbAccess.PutManyCorpus(corpusData); err != nil {
|
||||||
log.Warn("CORPUS data may be incomplete")
|
log.Msg.Warn("CORPUS data may be incomplete")
|
||||||
log.Error("Error inserting CORPUS Data", zap.Error(err))
|
log.Msg.Error("Error inserting CORPUS Data", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := dbAccess.PutManyStations(stationData); err != nil {
|
if err := dbAccess.PutManyStations(stationData); err != nil {
|
||||||
log.Warn("Stations data may be incomplete")
|
log.Msg.Warn("Stations data may be incomplete")
|
||||||
log.Error("Error inserting stations data", zap.Error(err))
|
log.Msg.Error("Error inserting stations data", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -28,9 +28,9 @@ func PushVersionToDb() {
|
||||||
coll := MongoClient.Database("owlboard").Collection("versions")
|
coll := MongoClient.Database("owlboard").Collection("versions")
|
||||||
_, err := coll.UpdateOne(context.TODO(), versionSelector, bson.M{"$set": version}, opts)
|
_, err := coll.UpdateOne(context.TODO(), versionSelector, bson.M{"$set": version}, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("Unable to push version to database: " + err.Error())
|
log.Msg.Warn("Unable to push version to database: " + err.Error())
|
||||||
} else {
|
} else {
|
||||||
log.Debug("Version up to date in Database")
|
log.Msg.Debug("Version up to date in Database")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -39,7 +39,7 @@ func PutOneService(data database.Service) bool {
|
||||||
coll := MongoClient.Database(databaseName).Collection(timetableCollection)
|
coll := MongoClient.Database(databaseName).Collection(timetableCollection)
|
||||||
_, err := coll.InsertOne(context.TODO(), data)
|
_, err := coll.InsertOne(context.TODO(), data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Unable to insert to database: " + err.Error())
|
log.Msg.Error("Unable to insert to database: " + err.Error())
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
|
@ -55,7 +55,7 @@ func DeleteOneService(data database.DeleteQuery) bool {
|
||||||
}
|
}
|
||||||
_, err := coll.DeleteOne(context.TODO(), filter)
|
_, err := coll.DeleteOne(context.TODO(), filter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Unable to delete service: " + err.Error())
|
log.Msg.Error("Unable to delete service: " + err.Error())
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
|
|
|
@ -36,10 +36,11 @@ func GetCifMetadata() (*CifMetadata, error) {
|
||||||
if errors.Is(err, mongo.ErrNoDocuments) {
|
if errors.Is(err, mongo.ErrNoDocuments) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
log.Msg.Error("Error fetching CIF Metadata")
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debug("Fetched CIF Metadata from database", zap.Any("Metadata", result))
|
log.Msg.Debug("Fetched CIF Metadata from database", zap.Any("Metadata", result))
|
||||||
|
|
||||||
return &result, nil
|
return &result, nil
|
||||||
}
|
}
|
||||||
|
@ -63,11 +64,11 @@ func PutCifMetadata(metadata *CifMetadata, lastUpdateType string) bool {
|
||||||
_, err := collection.UpdateOne(context.Background(), filter, update, options)
|
_, err := collection.UpdateOne(context.Background(), filter, update, options)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Error updating CIF Metadata", zap.Error(err))
|
log.Msg.Error("Error updating CIF Metadata", zap.Error(err))
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("New CIF Metadata written", zap.Time("Update time", metadata.LastUpdate))
|
log.Msg.Info("New CIF Metadata written", zap.Time("Update time", metadata.LastUpdate))
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -75,15 +76,15 @@ func PutCifMetadata(metadata *CifMetadata, lastUpdateType string) bool {
|
||||||
func DeleteCifEntries(deletions []database.DeleteQuery) error {
|
func DeleteCifEntries(deletions []database.DeleteQuery) error {
|
||||||
defer func() {
|
defer func() {
|
||||||
if r := recover(); r != nil {
|
if r := recover(); r != nil {
|
||||||
log.Panic("Panic:", zap.Any("panic", r))
|
log.Msg.Panic("Panic:", zap.Any("panic", r))
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
// Skip if deletions is empty
|
// Skip if deletions is empty
|
||||||
if len(deletions) == 0 {
|
if len(deletions) == 0 {
|
||||||
log.Info("No deletions required")
|
log.Msg.Info("No deletions required")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
log.Debug("Running deletions against database", zap.Int("count", len(deletions)))
|
log.Msg.Info("Running deletions against database", zap.Int("count", len(deletions)))
|
||||||
|
|
||||||
// Prepare deletion tasks
|
// Prepare deletion tasks
|
||||||
collection := MongoClient.Database(databaseName).Collection(timetableCollection)
|
collection := MongoClient.Database(databaseName).Collection(timetableCollection)
|
||||||
|
@ -102,6 +103,7 @@ func DeleteCifEntries(deletions []database.DeleteQuery) error {
|
||||||
|
|
||||||
_, err := collection.BulkWrite(context.Background(), bulkDeletions, bulkWriteOptions)
|
_, err := collection.BulkWrite(context.Background(), bulkDeletions, bulkWriteOptions)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.Msg.Error("Error deleting documents", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -112,10 +114,10 @@ func DeleteCifEntries(deletions []database.DeleteQuery) error {
|
||||||
func CreateCifEntries(schedules []database.Service) error {
|
func CreateCifEntries(schedules []database.Service) error {
|
||||||
// Skip if deletions is empty
|
// Skip if deletions is empty
|
||||||
if len(schedules) == 0 {
|
if len(schedules) == 0 {
|
||||||
log.Info("No creations required")
|
log.Msg.Info("No creations required")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
log.Debug("Running creations against database", zap.Int("count", len(schedules)))
|
log.Msg.Info("Running creations against database", zap.Int("count", len(schedules)))
|
||||||
|
|
||||||
collection := MongoClient.Database(databaseName).Collection(timetableCollection)
|
collection := MongoClient.Database(databaseName).Collection(timetableCollection)
|
||||||
|
|
||||||
|
@ -130,24 +132,9 @@ func CreateCifEntries(schedules []database.Service) error {
|
||||||
|
|
||||||
_, err := collection.BulkWrite(context.Background(), models, bulkWriteOptions)
|
_, err := collection.BulkWrite(context.Background(), models, bulkWriteOptions)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.Msg.Error("Error inserting documents", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Removes any schedules which ended before 'cutoff'
|
|
||||||
func RemoveOutdatedServices(cutoff time.Time) (count int64, err error) {
|
|
||||||
// Define filter
|
|
||||||
filter := bson.M{"scheduleEndDate": bson.M{"$lt": cutoff}}
|
|
||||||
|
|
||||||
collection := MongoClient.Database(databaseName).Collection(timetableCollection)
|
|
||||||
|
|
||||||
res, err := collection.DeleteMany(context.Background(), filter)
|
|
||||||
if err != nil {
|
|
||||||
return // Automatically returns named values
|
|
||||||
}
|
|
||||||
|
|
||||||
count = res.DeletedCount
|
|
||||||
return // Automatically returns names values
|
|
||||||
}
|
|
||||||
|
|
|
@ -3,9 +3,9 @@ package dbAccess
|
||||||
import (
|
import (
|
||||||
"git.fjla.uk/owlboard/timetable-mgr/helpers"
|
"git.fjla.uk/owlboard/timetable-mgr/helpers"
|
||||||
"git.fjla.uk/owlboard/timetable-mgr/log"
|
"git.fjla.uk/owlboard/timetable-mgr/log"
|
||||||
"go.uber.org/zap"
|
|
||||||
|
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.mongodb.org/mongo-driver/mongo"
|
"go.mongodb.org/mongo-driver/mongo"
|
||||||
|
@ -26,43 +26,19 @@ var bsonOpts = &options.BSONOptions{
|
||||||
UseJSONStructTags: true,
|
UseJSONStructTags: true,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Initialise the DB Connection
|
||||||
func InitDataAccess(cfg *helpers.Configuration) {
|
func InitDataAccess(cfg *helpers.Configuration) {
|
||||||
log.Debug("Starting database connection")
|
uri := getDbUri(cfg)
|
||||||
url := getDbUri(cfg)
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
|
defer cancel()
|
||||||
const maxRetries = 8
|
client, err := mongo.Connect(ctx, options.Client().ApplyURI(uri).SetBSONOptions(bsonOpts))
|
||||||
|
if err != nil {
|
||||||
for attempt := 1; attempt <= maxRetries; attempt++ {
|
fmt.Println(err)
|
||||||
log.Info("Attempting to connect to database", zap.Int("attempt", attempt), zap.Int("max-tries", maxRetries))
|
log.Msg.Fatal("Error connecting to database: " + err.Error())
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
} else {
|
||||||
defer cancel()
|
log.Msg.Info("Database connection successful")
|
||||||
|
|
||||||
client, err := mongo.Connect(ctx, options.Client().ApplyURI(url).SetBSONOptions(bsonOpts))
|
|
||||||
if err != nil {
|
|
||||||
log.Warn("Error connecting to database", zap.Int("attempt", attempt), zap.Int("max-tries", maxRetries))
|
|
||||||
cancel()
|
|
||||||
if attempt != maxRetries {
|
|
||||||
helpers.BackoffDelay(attempt)
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
err = client.Ping(ctx, nil)
|
|
||||||
if err != nil {
|
|
||||||
log.Warn("Error pinging database", zap.Int("attempt", attempt), zap.Int("max-tries", maxRetries))
|
|
||||||
cancel()
|
|
||||||
if attempt != maxRetries {
|
|
||||||
helpers.BackoffDelay(attempt)
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
MongoClient = client
|
|
||||||
log.Info("Database connection successful")
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
MongoClient = client
|
||||||
log.Fatal("Failed to connect to database on multiple attempts", zap.Int("attempts", maxRetries))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Closes the connection to the database - used for cleanup functions
|
// Closes the connection to the database - used for cleanup functions
|
||||||
|
@ -71,9 +47,9 @@ func CloseMongoClient() {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
if err := MongoClient.Disconnect(ctx); err != nil {
|
if err := MongoClient.Disconnect(ctx); err != nil {
|
||||||
log.Warn("Error disconnecting MongoDB client: " + err.Error())
|
log.Msg.Warn("Error disconnecting MongoDB client: " + err.Error())
|
||||||
} else {
|
} else {
|
||||||
log.Info("MongoDB client disconnected.")
|
log.Msg.Info("MongoDB client disconnected.")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,13 +12,13 @@ import (
|
||||||
|
|
||||||
// CAUTION: Drops the collection named in collectionName
|
// CAUTION: Drops the collection named in collectionName
|
||||||
func DropCollection(collectionName string) error {
|
func DropCollection(collectionName string) error {
|
||||||
log.Info("Dropping collection", zap.String("Collection Name", collectionName))
|
log.Msg.Info("Dropping collection", zap.String("Collection Name", collectionName))
|
||||||
database := MongoClient.Database(databaseName)
|
database := MongoClient.Database(databaseName)
|
||||||
collection := database.Collection(collectionName)
|
collection := database.Collection(collectionName)
|
||||||
|
|
||||||
err := collection.Drop(context.Background())
|
err := collection.Drop(context.Background())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Error dropping collection", zap.String("Collection Name", collectionName), zap.Error(err))
|
log.Msg.Error("Error dropping collection", zap.String("Collection Name", collectionName), zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -45,7 +45,7 @@ func CheckUpdateTime(collectionName string) (int64, error) {
|
||||||
|
|
||||||
// Sets a new update time (unix timestamp) of the collection named in collectionName. The update time is calculated within the function.
|
// Sets a new update time (unix timestamp) of the collection named in collectionName. The update time is calculated within the function.
|
||||||
func SetUpdateTime(collectionName string) error {
|
func SetUpdateTime(collectionName string) error {
|
||||||
log.Info("Setting update time", zap.String("collection", collectionName))
|
log.Msg.Info("Setting update time", zap.String("collection", collectionName))
|
||||||
database := MongoClient.Database(databaseName)
|
database := MongoClient.Database(databaseName)
|
||||||
collection := database.Collection("meta")
|
collection := database.Collection("meta")
|
||||||
options := options.Update().SetUpsert(true)
|
options := options.Update().SetUpsert(true)
|
||||||
|
@ -65,7 +65,7 @@ func SetUpdateTime(collectionName string) error {
|
||||||
_, err := collection.UpdateOne(context.Background(), filter, update, options)
|
_, err := collection.UpdateOne(context.Background(), filter, update, options)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Error setting update time", zap.String("collection", collectionName), zap.Error(err))
|
log.Msg.Error("Error setting update time", zap.String("collection", collectionName), zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -1,18 +0,0 @@
|
||||||
package helpers
|
|
||||||
|
|
||||||
import (
|
|
||||||
"math"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"git.fjla.uk/owlboard/timetable-mgr/log"
|
|
||||||
"go.uber.org/zap"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Implements an exponential backoff strategy and sleeps for a duration calculated as 1 second to the power of (attempt - 1).
|
|
||||||
// The backoff time doubles with each attempt, starting from 1 second for the first attempt.
|
|
||||||
func BackoffDelay(attempt int) {
|
|
||||||
base := time.Second
|
|
||||||
backoff := base * time.Duration(math.Pow(2, float64(attempt-1)))
|
|
||||||
log.Info("Retry backoff", zap.Duration("delay", backoff))
|
|
||||||
time.Sleep(backoff)
|
|
||||||
}
|
|
79
log/log.go
79
log/log.go
|
@ -1,77 +1,36 @@
|
||||||
package log
|
package log
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"os"
|
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"go.uber.org/zap/zapcore"
|
"go.uber.org/zap/zapcore"
|
||||||
|
|
||||||
|
"git.fjla.uk/owlboard/timetable-mgr/helpers"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Use outside `package log`
|
var Msg *zap.Logger
|
||||||
// should be avoided.
|
|
||||||
var post *zap.Logger
|
|
||||||
|
|
||||||
// Initialises the logger
|
// Initialises the logger
|
||||||
func InitLogger() {
|
func init() {
|
||||||
var err error
|
var err error
|
||||||
mode := os.Getenv("runtime")
|
|
||||||
if mode == "" {
|
|
||||||
mode = "prod"
|
|
||||||
}
|
|
||||||
|
|
||||||
var level zapcore.Level
|
|
||||||
if mode == "debug" {
|
|
||||||
level = zap.DebugLevel
|
|
||||||
} else {
|
|
||||||
level = zap.InfoLevel
|
|
||||||
}
|
|
||||||
|
|
||||||
|
// Create a custom configuration with a human-readable "Console" encoder
|
||||||
config := zap.NewDevelopmentConfig()
|
config := zap.NewDevelopmentConfig()
|
||||||
config.DisableStacktrace = true
|
config.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder // Adds color to log levels
|
||||||
config.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder
|
|
||||||
config.Level = zap.NewAtomicLevelAt(level)
|
// Determine the log level based on the runtime mode
|
||||||
post, err = config.Build(zap.AddCallerSkip(1))
|
logLevel := zapcore.DebugLevel
|
||||||
|
if helpers.Runtime != "debug" {
|
||||||
|
logLevel = zapcore.InfoLevel
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set the log level
|
||||||
|
config.Level = zap.NewAtomicLevelAt(logLevel)
|
||||||
|
|
||||||
|
Msg, err = config.Build() // Potential source of the error
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic("Failed to initialize logger: " + err.Error())
|
panic("Failed to initialize logger: " + err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
defer post.Sync()
|
// Log the selected log level (optional, can be helpful for debugging)
|
||||||
|
Msg.Info("Log level set to: " + logLevel.String())
|
||||||
Info("Logger initialised", zap.String("level", level.String()), zap.String("runtime", mode))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Logs message at info level
|
|
||||||
func Info(msg string, fields ...zap.Field) {
|
|
||||||
post.Info(msg, fields...)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Logs message at debug level
|
|
||||||
func Debug(msg string, fields ...zap.Field) {
|
|
||||||
post.Debug(msg, fields...)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Logs message at warn level
|
|
||||||
func Warn(msg string, fields ...zap.Field) {
|
|
||||||
post.Warn(msg, fields...)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Logs message at error level
|
|
||||||
func Error(msg string, fields ...zap.Field) {
|
|
||||||
post.Error(msg, fields...)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Logs message at fatal level then call os.exit(1)
|
|
||||||
func Fatal(msg string, fields ...zap.Field) {
|
|
||||||
post.Fatal(msg, fields...)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Logs message at panic level the panics
|
|
||||||
func Panic(msg string, fields ...zap.Field) {
|
|
||||||
post.Panic(msg, fields...)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Flushes log messages
|
|
||||||
func Cleanup() {
|
|
||||||
Info("Flushing log messages")
|
|
||||||
post.Sync()
|
|
||||||
}
|
}
|
||||||
|
|
50
main.go
50
main.go
|
@ -16,25 +16,19 @@ import (
|
||||||
"git.fjla.uk/owlboard/timetable-mgr/log"
|
"git.fjla.uk/owlboard/timetable-mgr/log"
|
||||||
"git.fjla.uk/owlboard/timetable-mgr/messaging"
|
"git.fjla.uk/owlboard/timetable-mgr/messaging"
|
||||||
"git.fjla.uk/owlboard/timetable-mgr/vstp"
|
"git.fjla.uk/owlboard/timetable-mgr/vstp"
|
||||||
"go.uber.org/zap"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
|
||||||
printStartupBanner()
|
|
||||||
fmt.Printf("Version %s \n\n", helpers.Version)
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
log.InitLogger()
|
|
||||||
defer log.Cleanup()
|
|
||||||
log.Info("Initialising OwlBoard timetable-mgr", zap.String("version", helpers.Version))
|
|
||||||
cfg, err := helpers.LoadConfig()
|
cfg, err := helpers.LoadConfig()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal("Unable to load configuration", zap.Error(err))
|
fmt.Println("Error loading configuration", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
cfg.PrintConfig()
|
cfg.PrintConfig()
|
||||||
|
|
||||||
|
log.Msg.Info("Initialised OwlBoard timetable-mgr " + helpers.Version)
|
||||||
|
|
||||||
dbAccess.InitDataAccess(cfg)
|
dbAccess.InitDataAccess(cfg)
|
||||||
dbAccess.PushVersionToDb()
|
dbAccess.PushVersionToDb()
|
||||||
|
|
||||||
|
@ -66,50 +60,32 @@ func handleSignals(cfg *helpers.Configuration, stop chan<- struct{}) {
|
||||||
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
||||||
|
|
||||||
sig := <-sigChan
|
sig := <-sigChan
|
||||||
log.Warn("Signal received: " + sig.String())
|
log.Msg.Warn("Signal received: " + sig.String())
|
||||||
cleanup(cfg, stop)
|
cleanup(cfg, stop)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cleans up open connections ready for a clean exit of the program
|
// Cleans up open connections ready for a clean exit of the program
|
||||||
func cleanup(cfg *helpers.Configuration, stop chan<- struct{}) {
|
func cleanup(cfg *helpers.Configuration, stop chan<- struct{}) {
|
||||||
log.Debug("Cleaning up open connections")
|
log.Msg.Debug("Cleaning up open connections")
|
||||||
if cfg.VstpOn {
|
if cfg.VstpOn {
|
||||||
if messaging.Client != nil {
|
if messaging.Client != nil {
|
||||||
log.Info("Closing STOMP Client")
|
log.Msg.Info("Closing STOMP Client")
|
||||||
messaging.Disconnect(messaging.Client)
|
messaging.Disconnect(messaging.Client)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if dbAccess.MongoClient != nil {
|
if dbAccess.MongoClient != nil {
|
||||||
log.Info("Closing MongoDB Client")
|
log.Msg.Info("Closing MongoDB Client")
|
||||||
dbAccess.CloseMongoClient()
|
dbAccess.CloseMongoClient()
|
||||||
}
|
}
|
||||||
log.Info("Signalling to other goroutines")
|
log.Msg.Info("Signalling to other goroutines")
|
||||||
close(stop)
|
close(stop)
|
||||||
|
|
||||||
log.Info("Program ready to exit")
|
log.Msg.Info("Program ready to exit")
|
||||||
|
if log.Msg != nil {
|
||||||
|
log.Msg.Sync()
|
||||||
|
}
|
||||||
|
|
||||||
time.Sleep(500 * time.Millisecond)
|
time.Sleep(500 * time.Millisecond)
|
||||||
|
|
||||||
log.Cleanup()
|
|
||||||
|
|
||||||
os.Exit(0)
|
os.Exit(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
func printStartupBanner() {
|
|
||||||
art := `
|
|
||||||
___ _ ____ _
|
|
||||||
/ _ \__ _| | __ ) ___ __ _ _ __ __| |
|
|
||||||
| | | \ \ /\ / / | _ \ / _ \ / _' | '__/ _' |
|
|
||||||
| |_| |\ V V /| | |_) | (_) | (_| | | | (_| |
|
|
||||||
\___/ \_/\_/ |_|____/ \___/ \__,_|_| \__,_|
|
|
||||||
|
|
||||||
_ _ _ _ _
|
|
||||||
| |_(_)_ __ ___ ___| |_ __ _| |__ | | ___ _ __ ___ __ _ _ __
|
|
||||||
| __| | '_ ' _ \ / _ \ __/ _' | '_ \| |/ _ \_____| '_ ' _ \ / _' | '__|
|
|
||||||
| |_| | | | | | | __/ || (_| | |_) | | __/_____| | | | | | (_| | |
|
|
||||||
\__|_|_| |_| |_|\___|\__\__,_|_.__/|_|\___| |_| |_| |_|\__, |_|
|
|
||||||
|___/
|
|
||||||
`
|
|
||||||
|
|
||||||
fmt.Println(art)
|
|
||||||
}
|
|
||||||
|
|
|
@ -21,11 +21,11 @@ func dial(user, pass string) *stomp.Conn {
|
||||||
stomp.ConnOpt.Header("client-id", user+"-mq-client"),
|
stomp.ConnOpt.Header("client-id", user+"-mq-client"),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal("Unable to connect to STOMP Client: " + err.Error())
|
log.Msg.Fatal("Unable to connect to STOMP Client: " + err.Error())
|
||||||
conn.MustDisconnect()
|
conn.MustDisconnect()
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("Initialised STOMP Client")
|
log.Msg.Info("Initialised STOMP Client")
|
||||||
return conn
|
return conn
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -34,14 +34,14 @@ func dial(user, pass string) *stomp.Conn {
|
||||||
func Disconnect(conn *stomp.Conn) {
|
func Disconnect(conn *stomp.Conn) {
|
||||||
if conn != nil {
|
if conn != nil {
|
||||||
err := conn.Disconnect()
|
err := conn.Disconnect()
|
||||||
log.Warn("Disconnected STOMP Client")
|
log.Msg.Warn("Disconnected STOMP Client")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
conn.MustDisconnect()
|
conn.MustDisconnect()
|
||||||
log.Error("STOMP Disconnection failed, forced disconnect")
|
log.Msg.Error("STOMP Disconnection failed, forced disconnect")
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
log.Error("STOMP Disconnect failed, next connection attempt may fail")
|
log.Msg.Error("STOMP Disconnect failed, next connection attempt may fail")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Register against the MQ Server and log each message for testing purposes
|
// Register against the MQ Server and log each message for testing purposes
|
||||||
|
|
|
@ -14,7 +14,7 @@ import (
|
||||||
|
|
||||||
// Downloads NROD Data and extracts if GZIP, returns a io.Reader
|
// Downloads NROD Data and extracts if GZIP, returns a io.Reader
|
||||||
func NrodStream(url string, cfg *helpers.Configuration) (io.ReadCloser, error) {
|
func NrodStream(url string, cfg *helpers.Configuration) (io.ReadCloser, error) {
|
||||||
log.Debug("Fetching NROD data stream", zap.String("Request URL", url))
|
log.Msg.Debug("Fetching NROD data stream", zap.String("Request URL", url))
|
||||||
|
|
||||||
client := http.Client{
|
client := http.Client{
|
||||||
Timeout: time.Second * 300,
|
Timeout: time.Second * 300,
|
||||||
|
@ -22,7 +22,7 @@ func NrodStream(url string, cfg *helpers.Configuration) (io.ReadCloser, error) {
|
||||||
|
|
||||||
req, err := http.NewRequest("GET", url, nil)
|
req, err := http.NewRequest("GET", url, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Error creating HTTP Request", zap.Error(err))
|
log.Msg.Error("Error creating HTTP Request", zap.Error(err))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -30,13 +30,13 @@ func NrodStream(url string, cfg *helpers.Configuration) (io.ReadCloser, error) {
|
||||||
|
|
||||||
resp, err := client.Do(req)
|
resp, err := client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Error carrying out HTTP Request", zap.Error(err), zap.Int("STATUS", resp.StatusCode))
|
log.Msg.Error("Error carrying out HTTP Request", zap.Error(err), zap.Int("STATUS", resp.StatusCode))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
if resp.StatusCode != http.StatusOK {
|
||||||
err := fmt.Errorf("unexpected status code: %d", resp.StatusCode)
|
err := fmt.Errorf("unexpected status code: %d", resp.StatusCode)
|
||||||
log.Error("Non-successful status code", zap.Error(err))
|
log.Msg.Error("Non-successful status code", zap.Error(err))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -46,13 +46,13 @@ func NrodStream(url string, cfg *helpers.Configuration) (io.ReadCloser, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func NrodStreamExtract(resp *http.Response) (io.ReadCloser, error) {
|
func NrodStreamExtract(resp *http.Response) (io.ReadCloser, error) {
|
||||||
log.Debug("Extracting NROD Download")
|
log.Msg.Debug("Extracting NROD Download")
|
||||||
|
|
||||||
log.Debug("Content Type", zap.String("Content-Encoding", resp.Header.Get("Content-Encoding")))
|
log.Msg.Debug("Content Type", zap.String("Content-Encoding", resp.Header.Get("Content-Encoding")))
|
||||||
|
|
||||||
gzReader, err := gzip.NewReader(resp.Body)
|
gzReader, err := gzip.NewReader(resp.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("Unable to create GZIP Reader, data probably not gzipped")
|
log.Msg.Warn("Unable to create GZIP Reader, data probably not gzipped")
|
||||||
return resp.Body, err
|
return resp.Body, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,26 +20,26 @@ func processEntryType(entry database.Service) {
|
||||||
case "Delete":
|
case "Delete":
|
||||||
deleteEntry(entry)
|
deleteEntry(entry)
|
||||||
default:
|
default:
|
||||||
log.Warn("Unknown transaction type: " + entry.TransactionType)
|
log.Msg.Error("Unknown transaction type: " + entry.TransactionType)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func createEntry(entry database.Service) {
|
func createEntry(entry database.Service) {
|
||||||
log.Info("Entry Creation requested for: " + entry.TrainUid + " - " + entry.Headcode + " - " + entry.Operator)
|
log.Msg.Info("Entry Creation requested for: " + entry.TrainUid + " - " + entry.Headcode + " - " + entry.Operator)
|
||||||
status := dbAccess.PutOneService(entry)
|
status := dbAccess.PutOneService(entry)
|
||||||
if status {
|
if status {
|
||||||
log.Info("Database entry created")
|
log.Msg.Info("Database entry created")
|
||||||
} else {
|
} else {
|
||||||
log.Error("Database entry failed, skipped service")
|
log.Msg.Error("Database entry failed, skipped service")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func updateEntry(entry database.Service) {
|
func updateEntry(entry database.Service) {
|
||||||
log.Warn("Entry UPDATE requested for: " + entry.TrainUid + " - " + entry.Headcode + " - " + entry.Operator)
|
log.Msg.Info("Entry UPDATE requested for: " + entry.TrainUid + " - " + entry.Headcode + " - " + entry.Operator)
|
||||||
}
|
}
|
||||||
|
|
||||||
func deleteEntry(entry database.Service) {
|
func deleteEntry(entry database.Service) {
|
||||||
log.Info("Entry DELETE requested for: " + entry.TrainUid + " - " + entry.Headcode)
|
log.Msg.Info("Entry DELETE requested for: " + entry.TrainUid + " - " + entry.Headcode)
|
||||||
var deletionQuery = database.DeleteQuery{
|
var deletionQuery = database.DeleteQuery{
|
||||||
TrainUid: entry.TrainUid,
|
TrainUid: entry.TrainUid,
|
||||||
ScheduleStartDate: entry.ScheduleStartDate,
|
ScheduleStartDate: entry.ScheduleStartDate,
|
||||||
|
@ -47,9 +47,9 @@ func deleteEntry(entry database.Service) {
|
||||||
}
|
}
|
||||||
status := dbAccess.DeleteOneService(deletionQuery)
|
status := dbAccess.DeleteOneService(deletionQuery)
|
||||||
if status {
|
if status {
|
||||||
log.Info("Database entry deleted")
|
log.Msg.Info("Database entry deleted")
|
||||||
} else {
|
} else {
|
||||||
log.Error("Database deletion failed, skipped deletion")
|
log.Msg.Error("Database deletion failed, skipped deletion")
|
||||||
fmt.Printf("%+v\n", deletionQuery)
|
fmt.Printf("%+v\n", deletionQuery)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,16 +1,17 @@
|
||||||
package vstp
|
package vstp
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
"git.fjla.uk/owlboard/timetable-mgr/log"
|
"git.fjla.uk/owlboard/timetable-mgr/log"
|
||||||
"github.com/go-stomp/stomp/v3"
|
"github.com/go-stomp/stomp/v3"
|
||||||
"go.uber.org/zap"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var count uint64 = 0
|
var count uint64 = 0
|
||||||
|
|
||||||
func handle(msg *stomp.Message) {
|
func handle(msg *stomp.Message) {
|
||||||
count++
|
count++
|
||||||
log.Info("Message received", zap.Uint64("total since startup", count))
|
log.Msg.Info("Messages since started: " + fmt.Sprint(count))
|
||||||
schedule := unmarshalData(string(msg.Body))
|
schedule := unmarshalData(string(msg.Body))
|
||||||
processEntryType(schedule)
|
processEntryType(schedule)
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,22 +18,23 @@ func unmarshalData(jsonData string) database.Service {
|
||||||
var schedule upstreamApi.MsgData
|
var schedule upstreamApi.MsgData
|
||||||
err := json.Unmarshal([]byte(jsonData), &schedule)
|
err := json.Unmarshal([]byte(jsonData), &schedule)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Unable to unmarshal message body: " + err.Error())
|
log.Msg.Error("Unable to unmarshal message body: " + err.Error())
|
||||||
//return err
|
//return err
|
||||||
}
|
}
|
||||||
log.Debug("Unmarshalling Complete")
|
log.Msg.Debug("Unmarshalling Complete")
|
||||||
|
|
||||||
if schedule.Data.CIFMsg.ScheduleSegment == nil {
|
if schedule.Data.CIFMsg.ScheduleSegment == nil {
|
||||||
log.Warn("ScheduleSegment is nil")
|
log.Msg.Warn("ScheduleSegment is nil")
|
||||||
} else if len(schedule.Data.CIFMsg.ScheduleSegment) == 0 {
|
} else if len(schedule.Data.CIFMsg.ScheduleSegment) == 0 {
|
||||||
log.Warn("ScheduleSegment is empty")
|
log.Msg.Warn("ScheduleSegment is empty")
|
||||||
}
|
}
|
||||||
return formatData(&schedule.Data.CIFMsg)
|
return formatData(&schedule.Data.CIFMsg)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Transforms the upstreamApi.Schedule type into a database.Service type
|
// Transforms the upstreamApi.Schedule type into a database.Service type
|
||||||
func formatData(dataInput *upstreamApi.Schedule) database.Service {
|
func formatData(dataInput *upstreamApi.Schedule) database.Service {
|
||||||
log.Debug("ScheduleSegment length: " + fmt.Sprint(len(dataInput.ScheduleSegment)))
|
log.Msg.Debug("ScheduleSegment length: " + fmt.Sprint(len(dataInput.ScheduleSegment)))
|
||||||
|
log.Msg.Debug("Printing dataInput to console:")
|
||||||
|
|
||||||
var operator, headcode, powerType string
|
var operator, headcode, powerType string
|
||||||
var planSpeed int32
|
var planSpeed int32
|
||||||
|
@ -69,20 +70,20 @@ func formatData(dataInput *upstreamApi.Schedule) database.Service {
|
||||||
|
|
||||||
// Uses the map provided in 'helpers' to translate incorrect CIF speeds to their correct equivalent
|
// Uses the map provided in 'helpers' to translate incorrect CIF speeds to their correct equivalent
|
||||||
func parseSpeed(CIFSpeed string) int32 {
|
func parseSpeed(CIFSpeed string) int32 {
|
||||||
log.Debug("CIFSpeed Input: '" + CIFSpeed + "'")
|
log.Msg.Debug("CIFSpeed Input: '" + CIFSpeed + "'")
|
||||||
if CIFSpeed == "" {
|
if CIFSpeed == "" {
|
||||||
log.Debug("Speed data not provided")
|
log.Msg.Debug("Speed data not provided")
|
||||||
return int32(0)
|
return int32(0)
|
||||||
}
|
}
|
||||||
actualSpeed, exists := helpers.SpeedMap[CIFSpeed]
|
actualSpeed, exists := helpers.SpeedMap[CIFSpeed]
|
||||||
if !exists {
|
if !exists {
|
||||||
actualSpeed = CIFSpeed
|
actualSpeed = CIFSpeed
|
||||||
}
|
}
|
||||||
log.Debug("Corrected Speed: " + actualSpeed)
|
log.Msg.Debug("Corrected Speed: " + actualSpeed)
|
||||||
|
|
||||||
speed, err := strconv.ParseInt(actualSpeed, 10, 32)
|
speed, err := strconv.ParseInt(actualSpeed, 10, 32)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("Unable to parse speed: " + CIFSpeed + ", returning 0")
|
log.Msg.Warn("Unable to parse speed: " + CIFSpeed + ", returning 0")
|
||||||
return int32(0)
|
return int32(0)
|
||||||
}
|
}
|
||||||
return int32(speed)
|
return int32(speed)
|
||||||
|
@ -90,10 +91,10 @@ func parseSpeed(CIFSpeed string) int32 {
|
||||||
|
|
||||||
// Converts the date string provided from the upstream API into a proper Date type and adds a time
|
// Converts the date string provided from the upstream API into a proper Date type and adds a time
|
||||||
func parseDate(dateString string, end bool) time.Time {
|
func parseDate(dateString string, end bool) time.Time {
|
||||||
log.Debug("Date Input: " + dateString)
|
log.Msg.Debug("Date Input: " + dateString)
|
||||||
date, err := time.Parse("2006-01-02", dateString)
|
date, err := time.Parse("2006-01-02", dateString)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Unable to parse date: " + dateString)
|
log.Msg.Error("Unable to parse date: " + dateString)
|
||||||
return time.Time{}
|
return time.Time{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -106,13 +107,13 @@ func parseDate(dateString string, end bool) time.Time {
|
||||||
}
|
}
|
||||||
|
|
||||||
dateWithTime := time.Date(date.Year(), date.Month(), date.Day(), hour, minute, second, nanosecond, location)
|
dateWithTime := time.Date(date.Year(), date.Month(), date.Day(), hour, minute, second, nanosecond, location)
|
||||||
log.Debug("Parsed date: " + dateWithTime.String())
|
log.Msg.Debug("Parsed date: " + dateWithTime.String())
|
||||||
return dateWithTime
|
return dateWithTime
|
||||||
}
|
}
|
||||||
|
|
||||||
// Converts the binary stype 'daysRun' field into an array of short days
|
// Converts the binary stype 'daysRun' field into an array of short days
|
||||||
func parseDaysRun(daysBinary string) []string {
|
func parseDaysRun(daysBinary string) []string {
|
||||||
log.Debug("daysRun Input: " + daysBinary)
|
log.Msg.Debug("daysRun Input: " + daysBinary)
|
||||||
shortDays := []string{"m", "t", "w", "th", "f", "s", "su"}
|
shortDays := []string{"m", "t", "w", "th", "f", "s", "su"}
|
||||||
var result []string
|
var result []string
|
||||||
for i, digit := range daysBinary {
|
for i, digit := range daysBinary {
|
||||||
|
|
|
@ -11,25 +11,25 @@ import (
|
||||||
func Subscribe() {
|
func Subscribe() {
|
||||||
sub, err := messaging.Client.Subscribe("/topic/VSTP_ALL", stomp.AckAuto)
|
sub, err := messaging.Client.Subscribe("/topic/VSTP_ALL", stomp.AckAuto)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal("Unable to start subscription: " + err.Error())
|
log.Msg.Fatal("Unable to start subscription: " + err.Error())
|
||||||
}
|
}
|
||||||
log.Info("Subscription to VSTP topic successful, listening")
|
log.Msg.Info("Subscription to VSTP topic successful, listening")
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
log.Debug("GOROUTINE: VSTP Message Handler Started")
|
log.Msg.Debug("GOROUTINE: VSTP Message Handler Started")
|
||||||
defer func() {
|
defer func() {
|
||||||
if r := recover(); r != nil {
|
if r := recover(); r != nil {
|
||||||
log.Warn("GOROUTINE: VSTP Message Handler Stopped")
|
log.Msg.Warn("GOROUTINE: VSTP Message Handler Stopped")
|
||||||
time.Sleep(time.Second * 10)
|
time.Sleep(time.Second * 10)
|
||||||
log.Fatal("GOROUTINE: VSTP Message Handler Failed")
|
log.Msg.Fatal("GOROUTINE: VSTP Message Handler Failed")
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
for {
|
for {
|
||||||
msg := <-sub.C
|
msg := <-sub.C
|
||||||
if msg.Err != nil {
|
if msg.Err != nil {
|
||||||
log.Error("STOMP Message Error: " + msg.Err.Error())
|
log.Msg.Error("STOMP Message Error: " + msg.Err.Error())
|
||||||
} else {
|
} else {
|
||||||
log.Info("STOMP Message Received")
|
log.Msg.Info("STOMP Message Received")
|
||||||
handle(msg)
|
handle(msg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue