Apama  10.11.3.3
sag_connectivity_plugins.hpp
Go to the documentation of this file.
1 /*
2  * Title: sag_connectivity_plugins.hpp
3  * Description: C++ API for writing connectivity plugins
4  * $Copyright (c) 2015-2021 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors.$
5  * Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG
6  * @Version: $Id: sag_connectivity_plugins.hpp 389658 2021-04-28 13:20:08Z rros $
7  */
8 
17 #ifndef _SAG_CONNECTIVITY_PLUGINS_HPP_
18 #define _SAG_CONNECTIVITY_PLUGINS_HPP_
19 
20 #include <sag_connectivity_cpp.hpp>
21 #include <sag_plugin_logging.hpp>
22 #include <memory>
23 #include <mutex>
24 
25 // must define __STDC_FORMAT_MACROS before first include of inttypes else printf macros won't be defined
26 #ifndef __STDC_FORMAT_MACROS
27 #define __STDC_FORMAT_MACROS 1
28 #endif
29 #include <inttypes.h>
30 
31 namespace com {
32 namespace softwareag {
33 namespace _DATAT_INTERNAL_CPP_NAMESPACE {
34 
35 namespace
36 {
40  void replace(std::string &input, const std::string &from, const std::string &to)
41  {
42  size_t pos = 0;
43  while((pos = input.find(from, pos)) != std::string::npos)
44  {
45  input.replace(pos, from.length(), to);
46  pos += to.length();
47  }
48  }
49 
53  std::string to_string(const Message &m, bool truncate=true)
54  {
55  std::string payload = to_string(m.getPayload());
56  if (truncate && payload.length() > 200) payload = payload.substr(0, 196) + " ...";
57  // security sanitization to prevent log message faking
58  replace(payload, "\n", "\\n");
59  replace(payload, "\r", "\\r");
60  return "Message<metadata="+to_string(m.getMetadataMap())+", payload="+payload+">";
61  }
62 }
63 
69 enum class Direction {
73  TOWARDS_HOST = 1,
78 
79 };
80 
85 class PluginHost {
86  friend class Plugin;
87 public:
88  typedef std::unique_ptr<PluginHost> ptr_t;
89 
104  void enableReliability(Direction direction) {
105  if (SAG_ERROR_OK != sag_enable_reliability(chain, static_cast<int>(direction))) {
106  throw std::runtime_error("An error occurred while setting chain reliability");
107  }
108  }
109 
111  bool isShuttingDown() {
112  bool isShuttingDown = false;
113  if (SAG_ERROR_OK != sag_is_host_shutting_down(chain, isShuttingDown)) {
114  throw std::runtime_error("An error occurred while checking if host is shutting down");
115  }
116  return isShuttingDown;
117  }
118 private:
122  PluginHost(void* chain = nullptr) :chain(chain) {}
123  void* chain;
124 };
125 
126 // forward decl for parameters friend
127 class Plugin;
128 
139 {
140  // for constructor access to connectivityManager
141  friend class Plugin;
142 
143 public:
145 
147 
152  const map_t &getConfig() const { return config; }
153 
157  const std::string &getChainId() const { return chainId; }
158 
162  const std::string &getPluginName() const { return pluginName; }
163 
164 protected:
168  PluginConstructorParameters(const std::string &pluginName, const std::string &chainId, const map_t &config, void* connectivityManager, void* chain)
169  : chainId(chainId), pluginName(pluginName), config(config.copy()), connectivityManager(connectivityManager), chain(chain)
170  {}
171 
172 private:
173  std::string chainId;
174  std::string pluginName;
175  map_t config;
177  void* connectivityManager;
179  void* chain;
180 };
181 
184 {
185 public:
189  TransportConstructorParameters(const std::string &pluginName, const std::string &chainId, const map_t &config, void* connectivityManager, void* reserved)
190  : PluginConstructorParameters(pluginName, chainId, config, connectivityManager, reserved)
191  {}
192 };
193 
196 {
197 public:
201  CodecConstructorParameters(const std::string &pluginName, const std::string &chainId, const map_t &config, void* connectivityManager, void* reserved)
202  : PluginConstructorParameters(pluginName, chainId, config, connectivityManager, reserved)
203  {}
207  CodecConstructorParameters(const std::string &pluginName)
208  : PluginConstructorParameters(pluginName, "", map_t{}, nullptr, nullptr)
209  {}
210 };
211 
212 
223 {
224 public:
225 
229  static const char* STATUS_ONLINE() { return "ONLINE"; }
233  static const char* STATUS_STARTING() { return "STARTING"; }
237  static const char* STATUS_FAILED() { return "FAILED"; }
238 
249  {
250  friend class StatusReporter;
251  public:
252 
253  ~StatusItem()
254  {
255  sag_delete_user_status_item(connectivityManager, underlying);
256  underlying.item = nullptr;
257  }
258 
266  void setStatus(const std::string &value)
267  {
268  std::unique_lock<std::mutex> ul(status_lock);
269  setStatusLocked(value);
270  }
271 
280  void setStatus(int64_t value)
281  {
282  std::unique_lock<std::mutex> ul(status_lock);
283  intValue = value;
284  setStatusLocked(convert_to_details::integerToString(value));
285  }
286 
294  std::string getStatus() {
295  std::unique_lock<std::mutex> ul(status_lock);
296  return lastValue;
297  }
298 
306  void increment(int64_t incrementValue = 1)
307  {
308  std::unique_lock<std::mutex> ul(status_lock);
309  intValue += incrementValue;
310  setStatusLocked(convert_to_details::integerToString(intValue));
311  }
312 
315  const std::string &key() { return mkey; }
316 
317  private:
318  StatusItem(const StatusItem& other) = delete; // non construction-copyable
319  StatusItem& operator=(const StatusItem&) = delete; // non copyable
320 
321  StatusItem(void* connectivityManager, const std::string &key, const std::string &initialValue, const int64_t intValue)
322  : intValue(intValue),
323  mkey(key),
324  lastValue(initialValue),
325  connectivityManager(connectivityManager),
326  underlying(sag_create_user_status_item(connectivityManager, key.c_str(), initialValue.c_str()))
327  {
328  if (!underlying.item)
329  {
330  std::ostringstream oss;
331  oss << "Failed to create status item '" << key << "' (ensure the key is unique and that this plug-in has not been shutdown already)";
332  throw std::runtime_error(oss.str());
333  }
334  }
335 
336  void setStatusLocked(const std::string &value)
337  {
338  if (value == lastValue) return; // no-op this case
339  lastValue = value;
340 
341  sag_set_user_status_item(underlying, value.c_str());
342  }
343 
344  int64_t intValue;
345  const std::string mkey;
346  std::string lastValue;
347  void* connectivityManager;
348  sag_status_item_t underlying;
349  std::mutex status_lock;
350  };
351 
353  typedef std::unique_ptr<StatusItem> item_ptr;
354 
369  item_ptr createStatusItem(const std::string &key, const std::string &initialValue)
370  {
371  return std::unique_ptr<StatusItem>(new StatusItem(connectivityManager, key, initialValue, 0));
372  }
373 
387  item_ptr createStatusItem(const std::string &key, int64_t initialValue)
388  {
389  std::ostringstream oss;
390  oss << initialValue;
391  return std::unique_ptr<StatusItem>(new StatusItem(connectivityManager, key, oss.str(), initialValue));
392  }
393 
405  void setStatus(const map_t &statusmap)
406  {
407  sag_set_user_status_map(connectivityManager, reinterpret_cast<const sag_underlying_map_t&>(statusmap));
408  // keep track of what keys we've added so we can automatically remove them when we're destroyed
409  for (auto it = statusmap.cbegin(); it != statusmap.cend(); it++)
410  if (it->first.type_tag() == SAG_DATA_STRING) // ignore invalid ones
411  {
412  if (it->second.empty())
413  mapKeysToCleanup.erase(it->first);
414  else
415  mapKeysToCleanup.insert(it->first.copy(), data_t());
416  }
417  }
418 
419 
420 
424  ~StatusReporter()
425  {
426  if (!mapKeysToCleanup.empty())
427  sag_set_user_status_map(connectivityManager, reinterpret_cast<const sag_underlying_map_t&>(mapKeysToCleanup));
428  }
429 
433  explicit StatusReporter(void *connectivityManager) : connectivityManager(connectivityManager), mapKeysToCleanup()
434  {
435  }
436 
437 private:
438  void* connectivityManager;
439  map_t mapKeysToCleanup;
440 
441 
442  StatusReporter() = delete;
443 
444  // non-copyable, due to the cleanup code we don't want people to shoot themselves in the foot by having multiple copies
445  StatusReporter(const StatusReporter& other) = delete;
446  StatusReporter& operator=(const StatusReporter&) = delete;
447 
448 };
449 
457 class Plugin
458 {
459 public:
460  /* Constructor.
461  * @since 9.12.0.1
462  */
463  Plugin(const PluginConstructorParameters &params)
464  : pluginName(params.getPluginName()), chainId(params.getChainId()), config(params.getConfig().copy()),
465  host(new PluginHost(params.chain)), logger("connectivity." + pluginName + "." + chainId),
466  statusReporter(new StatusReporter(params.connectivityManager))
467  {
468  }
469 
475  virtual ~Plugin() {}
476 
487  virtual void start() {}
488 
496  virtual void hostReady() {}
497 
522  virtual void shutdown() {}
524  const std::string &getName() const { return pluginName; }
525 protected:
529  const std::string pluginName;
531  const std::string chainId;
539  const PluginHost::ptr_t host;
548  if (statusReporter) return *statusReporter;
549  throw std::runtime_error("Cannot call getStatusReporter when using the legacy constructor");
550  }
551 
552 public:
557 private:
558  std::unique_ptr<StatusReporter> statusReporter;
559 };
560 
565 class HostSide
566 {
567 public:
569  virtual ~HostSide() {}
571  typedef std::unique_ptr<HostSide> ptr_t;
591  virtual void sendBatchTowardsHost(Message *start, Message *end) = 0;
592 
600  void sendBatchTowardsHost(Message &&message) {
601  sendBatchTowardsHost(&message, &message+1);
602  }
603 
614  template<typename IT>
615  auto sendBatchTowardsHost(const IT &begin, const IT &end) -> typename std::enable_if<
616  !std::is_const<ampl::remove_ref_t<decltype(*begin)>>::value &&
617  ampl::is_same<Message, ampl::remove_const_t<ampl::remove_ref_t<decltype(*begin)>>>::value
618  , void>::type // this ensures we can only pass in non-const iterator pairs to Message
619  {
620  if(begin == end) sendBatchTowardsHost((Message*) nullptr, (Message*) nullptr);
621  else sendBatchTowardsHost(&(*begin), (&(*(end-1)))+1);
622  }
623 };
624 
629 class RemoteHostSide: public HostSide
630 {
631 public:
632  RemoteHostSide(sag_plugin_t other, sag_send_fn_t fn): other(other), fn(fn) {}
633  virtual ~RemoteHostSide() {}
634  virtual void sendBatchTowardsHost(Message *start, Message *end);
635 private:
636  sag_plugin_t other;
637  sag_send_fn_t fn;
638 };
639 
645 {
646 public:
648  virtual ~TransportSide() {}
650  typedef std::unique_ptr<TransportSide> ptr_t;
671  virtual void sendBatchTowardsTransport(Message *start, Message *end) = 0;
672 
681  sendBatchTowardsTransport(&message, &message+1);
682  }
683 
693  template<typename IT>
694  auto sendBatchTowardsTransport(const IT &begin, const IT &end) -> typename std::enable_if<
695  !std::is_const<ampl::remove_ref_t<decltype(*begin)>>::value &&
696  ampl::is_same<Message, ampl::remove_const_t<ampl::remove_ref_t<decltype(*begin)>>>::value
697  , void>::type // this ensures we can only pass in non-const iterator pairs to Message
698  {
699  if(begin == end) sendBatchTowardsTransport((Message*) nullptr, (Message*) nullptr);
700  else sendBatchTowardsTransport(&(*begin), (&(*(end-1)))+1);
701  }
702 };
703 
708 class RemoteTransportSide: public TransportSide
709 {
710 public:
711  RemoteTransportSide(sag_plugin_t other, sag_send_fn_t fn): other(other), fn(fn) {}
712  virtual ~RemoteTransportSide() {}
713  virtual void sendBatchTowardsTransport(Message *start, Message *end);
714 private:
715  sag_plugin_t other;
716  sag_send_fn_t fn;
717 };
718 
719 
741 class AbstractCodec: public Plugin, public HostSide, public TransportSide
742 {
743 public:
745 
755  : Plugin(params)
756  {}
757 
758  // These methods do not need to show up in doxygen
759  /* Called between construction and start() to set the hostSide field */
760  virtual void setNextTowardsHost(HostSide::ptr_t &&host)
761  {
762  hostSide = std::move(host);
763  }
764  /* Called between construction and start() to set the transportSide field */
765  virtual void setNextTowardsTransport(TransportSide::ptr_t &&transport)
766  {
767  transportSide = std::move(transport);
768  }
769 protected:
775 
781 };
782 
812 {
813 public:
815 
825  : Plugin(params)
826  {}
827 
828  // This method does not need to show up in doxygen
829  /* Called between construction and start() to set the hostSide field */
830  virtual void setNextTowardsHost(HostSide::ptr_t &&host)
831  {
832  hostSide = std::move(host);
833  }
834 
835 protected:
841 };
842 
855 {
856 public:
857 
867  : AbstractTransport(params)
868  {}
869 
870 
876  virtual void sendBatchTowardsTransport(Message *start, Message *end)
877  {
878  for (Message *it = start; it != end; ++it) {
879  try {
880  if (it->getPayload().empty()) {
881  deliverNullPayloadTowardsTransport(*it);
882  } else {
883  deliverMessageTowardsTransport(*it);
884  }
885  } catch (...) {
886  handleException(*it);
887  }
888  }
889  }
891  virtual void deliverMessageTowardsTransport(Message &msg) = 0;
894  {
895  // do nothing
896  }
897 
916  virtual void handleException(Message &m)
917  {
918  try {
919  throw;
920  } catch (const std::exception &e) {
921  logger.warn("Error while delivering message: %s; %s will be dropped.", e.what(), to_string(m).c_str());
922  } catch (...) {
923  logger.warn("Unknown error delivering message: %s", to_string(m).c_str());
924  }
925  }
926 };
927 
944 {
945 public:
946 
956  : AbstractCodec(params)
957  {}
958 
965  virtual void sendBatchTowardsHost(Message *start, Message *end)
966  {
967  Message *curr = start;
968  for (Message *it = start; it != end; ++it) {
969  bool rv;
970  try {
971  if (it->getPayload().empty()) {
972  rv = transformNullPayloadTowardsHost(*it);
973  } else {
974  rv = transformMessageTowardsHost(*it);
975  }
976  } catch (...) {
977  rv = handleException(*it, false);
978  }
979  // if we keep it (and it didn't throw) swap it with the accumulator
980  if (rv) {
981  if (it != curr) it->swap(std::move(*curr)); // don't self-swap
982  ++curr;
983  }
984  }
985  if (hostSide.get()) hostSide->sendBatchTowardsHost(start, curr);
986  }
993  virtual void sendBatchTowardsTransport(Message *start, Message *end)
994  {
995  Message *curr = start;
996  for (Message *it = start; it != end; ++it) {
997  bool rv;
998  try {
999  // process the message
1000  if (it->getPayload().empty()) {
1001  rv = transformNullPayloadTowardsTransport(*it);
1002  } else {
1003  rv = transformMessageTowardsTransport(*it);
1004  }
1005  } catch (...) {
1006  rv = handleException(*it, true);
1007  }
1008  // if we keep it (and it didn't throw) swap it with the accumulator
1009  if (rv) {
1010  if (it != curr) it->swap(std::move(*curr)); // don't self-swap
1011  ++curr;
1012  }
1013  }
1014  if (transportSide.get()) transportSide->sendBatchTowardsTransport(start, curr);
1015  }
1016 
1021  virtual bool transformMessageTowardsHost(Message &msg) = 0;
1026  virtual bool transformMessageTowardsTransport(Message &msg) = 0;
1032  {
1033  // do nothing
1034  return true;
1035  }
1041  {
1042  // do nothing
1043  return true;
1044  }
1066  virtual bool handleException(Message &m, bool towardsTransport)
1067  {
1068  try {
1069  throw;
1070  } catch (const std::exception &e) {
1071  logger.warn("Error while transforming message: %s; %s will be dropped.", e.what(), to_string(m).c_str());
1072  } catch (...) {
1073  logger.warn("Unknown error transforming message: %s", to_string(m).c_str());
1074  }
1075  return false;
1076  }
1077 };
1078 
1079 }
1080 
1081 namespace connectivity { using namespace _DATAT_INTERNAL_CPP_NAMESPACE; }
1082 
1083 }} // com.softwareag.connectivity
1084 
1085 // internal implementation included from these files
1086 #include <sag_internal/exception.hpp>
1087 #include <sag_internal/remote_plugins.hpp>
1088 #include <sag_internal/plugin_macros.hpp>
1089 
1099 #define SAG_DECLARE_CONNECTIVITY_TRANSPORT_CLASS(Class) _SAG_DECLARE_CONNECTIVITY_TRANSPORT_CLASS(Class)
1100 
1110 #define SAG_DECLARE_CONNECTIVITY_CODEC_CLASS(Class) _SAG_DECLARE_CONNECTIVITY_CODEC_CLASS(Class)
1111 
1112 #endif // _SAG_CONNECTIVITY_PLUGINS_HPP_
A container for parameters passed to the constructor of a codec plug-in.
Definition: sag_connectivity_plugins.hpp:195
const std::string pluginName
The name used for this plug-in in the configuration file.
Definition: sag_connectivity_plugins.hpp:529
Base class that simplifies implementation of codec plug-ins that deal only with individual messages n...
Definition: sag_connectivity_plugins.hpp:943
AbstractTransport(const PluginConstructorParameters::TransportConstructorParameters &params)
Constructor.
Definition: sag_connectivity_plugins.hpp:824
Base class that simplifies implementation of transport plug-ins that deal only with individual messag...
Definition: sag_connectivity_plugins.hpp:854
void increment(int64_t incrementValue=1)
Set an integer status value by incrementing the previous integer value that was set by this object.
Definition: sag_connectivity_plugins.hpp:306
HostSide::ptr_t hostSide
The next plug-in in the chain towards host.
Definition: sag_connectivity_plugins.hpp:774
An interface to the next component (plugin or host) towards the host.
Definition: sag_connectivity_plugins.hpp:565
const_iterator cbegin() const
Forward const_iterator begin.
Definition: sag_connectivity_cpp.hpp:314
map_t config
The configuration of this plug-in.
Definition: sag_connectivity_plugins.hpp:533
item_ptr createStatusItem(const std::string &key, int64_t initialValue)
Creates a StatusItem instance that can be used to report status for a given key, using an integral in...
Definition: sag_connectivity_plugins.hpp:387
Direction
The enumeration indicating the direction of message flow - towards the transport or towards the host.
Definition: sag_connectivity_plugins.hpp:69
Class for writing to the system logger.
Definition: sag_plugin_logging.hpp:72
const PluginHost::ptr_t host
Interface to support miscellaneous requests from this plug-in to the host system.
Definition: sag_connectivity_plugins.hpp:539
StatusReporter & getStatusReporter()
Allows reporting status information from this plug-in, such as online or failed status and number of ...
Definition: sag_connectivity_plugins.hpp:547
const_iterator cend() const
Forward const_iterator end.
Definition: sag_connectivity_cpp.hpp:316
Contains the C++ implementation of the underlying datatypes used by connectivity plugins and their ac...
void setStatus(const map_t &statusmap)
Set multiple related string status values at the same time (atomically).
Definition: sag_connectivity_plugins.hpp:405
virtual void sendBatchTowardsTransport(Message *start, Message *end)
Implements batch sending, calling deliverMessageTowardsTransport(Message&) for each message individua...
Definition: sag_connectivity_plugins.hpp:876
Base of the inheritance tree for Connectivity plugins.
Definition: sag_connectivity_plugins.hpp:457
TransportSide::ptr_t transportSide
The next plug-in in the chain towards transport.
Definition: sag_connectivity_plugins.hpp:780
std::unique_ptr< StatusItem > item_ptr
Unique pointer to a StatusItem.
Definition: sag_connectivity_plugins.hpp:353
void setStatus(const std::string &value)
Set a string status value.
Definition: sag_connectivity_plugins.hpp:266
AbstractSimpleTransport(const PluginConstructorParameters::TransportConstructorParameters &params)
Constructor.
Definition: sag_connectivity_plugins.hpp:866
Logger logger
Logging for writing to the host log file.
Definition: sag_connectivity_plugins.hpp:556
virtual bool transformNullPayloadTowardsTransport(Message &msg)
Transform a message with a null payload in a transport-wards direction.
Definition: sag_connectivity_plugins.hpp:1040
virtual void start()
Called when an entire chain has been created and the plugin is allowed to start up (after all plugins...
Definition: sag_connectivity_plugins.hpp:487
auto sendBatchTowardsTransport(const IT &begin, const IT &end) -> typename std::enable_if< !std::is_const< ampl::remove_ref_t< decltype(*begin)>>::value &&ampl::is_same< Message, ampl::remove_const_t< ampl::remove_ref_t< decltype(*begin)>>>::value, void >::type
Overload for sending messages using an iterator range.
Definition: sag_connectivity_plugins.hpp:694
std::enable_if< get_underlying< T >::value, std::string >::type to_string(const T &t)
Get a string representation of t.
A class allowing a plug-in to report status values to the host.
Definition: sag_connectivity_plugins.hpp:222
std::unique_ptr< TransportSide > ptr_t
Pointers to TransportSides should always be this ptr_t type, which is a std::unique_ptr.
Definition: sag_connectivity_plugins.hpp:650
virtual ~TransportSide()
Ensure virtual destruction.
Definition: sag_connectivity_plugins.hpp:648
Base class for transport plug-ins.
Definition: sag_connectivity_plugins.hpp:811
static const char * STATUS_ONLINE()
Returns a constant that should be used as the status value when a component is online,...
Definition: sag_connectivity_plugins.hpp:229
item_ptr createStatusItem(const std::string &key, const std::string &initialValue)
Creates a StatusItem instance that can be used to report status for a given key.
Definition: sag_connectivity_plugins.hpp:369
const std::string & getName() const
The name used for this plug-in in the configuration file.
Definition: sag_connectivity_plugins.hpp:524
bool isShuttingDown()
Check if host is shutting down.
Definition: sag_connectivity_plugins.hpp:111
virtual void handleException(Message &m)
Handle an exception thrown while delivering a message towards the transport.
Definition: sag_connectivity_plugins.hpp:916
static const char * STATUS_FAILED()
Returns a constant that should be used as the status value when a component is not currently operatio...
Definition: sag_connectivity_plugins.hpp:237
virtual ~HostSide()
Ensure virtual destruction.
Definition: sag_connectivity_plugins.hpp:569
AbstractSimpleCodec(const PluginConstructorParameters::CodecConstructorParameters &params)
Constructor.
Definition: sag_connectivity_plugins.hpp:955
virtual void hostReady()
Called some time after start(), when the host is ready to start receiving input (sends will be queued...
Definition: sag_connectivity_plugins.hpp:496
Base class for codec plug-ins.
Definition: sag_connectivity_plugins.hpp:741
void sendBatchTowardsHost(Message &&message)
Overload for sending a batch containing a single message.
Definition: sag_connectivity_plugins.hpp:600
const std::string & getPluginName() const
Get the name used in the configuration file for this plug-in.
Definition: sag_connectivity_plugins.hpp:162
static const char * STATUS_STARTING()
Returns a constant that should be used as the status value when a component is still starting,...
Definition: sag_connectivity_plugins.hpp:233
void setStatus(int64_t value)
Set an integer status value.
Definition: sag_connectivity_plugins.hpp:280
A map class which implements many of the functions on std::map.
Definition: sag_connectivity_cpp.hpp:36
const std::string & key()
Get the unique key specified when this status item was created.
Definition: sag_connectivity_plugins.hpp:315
A class that can be used to efficiently update the value associated with a single status key.
Definition: sag_connectivity_plugins.hpp:248
A container for an payload and associated metadata.
Definition: sag_connectivity_cpp.hpp:27
virtual void shutdown()
Stop processing messages and terminate and join any background threads.
Definition: sag_connectivity_plugins.hpp:522
virtual void sendBatchTowardsHost(Message *start, Message *end)
Implements batch sending, calling transformMessageTowardsHost(Message &) for each message individuall...
Definition: sag_connectivity_plugins.hpp:965
An interface to the next component (plugin or host) towards the transport.
Definition: sag_connectivity_plugins.hpp:644
AbstractCodec(const PluginConstructorParameters::CodecConstructorParameters &params)
Constructor.
Definition: sag_connectivity_plugins.hpp:754
auto sendBatchTowardsHost(const IT &begin, const IT &end) -> typename std::enable_if< !std::is_const< ampl::remove_ref_t< decltype(*begin)>>::value &&ampl::is_same< Message, ampl::remove_const_t< ampl::remove_ref_t< decltype(*begin)>>>::value, void >::type
Overload for sending messages using an iterator range.
Definition: sag_connectivity_plugins.hpp:615
map_t copy() const
Return a deep copy of this map.
Definition: sag_connectivity_cpp.hpp:243
virtual void deliverNullPayloadTowardsTransport(Message &msg)
Deliver a message with a null payload.
Definition: sag_connectivity_plugins.hpp:893
void enableReliability(Direction direction)
Enable reliable messaging for the chain that this plug-in belongs to, in a particular direction i....
Definition: sag_connectivity_plugins.hpp:104
Contains the headers needed to implement your own Connectivity Plugins.
const std::string & getChainId() const
Get the identifier used for the chain this plug-in is part of.
Definition: sag_connectivity_plugins.hpp:157
A container for parameters passed to the constructor of a transport plug-in.
Definition: sag_connectivity_plugins.hpp:183
The direction of messages flowing towards the host (from the transport).
Interface to support miscellaneous requests from a particular plug-in to the host system.
Definition: sag_connectivity_plugins.hpp:85
A base interface for parameters passed to the constructor of transport or codec plug-ins.
Definition: sag_connectivity_plugins.hpp:138
virtual bool handleException(Message &m, bool towardsTransport)
Handle an exception thrown while delivering a message.
Definition: sag_connectivity_plugins.hpp:1066
utf8-encoded const char*
Definition: sag_connectivity_c.h:46
const std::string chainId
The identifier used for the chain this plug-in is part of.
Definition: sag_connectivity_plugins.hpp:531
The direction of messages flowing towards the transport (from the host).
void sendBatchTowardsTransport(Message &&message)
Overload for sending a batch containing a single message.
Definition: sag_connectivity_plugins.hpp:680
std::string getStatus()
Return the value this status item was set to most recently by this class.
Definition: sag_connectivity_plugins.hpp:294
A variant type which can be one of the following:
Definition: sag_connectivity_cpp.hpp:42
std::unique_ptr< HostSide > ptr_t
Pointers to HostSides should always be this ptr_t type, which is a std::unique_ptr.
Definition: sag_connectivity_plugins.hpp:571
virtual void sendBatchTowardsTransport(Message *start, Message *end)
Implements batch sending, calling transformMessageTowardsTransport(Message &) for each message indivi...
Definition: sag_connectivity_plugins.hpp:993
const map_t & getConfig() const
Get the configuration for this plug-in.
Definition: sag_connectivity_plugins.hpp:152
virtual ~Plugin()
This destructor must be virtual.
Definition: sag_connectivity_plugins.hpp:475
HostSide::ptr_t hostSide
The next plug-in in the chain towards host.
Definition: sag_connectivity_plugins.hpp:840
virtual bool transformNullPayloadTowardsHost(Message &msg)
Transform a message with a null payload in a host-wards direction.
Definition: sag_connectivity_plugins.hpp:1031