Reactive Input Output objects for Java:
- Fine tuned: fast or memory effecient
- RS-TCK compatible (see tests for publishers and subscribers)
- Just one dependency: the only compile dependency is JCTools library with concurent queues
Add Maven dependency to pom.xml:
<dependency>
<groupId>org.cqfn</groupId>
<artifactId>rio</artifactId>
<version><!-- see latest release --></version>
</dependency>Or use snapshot from central.artipie.com:
<repositories>
<repository>
<name>Artipie central</name>
<id>central.artipie.com</id>
<url>https://central.artipie.com/cqfn/maven</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<dependency>
<groupId>org.cqfn</groupId>
<artifactId>rio</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>The primary protocol for all objects is Publisher<ByteBuffer>, it's either accepted or provided
by various methods in main entry points.
To create new reactive file instance use constructor:
var file = new File(Paths.get("/tmp/my/file.txt"));To read the file use Publisher<ByteBuffer> content() method of File object:
Publisher<ByteBuffer> content = new File(Paths.get("/tmp/my/file.txt")).content();The file will be read on demand with all respect to backpressure. This default implementation uses 8 kilobytes buffer to read the file.
To tune the buffer allocation strategy use overloaded Publisher<ByteBuffer> read(Buffers buffers) method,
Buffers interface is responsible to provide new buffers for reading. Some standard implementations are
available at Buffers.Standard enum.
All read operations are performed in ThreadPoolExecutor by default, to specify executor service explicitely use
content(ExecuorService) or content(Buffers, ExecutorService) overloaded methods.
To write reactive stream of ByteBuffers to file use CompletionStage<Void> write(Publisher<ByteBuffer> data)
method:
CompletionStage<Void> result = new File(Paths.get("/tmp/my/file.txt")).write(data);It returns CompletionStage to signal errors or complete events. Also, it supports cancellation via
cancel() method of CompletableFuture:
// will stop writing after subscribe
CompletionStage<Void> result = new File(Paths.get("/tmp/my/file.txt")).write(data);
Thread.sleep(1000);
result.toCompletableFuture().cancel(); // cancel writing after one secondMethod write supports OpenOptions from java.nio.file as second varargs parameter:
file.write(data, StandardOpenOptions.WRITE, StandardOpenOptions.CREATE_NEW), by default it uses
WRITE and CREATE options if not provided any.
To fine tune the speed or memory usage of write, the client is able to configure the WriteGreed level.
It configures the amount of buffers to requests and when. By default it requests 3 buffers at the beginning and
when writing last buffer (one befire the end):
[subscribe] | [write] [write] [write] | [write] [write] [write] |
request(3) | request(3) | request(3)
to consume less memory WriteGreed.SINGLE can be used which requests one by one buffer at the beginning and after
write operation. These values can be configured via write overloaded 2-nd parameter:
file.write(path, new WriteGreed.Constant(10, 2)) // request 10 buffers when read 8or via system properties for default imeplementation:
# request 10 buffers when read 8 (only for default write method)
java -Drio.file.write.greed.amount=10 -Drio.file.write.greed.shift=2All write operations are performed in ThreadPoolExecutor by default, to specify executor service explicitely use
write(Publisher<ByteBuffer> data, ExecuorService exec) overloaded method.