Streaming data into the CIF Parse function reduces memory down to 4.1GB - also looking to stream the parsed data directly into the database if feasable.
This commit is contained in:
		
							parent
							
								
									7acae49812
								
							
						
					
					
						commit
						f903219276
					
				
							
								
								
									
										54
									
								
								cif/parse.go
									
									
									
									
									
								
							
							
						
						
									
										54
									
								
								cif/parse.go
									
									
									
									
									
								
							| @ -4,6 +4,7 @@ import ( | |||||||
| 	"bytes" | 	"bytes" | ||||||
| 	"encoding/json" | 	"encoding/json" | ||||||
| 	"errors" | 	"errors" | ||||||
|  | 	"io" | ||||||
| 
 | 
 | ||||||
| 	"git.fjla.uk/owlboard/go-types/pkg/upstreamApi" | 	"git.fjla.uk/owlboard/go-types/pkg/upstreamApi" | ||||||
| 	"git.fjla.uk/owlboard/timetable-mgr/log" | 	"git.fjla.uk/owlboard/timetable-mgr/log" | ||||||
| @ -63,3 +64,56 @@ func parseCifData(data *[]byte) (*parsedData, error) { | |||||||
| 	log.Msg.Debug("CIF Parsing completed") | 	log.Msg.Debug("CIF Parsing completed") | ||||||
| 	return &parsed, nil | 	return &parsed, nil | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | func parseCifDataStream(dataStream io.ReadCloser) (*parsedData, error) { | ||||||
|  | 	log.Msg.Debug("STREAM-Starting CIF Datastream parsing") | ||||||
|  | 	if dataStream == nil { | ||||||
|  | 		return nil, errors.New("unable to parse nil pointer") | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// Initialise data structures | ||||||
|  | 	var parsed parsedData | ||||||
|  | 	parsed.assoc = make([]upstreamApi.JsonAssociationV1, 0) | ||||||
|  | 	parsed.sched = make([]upstreamApi.JsonScheduleV1, 0) | ||||||
|  | 
 | ||||||
|  | 	// Create JSON Decoder | ||||||
|  | 	decoder := json.NewDecoder(dataStream) | ||||||
|  | 
 | ||||||
|  | 	// Iterate over JSON Objects using stream decoder | ||||||
|  | 	for decoder.More() { | ||||||
|  | 		var obj map[string]json.RawMessage | ||||||
|  | 		if err := decoder.Decode(&obj); err != nil { | ||||||
|  | 			log.Msg.Error("Error decoding JSON String") | ||||||
|  | 			return nil, err | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		// Handle parsed data | ||||||
|  | 		for key, value := range obj { | ||||||
|  | 			switch key { | ||||||
|  | 			case "JsonTimetableV1": | ||||||
|  | 				var timetable upstreamApi.JsonTimetableV1 | ||||||
|  | 				if err := json.Unmarshal(value, &timetable); err != nil { | ||||||
|  | 					log.Msg.Error("Error decoding JSONTimetableV1 object", zap.Error(err)) | ||||||
|  | 					continue | ||||||
|  | 				} | ||||||
|  | 				parsed.header = timetable | ||||||
|  | 			case "JsonAssociationV1": | ||||||
|  | 				var association upstreamApi.JsonAssociationV1 | ||||||
|  | 				if err := json.Unmarshal(value, &association); err != nil { | ||||||
|  | 					log.Msg.Error("Error decoding JSONAssociationV1 object", zap.Error(err)) | ||||||
|  | 					continue | ||||||
|  | 				} | ||||||
|  | 				parsed.assoc = append(parsed.assoc, association) | ||||||
|  | 			case "JsonScheduleV1": | ||||||
|  | 				var schedule upstreamApi.JsonScheduleV1 | ||||||
|  | 				if err := json.Unmarshal(value, &schedule); err != nil { | ||||||
|  | 					log.Msg.Error("Error decoding JSONScheduleV1 object", zap.Error(err)) | ||||||
|  | 					continue | ||||||
|  | 				} | ||||||
|  | 				parsed.sched = append(parsed.sched, schedule) | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	log.Msg.Debug("CIF Parsing completed") | ||||||
|  | 	return &parsed, nil | ||||||
|  | } | ||||||
|  | |||||||
| @ -20,25 +20,25 @@ func runCifFullDownload(cfg *helpers.Configuration) error { | |||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		log.Msg.Error("Error getting download URL", zap.Error(err)) | 		log.Msg.Error("Error getting download URL", zap.Error(err)) | ||||||
| 	} | 	} | ||||||
| 	data, err := nrod.NrodDownload(url, cfg) | 	dataStream, err := nrod.NrodStream(url, cfg) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		log.Msg.Error("Error downloading CIF data", zap.Error(err)) | 		log.Msg.Error("Error downloading CIF data", zap.Error(err)) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	// If debug mode is on, call debugWriteDownload | 	// If debug mode is on, call debugWriteDownload | ||||||
| 	if helpers.Runtime == "debug" { | 	//	if helpers.Runtime == "debug" { | ||||||
| 		debugWriteDownload(data) | 	//		debugWriteDownload(dataStream) | ||||||
| 	} | 	//	} | ||||||
| 
 | 
 | ||||||
| 	// Parse CIF file | 	// Parse CIF file | ||||||
| 	parsed, err := parseCifData(data) | 	parsed, err := parseCifDataStream(dataStream) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		log.Msg.Error("Error parsing CIF data", zap.Error(err)) | 		log.Msg.Error("Error parsing CIF data", zap.Error(err)) | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	// Make `data` a nil pointer as it is no longer required | 	// Make `data` a nil pointer as it is no longer required | ||||||
| 	data = nil | 	dataStream = nil | ||||||
| 
 | 
 | ||||||
| 	// Drop timetable collection | 	// Drop timetable collection | ||||||
| 	dbAccess.DropCollection(dbAccess.TimetableCollection) // I should edit this to prevent removal of VSTP entries in the database. | 	dbAccess.DropCollection(dbAccess.TimetableCollection) // I should edit this to prevent removal of VSTP entries in the database. | ||||||
|  | |||||||
| @ -39,7 +39,7 @@ func NrodDownload(url string, cfg *helpers.Configuration) (*[]byte, error) { | |||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	// Yes, I know `readedData` is not proper English.  But readData reads more like a verb action. | 	// Yes, I know `readedData` is not proper English.  But readData reads more like a verb action. | ||||||
| 	readedData, err := nrodExtract(*resp) | 	readedData, err := nrodExtract(resp) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		log.Msg.Error("Unable to read response data") | 		log.Msg.Error("Unable to read response data") | ||||||
| 		return nil, err | 		return nil, err | ||||||
| @ -49,7 +49,7 @@ func NrodDownload(url string, cfg *helpers.Configuration) (*[]byte, error) { | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // Extracts GZIP Data from an HTTP Response and returns the decompresses data as a byte array | // Extracts GZIP Data from an HTTP Response and returns the decompresses data as a byte array | ||||||
| func nrodExtract(resp http.Response) (*[]byte, error) { | func nrodExtract(resp *http.Response) (*[]byte, error) { | ||||||
| 	log.Msg.Debug("Extracting HTTP Response Data") | 	log.Msg.Debug("Extracting HTTP Response Data") | ||||||
| 	gzReader, err := gzip.NewReader(resp.Body) | 	gzReader, err := gzip.NewReader(resp.Body) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
|  | |||||||
							
								
								
									
										60
									
								
								nrod/streams.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										60
									
								
								nrod/streams.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,60 @@ | |||||||
|  | package nrod | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"compress/gzip" | ||||||
|  | 	"fmt" | ||||||
|  | 	"io" | ||||||
|  | 	"net/http" | ||||||
|  | 	"time" | ||||||
|  | 
 | ||||||
|  | 	"git.fjla.uk/owlboard/timetable-mgr/helpers" | ||||||
|  | 	"git.fjla.uk/owlboard/timetable-mgr/log" | ||||||
|  | 	"go.uber.org/zap" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | // Downloads NROD Data and extracts if GZIP, returns a io.Reader | ||||||
|  | func NrodStream(url string, cfg *helpers.Configuration) (io.ReadCloser, error) { | ||||||
|  | 	log.Msg.Debug("Fetching NROD data stream", zap.String("Request URL", url)) | ||||||
|  | 
 | ||||||
|  | 	client := http.Client{ | ||||||
|  | 		Timeout: time.Second * 300, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	req, err := http.NewRequest("GET", url, nil) | ||||||
|  | 	if err != nil { | ||||||
|  | 		log.Msg.Error("Error creating HTTP Request", zap.Error(err)) | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	req.Header.Add("Authorization", "Basic "+helpers.BasicAuth(cfg.NrodUser, cfg.NrodPass)) | ||||||
|  | 
 | ||||||
|  | 	resp, err := client.Do(req) | ||||||
|  | 	if err != nil { | ||||||
|  | 		log.Msg.Error("Error carrying out HTTP Request", zap.Error(err), zap.Int("STATUS", resp.StatusCode)) | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if resp.StatusCode != http.StatusOK { | ||||||
|  | 		err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) | ||||||
|  | 		log.Msg.Error("Non-successful status code", zap.Error(err)) | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// Run the data through the extractor function and return io.ReadCloser, error from | ||||||
|  | 	// directly | ||||||
|  | 	return NrodStreamExtract(resp) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func NrodStreamExtract(resp *http.Response) (io.ReadCloser, error) { | ||||||
|  | 	log.Msg.Debug("Extracting NROD Download") | ||||||
|  | 
 | ||||||
|  | 	log.Msg.Debug("Content Type", zap.String("Content-Encoding", resp.Header.Get("Content-Encoding"))) | ||||||
|  | 
 | ||||||
|  | 	gzReader, err := gzip.NewReader(resp.Body) | ||||||
|  | 	if err != nil { | ||||||
|  | 		log.Msg.Warn("Unable to create GZIP Reader, data probably not gzipped") | ||||||
|  | 		return resp.Body, err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return gzReader, nil | ||||||
|  | } | ||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user