Streams
Record Stream
A RecordStream is a Stream<Record> - a stream of Record instances. All operations defined in the Java 8 Stream interface are supported for RecordStream. Obtained using the DatasetReader.records() method, a RecordStream is the primary means of performing a query against a TCStore Dataset.
As with a java.util.stream.Stream, a RecordStream may be used only once. Unlike a Java Stream, a RecordStream closes itself when the stream is fully consumed through a terminal operation other than iterator or spliterator. (Even so, it is good practice to close a RecordStream using a try-with-resources block or RecordStream.close.) There are no provisions for concatenating two RecordStream instances while retaining RecordStream capabilities.
Most RecordStream intermediate operations return a RecordStream. However, operations which perform a transformation on a stream element may return a Stream<Record> which is not a RecordStream. For example, map(identity()) returns a Stream<Record> which is not a RecordStream.
Note: In a clustered configuration, a stream pipeline formed against a RecordStream, in addition to being composed of intermediate and terminal operations (as described in the Java 8 package java.util.stream), is comprised of a server-side and a client-side pipeline segment. Every RecordStream originates in the server. As each operation is added during pipeline construction, an evaluation is made if the operation and its arguments can be run in the server (extending the server-side pipeline) - many pipeline operations can be run in the server. The first operation which cannot be run in the server begins the client-side pipeline. A stream pipeline may have both server-side and client-side pipeline segments, only a server-side segment, or only a client-side segment (other than the stream source). Each Record or element passing through the stream pipeline is processed first by the server-side pipeline segment (if any) and is then passed to the client-side pipeline segment (if the client-side pipeline segment exists) to complete processing.
The following code creates a RecordStream and performs few operations on the records of the stream:
long numMaleEmployees = employeeReader.records() // <1>
.filter(GENDER.value().is('M')) // <2>
.count(); // <3>
1 | The DatasetReader.record() method returns a RecordStream delivering Record instances from the Dataset referred to by the DatasetReader. |
2 | Stream intermediate operations on a RecordStream return a stream whose type is determined by the operation and its parameters. In this example, filter provides a RecordStream. |
3 | A Stream terminal operation on RecordStream produces a value or a side-effect. In this case, count returns the number of Record instances passing the filter above. |
Additional operations supported On RecordStream
Optional<Record<Integer>> record = employeeReader.records()
.explain(System.out::println) // <1>
.batch(2) // <2>
.peek(RecordStream.log("{} from {}", NAME.valueOr(""),
COUNTRY.valueOr(""))) // <3>
.filter(COUNTRY.value().is("USA"))
.findAny();
long count = employeeReader.records()
.inline() // <4>
.count();
1 | The RecordStream.explain operation observes the stream pipeline and provides the pre-execution analysis information for this stream pipeline. It takes, as a parameter, a Consumer which is passed an explanation of the stream execution plan. RecordStream.explain returns a RecordStream for further pipeline construction. For explain to be effective, the pipeline must be terminated - the plan is not determined until the pipeline begins execution. The explain Consumer is called once the pipeline is closed. For a RecordStream against a clustered TCStore configuration, the explanation identifies the operations in each of the server-side and client-side pipeline segments. |
2 | In a clustered configuration, a RecordStream batches elements transferred from the server to the client, when possible, to reduce latencies involved in network transfer. The number of records or elements returned to the client at one time can be influenced using the RecordStream.batch operation. The batch operation takes a batch size as parameter and uses it as a hint for the batch size to use when transferring elements. RecordStream.batch returns a RecordStream for further pipeline construction. |
3 | The RecordStream.log method produces a Consumer for use in Stream.peek to log a message according to the specified format and arguments. The first argument provides a message format like that used in the SLF4J MessageFormatter.arrayFormat method. Each subsequent argument supplies a value to be substituted into the message text and is a mapping function that maps the stream element to the value to be substituted. The formatted message is logged using the logging implementation discovered by SLF4J (the logging abstraction used in TCStore). If the peek(log(…)) operation is in the server-side pipeline segment, the formatted message is logged on the TCStore server. If the peek(log(…)) operation is in the client-side segment, the formatted message is logged in the client. |
4 | The RecordStream.inline operation disables the element batching discussed above. When inline is used, each stream element is processed by both the server-side and client-side pipeline segments before the next element is processed. RecordStream.inline returns a RecordStream for further pipeline construction. |
Mutable Record Stream
Obtained from the DatasetWriterReader.records() method, a MutableRecordStream extends RecordStream to provide operations through which Record instances in the RecordStream may be changed. No more than one of the mutating operations may be used in a pipeline. The changes in a Record from a MutableRecordStream mutation operation affect only the Dataset from which MutableRecordStream was obtained (and to which the Record belongs).
The following are the operations added in MutableRecordStream:
mutateThen
The MutableRecordStream.mutateThen operation is an intermediate operation that accepts an UpdateOperation instance describing a mutation to perform on every Record passing through the mutateThen operation. The output of mutateThen is a Stream<Tuple<Record, Record>> where the Tuple holds the before (Tuple.first()) and after (Tuple.second()) versions of the Record.
double sum = employeeWriterReader.records() // 1
.mutateThen(UpdateOperation.write(SALARY).doubleResultOf(
SALARY.doubleValueOrFail().increment())) // 2
.map(Tuple::getSecond) // 3
.mapToDouble(SALARY.doubleValueOrFail())
.sum();
1 | The DatasetWriterReader.record() method, not DatasetReader.record(), returns a MutableRecordStream which is a Stream of Records of the Dataset referred by the DatasetWriterReader. |
2 | MutableRecordStream.mutateThen() is an intermediate operation and takes in UpdateOperation as parameter and performs the update transformation against the Record instances in the stream. |
3 | MutableRecordStream.mutateThen() returns a Stream of new Tuple instances holding before and after Record instances. Note that it does not return a RecordStream or a MutableRecordStream. |
deleteThen
The MutableRecordStream.deleteThen operation is an intermediate operation that deletes all Record instances passing through the deleteThen operation. The output of deleteThen is a Stream<Record> where each element is a deleted Record. (Note the output is neither a RecordStream nor a MutableRecordStream.)
employeeWriterReader.records()
.filter(BIRTH_YEAR.value().isGreaterThan(1985))
.deleteThen() // <1>
.map(NAME.valueOrFail()) // <2>
.forEach(name -> System.out.println("Deleted record of " + name));
1 | MutableRecordStream.deleteThen() is an intermediate operation and deletes every Record in the stream. |
2 | MutableRecordStream.deleteThen() returns a Stream of the deleted Record instances. Note that it does not return a RecordStream or a MutableRecordStream. |
mutate
The MutableRecordStream.mutate operation is a terminal operation that accepts an UpdateOperation instance describing a mutation to perform on every Record reaching the mutate operation. The return type of the mutate operation is void.
employeeWriterReader.records()
.filter(GENDER.value().is('M'))
.mutate(UpdateOperation.write(SALARY)
.doubleResultOf(SALARY.doubleValueOrFail().decrement())); <1>
1 | MutableRecordStream.mutate() takes in UpdateOperation as parameter and performs the update transformation against theRecord instances in the stream. This is a terminal operation and returns nothing. |
delete
The MutableRecordStream.delete operation is a terminal operation deletes every Record reaching the delete operation. The return type of the delete operation is void.
employeeWriterReader.records()
.filter(BIRTH_YEAR.value().isLessThan(1945))
.delete(); // <1>
1 | MutableRecordStream.delete() deletes every Record in the stream. This is a terminal operation and returns nothing. |
Stream pipeline execution and concurrent record mutations
During stream pipeline execution on a Dataset, concurrent mutation of records on it are allowed. Pipeline execution does not iterate over a point in time snapshot of a Dataset - changes by concurrent mutations on a Dataset may or may not be visible to a pipeline execution depending on the position of its underlying Record iterator.
Stream pipeline portability
In a clustered configuration, the
Record instances accessed through a
RecordStream are sourced from one or more Terracotta servers. For large datasets, this can involve an enormous amount of data transfer. To reduce the amount of data to transfer, there are
RecordStream capabilities and optimization strategies that can be applied to significantly reduce the amount of data transferred. One of these capabilities is enabled through the use of
portable pipeline operations. This capability and others are described in the section
Streams.