API Usages
For convenience, the software kit contains several demo programs that elaborate on how to use the TCStore API.
The following sections discuss aspects of the TCStore API using code of these demo programs.
Please mind:
The TCStore API in general and in particular the following code snippets are based on Java 8 concepts and constructs. Strong familiarity with Java 8 is required to fully understand these examples.
To make good use of the TCStore API, you will also need knowledge of Java 8 Lambda Expressions, Java 8 Streams and Collectors and topics such as patterns, parallelization and performance.
Lifecycle
In order to use a Dataset you need to create one or use one that you had created previously. One client node in the cluster is responsible for creating the Dataset.
Clustered Dataset
A Terracotta Server can be used to host a Dataset that can be shared among multiple clients. To do this, you must create a clustered DatasetManager instance identifying the server to host the dataset.
DatasetManager datasetManager =
DatasetManager.clustered(connectionURI).build(); // <1>
DatasetConfiguration configuration =
datasetManager.datasetConfiguration()
.offheap(offHeapResource).build(); // <2>
Dataset<String> counterSet = datasetManager.createDataset(
"counters", Type.STRING, configuration)) // <3>
1. First, a DatasetManager instance is built. For a clustered dataset, the URI identifying the Terracotta Server must be specified.
2. The dataset configuration for a clustered Dataset must identify the name of an off-heap storage resource to be used on the server. The name specified here must match a name provided in a service/offheap-resources/resource element in the server’s XML configuration.
3. The Dataset is created by using the DatasetManager.createDataset(String, Type<K>, DatasetConfiguration) method.
Note:
Creation of DatasetManager and Dataset instances is often done using a Java 7 try-with-resources statement. Both DatasetManager and Dataset extend java.lang.AutoCloseable. Each should be closed when no longer needed to permit resources to be reclaimed.
For purposes of clarity, this detail is omitted from the samples in this document.
Data Access and Data Model
Once you’ve obtained a reference to a Dataset, you can read, add, remove and mutate data that it holds. That data is held in the form of Record instances. The following section demonstrates the basic create/replace/update/delete (CRUD) operations on a dataset.
Basic CRUD Operations
DatasetWriterReader<String> counterAccess =
counterSet.writerReader(); // <1>
String someCounterKey = "someCounter";
boolean added = counterAccess.add( // <2>
someCounterKey, counterCell.newCell(0L),
stoppedCell.newCell(false));
if (added) {
System.out.println("No record with the key: " + someCounterKey
+ " existed. The new one was added");
}
Optional<Record<String>> someCounterRec =
counterAccess.get(someCounterKey); // <3>
Long longCounterVal = someCounterRec.flatMap(r ->
r.get(counterCell)).orElse(0L); // <4>
System.out.println("someCounter is now: " + longCounterVal);
counterAccess.update(someCounterKey, write(counterCell, 10L)); // <5>
someCounterRec = counterAccess.get(someCounterKey);
System.out.println("someCounter is now: "
+ someCounterRec.flatMap(r -> r.get(counterCell)).orElse(0L));
Optional<Record<String>> deletedRecord =
counterAccess.on(someCounterKey).delete(); // <6>
System.out.println("Deleted record with key: "
+ deletedRecord.map(Record::getKey).orElse("<none>"));
1. Define a Dataset access object, in this case a DatasetWriterReader instance, over the dataset.
2. DatasetWriterReader.add lets you add a new Record for a given key in the Dataset but only if no Record already exists for the key provided. Should a Record already exist, false is returned and no changes are made to the Dataset.
3. DatasetWriterReader.get lets you retrieve a Record, wrapped in an Optional, from the Dataset using the key that was used to add the record in the dataset. If a record with the specified key does not exist in the dataset then Optional.empty() is returned.
4. Since an Optional is returned from get, Optional.flatMap may be used to extract information from the Record. In this case, the value of the counterCell is extracted. If the record does not contain the counterCell, zero is returned by the Optional.orElse method
5. Mutates the Record. In this example, the counterCell value is updated. UpdateOperation.write method is a helper method provided to update individual cells in a record. One thing to note here is that this form of update method does not return anything.
6. Deletes the Record. If a record with the given key is available in the dataset, it is removed and returned, wrapped in an Optional. If there is no record with the given key, then an empty Optional is returned.
Complex Mutative Operations
The example that follows shows a complex mutative operation and a conditional delete operation. This example operates on a dataset containing ten records having keys counter0 through counter9 each of which has a counterCell and a stoppedCell. Another cell, stoppedByCell, is defined but is not present in any of the records.
Single Record Update/Delete
String advancedCounterKey = "counter9";
Optional<String> token = counterAccess
.on(advancedCounterKey) // <1>
.update(UpdateOperation.custom( // <2>
record -> { // <3>
if (!record.get(stoppedCell).orElse(false) // <4>
&& record.get(counterCell).orElse(0L) > 5) {
CellSet newCells = new CellSet(record); // <5>
newCells.set(stoppedCell.newCell(true));
newCells.set(stoppedByCell.newCell("Albin"));
newCells.remove(counterCell);
return newCells;
} else {
return record; // <6>
}
}))
.map(Tuple::second) // <7>
.map(r -> r.get(stoppedCell).orElse(false)
? r.get(stoppedByCell).orElse("<unknown>") : "<not_stopped>");
deletedRecord = counterAccess.on("counter0")
.iff(stoppedCell.isFalse()).delete(); // <8>
1. Selecting the counter9 record ...
2. Update the record ...
3. Using a custom, i.e. non-DSL lambda. DSL is discussed in the following section "Query and Compute Capabilities".
4. Gates the update so the record is only mutated if stoppedCell is false and counterCell is greater than five.
5. The custom update creates a CellSet copied from the existing record and then modifies it by setting the stoppedCell and stoppedByCell, and removing the counterCell. Note that the stoppedByCell gets added to the record by this update.
6. If the record does not meet the selection criterion (stoppedCell == false && counterCell > 5), the original record is returned.
7. Maps the output of the update operation (a Tuple containing the old and new records) to select only the new record (the second of the tuple) then maps the new record to obtain the "stopped by" value if the record is actually flagged as stopped.
8. Delete the record with the key counter0 if, and only if, stoppedCell is false.
Query and Compute Capabilities
The following example shows a simple computation over the Record instances in a Dataset: an average of the counterCell values. This example uses a Java stream on the Dataset and a pipeline using Java lambda expressions.
Simple Stream<Record<K>> Using Lambdas
OptionalDouble avg;
try (final Stream<Record<String>> recordStream =
counterAccess.records()) { // <1>
avg = recordStream
.filter(record -> !record.get(stoppedCell).orElse(false)) // <2>
.mapToLong(record -> record.get(counterCell).orElse(0L)) // <3>
.average(); // <4>
}
1. Retrieves a Stream<Record<K>> to operate on. Note the use of the try-with-resources statement. Streams obtained from a Dataset should be closed when no longer needed.
2. Filters the Stream for all Record<K> having the stoppedCell value as false, which are the counters that are not stopped.
3. Maps the Stream<Record<K>> to a LongStream of the values of the counterCell.
4. Calculates the average of all of these values using the Java 8 LongStream.average method.
A key aspect of using a Java stream is that no elements (in this case, Record instances) get processed until a terminal operation is invoked on the Stream, in this example, the .average() operation. As part of the terminal operation processing, TCStore tries to resolve the best possible way to execute the query.
While the example above is completely functional using TCStore, it isn’t optimal as the example Java lambda expressions are not introspectable.
In a distributed environment, TCStore must move data (at least the Cell<Boolean> for all stoppedCell instances to filter on and then all matching Cell<Long> for counterCell instances) over the network to the client node to evaluate each and every lambda in the pipeline.
Note:
As with other Java Stream instances, a Stream instance obtained from a Dataset can be consumed exactly one time.
In the next example below, we re-express the query from the previous example "Simple Stream<Record<K>> Using Lambdas", this time using TCStore's fluent Domain Specific Language (DSL) for querying and computing. Using the DSL, TCStore is capable of understanding the actual query and/or computation being requested and optimizing it for an execution in a distributed environment. (For information on how to make use of existing cell indexes, see the following section "Cell Indexes"):
Simple Stream<Record<K>> using TCStore API DSL
try (final Stream<Record<String>> recordStream =
counterAccess.records()) { // <1>
avg = recordStream
.filter(stoppedCell.isFalse()) // <2>
.mapToLong(counterCell.longValueOr(0L)) // <3>
.average(); // <4>
}
1. As above, retrieving a Stream<Record<K>>;
2. Filters on the stoppedCell being false;
3. Retrieves the values of counterCell as a long;
4. And finally, as above, averages them.
The DSL makes the actual query more readable to everyone and the example above is more self-describing than the initial implementation above it.
If we refactor the "Single Record Update/Delete" example from above using DSL to avoid moving the data to the client node, then this is what it would look like:
DSL-based counter update
// <1>
import static com.terracottatech.store.UpdateOperation.allOf;
import static com.terracottatech.store.UpdateOperation.remove;
import static com.terracottatech.store.UpdateOperation.write;
...
String dslCounterKey = "counter8";
token = counterAccess
.on(dslCounterKey)
.iff(stoppedCell.isFalse()
.and(counterCell.valueOr(0L).isGreaterThan(5L))) // <2>
.update(
allOf(write(stoppedCell, true), // <3>
write(stoppedByCell, "Albin"),
remove(counterCell)))
.map(Tuple::second)
.map(r -> r.get(stoppedCell).orElse(false)
? r.get(stoppedByCell).orElse("<unknown>") :
"<not stopped>"); // <4>
1. The use of static imports for the DSL helper methods is recommended to make the DSL-based code more readable.
2. The iff operation (if-and-only-if) is used to enable the following update operation only if the specified condition is true. In this case, stoppedCell is false and counterCell is greater than five.
3. This update operation specifies a collection of mutations to perform: (1) adds or updates the stoppedCell value to true, (2) adds or updates the stoppedByCell to Albin, and (3) removes counterCell. As with previous example, the Record is updated by all of these mutations in one atomic operation.
4. Maps the output of the update operation (a Tuple containing the old and new records) to select only the new record (the second of the tuple) then maps the new record to obtain the stoppedByCell value if the record is actually flagged as stopped.
The delete shown in the example "Single Record Update/Delete" is already expressed in proper DSL form and is not repeated in the example above.
Similar to the example "DSL-based Counter Update" above, a bulk operation is performed using a Java Stream over the records in the dataset through the com.terracottatech.store.DatasetReader.records method. With this pattern, bulk update is performed using TCStore:
Bulk Update
import static com.terracottatech.store.UpdateOperation.allOf;
import static com.terracottatech.store.UpdateOperation.remove;
import static com.terracottatech.store.UpdateOperation.write;
...
try (final Stream<Record<String>> recordStream =
counterAccess.records()) { // <1>
recordStream
.filter(stoppedCell.isFalse()) // <2>
.forEach(counterAccess.functions() // <3>
.update(
allOf(write(stoppedCell, true), // <4>
write(stoppedByCell, "Albin"),
remove(counterCell))));
}
1. Retrieve a Stream<Record<String>> through which updates will be performed. As with the previous Stream-based examples, note the use of the try-with-resources statement to enforce closure of the stream when operations are complete.
2. Filters the Record instances in the stream dropping those for which stoppedCell is true. Only the non-stopped records get past.
3. forEach is the terminal operation for the stream. forEach takes a java.util.function.Consumer.
In this example, a special consumer is used: one obtained from the com.terracottatech.store.DatasetWriterReader.functions method.
A consumer formed from DatasetWriterReader.functions can be used to update or delete records in the dataset from which the consumer was obtained.
4. As with some previous "Update" examples, an update operation with two write mutations and one remove mutation is specified. The update is applied to each selected record atomically - each record in its own atomic update.
Cell Indexes
To improve the performance of operations using streams obtained from DatasetReader.records(), cell indexes may be created for a Dataset. A cell index is defined against a CellDefinition instance. Entries are created in the index for all distinct Cell values for cells in the Dataset typed by the indexed CellDefinition. Each index entry associates that Cell value with the keys of all the Record instances containing that Cell value. When a stream pipeline refers to an indexed CellDefinition, particularly in a filter operation Predicate expressed using the TCStore DSL, iteration over the records in the dataset may be driven using the associated cell index.
A cell index may only be defined on a CellDefinition if its data type is BOOL, CHAR, DOUBLE, INT, LONG, or STRING.
An index may NOT be defined for a CellDefinition of type BYTES.
There are two ways to define an index:
(1) through the DatasetConfigurationBuilder.withIndex method during dataset creation or
(2) through the Indexing instance obtained from the dataset.
Defining an Index During Dataset Creation
StringCellDefinition LAST_NAME = CellDefinition.defineString("lastName");
DoubleCellDefinition NICENESS = CellDefinition.defineDouble("niceness");
...
DatasetManager datasetManager =
DatasetManager.clustered(connectionURI).build();
DatasetConfiguration configuration =
datasetManager.datasetConfiguration() // <1>
.offheap("offheap")
.index(Person.LAST_NAME, IndexSettings.btree()) // <2>
.index(Person.NICENESS, IndexSettings.btree()) // <3>
.build();
Dataset<String> persons =
datasetManager.createDataset(
"people", Type.STRING, configuration) // <4>
1. Create a DatasetConfiguration ...
2. ... specifying indexes for the LAST_NAME ...
3. and NICENESS cell definitions.
4. Create a Dataset using the configuration with the indexes.
Adding an Index to a Dataset After Creation
Indexing indexing = counterSet.getIndexing(); // <1>
Operation<Index<Boolean>> indexOp =
indexing.createIndex(stoppedCell, IndexSettings.btree()); // <2>
try {
indexOp.get(); // <3>
} catch (InterruptedException | ExecutionException e) {
throw new AssertionError(e);
}
1. Get the Indexing instance for the Dataset. The Indexing instance for a Dataset may be used to add or delete a dataset’s indexes.
2. Define an index specifying a CellDefinition identifying the cells whose values make up the index keys.
3. Runtime index creation is asynchronous; call the get method to await completion of the indexing operation.