Apama  10.15.1.2
sag_connectivity_plugins.hpp
Go to the documentation of this file.
1 /*
2  * Title: sag_connectivity_plugins.hpp
3  * Description: C++ API for writing connectivity plugins
4  * $Copyright (c) 2015-2022 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors.$
5  * Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG
6  */
7 
16 #ifndef _SAG_CONNECTIVITY_PLUGINS_HPP_
17 #define _SAG_CONNECTIVITY_PLUGINS_HPP_
18 
19 #include <sag_connectivity_cpp.hpp>
20 #include <sag_plugin_logging.hpp>
21 #include <memory>
22 #include <mutex>
23 
24 // must define __STDC_FORMAT_MACROS before first include of inttypes else printf macros won't be defined
25 #ifndef __STDC_FORMAT_MACROS
26 #define __STDC_FORMAT_MACROS 1
27 #endif
28 #include <inttypes.h>
29 
30 namespace com {
31 namespace softwareag {
32 namespace _DATAT_INTERNAL_CPP_NAMESPACE {
33 
34 namespace
35 {
39  void replace(std::string &input, const std::string &from, const std::string &to)
40  {
41  size_t pos = 0;
42  while((pos = input.find(from, pos)) != std::string::npos)
43  {
44  input.replace(pos, from.length(), to);
45  pos += to.length();
46  }
47  }
48 
52  std::string to_string(const Message &m, bool truncate=true)
53  {
54  std::string payload = to_string(m.getPayload());
55  if (truncate && payload.length() > 200) payload = payload.substr(0, 196) + " ...";
56  // security sanitization to prevent log message faking
57  replace(payload, "\n", "\\n");
58  replace(payload, "\r", "\\r");
59  return "Message<metadata="+to_string(m.getMetadataMap())+", payload="+payload+">";
60  }
61 }
62 
68 enum class Direction {
72  TOWARDS_HOST = 1,
77 
78 };
79 
84 class PluginHost {
85  friend class Plugin;
86 public:
87  typedef std::unique_ptr<PluginHost> ptr_t;
88 
103  void enableReliability(Direction direction) {
104  if (SAG_ERROR_OK != sag_enable_reliability(chain, static_cast<int>(direction))) {
105  throw std::runtime_error("An error occurred while setting chain reliability");
106  }
107  }
108 
110  bool isShuttingDown() {
111  bool isShuttingDown = false;
112  if (SAG_ERROR_OK != sag_is_host_shutting_down(chain, isShuttingDown)) {
113  throw std::runtime_error("An error occurred while checking if host is shutting down");
114  }
115  return isShuttingDown;
116  }
117 private:
121  PluginHost(void* chain = nullptr) :chain(chain) {}
122  void* chain;
123 };
124 
125 // forward decl for parameters friend
126 class Plugin;
127 
138 {
139  // for constructor access to connectivityManager
140  friend class Plugin;
141 
142 public:
144 
146 
151  const map_t &getConfig() const { return config; }
152 
156  const std::string &getChainId() const { return chainId; }
157 
161  const std::string &getPluginName() const { return pluginName; }
162 
163 protected:
167  PluginConstructorParameters(const std::string &pluginName, const std::string &chainId, const map_t &config, void* connectivityManager, void* chain)
168  : chainId(chainId), pluginName(pluginName), config(config.copy()), connectivityManager(connectivityManager), chain(chain)
169  {}
170 
171 private:
172  std::string chainId;
173  std::string pluginName;
174  map_t config;
176  void* connectivityManager;
178  void* chain;
179 };
180 
183 {
184 public:
188  TransportConstructorParameters(const std::string &pluginName, const std::string &chainId, const map_t &config, void* connectivityManager, void* reserved)
189  : PluginConstructorParameters(pluginName, chainId, config, connectivityManager, reserved)
190  {}
191 };
192 
195 {
196 public:
200  CodecConstructorParameters(const std::string &pluginName, const std::string &chainId, const map_t &config, void* connectivityManager, void* reserved)
201  : PluginConstructorParameters(pluginName, chainId, config, connectivityManager, reserved)
202  {}
206  CodecConstructorParameters(const std::string &pluginName)
207  : PluginConstructorParameters(pluginName, "", map_t{}, nullptr, nullptr)
208  {}
209 };
210 
211 
222 {
223 public:
224 
228  static const char* STATUS_ONLINE() { return "ONLINE"; }
232  static const char* STATUS_STARTING() { return "STARTING"; }
236  static const char* STATUS_FAILED() { return "FAILED"; }
237 
248  {
249  friend class StatusReporter;
250  public:
251 
252  ~StatusItem()
253  {
254  sag_delete_user_status_item(connectivityManager, underlying);
255  underlying.item = nullptr;
256  }
257 
265  void setStatus(const std::string &value)
266  {
267  std::unique_lock<std::mutex> ul(status_lock);
268  setStatusLocked(value);
269  }
270 
279  void setStatus(int64_t value)
280  {
281  std::unique_lock<std::mutex> ul(status_lock);
282  intValue = value;
283  setStatusLocked(convert_to_details::integerToString(value));
284  }
285 
293  std::string getStatus() {
294  std::unique_lock<std::mutex> ul(status_lock);
295  return lastValue;
296  }
297 
305  void increment(int64_t incrementValue = 1)
306  {
307  std::unique_lock<std::mutex> ul(status_lock);
308  intValue += incrementValue;
309  setStatusLocked(convert_to_details::integerToString(intValue));
310  }
311 
314  const std::string &key() { return mkey; }
315 
316  private:
317  StatusItem(const StatusItem& other) = delete; // non construction-copyable
318  StatusItem& operator=(const StatusItem&) = delete; // non copyable
319 
320  StatusItem(void* connectivityManager, const std::string &key, const std::string &initialValue, const int64_t intValue)
321  : intValue(intValue),
322  mkey(key),
323  lastValue(initialValue),
324  connectivityManager(connectivityManager),
325  underlying(sag_create_user_status_item(connectivityManager, key.c_str(), initialValue.c_str()))
326  {
327  if (!underlying.item)
328  {
329  std::ostringstream oss;
330  oss << "Failed to create status item '" << key << "' (ensure the key is unique and that this plug-in has not been shutdown already)";
331  throw std::runtime_error(oss.str());
332  }
333  }
334 
335  void setStatusLocked(const std::string &value)
336  {
337  if (value == lastValue) return; // no-op this case
338  lastValue = value;
339 
340  sag_set_user_status_item(underlying, value.c_str());
341  }
342 
343  int64_t intValue;
344  const std::string mkey;
345  std::string lastValue;
346  void* connectivityManager;
347  sag_status_item_t underlying;
348  std::mutex status_lock;
349  };
350 
352  typedef std::unique_ptr<StatusItem> item_ptr;
353 
368  item_ptr createStatusItem(const std::string &key, const std::string &initialValue)
369  {
370  return std::unique_ptr<StatusItem>(new StatusItem(connectivityManager, key, initialValue, 0));
371  }
372 
386  item_ptr createStatusItem(const std::string &key, int64_t initialValue)
387  {
388  std::ostringstream oss;
389  oss << initialValue;
390  return std::unique_ptr<StatusItem>(new StatusItem(connectivityManager, key, oss.str(), initialValue));
391  }
392 
404  void setStatus(const map_t &statusmap)
405  {
406  sag_set_user_status_map(connectivityManager, reinterpret_cast<const sag_underlying_map_t&>(statusmap));
407  // keep track of what keys we've added so we can automatically remove them when we're destroyed
408  for (auto it = statusmap.cbegin(); it != statusmap.cend(); it++)
409  if (it->first.type_tag() == SAG_DATA_STRING) // ignore invalid ones
410  {
411  if (it->second.empty())
412  mapKeysToCleanup.erase(it->first);
413  else
414  mapKeysToCleanup.insert(it->first.copy(), data_t());
415  }
416  }
417 
418 
419 
423  ~StatusReporter()
424  {
425  if (!mapKeysToCleanup.empty())
426  sag_set_user_status_map(connectivityManager, reinterpret_cast<const sag_underlying_map_t&>(mapKeysToCleanup));
427  }
428 
432  explicit StatusReporter(void *connectivityManager) : connectivityManager(connectivityManager), mapKeysToCleanup()
433  {
434  }
435 
436 private:
437  void* connectivityManager;
438  map_t mapKeysToCleanup;
439 
440 
441  StatusReporter() = delete;
442 
443  // non-copyable, due to the cleanup code we don't want people to shoot themselves in the foot by having multiple copies
444  StatusReporter(const StatusReporter& other) = delete;
445  StatusReporter& operator=(const StatusReporter&) = delete;
446 
447 };
448 
456 class Plugin
457 {
458 public:
459  /* Constructor.
460  * @since 9.12.0.1
461  */
462  Plugin(const PluginConstructorParameters &params)
463  : pluginName(params.getPluginName()), chainId(params.getChainId()), config(params.getConfig().copy()),
464  host(new PluginHost(params.chain)), logger("connectivity." + pluginName + "." + chainId),
465  statusReporter(new StatusReporter(params.connectivityManager))
466  {
467  }
468 
474  virtual ~Plugin() {}
475 
486  virtual void start() {}
487 
495  virtual void hostReady() {}
496 
521  virtual void shutdown() {}
523  const std::string &getName() const { return pluginName; }
524 protected:
528  const std::string pluginName;
530  const std::string chainId;
538  const PluginHost::ptr_t host;
547  if (statusReporter) return *statusReporter;
548  throw std::runtime_error("Cannot call getStatusReporter when using the legacy constructor");
549  }
550 
551 public:
556 private:
557  std::unique_ptr<StatusReporter> statusReporter;
558 };
559 
564 class HostSide
565 {
566 public:
568  virtual ~HostSide() {}
570  typedef std::unique_ptr<HostSide> ptr_t;
590  virtual void sendBatchTowardsHost(Message *start, Message *end) = 0;
591 
599  void sendBatchTowardsHost(Message &&message) {
600  sendBatchTowardsHost(&message, &message+1);
601  }
602 
613  template<typename IT>
614  auto sendBatchTowardsHost(const IT &begin, const IT &end) -> typename std::enable_if<
615  !std::is_const<ampl::remove_ref_t<decltype(*begin)>>::value &&
616  ampl::is_same<Message, ampl::remove_const_t<ampl::remove_ref_t<decltype(*begin)>>>::value
617  , void>::type // this ensures we can only pass in non-const iterator pairs to Message
618  {
619  if(begin == end) sendBatchTowardsHost((Message*) nullptr, (Message*) nullptr);
620  else sendBatchTowardsHost(&(*begin), (&(*(end-1)))+1);
621  }
622 };
623 
628 class RemoteHostSide: public HostSide
629 {
630 public:
631  RemoteHostSide(sag_plugin_t other, sag_send_fn_t fn): other(other), fn(fn) {}
632  virtual ~RemoteHostSide() {}
633  virtual void sendBatchTowardsHost(Message *start, Message *end);
634 private:
635  sag_plugin_t other;
636  sag_send_fn_t fn;
637 };
638 
644 {
645 public:
647  virtual ~TransportSide() {}
649  typedef std::unique_ptr<TransportSide> ptr_t;
670  virtual void sendBatchTowardsTransport(Message *start, Message *end) = 0;
671 
680  sendBatchTowardsTransport(&message, &message+1);
681  }
682 
692  template<typename IT>
693  auto sendBatchTowardsTransport(const IT &begin, const IT &end) -> typename std::enable_if<
694  !std::is_const<ampl::remove_ref_t<decltype(*begin)>>::value &&
695  ampl::is_same<Message, ampl::remove_const_t<ampl::remove_ref_t<decltype(*begin)>>>::value
696  , void>::type // this ensures we can only pass in non-const iterator pairs to Message
697  {
698  if(begin == end) sendBatchTowardsTransport((Message*) nullptr, (Message*) nullptr);
699  else sendBatchTowardsTransport(&(*begin), (&(*(end-1)))+1);
700  }
701 };
702 
707 class RemoteTransportSide: public TransportSide
708 {
709 public:
710  RemoteTransportSide(sag_plugin_t other, sag_send_fn_t fn): other(other), fn(fn) {}
711  virtual ~RemoteTransportSide() {}
712  virtual void sendBatchTowardsTransport(Message *start, Message *end);
713 private:
714  sag_plugin_t other;
715  sag_send_fn_t fn;
716 };
717 
718 
740 class AbstractCodec: public Plugin, public HostSide, public TransportSide
741 {
742 public:
744 
754  : Plugin(params)
755  {}
756 
757  // These methods do not need to show up in doxygen
758  /* Called between construction and start() to set the hostSide field */
759  virtual void setNextTowardsHost(HostSide::ptr_t &&host)
760  {
761  hostSide = std::move(host);
762  }
763  /* Called between construction and start() to set the transportSide field */
764  virtual void setNextTowardsTransport(TransportSide::ptr_t &&transport)
765  {
766  transportSide = std::move(transport);
767  }
768 protected:
774 
780 };
781 
811 {
812 public:
814 
824  : Plugin(params)
825  {}
826 
827  // This method does not need to show up in doxygen
828  /* Called between construction and start() to set the hostSide field */
829  virtual void setNextTowardsHost(HostSide::ptr_t &&host)
830  {
831  hostSide = std::move(host);
832  }
833 
834 protected:
840 };
841 
854 {
855 public:
856 
866  : AbstractTransport(params)
867  {}
868 
869 
875  virtual void sendBatchTowardsTransport(Message *start, Message *end)
876  {
877  for (Message *it = start; it != end; ++it) {
878  try {
879  if (it->getPayload().empty()) {
880  deliverNullPayloadTowardsTransport(*it);
881  } else {
882  deliverMessageTowardsTransport(*it);
883  }
884  } catch (...) {
885  handleException(*it);
886  }
887  }
888  }
890  virtual void deliverMessageTowardsTransport(Message &msg) = 0;
893  {
894  // do nothing
895  }
896 
915  virtual void handleException(Message &m)
916  {
917  try {
918  throw;
919  } catch (const std::exception &e) {
920  logger.warn("Error while delivering message: %s; %s will be dropped.", e.what(), to_string(m).c_str());
921  } catch (...) {
922  logger.warn("Unknown error delivering message: %s", to_string(m).c_str());
923  }
924  }
925 };
926 
943 {
944 public:
945 
955  : AbstractCodec(params)
956  {}
957 
964  virtual void sendBatchTowardsHost(Message *start, Message *end)
965  {
966  Message *curr = start;
967  for (Message *it = start; it != end; ++it) {
968  bool rv;
969  try {
970  if (it->getPayload().empty()) {
971  rv = transformNullPayloadTowardsHost(*it);
972  } else {
973  rv = transformMessageTowardsHost(*it);
974  }
975  } catch (...) {
976  rv = handleException(*it, false);
977  }
978  // if we keep it (and it didn't throw) swap it with the accumulator
979  if (rv) {
980  if (it != curr) it->swap(std::move(*curr)); // don't self-swap
981  ++curr;
982  }
983  }
984  if (hostSide.get()) hostSide->sendBatchTowardsHost(start, curr);
985  }
992  virtual void sendBatchTowardsTransport(Message *start, Message *end)
993  {
994  Message *curr = start;
995  for (Message *it = start; it != end; ++it) {
996  bool rv;
997  try {
998  // process the message
999  if (it->getPayload().empty()) {
1000  rv = transformNullPayloadTowardsTransport(*it);
1001  } else {
1002  rv = transformMessageTowardsTransport(*it);
1003  }
1004  } catch (...) {
1005  rv = handleException(*it, true);
1006  }
1007  // if we keep it (and it didn't throw) swap it with the accumulator
1008  if (rv) {
1009  if (it != curr) it->swap(std::move(*curr)); // don't self-swap
1010  ++curr;
1011  }
1012  }
1013  if (transportSide.get()) transportSide->sendBatchTowardsTransport(start, curr);
1014  }
1015 
1020  virtual bool transformMessageTowardsHost(Message &msg) = 0;
1025  virtual bool transformMessageTowardsTransport(Message &msg) = 0;
1031  {
1032  // do nothing
1033  return true;
1034  }
1040  {
1041  // do nothing
1042  return true;
1043  }
1065  virtual bool handleException(Message &m, bool towardsTransport)
1066  {
1067  try {
1068  throw;
1069  } catch (const std::exception &e) {
1070  logger.warn("Error while transforming message: %s; %s will be dropped.", e.what(), to_string(m).c_str());
1071  } catch (...) {
1072  logger.warn("Unknown error transforming message: %s", to_string(m).c_str());
1073  }
1074  return false;
1075  }
1076 };
1077 
1078 }
1079 
1080 namespace connectivity { using namespace _DATAT_INTERNAL_CPP_NAMESPACE; }
1081 
1082 }} // com.softwareag.connectivity
1083 
1084 // internal implementation included from these files
1085 #include <sag_internal/exception.hpp>
1086 #include <sag_internal/remote_plugins.hpp>
1087 #include <sag_internal/plugin_macros.hpp>
1088 
1098 #define SAG_DECLARE_CONNECTIVITY_TRANSPORT_CLASS(Class) _SAG_DECLARE_CONNECTIVITY_TRANSPORT_CLASS(Class)
1099 
1109 #define SAG_DECLARE_CONNECTIVITY_CODEC_CLASS(Class) _SAG_DECLARE_CONNECTIVITY_CODEC_CLASS(Class)
1110 
1111 #endif // _SAG_CONNECTIVITY_PLUGINS_HPP_
A container for parameters passed to the constructor of a codec plug-in.
Definition: sag_connectivity_plugins.hpp:194
const std::string pluginName
The name used for this plug-in in the configuration file.
Definition: sag_connectivity_plugins.hpp:528
Base class that simplifies implementation of codec plug-ins that deal only with individual messages n...
Definition: sag_connectivity_plugins.hpp:942
AbstractTransport(const PluginConstructorParameters::TransportConstructorParameters &params)
Constructor.
Definition: sag_connectivity_plugins.hpp:823
Base class that simplifies implementation of transport plug-ins that deal only with individual messag...
Definition: sag_connectivity_plugins.hpp:853
void increment(int64_t incrementValue=1)
Set an integer status value by incrementing the previous integer value that was set by this object.
Definition: sag_connectivity_plugins.hpp:305
HostSide::ptr_t hostSide
The next plug-in in the chain towards host.
Definition: sag_connectivity_plugins.hpp:773
An interface to the next component (plugin or host) towards the host.
Definition: sag_connectivity_plugins.hpp:564
const_iterator cbegin() const
Forward const_iterator begin.
Definition: sag_connectivity_cpp.hpp:313
map_t config
The configuration of this plug-in.
Definition: sag_connectivity_plugins.hpp:532
item_ptr createStatusItem(const std::string &key, int64_t initialValue)
Creates a StatusItem instance that can be used to report status for a given key, using an integral in...
Definition: sag_connectivity_plugins.hpp:386
Direction
The enumeration indicating the direction of message flow - towards the transport or towards the host.
Definition: sag_connectivity_plugins.hpp:68
Class for writing to the system logger.
Definition: sag_plugin_logging.hpp:71
const PluginHost::ptr_t host
Interface to support miscellaneous requests from this plug-in to the host system.
Definition: sag_connectivity_plugins.hpp:538
StatusReporter & getStatusReporter()
Allows reporting status information from this plug-in, such as online or failed status and number of ...
Definition: sag_connectivity_plugins.hpp:546
const_iterator cend() const
Forward const_iterator end.
Definition: sag_connectivity_cpp.hpp:315
Contains the C++ implementation of the underlying datatypes used by connectivity plugins and their ac...
void setStatus(const map_t &statusmap)
Set multiple related string status values at the same time (atomically).
Definition: sag_connectivity_plugins.hpp:404
virtual void sendBatchTowardsTransport(Message *start, Message *end)
Implements batch sending, calling deliverMessageTowardsTransport(Message&) for each message individua...
Definition: sag_connectivity_plugins.hpp:875
Base of the inheritance tree for Connectivity plugins.
Definition: sag_connectivity_plugins.hpp:456
TransportSide::ptr_t transportSide
The next plug-in in the chain towards transport.
Definition: sag_connectivity_plugins.hpp:779
std::unique_ptr< StatusItem > item_ptr
Unique pointer to a StatusItem.
Definition: sag_connectivity_plugins.hpp:352
void setStatus(const std::string &value)
Set a string status value.
Definition: sag_connectivity_plugins.hpp:265
AbstractSimpleTransport(const PluginConstructorParameters::TransportConstructorParameters &params)
Constructor.
Definition: sag_connectivity_plugins.hpp:865
Logger logger
Logging for writing to the host log file.
Definition: sag_connectivity_plugins.hpp:555
virtual bool transformNullPayloadTowardsTransport(Message &msg)
Transform a message with a null payload in a transport-wards direction.
Definition: sag_connectivity_plugins.hpp:1039
virtual void start()
Called when an entire chain has been created and the plugin is allowed to start up (after all plugins...
Definition: sag_connectivity_plugins.hpp:486
auto sendBatchTowardsTransport(const IT &begin, const IT &end) -> typename std::enable_if< !std::is_const< ampl::remove_ref_t< decltype(*begin)>>::value &&ampl::is_same< Message, ampl::remove_const_t< ampl::remove_ref_t< decltype(*begin)>>>::value, void >::type
Overload for sending messages using an iterator range.
Definition: sag_connectivity_plugins.hpp:693
std::enable_if< get_underlying< T >::value, std::string >::type to_string(const T &t)
Get a string representation of t.
A class allowing a plug-in to report status values to the host.
Definition: sag_connectivity_plugins.hpp:221
std::unique_ptr< TransportSide > ptr_t
Pointers to TransportSides should always be this ptr_t type, which is a std::unique_ptr.
Definition: sag_connectivity_plugins.hpp:649
virtual ~TransportSide()
Ensure virtual destruction.
Definition: sag_connectivity_plugins.hpp:647
Base class for transport plug-ins.
Definition: sag_connectivity_plugins.hpp:810
static const char * STATUS_ONLINE()
Returns a constant that should be used as the status value when a component is online,...
Definition: sag_connectivity_plugins.hpp:228
item_ptr createStatusItem(const std::string &key, const std::string &initialValue)
Creates a StatusItem instance that can be used to report status for a given key.
Definition: sag_connectivity_plugins.hpp:368
const std::string & getName() const
The name used for this plug-in in the configuration file.
Definition: sag_connectivity_plugins.hpp:523
bool isShuttingDown()
Check if host is shutting down.
Definition: sag_connectivity_plugins.hpp:110
virtual void handleException(Message &m)
Handle an exception thrown while delivering a message towards the transport.
Definition: sag_connectivity_plugins.hpp:915
static const char * STATUS_FAILED()
Returns a constant that should be used as the status value when a component is not currently operatio...
Definition: sag_connectivity_plugins.hpp:236
virtual ~HostSide()
Ensure virtual destruction.
Definition: sag_connectivity_plugins.hpp:568
AbstractSimpleCodec(const PluginConstructorParameters::CodecConstructorParameters &params)
Constructor.
Definition: sag_connectivity_plugins.hpp:954
virtual void hostReady()
Called some time after start(), when the host is ready to start receiving input (sends will be queued...
Definition: sag_connectivity_plugins.hpp:495
Base class for codec plug-ins.
Definition: sag_connectivity_plugins.hpp:740
void sendBatchTowardsHost(Message &&message)
Overload for sending a batch containing a single message.
Definition: sag_connectivity_plugins.hpp:599
const std::string & getPluginName() const
Get the name used in the configuration file for this plug-in.
Definition: sag_connectivity_plugins.hpp:161
static const char * STATUS_STARTING()
Returns a constant that should be used as the status value when a component is still starting,...
Definition: sag_connectivity_plugins.hpp:232
void setStatus(int64_t value)
Set an integer status value.
Definition: sag_connectivity_plugins.hpp:279
A map class which implements many of the functions on std::map.
Definition: sag_connectivity_cpp.hpp:35
const std::string & key()
Get the unique key specified when this status item was created.
Definition: sag_connectivity_plugins.hpp:314
A class that can be used to efficiently update the value associated with a single status key.
Definition: sag_connectivity_plugins.hpp:247
A container for an payload and associated metadata.
Definition: sag_connectivity_cpp.hpp:26
virtual void shutdown()
Stop processing messages and terminate and join any background threads.
Definition: sag_connectivity_plugins.hpp:521
virtual void sendBatchTowardsHost(Message *start, Message *end)
Implements batch sending, calling transformMessageTowardsHost(Message &) for each message individuall...
Definition: sag_connectivity_plugins.hpp:964
An interface to the next component (plugin or host) towards the transport.
Definition: sag_connectivity_plugins.hpp:643
AbstractCodec(const PluginConstructorParameters::CodecConstructorParameters &params)
Constructor.
Definition: sag_connectivity_plugins.hpp:753
auto sendBatchTowardsHost(const IT &begin, const IT &end) -> typename std::enable_if< !std::is_const< ampl::remove_ref_t< decltype(*begin)>>::value &&ampl::is_same< Message, ampl::remove_const_t< ampl::remove_ref_t< decltype(*begin)>>>::value, void >::type
Overload for sending messages using an iterator range.
Definition: sag_connectivity_plugins.hpp:614
map_t copy() const
Return a deep copy of this map.
Definition: sag_connectivity_cpp.hpp:242
virtual void deliverNullPayloadTowardsTransport(Message &msg)
Deliver a message with a null payload.
Definition: sag_connectivity_plugins.hpp:892
void enableReliability(Direction direction)
Enable reliable messaging for the chain that this plug-in belongs to, in a particular direction i....
Definition: sag_connectivity_plugins.hpp:103
Contains the headers needed to implement your own Connectivity Plugins.
const std::string & getChainId() const
Get the identifier used for the chain this plug-in is part of.
Definition: sag_connectivity_plugins.hpp:156
A container for parameters passed to the constructor of a transport plug-in.
Definition: sag_connectivity_plugins.hpp:182
The direction of messages flowing towards the host (from the transport).
Interface to support miscellaneous requests from a particular plug-in to the host system.
Definition: sag_connectivity_plugins.hpp:84
A base interface for parameters passed to the constructor of transport or codec plug-ins.
Definition: sag_connectivity_plugins.hpp:137
virtual bool handleException(Message &m, bool towardsTransport)
Handle an exception thrown while delivering a message.
Definition: sag_connectivity_plugins.hpp:1065
utf8-encoded const char*
Definition: sag_connectivity_c.h:45
const std::string chainId
The identifier used for the chain this plug-in is part of.
Definition: sag_connectivity_plugins.hpp:530
The direction of messages flowing towards the transport (from the host).
void sendBatchTowardsTransport(Message &&message)
Overload for sending a batch containing a single message.
Definition: sag_connectivity_plugins.hpp:679
std::string getStatus()
Return the value this status item was set to most recently by this class.
Definition: sag_connectivity_plugins.hpp:293
A variant type which can be one of the following:
Definition: sag_connectivity_cpp.hpp:41
std::unique_ptr< HostSide > ptr_t
Pointers to HostSides should always be this ptr_t type, which is a std::unique_ptr.
Definition: sag_connectivity_plugins.hpp:570
virtual void sendBatchTowardsTransport(Message *start, Message *end)
Implements batch sending, calling transformMessageTowardsTransport(Message &) for each message indivi...
Definition: sag_connectivity_plugins.hpp:992
const map_t & getConfig() const
Get the configuration for this plug-in.
Definition: sag_connectivity_plugins.hpp:151
virtual ~Plugin()
This destructor must be virtual.
Definition: sag_connectivity_plugins.hpp:474
HostSide::ptr_t hostSide
The next plug-in in the chain towards host.
Definition: sag_connectivity_plugins.hpp:839
virtual bool transformNullPayloadTowardsHost(Message &msg)
Transform a message with a null payload in a host-wards direction.
Definition: sag_connectivity_plugins.hpp:1030