A helper library for fast line reading and writing with some degree of type safety.
Declare serializer and deserializers for your objects. Here's an example that reads
and writes jsonl format, though any line oriented format will obviously work.
These allow customizable entrypoints for the application data. Any validation logic should also happen here.
import lrw
import json
from dataclasses import dataclass
@dataclass
class Person:
id: int
name: str
class MySerializer(lrw.LineSerializer[Person]):
def dumps(self, o: Person) -> str:
return json.dumps({"id": o.id, "name": o.name})
class MyDeserialzer(lrw.LineDeserializer[Person]):
def loads(self, o: str | bytes) -> Person:
data = json.loads(str(o))
return Person(id=data['id'], name=data['name'])Note that serializers should always append a newline to their string.
Batch read lines from a file handle. This is usually done on the main thread.
This example delivers chunks of 100 Person objects at time from sys.stdin for processing.
If an error occurs while reading we log it and keep going by setting
an OnError callback to reader. This gives the caller the oppurtunity to continue processing
if there is any kind of error during deserialization (validation, bad I/O etc.).
If we want to crash immediately and handle it in the caller then we do need to pass an error handler.
import sys
import lrw
d = MyDeserialzer()
def error_handler(exc: Exception) -> None:
# log a boo and keep going in this case
print("boo boo", file=sys.stderr)
reader = lrw.LineReader(sys.stdin, d, error_handler)The read method will read N lines at a time into the application.
for chunk in reader.read(100):
...If instead you want to deliver one line at a time in your app the stream method will do.
for person in reader.stream():
...Writer is buffered in the application context and can be controlled by setting max_size.
writer.write(o: T) will call the object's serialization logic defined in the Serializer[T] and
assure that there is a newline appended to the result. When write is called repeatedly it will
automatically check and clear the buffer (_maybe_flush).
However, a call to writer.flush() should always be assured in the calling context at the end of the data to be printed to assure any remaining items are flushed before the process exits if needed.
s = MySerializer()
writer = lrw.LineWriter(sys.stdout, s, max_size=42)
...
writer.write(person)A simple write thread is provided for convenience for producer/consumer fanin style batch
processing workflows. The underlying queue.Queue can be configured with a max_size and
timeout.
Callers may submit or submit_nowait.
write_thread = lrw.LineWriterThread(writer)
... # somewhere else in the program maybe another thread etc.
for p in person_list:
write_thread.submit(person) # <--- will not block unless queue hits max_size
... # when we're done cleanup
write_thread.join()WriterThread does not explicitly use locks and assumes that only one exists per application.
If configured appropriately it will not block the main thread in both sync and async contexts.