Reorganise repo
This commit is contained in:
124
dbAccess/access.go
Normal file
124
dbAccess/access.go
Normal file
@@ -0,0 +1,124 @@
|
||||
package dbAccess
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"git.fjla.uk/owlboard/go-types/pkg/database"
|
||||
"git.fjla.uk/owlboard/timetable-mgr/helpers"
|
||||
"git.fjla.uk/owlboard/timetable-mgr/log"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const timetableCollection string = "timetable"
|
||||
|
||||
// CAUTION: Drops the collection named in collectionName
|
||||
func DropCollection(collectionName string) error {
|
||||
log.Msg.Info("Dropping collection", zap.String("Collection Name", collectionName))
|
||||
database := MongoClient.Database(databaseName)
|
||||
collection := database.Collection(collectionName)
|
||||
|
||||
err := collection.Drop(context.Background())
|
||||
if err != nil {
|
||||
log.Msg.Error("Error dropping collection", zap.String("Collection Name", collectionName), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Checks the update time (unix timestamp) of the collection named in collectionName, uses 'type: collection' entries in the meta collection
|
||||
func CheckUpdateTime(collectionName string) (int64, error) {
|
||||
database := MongoClient.Database(databaseName)
|
||||
collection := database.Collection(metaCollection)
|
||||
filter := bson.D{
|
||||
{Key: "target", Value: collectionName},
|
||||
{Key: "type", Value: "collection"},
|
||||
}
|
||||
var result struct {
|
||||
Updated int64 `bson:"updated"`
|
||||
}
|
||||
err := collection.FindOne(context.Background(), filter).Decode(&result)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return result.Updated, nil
|
||||
}
|
||||
|
||||
// Sets a new update time (unix timestamp) of the collection named in collectionName. The update time is calculated within the function.
|
||||
func SetUpdateTime(collectionName string) error {
|
||||
log.Msg.Info("Setting update time", zap.String("collection", collectionName))
|
||||
database := MongoClient.Database(databaseName)
|
||||
collection := database.Collection("meta")
|
||||
options := options.Update().SetUpsert(true)
|
||||
updateTime := time.Now().Unix()
|
||||
filter := bson.M{
|
||||
"target": collectionName,
|
||||
"type": "collection",
|
||||
}
|
||||
update := bson.M{
|
||||
"$set": bson.M{
|
||||
"updated": updateTime,
|
||||
"target": collectionName,
|
||||
"type": "collection",
|
||||
},
|
||||
}
|
||||
_, err := collection.UpdateOne(context.Background(), filter, update, options)
|
||||
|
||||
if err != nil {
|
||||
log.Msg.Error("Error setting update time", zap.String("collection", collectionName), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Pushes the current version of this application to the data base 'versions' collection.
|
||||
// Currently uses the old name of mq-client
|
||||
func PushVersionToDb() {
|
||||
version := database.Version{
|
||||
Target: "mq-client",
|
||||
Component: "mq-client",
|
||||
Version: helpers.Version,
|
||||
}
|
||||
versionSelector := database.VersionSelector{
|
||||
Target: "mq-client",
|
||||
Component: "mq-client",
|
||||
}
|
||||
opts := options.Update().SetUpsert(true)
|
||||
coll := MongoClient.Database("owlboard").Collection("versions")
|
||||
_, err := coll.UpdateOne(context.TODO(), versionSelector, bson.M{"$set": version}, opts)
|
||||
if err != nil {
|
||||
log.Msg.Warn("Unable to push version to database: " + err.Error())
|
||||
} else {
|
||||
log.Msg.Debug("Version up to date in Database")
|
||||
}
|
||||
}
|
||||
|
||||
// Puts one item of the type `database.Service` to the database, used by the VSTP package which receives services one at a time
|
||||
func PutOneService(data database.Service) bool {
|
||||
coll := MongoClient.Database(databaseName).Collection(timetableCollection)
|
||||
_, err := coll.InsertOne(context.TODO(), data)
|
||||
if err != nil {
|
||||
log.Msg.Error("Unable to insert to database: " + err.Error())
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// Deletes one service from the database.
|
||||
func DeleteOneService(data database.DeleteQuery) bool {
|
||||
coll := MongoClient.Database(databaseName).Collection(timetableCollection)
|
||||
var filter = bson.D{
|
||||
{Key: "trainUid", Value: data.TrainUid},
|
||||
{Key: "stpIndicator", Value: data.StpIndicator},
|
||||
{Key: "scheduleStartDate", Value: data.ScheduleStartDate},
|
||||
}
|
||||
_, err := coll.DeleteOne(context.TODO(), filter)
|
||||
if err != nil {
|
||||
log.Msg.Error("Unable to delete service: " + err.Error())
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
93
dbAccess/cif.go
Normal file
93
dbAccess/cif.go
Normal file
@@ -0,0 +1,93 @@
|
||||
package dbAccess
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"git.fjla.uk/owlboard/go-types/pkg/database"
|
||||
"git.fjla.uk/owlboard/timetable-mgr/log"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const Doctype = "CifMetadata"
|
||||
|
||||
// The type describing the CifMetadata 'type' in the database.
|
||||
// This type will be moved to owlboard/go-types
|
||||
type CifMetadata struct {
|
||||
Doctype string `json:"type"`
|
||||
LastUpdate time.Time `json:"lastUpdate"`
|
||||
LastTimestamp int64 `json:"lastTimestamp"`
|
||||
LastSequence int64 `json:"lastSequence"`
|
||||
}
|
||||
|
||||
// Fetches the CifMetadata from the database, returns nil if no metadata exists - before first initialisation for example.
|
||||
func GetCifMetadata() (*CifMetadata, error) {
|
||||
database := MongoClient.Database(databaseName)
|
||||
collection := database.Collection(metaCollection)
|
||||
filter := bson.M{"type": Doctype}
|
||||
var result CifMetadata
|
||||
err := collection.FindOne(context.Background(), filter).Decode(&result)
|
||||
if err != nil {
|
||||
if errors.Is(err, mongo.ErrNoDocuments) {
|
||||
return nil, nil
|
||||
}
|
||||
log.Msg.Error("Error fetching CIF Metadata")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
// Uses upsert to Insert/Update the CifMetadata in the database
|
||||
func PutCifMetadata(metadata CifMetadata) bool {
|
||||
database := MongoClient.Database(databaseName)
|
||||
collection := database.Collection(metaCollection)
|
||||
options := options.Update().SetUpsert(true)
|
||||
filter := bson.M{"type": Doctype}
|
||||
update := bson.M{
|
||||
"type": Doctype,
|
||||
"LastUpdate": metadata.LastUpdate,
|
||||
"LastTimestamp": metadata.LastTimestamp,
|
||||
"LastSequence": metadata.LastSequence,
|
||||
}
|
||||
|
||||
_, err := collection.UpdateOne(context.Background(), filter, update, options)
|
||||
|
||||
if err != nil {
|
||||
log.Msg.Error("Error updating CIF Metadata", zap.Error(err))
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func DeleteCifEntries(deletions []database.DeleteQuery) error {
|
||||
// Prepare deletion tasks
|
||||
bulkDeletions := make([]mongo.WriteModel, 0, len(deletions))
|
||||
|
||||
for _, deleteQuery := range deletions {
|
||||
filter := bson.M{
|
||||
"trainUid": deleteQuery.TrainUid,
|
||||
"scheduleStartDate": deleteQuery.ScheduleStartDate,
|
||||
"stpIndicator": deleteQuery.StpIndicator,
|
||||
}
|
||||
bulkDeletions = append(bulkDeletions, mongo.NewDeleteManyModel().SetFilter(filter))
|
||||
}
|
||||
|
||||
log.Msg.Info("Running `Delete` tasks from CIF Update", zap.Int("Required deletions", len(deletions)))
|
||||
for i := 0; i < len(bulkDeletions); i += batchsize {
|
||||
end := i + batchsize
|
||||
if end > len(bulkDeletions) {
|
||||
end = len(bulkDeletions)
|
||||
}
|
||||
_, err := MongoClient.Database(databaseName).Collection(TimetableCollection).BulkWrite(context.Background(), bulkDeletions[i:end])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
55
dbAccess/client.go
Normal file
55
dbAccess/client.go
Normal file
@@ -0,0 +1,55 @@
|
||||
package dbAccess
|
||||
|
||||
import (
|
||||
"git.fjla.uk/owlboard/timetable-mgr/helpers"
|
||||
"git.fjla.uk/owlboard/timetable-mgr/log"
|
||||
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
)
|
||||
|
||||
// Provide the DB Connection to other functions
|
||||
var MongoClient (*mongo.Client)
|
||||
|
||||
// Builds the DB URI based on the loaded configuration parameters
|
||||
func getDbUri(cfg *helpers.Configuration) string {
|
||||
var uri = "mongodb://" + cfg.DbUser + ":" + cfg.DbPass + "@" + cfg.DbHost + ":" + cfg.DbPort
|
||||
return uri
|
||||
}
|
||||
|
||||
// Configure bsonOpts
|
||||
var bsonOpts = &options.BSONOptions{
|
||||
UseJSONStructTags: true,
|
||||
}
|
||||
|
||||
// Initialise the DB Connection
|
||||
func InitDataAccess(cfg *helpers.Configuration) {
|
||||
uri := getDbUri(cfg)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
client, err := mongo.Connect(ctx, options.Client().ApplyURI(uri).SetBSONOptions(bsonOpts))
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
log.Msg.Fatal("Error connecting to database: " + err.Error())
|
||||
} else {
|
||||
log.Msg.Info("Database connection successful")
|
||||
}
|
||||
MongoClient = client
|
||||
}
|
||||
|
||||
// Closes the connection to the database - used for cleanup functions
|
||||
func CloseMongoClient() {
|
||||
if MongoClient != nil {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
if err := MongoClient.Disconnect(ctx); err != nil {
|
||||
log.Msg.Warn("Error disconnecting MongoDB client: " + err.Error())
|
||||
} else {
|
||||
log.Msg.Info("MongoDB client disconnected.")
|
||||
}
|
||||
}
|
||||
}
|
||||
1
dbAccess/common.go
Normal file
1
dbAccess/common.go
Normal file
@@ -0,0 +1 @@
|
||||
package dbAccess
|
||||
8
dbAccess/contants.go
Normal file
8
dbAccess/contants.go
Normal file
@@ -0,0 +1,8 @@
|
||||
package dbAccess
|
||||
|
||||
const databaseName string = "owlboard"
|
||||
const CorpusCollection string = "corpus"
|
||||
const StationsCollection string = "stations"
|
||||
const metaCollection string = "meta"
|
||||
const TimetableCollection string = "timetable"
|
||||
const batchsize int = 100
|
||||
55
dbAccess/corpus.go
Normal file
55
dbAccess/corpus.go
Normal file
@@ -0,0 +1,55 @@
|
||||
package dbAccess
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.fjla.uk/owlboard/go-types/pkg/database"
|
||||
)
|
||||
|
||||
// Puts an array of Corpus Documents into the database
|
||||
func PutManyCorpus(corpusData []database.CorpusEntry) error {
|
||||
collection := MongoClient.Database(databaseName).Collection(CorpusCollection)
|
||||
|
||||
documents := convertCorpusToInterfaceSlice(corpusData)
|
||||
|
||||
_, err := collection.InsertMany(context.Background(), documents)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
SetUpdateTime(CorpusCollection)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Puts an array of Stations documents into the database
|
||||
func PutManyStations(stationsData []database.StationEntry) error {
|
||||
collection := MongoClient.Database(databaseName).Collection(StationsCollection)
|
||||
|
||||
documents := convertStationsToInterfaceSlice(stationsData)
|
||||
|
||||
_, err := collection.InsertMany(context.Background(), documents)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
SetUpdateTime(StationsCollection)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Converts []database.CorpusEntry types into interface slices required to put them into the database
|
||||
func convertCorpusToInterfaceSlice(corpusData []database.CorpusEntry) []interface{} {
|
||||
interfaceSlice := make([]interface{}, len(corpusData))
|
||||
for i, doc := range corpusData {
|
||||
interfaceSlice[i] = doc
|
||||
}
|
||||
return interfaceSlice
|
||||
}
|
||||
|
||||
// Converts []database.StationEntry types into interface slices required to put them into the database
|
||||
func convertStationsToInterfaceSlice(stationsData []database.StationEntry) []interface{} {
|
||||
interfaceSlice := make([]interface{}, len(stationsData))
|
||||
for i, doc := range stationsData {
|
||||
interfaceSlice[i] = doc
|
||||
}
|
||||
return interfaceSlice
|
||||
}
|
||||
Reference in New Issue
Block a user