Apama 10.3.1 | Apama Documentation | Developing Apama Applications | Developing Apama Applications in Event Modeler | Creating Blocks | An example block | Description of the Correlation Calculator block EPL
 
Description of the Correlation Calculator block EPL
After the XML elements that describe the block interface, there is a <code> element. The <code> element contains the EPL. The first section in which you can add custom EPL code is the user-defined monitors section. The Correlation Calculator block defines a few events here.
User-defined monitors and/or events
<code><![CDATA[// Apama generated code - ONLY EDIT INDICATED SECTIONS
// Generated code type: CALLBACK
// Generated code version: 1
// BLOCKBUILDER - USER DEFINED MONITORS
 
event CorrelationCalculator_DataPoint {
  float value1;
  float value2;
  float time;
}
 
event CorrelationCalculator_Incr {
  float x1;
  float y1;
  float x2;
  float y2;
  float xy;
  float N;
}
 
event CorrelationCalculator_InputData {
  float value;
  float time;
}
 
// BLOCKBUILDER - END OF USER DEFINED MONITORS
User-defined variables
After the section for user-defined monitors or events, Apama begins the event type definition that implements the block. The placeholder name of the event type is always #block#. When you inject a scenario that uses a block, the correlator replaces #block# with the actual name of the block plus a unique number that distinguishes the instance of the block from other instances.
The first section after the event declaration is for user-defined variables. Each variable is a field in the event type. The Correlation Calculator block defines a number of variables.
event #block# {
// BLOCKBUILDER - USER DEFINED VARIABLES
  sequence<CorrelationCalculator_DataPoint> dataset;
  boolean running;
  boolean infinite;
  CorrelationCalculator_Incr incr;
 
  integer MAX_INT;
  float MAX_FLOAT;
  float NO_CORRELATION;
 
  CorrelationCalculator_InputData inputdata1;
  CorrelationCalculator_InputData inputdata2;
 
  float period;
  integer size;
 
// BLOCKBUILDER - END OF USER DEFINED VARIABLES
Actions for updating output feeds
Following the user-defined variables are the variables that Apama automatically generates for every block. This includes an integer variable to contain the block instance ID and an action variable for each output feed in the block. For the Correlation Calculator block, these variables are defined as follows:
  integer blockInstanceId$;
  action<float,integer> sendOutput$statistics;
Actions for updating parameters
Next come the actions that update parameters. Apama defines the action and the block writer fills in the code that actually updates the parameter. For the Correlation Calculator block, the following actions update the period and size parameters:
  action update$period(float period) {
// BLOCKBUILDER - USER DEFINED ACTION
    self.period := period;
    updateInfinite();
// BLOCKBUILDER - END OF USER DEFINED ACTION
  }
 
  action update$size(integer size) {
// BLOCKBUILDER - USER DEFINED ACTION
    self.size := size;
    updateInfinite();
// BLOCKBUILDER - END OF USER DEFINED ACTION
  }
Actions for updating input feeds
Next come the actions that update input feeds. Again, Apama defines the action and the block writer fills in the code that actually does the update. For the Correlation Calculator block, the following actions update the data1 and data2 input feeds:
  action input$data1(float value) {
// BLOCKBUILDER - USER DEFINED ACTION
     if not running {
       return;
     }
    self.inputdata1.value := value;
    self.inputdata1.time := currentTime;
    doStats1();
// BLOCKBUILDER - END OF USER DEFINED ACTION
  }
 
  action input$data2(float value) {
// BLOCKBUILDER - USER DEFINED ACTION
    if not running {
      return;
    }
    self.inputdata2.value := value;
    self.inputdata2.time := currentTime;
    doStats2();
// BLOCKBUILDER - END OF USER DEFINED ACTION
  }
Actions for performing operations
The actions that perform operations come next. For the Correlation Calculator block, these actions are defined as follows:
  action operation$start(action<> acknowledge) {
// BLOCKBUILDER - USER DEFINED ACTION
    running := true;
    acknowledge();
// BLOCKBUILDER - END OF USER DEFINED ACTION
  }
 
  action operation$stop(action<> acknowledge) {
// BLOCKBUILDER - USER DEFINED ACTION
    running := false;
    acknowledge();
// BLOCKBUILDER - END OF USER DEFINED ACTION
  }
 
  action operation$clear(action<> acknowledge) {
// BLOCKBUILDER - USER DEFINED ACTION
    inputdata1.value := MAX_FLOAT;
    inputdata2.value := MAX_FLOAT;
    dataset.setSize(0);
    incr := new CorrelationCalculator_Incr;
    acknowledge();
// BLOCKBUILDER - END OF USER DEFINED ACTION
  }
Standard setup and cleanup actions
After defining the actions that implement the interface to the block, Apama defines the standard setup and cleanup actions that it defines in every block. These look like the following for the Correlation Calculator block. Notice that the instancePreSpawnInit() action has no user-defined code. The scenario calls this action on each new scenario instance. Since nothing other than what Apama automatically fills in is necessary, the user-defined section for the instancePreSpawnInit() action is empty.
  action setup() {
// BLOCKBUILDER - USER DEFINED ACTION
    MAX_INT := 0x7fffffffffffffff;
    MAX_FLOAT := 1.0e300;
    NO_CORRELATION := -2.0;
// BLOCKBUILDER - END OF USER DEFINED ACTION
  }
 
  action instancePreSpawnInit(integer blockInstanceId$,
      string scenarioId$,
      dictionary<string, string> userData$,
      context target,
      action<> acknowledge) {
    self.blockInstanceId$ := blockInstanceId$;
// BLOCKBUILDER - USER DEFINED ACTION
//
// -- insert pre-spawn initialisation code --
//
    acknowledge();
// BLOCKBUILDER - END OF USER DEFINED ACTION
  }
 
  action instancePostSpawnInit(integer blockInstanceId$,
      string ownerId$,
      string scenarioId$,
      dictionary<string, string> userData$,
      action<> acknowledge,
      float period,
      integer size,
      action<float,integer> $$sendOutput$statistics) {
    self.$$sendOutput$statistics := $$sendOutput$statistics;
// BLOCKBUILDER - USER DEFINED ACTION
    self.period := period;
    self.size := size;
    inputdata1.value := MAX_FLOAT;
    inputdata2.value := MAX_FLOAT;
    updateInfinite();
    acknowledge();
// BLOCKBUILDER - END OF USER DEFINED ACTION
  }
 
  action cleanup() {
// BLOCKBUILDER - USER DEFINED ACTION
// BLOCKBUILDER - END OF USER DEFINED ACTION
  }
User-defined actions
Finally, any additional user-defined actions come at the end of the block definition file. For the Correlation Calculator block, these actions contain the unique functional content of this block.
// BLOCKBUILDER - USER DEFINED ACTIONS
  action doStats1() {
    if inputdata2.value != MAX_FLOAT {
      doStatsCommon(inputdata2.time);
    }
  }
 
  action doStats2() {
    if inputdata1.value != MAX_FLOAT {
      doStatsCommon(inputdata1.time);
    }
  }
 
  action doStatsCommon(float timestamp) {
    float N;
    float Mx;
    float sum, div;
    float correlation;
 
    if not infinite {
      // Remove expired samples
      removeExpiredSamples();
 
      // Add new pair to dataset
      dataset.append(
        CorrelationCalculator_DataPoint(inputdata1.value,
            inputdata2.value, timestamp));
    }
    incrAdd(inputdata1.value, inputdata2.value);
 
    // Calculate correlation
    N := incr.N;
    Mx := incr.x1 / N;
    sum := incr.xy - Mx*incr.y1;
    div := (incr.x2 - Mx*incr.x1) * (incr.y2 - incr.y1*incr.y1/N);
    if sum = 0.0 {
      correlation := 0.0;
    } else
    if div != 0.0 {
      correlation := sum / div.sqrt();
    } else {
      correlation := NO_CORRELATION;
    }
    sendOutput$statistics(correlation, N.floor());
  }
 
  action removeExpiredSamples() {
    float timeLimit := -MAX_FLOAT;
    integer sizeLimit := MAX_INT;
    if self.period > 0.0 {
      timeLimit := currentTime - self.period;
    } else
    if self.size > 0 {
      sizeLimit := self.size;
    }
    while (dataset.size() > 0 and dataset[0].time <= timeLimit)
      or dataset.size() >= sizeLimit {
      incrRemove(dataset[0].value1, dataset[0].value2);
      dataset.remove(0);
    }
  }
 
  action updateInfinite() {
    boolean wasInfinite := infinite;
    // Set infinite to true if period/size is infinite
    infinite := self.period <= 0.0 and self.size <= 0;
    if infinite {
      dataset.setSize(0);
    } else
    if wasInfinite {
      // Infinite has gone from true to false,
      // must reset incremental data
      incr := new CorrelationCalculator_Incr;
    }
  }
 
  action incrAdd(float x, float y) {
    incr.x1 := incr.x1 + x;
    incr.y1 := incr.y1 + y;
    incr.x2 := incr.x2 + x*x;
    incr.y2 := incr.y2 + y*y;
    incr.xy := incr.xy + x*y;
    incr.N := incr.N + 1.0;
  }
 
  action incrRemove(float x, float y) {
    incr.x1 := incr.x1 - x;
    incr.y1 := incr.y1 - y;
    incr.x2 := incr.x2 - x*x;
    incr.y2 := incr.y2 - y*y;
    incr.xy := incr.xy - x*y;
    incr.N := incr.N - 1.0;
  }
// BLOCKBUILDER - END OF USER DEFINED ACTIONS
}]]></code>
</block>

Copyright © 2013-2019 | Software AG, Darmstadt, Germany and/or Software AG USA, Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors.