Skip to content

Commit ccf42f2

Browse files
committed
Updated - Introduced chunked processing. Saves approx 30 sec.
1 parent 199a9ef commit ccf42f2

1 file changed

Lines changed: 176 additions & 22 deletions

File tree

entries/ikelaiah/src/weatherstation.pas

Lines changed: 176 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,37 +5,34 @@
55
interface
66

77
uses
8-
Classes,
9-
SysUtils,
10-
Math,
11-
streamex,
12-
lgHashMap
8+
Classes
9+
, SysUtils
10+
, Math
11+
, streamex
12+
, bufstream
13+
, lgHashMap
1314
{$IFDEF DEBUG}
1415
, Stopwatch
1516
{$ENDIF}
16-
, Baseline.Common
17-
;
17+
, Baseline.Common;
1818

1919
type
20-
TParsedData = record
21-
wsName: string;
22-
wsTemp: int64;
23-
end;
20+
{ Create a record of temperature stats.
2421
25-
type
26-
// Create a record of temperature stats.
22+
Borrowed the concept from go's approach to improve performance, save floats as int64.
23+
This saved ~2 mins processing time for processing 1 billion rows.}
2724
TStat = record
2825
var
29-
min: int64; // Borrowed the concept from go's approach to improve
30-
// performance, save floats as int64.
31-
max: int64; // This saved ~2 mins processing time.
26+
min: int64;
27+
max: int64;
3228
sum: int64;
3329
cnt: int64;
3430
public
3531
constructor Create(const newMin: int64; const newMax: int64;
3632
const newSum: int64; const newCount: int64);
3733
function ToString: string;
3834
end;
35+
PStat = ^TStat;
3936

4037
type
4138
// Create a dictionary
@@ -49,6 +46,10 @@ TWeatherStation = class
4946
weatherDictionary: TWeatherDictionaryLG;
5047
weatherStationList: TStringList;
5148
procedure ReadMeasurements;
49+
procedure ReadMeasurementsClassic;
50+
procedure ReadMeasurementsInChunks(const filename: string);
51+
procedure ParseStationAndTempFromChunk(const chunkData: pansichar;
52+
const dataSize: int64; const chunkIndex: int64);
5253
procedure ParseStationAndTemp(const line: string);
5354
procedure AddCityTemperatureLG(const cityName: string; const newTemp: int64);
5455
procedure SortWeatherStationAndStats;
@@ -172,6 +173,7 @@ procedure TWeatherStation.SortWeatherStationAndStats;
172173
WriteLn('Nothing to Sort.');
173174
Exit;
174175
end;
176+
175177
for wsKey in weatherDictionary.Keys do
176178
begin
177179
self.weatherStationList.Add(wsKey + '=' + weatherDictionary[wsKey].ToString + ', ');
@@ -209,17 +211,19 @@ procedure TWeatherStation.AddCityTemperatureLG(const cityName: string;
209211
self.weatherDictionary.AddOrSetValue(cityName, stat);
210212
{$IFDEF DEBUG}
211213
// Display the line.
212-
// WriteLn('Updated: ', cityName);
214+
WriteLn('Updated: ', cityName);
213215
{$ENDIF DEBUG}
214216
end;
215217

216218
// If city name doesn't exist add a new entry
217219
if not self.weatherDictionary.Contains(cityName) then
218220
begin
219221
self.weatherDictionary.Add(cityName, TStat.Create(newTemp, newTemp, newTemp, 1));
222+
220223
{$IFDEF DEBUG}
221224
// Display the line.
222-
// WriteLn('Added: ', cityName);
225+
WriteLn('weatherDictionary count: ', inttostr(self.weatherDictionary.Count));
226+
WriteLn('Added: ', cityName);
223227
{$ENDIF DEBUG}
224228
end;
225229
end;
@@ -269,13 +273,13 @@ procedure TWeatherStation.ReadMeasurements;
269273
// Open the file for reading
270274
fileStream := TFileStream.Create(self.fname, fmOpenRead or fmShareDenyNone);
271275
try
272-
streamReader := TStreamReader.Create(fileStream);
276+
streamReader := TStreamReader.Create(fileStream, 655360, False);
273277
try
274278
// Read and parse chunks of data until EOF -------------------------------
275279
while not streamReader.EOF do
276280
begin
277-
line := streamReader.ReadLine;
278-
self.ParseStationAndTemp(line);
281+
//line := streamReader.ReadLine;
282+
self.ParseStationAndTemp(streamReader.ReadLine);
279283
end;// End of read and parse chunks of data ------------------------------
280284
finally
281285
streamReader.Free;
@@ -286,10 +290,160 @@ procedure TWeatherStation.ReadMeasurements;
286290
end;
287291
end;
288292

293+
procedure TWeatherStation.ReadMeasurementsClassic;
294+
var
295+
inputFile: System.TextFile;
296+
textBuffer: array[1..131072] of byte;
297+
line: string;
298+
begin
299+
300+
// Open the file for reading
301+
AssignFile(inputFile, self.fname);
302+
SetTextBuf(inputFile, textBuffer);
303+
try
304+
Reset(inputFile);
305+
306+
// Read and parse chunks of data until EOF -------------------------------
307+
while not EOF(inputFile) do
308+
begin
309+
ReadLn(inputFile, line);
310+
self.ParseStationAndTemp(line);
311+
end;// End of read and parse chunks of data ------------------------------
312+
313+
finally
314+
// Close the file
315+
CloseFile(inputFile);
316+
end;
317+
end;
318+
319+
{procedure TWeatherStation.ParseStationAndTempFromChunk(const chunkData: pansichar;
320+
const dataSize: int64; const chunkIndex: int64);
321+
var
322+
mStream: TMemoryStream;
323+
streamReader: TStreamReader;
324+
currentString: string;
325+
begin
326+
mStream:=TMemoryStream.Create;
327+
try
328+
mStream.WriteBuffer(chunkData^, dataSize);
329+
mStream.Position:=0;
330+
331+
streamReader:=TStreamReader.Create(mStream, 1048576, False);
332+
try
333+
while not streamReader.Eof do
334+
begin
335+
currentString:=streamReader.ReadLine;
336+
self.ParseStationAndTemp(currentString);
337+
end;
338+
finally
339+
streamReader.Free;
340+
end;
341+
finally
342+
mStream.Free;
343+
end;
344+
end;}
345+
346+
procedure TWeatherStation.ParseStationAndTempFromChunk(const chunkData: pansichar;
347+
const dataSize: int64; const chunkIndex: int64);
348+
var
349+
index, lineStart, lineLength: int64;
350+
begin
351+
lineStart := 0;
352+
353+
// Check for Line Feed (LF)
354+
for index := 0 to dataSize - 1 do
355+
begin
356+
if chunkData[index] = #10 then
357+
begin
358+
359+
lineLength := index - lineStart;
360+
361+
// Remove potential CR before LF (for Windows)
362+
if (chunkData[index-1] = #13) and (index < dataSize - 1) then
363+
Dec(LineLength);
364+
365+
// The current line is now: Buffer[LineStart..LineStart+LineLength-1]
366+
// WriteLn(chunkData[lineStart..lineStart + lineLength - 1], '.');
367+
self.ParseStationAndTemp(chunkData[lineStart..lineStart+lineLength - 1]);
368+
// Skip to the next 'line' in the buffer
369+
lineStart := index + 1;
370+
end;
371+
end;
372+
end;
373+
374+
procedure TWeatherStation.ReadMeasurementsInChunks(const filename: string);
375+
const
376+
defaultChunkSize: integer = 536870912; // 512MB in bytes
377+
var
378+
fileStream: TFileStream;
379+
buffer: pansichar;
380+
bytesRead, totalBytesRead, chunkSize, lineBreakPos, chunkIndex: int64;
381+
begin
382+
chunkSize := defaultChunkSize * 1;
383+
384+
// Open the file for reading
385+
fileStream := TFileStream.Create(filename, fmOpenRead);
386+
try
387+
// Allocate memory buffer for reading chunks
388+
// Ref: https://www.freepascal.org/docs-html/rtl/system/getmem.html
389+
GetMem(buffer, chunkSize);
390+
try
391+
totalBytesRead := 0;
392+
chunkIndex := 0;
393+
394+
// Read and parse chunks of data until EOF
395+
while totalBytesRead < fileStream.Size do
396+
begin
397+
{$IFDEF DEBUG}
398+
WriteLn('Processing chunk index: ', IntToStr(chunkIndex));
399+
{$ENDIF DEBUG}
400+
401+
bytesRead := fileStream.Read(buffer^, chunkSize);
402+
403+
// Update total bytes read
404+
Inc(totalBytesRead, bytesRead);
405+
406+
// Find the position of the last newline character in the chunk
407+
lineBreakPos := BytesRead;
408+
while (lineBreakPos > 0) and (Buffer[lineBreakPos - 1] <> #10) do
409+
Dec(lineBreakPos);
410+
411+
{ Now, must ensure that if the last byte read in the current chunk
412+
is not a newline character, the file pointer is moved back to include
413+
that byte and any preceding bytes of the partial line in the next
414+
chunk's read operation.
415+
416+
Also, no need to update the BytesRead variable in this context because
417+
it represents the actual number of bytes read from the file, including
418+
any partial line that may have been included due to moving the file
419+
pointer back.
420+
Ref: https://www.freepascal.org/docs-html/rtl/classes/tstream.seek.html}
421+
if lineBreakPos < bytesRead then
422+
fileStream.Seek(-(bytesRead - lineBreakPos), soCurrent);
423+
424+
// Parse the buffer line by line here
425+
// This is to slow!
426+
self.ParseStationAndTempFromChunk(buffer, lineBreakPos, chunkIndex);
427+
428+
// Increase chunk index - a counter
429+
Inc(chunkIndex);
430+
end;
431+
finally
432+
// Free the memory buffer
433+
FreeMem(buffer);
434+
end;
435+
finally
436+
// Close the file
437+
fileStream.Free;
438+
end;
439+
end;
440+
289441
// The main algorithm
290442
procedure TWeatherStation.ProcessMeasurements;
291443
begin
292-
self.ReadMeasurements;
444+
// self.ReadMeasurements;
445+
// self.ReadMeasurementsClassic;
446+
self.ReadMeasurementsInChunks(self.fname); // This method cuts approx 30 seconds of processing time
293447
self.SortWeatherStationAndStats;
294448
self.PrintSortedWeatherStationAndStats;
295449
end;

0 commit comments

Comments
 (0)