timetable-extension #1

Open
fred.boniface wants to merge 163 commits from timetable-extension into main
22 changed files with 193 additions and 172 deletions
Showing only changes of commit 91fd38104c - Show all commits

View File

@ -26,7 +26,7 @@ func InitTicker(cfg *helpers.Configuration, stop <-chan struct{}) {
// Runs the ticker and handles tick events
func runTicker(cfg *helpers.Configuration, stop <-chan struct{}) {
log.Msg.Sugar().Infof("Starting background ticker, runs every %s", frequency)
log.Info("Starting background ticker", zap.Duration("frequency", frequency))
ticker := time.NewTicker(frequency)
defer ticker.Stop()
@ -43,7 +43,7 @@ func runTicker(cfg *helpers.Configuration, stop <-chan struct{}) {
// Starts a ticker that logs how many goroutines are running every two seconds
func goroutineTicker(stop <-chan struct{}) {
log.Msg.Warn("Starting goroutine Tracker ticker - DEBUG USE ONLY")
log.Debug("Starting goroutine resource logging ticker")
ticker := time.NewTicker(1000 * time.Millisecond)
defer ticker.Stop()
for {
@ -64,5 +64,5 @@ func debugLog() {
heapMem := float64(memStats.HeapAlloc) / (1024 * 1024)
heapMemRound := math.Round(heapMem*100) / 100
log.Msg.Debug("Performance", zap.Int("goroutine-count", goroutines), zap.Float64("heap-mem (MB)", heapMemRound))
log.Debug("Performance", zap.Int("goroutine-count", goroutines), zap.Float64("heap-mem (MB)", heapMemRound))
}

View File

@ -14,43 +14,43 @@ import (
func CheckCif(cfg *helpers.Configuration) {
// Check that it is after 0600, if not then skip update
if time.Now().In(londonTimezone).Hour() <= dataAvailable {
log.Msg.Info("Too early to update CIF data, not published until 0600")
log.Info("Too early to update CIF data, not published until 0600")
return
}
log.Msg.Info("Checking age of CIF Data")
log.Info("Checking age of CIF Data")
// Load and read metadata from database
metadata, err := dbAccess.GetCifMetadata()
if err != nil {
log.Msg.Error("Unable to read last update time", zap.Error(err))
log.Error("Unable to read last update time", zap.Error(err))
return
}
// If no metadata is found in DB, presume no CIF data exists
if metadata == nil {
log.Msg.Info("Full CIF download required")
log.Info("Full CIF download required")
err := runCifFullDownload(cfg)
if err != nil {
log.Msg.Error("Unable to run full CIF Update", zap.Error(err))
log.Error("Unable to run full CIF Update", zap.Error(err))
}
return
}
// Check if last update was today
if isSameToday(metadata.LastUpdate) {
log.Msg.Info("CIF Data has already been updated today, skipping")
log.Info("CIF Data has already been updated today, skipping")
return
}
// Check how many days since last update, if more than 5, run full update, else run update
daysSinceLastUpdate := howManyDaysAgo(metadata.LastUpdate)
if daysSinceLastUpdate > 5 {
log.Msg.Debug("Full Update Requested due to time since last update", zap.Int("daysSinceLastUpdate", daysSinceLastUpdate))
log.Msg.Info("Full CIF download required")
log.Debug("Full Update Requested due to time since last update", zap.Int("daysSinceLastUpdate", daysSinceLastUpdate))
log.Info("Full CIF download required")
err := runCifFullDownload(cfg)
if err != nil {
log.Msg.Error("Unable to run full CIF Update", zap.Error(err))
log.Error("Unable to run full CIF Update", zap.Error(err))
}
return
}
@ -58,9 +58,9 @@ func CheckCif(cfg *helpers.Configuration) {
daysToUpdate := generateUpdateDays(daysSinceLastUpdate)
// Run the update
log.Msg.Info("CIF Update required", zap.Any("days to update", daysToUpdate))
log.Info("CIF Update required", zap.Any("days to update", daysToUpdate))
err = runCifUpdateDownload(cfg, metadata, daysToUpdate)
if err != nil {
log.Msg.Error("Unable to run CIF update", zap.Error(err))
log.Error("Unable to run CIF update", zap.Error(err))
}
}

View File

@ -7,6 +7,7 @@ import (
"git.fjla.uk/owlboard/go-types/pkg/upstreamApi"
"git.fjla.uk/owlboard/timetable-mgr/helpers"
"git.fjla.uk/owlboard/timetable-mgr/log"
"go.uber.org/zap"
)
func ConvertServiceType(input *upstreamApi.JsonScheduleV1, vstp bool) (*database.Service, error) {
@ -46,7 +47,7 @@ func parseSpeed(CIFSpeed *string) int32 {
speed, err := strconv.ParseInt(actualSpeed, 10, 32)
if err != nil {
log.Msg.Warn("Unable to parse speed: " + *CIFSpeed + ", returning 0")
log.Warn("Unable to parse speed", zap.String("input-value", *CIFSpeed))
return int32(0)
}
return int32(speed)

View File

@ -12,7 +12,7 @@ import (
func getDayString(t time.Time) string {
london, err := time.LoadLocation("Europe/London")
if err != nil {
log.Msg.Error("Unable to load time zone info", zap.Error(err))
log.Error("Unable to load time zone info", zap.Error(err))
}
timeNow := t.In(london)
@ -44,7 +44,7 @@ func isSameToday(t time.Time) bool {
// Returns how many days ago `t` was compared to today
func howManyDaysAgo(t time.Time) int {
log.Msg.Debug("Calculating how many days ago", zap.Time("Input time", t))
log.Debug("Calculating how many days ago", zap.Time("Input time", t))
// Truncate both times to midnight in UTC timezone
today := time.Now().UTC().Truncate(24 * time.Hour)
input := t.UTC().Truncate(24 * time.Hour)
@ -76,7 +76,7 @@ func ParseCifDate(input *string, startOrEnd string) time.Time {
layout := "2006-01-02" // Layout of input
t, err := time.ParseInLocation(layout, *input, londonTimezone)
if err != nil {
log.Msg.Error("Error parsing date string", zap.String("date string", *input), zap.Error(err))
log.Error("Error parsing date string", zap.String("date string", *input), zap.Error(err))
return time.Time{}
}
@ -85,7 +85,7 @@ func ParseCifDate(input *string, startOrEnd string) time.Time {
} else if startOrEnd == "end" {
t = time.Date(t.Year(), t.Month(), t.Day(), 23, 59, 59, 0, londonTimezone)
} else {
log.Msg.Error("Error parsing date string", zap.String("date string", *input), zap.Error(err))
log.Error("Error parsing date string", zap.String("date string", *input), zap.Error(err))
return time.Time{}
}

View File

@ -13,7 +13,7 @@ import (
// Accepts the CIF data as a stream and outputs parsed data
func parseCifDataStream(dataStream io.ReadCloser) (*parsedData, error) {
defer dataStream.Close()
log.Msg.Debug("Starting CIF Datastream parsing")
log.Debug("Starting CIF Datastream parsing")
if dataStream == nil {
return nil, errors.New("unable to parse nil pointer")
}
@ -29,7 +29,7 @@ func parseCifDataStream(dataStream io.ReadCloser) (*parsedData, error) {
for decoder.More() {
var obj map[string]json.RawMessage
if err := decoder.Decode(&obj); err != nil {
log.Msg.Error("Error decoding JSON String")
log.Error("Error decoding JSON String")
return nil, err
}
@ -39,7 +39,7 @@ func parseCifDataStream(dataStream io.ReadCloser) (*parsedData, error) {
case "JsonTimetableV1":
var timetable upstreamApi.JsonTimetableV1
if err := json.Unmarshal(value, &timetable); err != nil {
log.Msg.Error("Error decoding JSONTimetableV1 object", zap.Error(err))
log.Error("Error decoding JSONTimetableV1 object", zap.Error(err))
continue
}
parsed.header = timetable
@ -53,17 +53,17 @@ func parseCifDataStream(dataStream io.ReadCloser) (*parsedData, error) {
case "JsonScheduleV1":
var schedule upstreamApi.JsonScheduleV1
if err := json.Unmarshal(value, &schedule); err != nil {
log.Msg.Error("Error decoding JSONScheduleV1 object", zap.Error(err))
log.Error("Error decoding JSONScheduleV1 object", zap.Error(err))
continue
}
parsed.sched = append(parsed.sched, schedule)
case "EOF":
log.Msg.Info("Reached EOF")
log.Info("Reached EOF")
default:
log.Msg.Warn("Unknown CIF Data type", zap.String("key", key))
log.Warn("Unknown CIF Data type", zap.String("key", key))
}
}
}
log.Msg.Debug("CIF Parsing completed")
log.Debug("CIF Parsing completed")
return &parsed, nil
}

View File

@ -12,8 +12,8 @@ import (
// Processes parsed CIF data and applies the data to the database
func processParsedCif(data *parsedData) error {
log.Msg.Debug("Starting CIF Processing")
log.Msg.Info("Processing CIF Data", zap.Int("schedule-count", len(data.sched)))
log.Debug("Starting CIF Processing")
log.Info("Processing CIF Data", zap.Int("schedule-count", len(data.sched)))
// Batch size for processing
batchSize := 250 // Needs optimising for better RAM use. 1000 ~ 5.7GB, 500 ~ 5.5GB, 750 ~ 5.2GB
@ -34,7 +34,7 @@ func processParsedCif(data *parsedData) error {
if len(deleteBatch) > 0 {
err := doDeletions(deleteBatch)
if err != nil {
log.Msg.Error("Error deleting CIF Entries", zap.Error(err))
log.Error("Error deleting CIF Entries", zap.Error(err))
return err
}
}
@ -56,13 +56,13 @@ func processParsedCif(data *parsedData) error {
if len(createBatch) > 0 {
err := doCreations(createBatch)
if err != nil {
log.Msg.Error("Error creating CIF Entries", zap.Error(err))
log.Error("Error creating CIF Entries", zap.Error(err))
return err
}
}
}
log.Msg.Debug("CIF Processing complete")
log.Debug("CIF Processing complete")
return nil
}
@ -70,7 +70,7 @@ func processParsedCif(data *parsedData) error {
func doDeletions(deletions []*upstreamApi.JsonScheduleV1) error {
defer func() {
if r := recover(); r != nil {
log.Msg.Panic("Panic:", zap.Any("panic", r))
log.Panic("Panic:", zap.Any("panic", r))
}
}()
deleteQueries := make([]database.DeleteQuery, 0)
@ -86,7 +86,7 @@ func doDeletions(deletions []*upstreamApi.JsonScheduleV1) error {
err := dbAccess.DeleteCifEntries(deleteQueries)
if err != nil {
log.Msg.Error("Error deleting documents", zap.Error(err))
log.Error("Error deleting documents", zap.Error(err))
return err
}
@ -99,7 +99,7 @@ func doCreations(creations []*upstreamApi.JsonScheduleV1) error {
for _, item := range creations {
document, err := ConvertServiceType(item, false)
if err != nil {
log.Msg.Error("Error converting JsonSchedule to Service type", zap.Error(err))
log.Error("Error converting JsonSchedule to Service type", zap.Error(err))
}
createDocuments = append(createDocuments, *document)
@ -107,7 +107,7 @@ func doCreations(creations []*upstreamApi.JsonScheduleV1) error {
err := dbAccess.CreateCifEntries(createDocuments)
if err != nil {
log.Msg.Error("Error creating documents", zap.Error(err))
log.Error("Error creating documents", zap.Error(err))
return err
}

View File

@ -13,22 +13,22 @@ import (
// Replaces all existing CIF Data with a new download
func runCifFullDownload(cfg *helpers.Configuration) error {
log.Msg.Info("Downloading all CIF Data")
log.Info("Downloading all CIF Data")
// Download CIF Data file
url, err := getUpdateUrl("full")
if err != nil {
log.Msg.Error("Error getting download URL", zap.Error(err))
log.Error("Error getting download URL", zap.Error(err))
}
dataStream, err := nrod.NrodStream(url, cfg)
if err != nil {
log.Msg.Error("Error downloading CIF data", zap.Error(err))
log.Error("Error downloading CIF data", zap.Error(err))
}
// Parse CIF file
parsed, err := parseCifDataStream(dataStream)
if err != nil {
log.Msg.Error("Error parsing CIF data", zap.Error(err))
log.Error("Error parsing CIF data", zap.Error(err))
return err
}
@ -38,13 +38,13 @@ func runCifFullDownload(cfg *helpers.Configuration) error {
// Process CIF file
err = processParsedCif(parsed)
if err != nil {
log.Msg.Error("Error processing CIF data", zap.Error(err))
log.Error("Error processing CIF data", zap.Error(err))
}
newMeta := generateMetadata(&parsed.header)
ok := dbAccess.PutCifMetadata(newMeta, fullUpdateType)
if !ok {
log.Msg.Warn("CIF Data updated, but metadata write failed")
log.Warn("CIF Data updated, but metadata write failed")
}
parsed = nil
@ -53,41 +53,41 @@ func runCifFullDownload(cfg *helpers.Configuration) error {
// Runs a CIF Update for up to five days
func runCifUpdateDownload(cfg *helpers.Configuration, metadata *dbAccess.CifMetadata, days []time.Time) error {
log.Msg.Info("Downloading CIF Updates")
log.Info("Downloading CIF Updates")
// Loop over dates
for _, time := range days {
log.Msg.Info("Downloading CIF File", zap.Time("CIF Data from", time))
log.Info("Downloading CIF File", zap.Time("CIF Data from", time))
// Download CIF data file
data, err := fetchUpdate(time, cfg)
if err != nil {
log.Msg.Error("Error fetching CIF update", zap.Error(err))
log.Error("Error fetching CIF update", zap.Error(err))
return err
}
// Parse CIF file
parsed, err := parseCifDataStream(data)
if err != nil {
log.Msg.Error("Error parsing CIF data", zap.Error(err))
log.Error("Error parsing CIF data", zap.Error(err))
return err
}
// Check CIF Metadata
log.Msg.Debug("Starting metadata checks")
log.Debug("Starting metadata checks")
reason, update := checkMetadata(metadata, &parsed.header)
if !update {
log.Msg.Warn("Update file not processed", zap.String("reason", reason))
log.Warn("Update file not processed", zap.String("reason", reason))
continue
}
log.Msg.Info("CIF Data is suitable for processing", zap.String("reason", reason))
log.Info("CIF Data is suitable for processing", zap.String("reason", reason))
// Process CIF file
err = processParsedCif(parsed)
if err != nil {
log.Msg.Error("Error processing CIF data", zap.Error(err))
log.Error("Error processing CIF data", zap.Error(err))
}
metadata = generateMetadata(&parsed.header)
@ -96,7 +96,7 @@ func runCifUpdateDownload(cfg *helpers.Configuration, metadata *dbAccess.CifMeta
ok := dbAccess.PutCifMetadata(metadata, dailyUpdateType)
if !ok {
log.Msg.Warn("CIF Data updated, but metadata write failed.")
log.Warn("CIF Data updated, but metadata write failed.")
}
return nil

View File

@ -11,10 +11,10 @@ import (
// Checks if the CORPUS Data needs updating, and calls an updater function if needed.
func CheckCorpus(cfg *helpers.Configuration) {
log.Msg.Debug("Checking age of CORPUS Data")
log.Debug("Checking age of CORPUS Data")
utime, err := dbAccess.CheckUpdateTime(dbAccess.CorpusCollection)
if err != nil {
log.Msg.Error("Error checking last CORPUS update", zap.Error(err))
log.Error("Error checking last CORPUS update", zap.Error(err))
}
lastUpdate := time.Unix(utime, 0)
@ -22,17 +22,17 @@ func CheckCorpus(cfg *helpers.Configuration) {
dataAge := currentTime.Sub(lastUpdate)
fortnight := 14 * 24 * time.Hour
log.Msg.Debug("CORPUS Data", zap.Duration("Data Age", dataAge), zap.Duration("Max Age", 14*24*time.Hour))
log.Debug("CORPUS Data", zap.Duration("Data Age", dataAge), zap.Duration("Max Age", 14*24*time.Hour))
if dataAge >= fortnight {
log.Msg.Info("CORPUS Data is more than two weeks old")
log.Info("CORPUS Data is more than two weeks old")
err := RunCorpusUpdate(cfg)
if err != nil {
log.Msg.Warn("CORPUS Update did not run")
log.Warn("CORPUS Update did not run")
} else {
log.Msg.Info("CORPUS data has been updated")
log.Info("CORPUS data has been updated")
}
} else {
log.Msg.Info("CORPUS Data is less than two weeks old, not updating")
log.Info("CORPUS Data is less than two weeks old, not updating")
}
}

View File

@ -14,14 +14,14 @@ import (
func parseCorpusData(stream io.ReadCloser) (*[]database.CorpusEntry, error) {
defer stream.Close()
log.Msg.Debug("Starting CORPUS Data parsing")
log.Debug("Starting CORPUS Data parsing")
var corpusEntries []database.CorpusEntry
decoder := json.NewDecoder(stream)
// Expect an object at the root of the JSON stream
if _, err := decoder.Token(); err != nil {
log.Msg.Error("Error parsing CORPUS Data", zap.Error(err))
log.Error("Error parsing CORPUS Data", zap.Error(err))
return nil, err
}
@ -29,19 +29,19 @@ func parseCorpusData(stream io.ReadCloser) (*[]database.CorpusEntry, error) {
for decoder.More() {
// Decode the next JSON token
if tok, err := decoder.Token(); err != nil {
log.Msg.Error("Error parsing CORPUS Data", zap.Error(err))
log.Error("Error parsing CORPUS Data", zap.Error(err))
return nil, err
} else if tok == "TIPLOCDATA" {
// Found the "TIPLOCDATA" key, expect the associated array
if !decoder.More() {
err := errors.New("missing array after TIPLOCDATA key")
log.Msg.Error("Error parsing CORPUS Data", zap.Error(err))
log.Error("Error parsing CORPUS Data", zap.Error(err))
return nil, err
}
// Start reading the array associated with the "TIPLOCDATA" key
if _, err := decoder.Token(); err != nil {
log.Msg.Error("Error parsing CORPUS Data", zap.Error(err))
log.Error("Error parsing CORPUS Data", zap.Error(err))
return nil, err
}
@ -49,7 +49,7 @@ func parseCorpusData(stream io.ReadCloser) (*[]database.CorpusEntry, error) {
for decoder.More() {
var corpusEntry database.CorpusEntry
if err := decoder.Decode(&corpusEntry); err != nil {
log.Msg.Error("Error parsing CORPUS Data", zap.Error(err))
log.Error("Error parsing CORPUS Data", zap.Error(err))
return nil, err
}
corpusEntries = append(corpusEntries, corpusEntry)
@ -58,7 +58,7 @@ func parseCorpusData(stream io.ReadCloser) (*[]database.CorpusEntry, error) {
}
}
log.Msg.Debug("CORPUS parsing complete")
log.Debug("CORPUS parsing complete")
return &corpusEntries, nil
}

View File

@ -12,13 +12,13 @@ import (
func RunCorpusUpdate(cfg *helpers.Configuration) error {
resp, err := nrod.NrodStream(url, cfg)
if err != nil {
log.Msg.Error("Failed to fetch CORPUS data", zap.Error(err))
log.Error("Failed to fetch CORPUS data", zap.Error(err))
return err
}
unsortedCorpusData, err := parseCorpusData(resp)
if err != nil {
log.Msg.Error("Error parsing Corpus data", zap.Error(err))
log.Error("Error parsing Corpus data", zap.Error(err))
return err
}
@ -26,24 +26,24 @@ func RunCorpusUpdate(cfg *helpers.Configuration) error {
stationData := createStationEntries(corpusData)
if err := dbAccess.DropCollection(dbAccess.CorpusCollection); err != nil {
log.Msg.Warn("CORPUS data may be incomplete")
log.Msg.Error("Error dropping CORPUS Data", zap.Error(err))
log.Warn("CORPUS data may be incomplete")
log.Error("Error dropping CORPUS Data", zap.Error(err))
return err
}
if err := dbAccess.DropCollection(dbAccess.StationsCollection); err != nil {
log.Msg.Warn("Stations data may be incomplete")
log.Msg.Error("Error dropping stations Data", zap.Error(err))
log.Warn("Stations data may be incomplete")
log.Error("Error dropping stations Data", zap.Error(err))
return err
}
if err := dbAccess.PutManyCorpus(corpusData); err != nil {
log.Msg.Warn("CORPUS data may be incomplete")
log.Msg.Error("Error inserting CORPUS Data", zap.Error(err))
log.Warn("CORPUS data may be incomplete")
log.Error("Error inserting CORPUS Data", zap.Error(err))
return err
}
if err := dbAccess.PutManyStations(stationData); err != nil {
log.Msg.Warn("Stations data may be incomplete")
log.Msg.Error("Error inserting stations data", zap.Error(err))
log.Warn("Stations data may be incomplete")
log.Error("Error inserting stations data", zap.Error(err))
return err
}

View File

@ -28,9 +28,9 @@ func PushVersionToDb() {
coll := MongoClient.Database("owlboard").Collection("versions")
_, err := coll.UpdateOne(context.TODO(), versionSelector, bson.M{"$set": version}, opts)
if err != nil {
log.Msg.Warn("Unable to push version to database: " + err.Error())
log.Warn("Unable to push version to database: " + err.Error())
} else {
log.Msg.Debug("Version up to date in Database")
log.Debug("Version up to date in Database")
}
}
@ -39,7 +39,7 @@ func PutOneService(data database.Service) bool {
coll := MongoClient.Database(databaseName).Collection(timetableCollection)
_, err := coll.InsertOne(context.TODO(), data)
if err != nil {
log.Msg.Error("Unable to insert to database: " + err.Error())
log.Error("Unable to insert to database: " + err.Error())
return false
}
return true
@ -55,7 +55,7 @@ func DeleteOneService(data database.DeleteQuery) bool {
}
_, err := coll.DeleteOne(context.TODO(), filter)
if err != nil {
log.Msg.Error("Unable to delete service: " + err.Error())
log.Error("Unable to delete service: " + err.Error())
return false
}
return true

View File

@ -36,11 +36,11 @@ func GetCifMetadata() (*CifMetadata, error) {
if errors.Is(err, mongo.ErrNoDocuments) {
return nil, nil
}
log.Msg.Error("Error fetching CIF Metadata")
log.Error("Error fetching CIF Metadata")
return nil, err
}
log.Msg.Debug("Fetched CIF Metadata from database", zap.Any("Metadata", result))
log.Debug("Fetched CIF Metadata from database", zap.Any("Metadata", result))
return &result, nil
}
@ -64,11 +64,11 @@ func PutCifMetadata(metadata *CifMetadata, lastUpdateType string) bool {
_, err := collection.UpdateOne(context.Background(), filter, update, options)
if err != nil {
log.Msg.Error("Error updating CIF Metadata", zap.Error(err))
log.Error("Error updating CIF Metadata", zap.Error(err))
return false
}
log.Msg.Info("New CIF Metadata written", zap.Time("Update time", metadata.LastUpdate))
log.Info("New CIF Metadata written", zap.Time("Update time", metadata.LastUpdate))
return true
}
@ -76,15 +76,15 @@ func PutCifMetadata(metadata *CifMetadata, lastUpdateType string) bool {
func DeleteCifEntries(deletions []database.DeleteQuery) error {
defer func() {
if r := recover(); r != nil {
log.Msg.Panic("Panic:", zap.Any("panic", r))
log.Panic("Panic:", zap.Any("panic", r))
}
}()
// Skip if deletions is empty
if len(deletions) == 0 {
log.Msg.Info("No deletions required")
log.Info("No deletions required")
return nil
}
log.Msg.Info("Running deletions against database", zap.Int("count", len(deletions)))
log.Info("Running deletions against database", zap.Int("count", len(deletions)))
// Prepare deletion tasks
collection := MongoClient.Database(databaseName).Collection(timetableCollection)
@ -103,7 +103,7 @@ func DeleteCifEntries(deletions []database.DeleteQuery) error {
_, err := collection.BulkWrite(context.Background(), bulkDeletions, bulkWriteOptions)
if err != nil {
log.Msg.Error("Error deleting documents", zap.Error(err))
log.Error("Error deleting documents", zap.Error(err))
return err
}
@ -114,10 +114,10 @@ func DeleteCifEntries(deletions []database.DeleteQuery) error {
func CreateCifEntries(schedules []database.Service) error {
// Skip if deletions is empty
if len(schedules) == 0 {
log.Msg.Info("No creations required")
log.Info("No creations required")
return nil
}
log.Msg.Info("Running creations against database", zap.Int("count", len(schedules)))
log.Info("Running creations against database", zap.Int("count", len(schedules)))
collection := MongoClient.Database(databaseName).Collection(timetableCollection)
@ -132,7 +132,7 @@ func CreateCifEntries(schedules []database.Service) error {
_, err := collection.BulkWrite(context.Background(), models, bulkWriteOptions)
if err != nil {
log.Msg.Error("Error inserting documents", zap.Error(err))
log.Error("Error inserting documents", zap.Error(err))
return err
}

View File

@ -34,9 +34,9 @@ func InitDataAccess(cfg *helpers.Configuration) {
client, err := mongo.Connect(ctx, options.Client().ApplyURI(uri).SetBSONOptions(bsonOpts))
if err != nil {
fmt.Println(err)
log.Msg.Fatal("Error connecting to database: " + err.Error())
log.Fatal("Error connecting to database: " + err.Error())
} else {
log.Msg.Info("Database connection successful")
log.Info("Database connection successful")
}
MongoClient = client
}
@ -47,9 +47,9 @@ func CloseMongoClient() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := MongoClient.Disconnect(ctx); err != nil {
log.Msg.Warn("Error disconnecting MongoDB client: " + err.Error())
log.Warn("Error disconnecting MongoDB client: " + err.Error())
} else {
log.Msg.Info("MongoDB client disconnected.")
log.Info("MongoDB client disconnected.")
}
}
}

View File

@ -12,13 +12,13 @@ import (
// CAUTION: Drops the collection named in collectionName
func DropCollection(collectionName string) error {
log.Msg.Info("Dropping collection", zap.String("Collection Name", collectionName))
log.Info("Dropping collection", zap.String("Collection Name", collectionName))
database := MongoClient.Database(databaseName)
collection := database.Collection(collectionName)
err := collection.Drop(context.Background())
if err != nil {
log.Msg.Error("Error dropping collection", zap.String("Collection Name", collectionName), zap.Error(err))
log.Error("Error dropping collection", zap.String("Collection Name", collectionName), zap.Error(err))
return err
}
@ -45,7 +45,7 @@ func CheckUpdateTime(collectionName string) (int64, error) {
// Sets a new update time (unix timestamp) of the collection named in collectionName. The update time is calculated within the function.
func SetUpdateTime(collectionName string) error {
log.Msg.Info("Setting update time", zap.String("collection", collectionName))
log.Info("Setting update time", zap.String("collection", collectionName))
database := MongoClient.Database(databaseName)
collection := database.Collection("meta")
options := options.Update().SetUpsert(true)
@ -65,7 +65,7 @@ func SetUpdateTime(collectionName string) error {
_, err := collection.UpdateOne(context.Background(), filter, update, options)
if err != nil {
log.Msg.Error("Error setting update time", zap.String("collection", collectionName), zap.Error(err))
log.Error("Error setting update time", zap.String("collection", collectionName), zap.Error(err))
return err
}
return nil

View File

@ -1,36 +1,63 @@
package log
import (
"os"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"git.fjla.uk/owlboard/timetable-mgr/helpers"
)
var Msg *zap.Logger
// Logger object is exported to facilitate migration
// to the single logger instance. Use outside `package log`
// should be avoided.
var post *zap.Logger
// Initialises the logger
func init() {
var err error
// Create a custom configuration with a human-readable "Console" encoder
config := zap.NewDevelopmentConfig()
config.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder // Adds color to log levels
// Determine the log level based on the runtime mode
logLevel := zapcore.DebugLevel
if helpers.Runtime != "debug" {
logLevel = zapcore.InfoLevel
mode := os.Getenv("runtime")
if mode == "" {
mode = "prod"
}
// Set the log level
config.Level = zap.NewAtomicLevelAt(logLevel)
logLevel := zapcore.InfoLevel
if mode == "debug" {
logLevel = zapcore.DebugLevel
}
Msg, err = config.Build() // Potential source of the error
config := zap.NewDevelopmentConfig()
config.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder
config.Level = zap.NewAtomicLevelAt(logLevel)
post, err = config.Build(zap.AddCallerSkip(1))
if err != nil {
panic("Failed to initialize logger: " + err.Error())
}
// Log the selected log level (optional, can be helpful for debugging)
Msg.Info("Log level set to: " + logLevel.String())
defer post.Sync()
Info("Logger initialised", zap.String("level", logLevel.String()), zap.String("runtime", mode))
}
func Info(msg string, fields ...zap.Field) {
post.Info(msg, fields...)
}
func Debug(msg string, fields ...zap.Field) {
post.Debug(msg, fields...)
}
func Warn(msg string, fields ...zap.Field) {
post.Warn(msg, fields...)
}
func Error(msg string, fields ...zap.Field) {
post.Error(msg, fields...)
}
func Fatal(msg string, fields ...zap.Field) {
post.Fatal(msg, fields...)
}
func Panic(msg string, fields ...zap.Field) {
post.Panic(msg, fields...)
}

23
main.go
View File

@ -1,7 +1,6 @@
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
@ -16,19 +15,18 @@ import (
"git.fjla.uk/owlboard/timetable-mgr/log"
"git.fjla.uk/owlboard/timetable-mgr/messaging"
"git.fjla.uk/owlboard/timetable-mgr/vstp"
"go.uber.org/zap"
)
func main() {
log.Info("Initialising OwlBoard timetable-mgr", zap.String("version", helpers.Version))
cfg, err := helpers.LoadConfig()
if err != nil {
fmt.Println("Error loading configuration", err)
log.Fatal("Unable to load configuration", zap.Error(err))
return
}
cfg.PrintConfig()
log.Msg.Info("Initialised OwlBoard timetable-mgr " + helpers.Version)
dbAccess.InitDataAccess(cfg)
dbAccess.PushVersionToDb()
@ -60,30 +58,27 @@ func handleSignals(cfg *helpers.Configuration, stop chan<- struct{}) {
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
sig := <-sigChan
log.Msg.Warn("Signal received: " + sig.String())
log.Warn("Signal received: " + sig.String())
cleanup(cfg, stop)
}
// Cleans up open connections ready for a clean exit of the program
func cleanup(cfg *helpers.Configuration, stop chan<- struct{}) {
log.Msg.Debug("Cleaning up open connections")
log.Debug("Cleaning up open connections")
if cfg.VstpOn {
if messaging.Client != nil {
log.Msg.Info("Closing STOMP Client")
log.Info("Closing STOMP Client")
messaging.Disconnect(messaging.Client)
}
}
if dbAccess.MongoClient != nil {
log.Msg.Info("Closing MongoDB Client")
log.Info("Closing MongoDB Client")
dbAccess.CloseMongoClient()
}
log.Msg.Info("Signalling to other goroutines")
log.Info("Signalling to other goroutines")
close(stop)
log.Msg.Info("Program ready to exit")
if log.Msg != nil {
log.Msg.Sync()
}
log.Info("Program ready to exit")
time.Sleep(500 * time.Millisecond)

View File

@ -21,11 +21,11 @@ func dial(user, pass string) *stomp.Conn {
stomp.ConnOpt.Header("client-id", user+"-mq-client"),
)
if err != nil {
log.Msg.Fatal("Unable to connect to STOMP Client: " + err.Error())
log.Fatal("Unable to connect to STOMP Client: " + err.Error())
conn.MustDisconnect()
}
log.Msg.Info("Initialised STOMP Client")
log.Info("Initialised STOMP Client")
return conn
}
@ -34,14 +34,14 @@ func dial(user, pass string) *stomp.Conn {
func Disconnect(conn *stomp.Conn) {
if conn != nil {
err := conn.Disconnect()
log.Msg.Warn("Disconnected STOMP Client")
log.Warn("Disconnected STOMP Client")
if err != nil {
conn.MustDisconnect()
log.Msg.Error("STOMP Disconnection failed, forced disconnect")
log.Error("STOMP Disconnection failed, forced disconnect")
}
return
}
log.Msg.Error("STOMP Disconnect failed, next connection attempt may fail")
log.Error("STOMP Disconnect failed, next connection attempt may fail")
}
// Register against the MQ Server and log each message for testing purposes

View File

@ -14,7 +14,7 @@ import (
// Downloads NROD Data and extracts if GZIP, returns a io.Reader
func NrodStream(url string, cfg *helpers.Configuration) (io.ReadCloser, error) {
log.Msg.Debug("Fetching NROD data stream", zap.String("Request URL", url))
log.Debug("Fetching NROD data stream", zap.String("Request URL", url))
client := http.Client{
Timeout: time.Second * 300,
@ -22,7 +22,7 @@ func NrodStream(url string, cfg *helpers.Configuration) (io.ReadCloser, error) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
log.Msg.Error("Error creating HTTP Request", zap.Error(err))
log.Error("Error creating HTTP Request", zap.Error(err))
return nil, err
}
@ -30,13 +30,13 @@ func NrodStream(url string, cfg *helpers.Configuration) (io.ReadCloser, error) {
resp, err := client.Do(req)
if err != nil {
log.Msg.Error("Error carrying out HTTP Request", zap.Error(err), zap.Int("STATUS", resp.StatusCode))
log.Error("Error carrying out HTTP Request", zap.Error(err), zap.Int("STATUS", resp.StatusCode))
return nil, err
}
if resp.StatusCode != http.StatusOK {
err := fmt.Errorf("unexpected status code: %d", resp.StatusCode)
log.Msg.Error("Non-successful status code", zap.Error(err))
log.Error("Non-successful status code", zap.Error(err))
return nil, err
}
@ -46,13 +46,13 @@ func NrodStream(url string, cfg *helpers.Configuration) (io.ReadCloser, error) {
}
func NrodStreamExtract(resp *http.Response) (io.ReadCloser, error) {
log.Msg.Debug("Extracting NROD Download")
log.Debug("Extracting NROD Download")
log.Msg.Debug("Content Type", zap.String("Content-Encoding", resp.Header.Get("Content-Encoding")))
log.Debug("Content Type", zap.String("Content-Encoding", resp.Header.Get("Content-Encoding")))
gzReader, err := gzip.NewReader(resp.Body)
if err != nil {
log.Msg.Warn("Unable to create GZIP Reader, data probably not gzipped")
log.Warn("Unable to create GZIP Reader, data probably not gzipped")
return resp.Body, err
}

View File

@ -20,26 +20,26 @@ func processEntryType(entry database.Service) {
case "Delete":
deleteEntry(entry)
default:
log.Msg.Error("Unknown transaction type: " + entry.TransactionType)
log.Warn("Unknown transaction type: " + entry.TransactionType)
}
}
func createEntry(entry database.Service) {
log.Msg.Info("Entry Creation requested for: " + entry.TrainUid + " - " + entry.Headcode + " - " + entry.Operator)
log.Info("Entry Creation requested for: " + entry.TrainUid + " - " + entry.Headcode + " - " + entry.Operator)
status := dbAccess.PutOneService(entry)
if status {
log.Msg.Info("Database entry created")
log.Info("Database entry created")
} else {
log.Msg.Error("Database entry failed, skipped service")
log.Error("Database entry failed, skipped service")
}
}
func updateEntry(entry database.Service) {
log.Msg.Info("Entry UPDATE requested for: " + entry.TrainUid + " - " + entry.Headcode + " - " + entry.Operator)
log.Warn("Entry UPDATE requested for: " + entry.TrainUid + " - " + entry.Headcode + " - " + entry.Operator)
}
func deleteEntry(entry database.Service) {
log.Msg.Info("Entry DELETE requested for: " + entry.TrainUid + " - " + entry.Headcode)
log.Info("Entry DELETE requested for: " + entry.TrainUid + " - " + entry.Headcode)
var deletionQuery = database.DeleteQuery{
TrainUid: entry.TrainUid,
ScheduleStartDate: entry.ScheduleStartDate,
@ -47,9 +47,9 @@ func deleteEntry(entry database.Service) {
}
status := dbAccess.DeleteOneService(deletionQuery)
if status {
log.Msg.Info("Database entry deleted")
log.Info("Database entry deleted")
} else {
log.Msg.Error("Database deletion failed, skipped deletion")
log.Error("Database deletion failed, skipped deletion")
fmt.Printf("%+v\n", deletionQuery)
}
}

View File

@ -1,17 +1,16 @@
package vstp
import (
"fmt"
"git.fjla.uk/owlboard/timetable-mgr/log"
"github.com/go-stomp/stomp/v3"
"go.uber.org/zap"
)
var count uint64 = 0
func handle(msg *stomp.Message) {
count++
log.Msg.Info("Messages since started: " + fmt.Sprint(count))
log.Info("Message received", zap.Uint64("total since startup", count))
schedule := unmarshalData(string(msg.Body))
processEntryType(schedule)
}

View File

@ -18,23 +18,22 @@ func unmarshalData(jsonData string) database.Service {
var schedule upstreamApi.MsgData
err := json.Unmarshal([]byte(jsonData), &schedule)
if err != nil {
log.Msg.Error("Unable to unmarshal message body: " + err.Error())
log.Error("Unable to unmarshal message body: " + err.Error())
//return err
}
log.Msg.Debug("Unmarshalling Complete")
log.Debug("Unmarshalling Complete")
if schedule.Data.CIFMsg.ScheduleSegment == nil {
log.Msg.Warn("ScheduleSegment is nil")
log.Warn("ScheduleSegment is nil")
} else if len(schedule.Data.CIFMsg.ScheduleSegment) == 0 {
log.Msg.Warn("ScheduleSegment is empty")
log.Warn("ScheduleSegment is empty")
}
return formatData(&schedule.Data.CIFMsg)
}
// Transforms the upstreamApi.Schedule type into a database.Service type
func formatData(dataInput *upstreamApi.Schedule) database.Service {
log.Msg.Debug("ScheduleSegment length: " + fmt.Sprint(len(dataInput.ScheduleSegment)))
log.Msg.Debug("Printing dataInput to console:")
log.Debug("ScheduleSegment length: " + fmt.Sprint(len(dataInput.ScheduleSegment)))
var operator, headcode, powerType string
var planSpeed int32
@ -70,20 +69,20 @@ func formatData(dataInput *upstreamApi.Schedule) database.Service {
// Uses the map provided in 'helpers' to translate incorrect CIF speeds to their correct equivalent
func parseSpeed(CIFSpeed string) int32 {
log.Msg.Debug("CIFSpeed Input: '" + CIFSpeed + "'")
log.Debug("CIFSpeed Input: '" + CIFSpeed + "'")
if CIFSpeed == "" {
log.Msg.Debug("Speed data not provided")
log.Debug("Speed data not provided")
return int32(0)
}
actualSpeed, exists := helpers.SpeedMap[CIFSpeed]
if !exists {
actualSpeed = CIFSpeed
}
log.Msg.Debug("Corrected Speed: " + actualSpeed)
log.Debug("Corrected Speed: " + actualSpeed)
speed, err := strconv.ParseInt(actualSpeed, 10, 32)
if err != nil {
log.Msg.Warn("Unable to parse speed: " + CIFSpeed + ", returning 0")
log.Warn("Unable to parse speed: " + CIFSpeed + ", returning 0")
return int32(0)
}
return int32(speed)
@ -91,10 +90,10 @@ func parseSpeed(CIFSpeed string) int32 {
// Converts the date string provided from the upstream API into a proper Date type and adds a time
func parseDate(dateString string, end bool) time.Time {
log.Msg.Debug("Date Input: " + dateString)
log.Debug("Date Input: " + dateString)
date, err := time.Parse("2006-01-02", dateString)
if err != nil {
log.Msg.Error("Unable to parse date: " + dateString)
log.Error("Unable to parse date: " + dateString)
return time.Time{}
}
@ -107,13 +106,13 @@ func parseDate(dateString string, end bool) time.Time {
}
dateWithTime := time.Date(date.Year(), date.Month(), date.Day(), hour, minute, second, nanosecond, location)
log.Msg.Debug("Parsed date: " + dateWithTime.String())
log.Debug("Parsed date: " + dateWithTime.String())
return dateWithTime
}
// Converts the binary stype 'daysRun' field into an array of short days
func parseDaysRun(daysBinary string) []string {
log.Msg.Debug("daysRun Input: " + daysBinary)
log.Debug("daysRun Input: " + daysBinary)
shortDays := []string{"m", "t", "w", "th", "f", "s", "su"}
var result []string
for i, digit := range daysBinary {

View File

@ -11,25 +11,25 @@ import (
func Subscribe() {
sub, err := messaging.Client.Subscribe("/topic/VSTP_ALL", stomp.AckAuto)
if err != nil {
log.Msg.Fatal("Unable to start subscription: " + err.Error())
log.Fatal("Unable to start subscription: " + err.Error())
}
log.Msg.Info("Subscription to VSTP topic successful, listening")
log.Info("Subscription to VSTP topic successful, listening")
go func() {
log.Msg.Debug("GOROUTINE: VSTP Message Handler Started")
log.Debug("GOROUTINE: VSTP Message Handler Started")
defer func() {
if r := recover(); r != nil {
log.Msg.Warn("GOROUTINE: VSTP Message Handler Stopped")
log.Warn("GOROUTINE: VSTP Message Handler Stopped")
time.Sleep(time.Second * 10)
log.Msg.Fatal("GOROUTINE: VSTP Message Handler Failed")
log.Fatal("GOROUTINE: VSTP Message Handler Failed")
}
}()
for {
msg := <-sub.C
if msg.Err != nil {
log.Msg.Error("STOMP Message Error: " + msg.Err.Error())
log.Error("STOMP Message Error: " + msg.Err.Error())
} else {
log.Msg.Info("STOMP Message Received")
log.Info("STOMP Message Received")
handle(msg)
}
}