The Batch Accumulator codec connectivity plug-in
Events being written from the correlator to a connectivity transport are automatically batched for performance reasons. Many transports also read batches of incoming messages and send them into the correlator in a batch as well. However, some transports do not perform this batching and deliver messages one at a time. For such transports, performance can be increased by adding the batching before it is parsed by the correlator. This can be done with the Batch Accumulator codec.
For the transports that are provided out of the box, all the message-bus transports such as MQTT and Kafka already provide batching, The HTTP client, however, does not benefit from it because of the request/response nature. If you are using the HTTP server in submission-only mode and want to achieve a high rate of requests with multiple clients, then the Batch Accumulator codec can be useful. It may also be useful with any custom transports you write.
The Batch Accumulator codec automatically tunes batch sizes from one up, depending on the rate of incoming requests, and requires no manual tuning. It does not wait to accumulate a batch for a certain period of time and so does not introduce unnecessary latency. The batching is performed for messages going from the transport to the host. Messages from the host to the transport are passed through verbatim since they are already in batches.
To load the Batch Accumulator codec, an entry such as the following is required in the
connectivityPlugins section of the configuration file (see also
Configuration file for connectivity plug-ins):
batchAccumulatorCodec:
libraryName: connectivity-batch-accumulator-codec
class: BatchAccumulator
You then need to insert the batchAccumulatorCodec in your connectivity chain just before the transport. For example:
dynamicChains:
http:
- apama.eventMap
- mapperCodec:
...
- classifierCodec:
...
- jsonCodec
- stringCodec
- batchAccumulatorCodec
- httpServer:
automaticResponses: true
The Batch Accumulator codec can be inserted anywhere in the chain, but it is better to insert it close to the transport. It is entirely transparent to the plug-ins either side.
By default, the Batch Accumulator codec has a maximum batch size of 1000. This means if more than 1000 messages are waiting to be processed by the host-bound thread, requests from the transport will block. It also means you can be using up to 1000 times your message size in memory in outstanding events. You can configure a different batch size with the maxBatchSize configuration option (see below).
The Batch Accumulator codec exposes the actual size of the queue via a user-defined status value. This is available through the various monitoring tools for the correlator with the name
user-chain.batchAccumulator.queueSize. This will be the most recent batch size removed from the queue. See also
User-defined status reporting from connectivity plug-ins.
The following configuration option is available for the Batch Accumulator codec:
Configuration option | Description |
maxBatchSize | Optional. The maximum number of messages in a batch. This must be a positive integer, at least 1. Default: 1000. |