Stream Optimizations
When performing queries or mutations using a stream pipeline on a
RecordStream or
MutableRecordStream (referred to collectively in this documentation as
RecordStream) there are several ways a user can influence the performance of the pipeline. The primary methods, using pipeline portability and cell indexes, are described in the sections
Pipeline Portability and
Index Use. There is also a tool, the
stream plan, that provides visibility on the effectiveness of the performance methods; this is described in the followng section
Stream Plan.
Stream Plan
There is a tool available to help a user understand how a pipeline based on a RecordStream will be executed - the stream plan. It is observed using the object presented to the RecordStream.explain(Consumer<Object>) pipeline operation. This object represents the system's understanding of the pipeline and includes information about how the pipeline will be executed by TCStore.
The plan object is not a programmatic API. The object is intended to be converted to String using the toString() method and reviewed by a human. The content and format of the String are subject to change without notice.
Looking at the plan, a user can determine:
1. what portions of the pipeline are portable and may be executed on the server;
2. what portions of the pipeline are non-portable and must be executed on the client;
3. what index, if any, is used for data retrieval.
Sample plans are included in the discussions below.
In a striped TCStore configuration, multiple plans are included in the output - one (1) for each stripe in the configuration. Each server in a stripe will calculate a stream plan based on state extant in that server so plans may differ from stripe to stripe.
The stream plan for a pipeline is provided to the explain Consumer only after the pipeline completes execution and the stream is closed. (This is, in part, due to the fact that the stream plan is not computed until the pipeline begins execution - that is, once the terminal operation is appended to the pipeline.)
Pipeline Portability
As discussed in the section
Record Stream,
RecordStream pipelines in a TCStore clustered configuration are split into server-side and client-side segments. The best performing TCStore stream pipelines are those which limit the amount of data transferred between the server and the client. In general, the more processing that can be performed in the server - close to the data - the better.
For an operation to be run in the server, the operation and its arguments must be
portable. A portable operation is one for which the operation, its context and its arguments are understood through introspection. This introspection is enabled by the use of the TCStore Functional DSL (see the section
Functional DSL). Most, but not all, function instances produced from the DSL are portable.
Operations using lambda expressions ("arrow" operator) or method reference expressions (double colon separator) are not portable and must be executed in the client.
Every
RecordStream pipeline begins as a portable pipeline - the stream's data source is the server. As each operation is added to the pipeline, that operation and its arguments are evaluated for portability - in general, if the arguments (if any) provided to the operation are produced using the TCStore DSL, the operation will be portable. (Exceptions are noted in
DSL Support for Portable Operations below.) The portable, server-side pipeline segment is extended with each portable operation appended to the pipeline. The non-portable, client-side pipeline segment begins with the first non-portable operation and continues through to the pipeline's terminal operation.
Even if an otherwise portable operation is appended to the pipeline after a non-portable operation, that otherwise portable operation is executed on the client - the stream elements are already being transferred from the server to the client.
To determine how much of a pipeline is portable, use the RecordStream.explain(Consumer<Object>) operation. This makes a stream plan available which may be used to determine what portions of a pipeline are portable. Stream plans are introduced in Stream Plan section above.
The explain operation does not affect pipeline portability - explain is a meta-operation and sets an observer for the stream plan but does not actually add an operation to the pipeline.
The peek operation can affect pipeline portability. If the Consumer provided to the peek operation is non-portable, the pipeline segment beginning with that peek operation will be rendered non-portable and forced to run on the client. A warning is logged if a non-portable peek is appended to a pipeline that, to that point, is portable. The RecordStream.log method can be used to produce a portable Consumer for peek.
Examples
In the examples that follow, the following definitions are presumed:
import static java.util.stream.Collectors.toList;
public static final StringCellDefinition TAXONOMIC_CLASS =
defineString("class");
RecordStream recordStream = dataset.records();
Non-Portable Operations
This example shows a pipeline using an operation with a non-portable argument - a lambda expression - making the operation non-portable. In this example, all records in the dataset are shipped to the client for processing by the filter operation.
Non-Portable Pipeline:
List<String> result = recordStream
.explain(System.out::println)
.filter(r -> r.get(TAXONOMIC_CLASS).orElse("").equals("mammal")) // 1
.map(TAXONOMIC_CLASS.valueOrFail()) // 2
.collect(toList());
1 | Using Java lambda expressions (expressions using the "arrow" operator) always produce non-portable operations. |
2 | The map operation in this example could be portable but is not because it follows a non-portable operation - once a non-portable operation is used and pipeline execution shifts to the client, subsequent operations are made non-portable. |
Stream Plan - No Portable Operations:
Stream Plan
Structure:
Portable:
None // 1
Non-Portable:
PipelineOperation{FILTER(com.terracottatech.store.server.
RemoteStreamTest$$Lambda$504/1753714541@51bf5add)} // 2
PipelineOperation{MAP(class.valueOrFail())}
PipelineOperation{COLLECT_1(
java.util.stream.Collectors$CollectorImpl@7905a0b8)}
Server Plan: 0970e486-484c-4e04-bb8e-5fe477d47c0d
Stream Planning Time (Nanoseconds): 2611339
Sorted Index Used In Filter: false
Filter Expression: true
Unknown Filter Count: 0
Unused Filter Count And Filters (If Any): 0
Selected Plan: Full Dataset Scan //3
1 | No portable operations are identified. |
2 | Several non-portable operations are identified. These operations are all executed in the client. |
3 | Pipelines having no portable operations require a full dataset scan for data retrieval. |
Portable Operations
This example shows a pipeline expressing the same sequence of operations as the previous example but using portable operation arguments making the majority of the pipeline portable. Unlike the previous example, both filtering and mapping are performed on the server limiting what is transferred to the client to that data that actually needs to be collected.
Portable Pipeline:
List<String> result = recordStream
.explain(System.out::println)
.filter(TAXONOMIC_CLASS.value().is("mammal")) // 1
.map(TAXONOMIC_CLASS.valueOrFail()) // 2
.collect(toList());
1 | This filter operation expresses the same selection criterion as the first example but does so using a portable DSL expression. |
2 | Unlike the first example, the map operation in this pipeline is portable - all preceding operations in the pipeline are portable so the map operation can be portable. |
Stream Plan - Portable Operations:
Stream Plan
Structure:
Portable:
PipelineOperation{FILTER((class==mammal))} // 1
PipelineOperation{MAP(class.valueOrFail())}
Non-Portable:
PipelineOperation{COLLECT_1(
java.util.stream.Collectors$CollectorImpl@1e13529a)} // 2
Server Plan: ecc2db4d-1da7-4822-ad8a-b2f469fce4d5
Stream Planning Time (Nanoseconds): 99065863
Sorted Index Used In Filter: false
Filter Expression: (class==mammal)
Unknown Filter Count: 0
Unused Filter Count And Filters (If Any): 0
Selected Plan: Full Dataset Scan // 3
1 | Two (2) portable operations are identified. |
2 | One (1) non-portable operation is identified. This operation, the toList collector, must be run in the client. |
3 | Pipelines using portable operations may use an index-based data retrieval if an index is available. In this example, no index for the class ( TAXONOMIC_CLASS) cell was defined. See the section Index Use below. |
DSL Support for Portable Operations
As discussed in the section
Functional DSL, the DSL methods permit expression of pipeline operation arguments in a manner which can be portable between client and server. However, as a growth point in TCStore, the DSL methods may produce non-portable expressions as well.
A method in the DSL produces an instance of one of the interfaces found in java.util.function - Predicate, Function, Consumer, BiFunction, ToDoubleFunction, ToIntFunction, ToLongFunction, etc. - or found in java.util.stream like Collector. For the instance to be portable, the instance must be from a TCStore implementation that is designed and implemented to be portable. There are currently no provisions for a user to extend the collection of portable operations by implementing their own portable DSL extensions.
The following is a list of the DSL methods that produce non-portable expressions:
UpdateOperation.custom The
UpdateOperation.custom method is intended to provide a means of performing updates too complex to be expressed using the other
UpdateOperation methods -
custom is not intended to be used for portable operations so it will not produce a portable function instance.
Collectors Methods The following
com.terracottatech.store.function.Collectors methods return non-portable
Collector implementations:
averagingDouble | groupingBy | partitioningBy |
averagingInt | groupingByConcurrent | summingDouble |
averagingLong | mapping | summingInt |
composite | maxBy | summingLong |
counting | minBy | varianceOf |
filtering | | |
A
collect operation, even when using a portable
Collector, will partially execute in the client to perform result aggregation over the stripes in a multi-stripe configuration. A
collect operation involving a
Collector that
does not perform a data reduction or aggregation operation will always involve data transfer to and execution in the client.
Comparator Methods The
asComparator method from the value accessors (
value(),
doubleValueOrFail(), etc.) on each of the
CellDefinition subtypes and from
Record.keyFunction() produce
Comparator implementations that do not provide a portable implementation of the
thenComparing,
thenComparingDouble,
thenComparingInt, or
thenComparingLong methods.
Function.andThen / Consumer.andThen Several of the DSL functions produce a specialized type of the
Function or
Consumer interfaces. Most of these specialized types do not implement the
andThen method - the
andThen method does not produce a portable instance. For example,
definition.value().andThen(Function) where
definition is a
CellDefinition (or one of its subtypes) produces a non-portable instance even if the argument to
andThen is portable.
Function.compose Several of the DSL functions produce a specialized type of the
Function interface. Most of these specialized types do not implement the
compose method - the
compose method does not produce a portable function instance. For example,
definition.value().compose(Function) where
definition is a
CellDefinition (or one of its subtypes) produces a non-portable instance even if the argument to compose is portable.
multiply / divide The type-specific value accessors on the numeric
CellDefinition subtypes, for example
DoubleCellDefinition.doubleValueOrFail(), each provide
multiply and
divide methods that produce a non-portable function instance.
length / startsWith The value accessors of
StringCellDefinition -
value() and
valueOrFail() - provide
length and
startsWith methods that produce a non-portable function instance.
The number of DSL methods and the number of methods producing portable expressions will be extended over time.
Index Use
In combination with pipeline portability, Predicates used in RecordStream.filter operations used in the portable, server-side segment of the pipeline are analyzed for expressions referring to CellDefinitions on which an index is defined. Analysis by the server chooses one index through which the dataset is accessed to provide the Record instances for the stream. Because a TCStore index tracks only Record instances having the indexed Cell, Record instances without a value for the indexed Cell are not presented to the stream when an index is used.
Because an index provides only Record instances having the indexed cell, the Predicate analysis looks for uses of the CellDefinition.value() method. The other forms of value reference (valueOr, valueOrFail, longValueOr, longValueOrFail, etc.) are not supported in determining index use. So, while TAXONOMIC_CLASS.value() is considered for index use, TAXONOMIC_CLASS.valueOrFail() is not.
The analysis also includes a determination of whether or not a range query can be performed. The use of range comparisons (value().isGreaterThan(), value().isLessThanOrEqualTo()) permits selection of a subset of the indexed Record instances using the index.
Example
For example, using the portable example from the section Portable Operations above, if an index is defined over the TAXONOMIC_CLASS CellDefinition, an index will be used when supplying Record instances to the pipeline.
Portable Pipeline:
List<String> result = recordStream
.explain(System.out::println)
.filter(TAXONOMIC_CLASS.value().is("mammal")) // 1
.map(TAXONOMIC_CLASS.valueOrFail())
.collect(toList());
1 | TAXONOMIC_CLASS is a reference to a StringCellDefinition over which an index is defined. |
Stream Plan - Portable Operations & Using an Index
Stream Plan
Structure:
Portable:
PipelineOperation{FILTER((class==mammal))} // 1
PipelineOperation{MAP(class.valueOrFail())}
Non-Portable:
PipelineOperation{COLLECT_1(
java.util.stream.Collectors$CollectorImpl@1b410b60)}
Server Plan: a9c4a05c-7303-440c-90b5-d56bf518b66f
Stream Planning Time (Nanoseconds): 138369229
Sorted Index Used In Filter: true
Filter Expression: (class==mammal)
Unknown Filter Count: 0
Unused Filter Count And Filters (If Any): 0
Selected Plan: Sorted Index Scan // 2
Cell Definition Name: class
Cell Definition Type: String
Index Ranges: (Number of Ranges = 1)
Index Range 1: Range = mammal ::: Operation = EQ
1 | As with the previous example, the same two (2) operations are portable. The filter Predicate refers to a CellDefinition over which an index is defined. |
2 | A "Sorted Index Plan" was chosen. The attributes of the access (CellDefinition information and the type of index query) are described. |