Node.hh
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2012 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 
18 #ifndef GAZEBO_TRANSPORT_NODE_HH_
19 #define GAZEBO_TRANSPORT_NODE_HH_
20 
21 #undef emit
22 #include <tbb/task.h>
23 #define emit
24 #ifndef TBB_VERSION_MAJOR
25 #include <tbb/version.h>
26 #endif
27 
28 // This fixes compiler warnings, see #3147 and #3160
29 #ifndef BOOST_BIND_GLOBAL_PLACEHOLDERS
30 #define BOOST_BIND_GLOBAL_PLACEHOLDERS
31 #endif
32 
33 #include <boost/bind.hpp>
34 #include <boost/enable_shared_from_this.hpp>
35 #include <map>
36 #include <list>
37 #include <string>
38 #include <vector>
39 #if TBB_VERSION_MAJOR >= 2021
41 #endif
44 #include "gazebo/util/system.hh"
45 
46 namespace gazebo
47 {
48  namespace transport
49  {
52 #if TBB_VERSION_MAJOR < 2021
53  class GZ_TRANSPORT_VISIBLE PublishTask : public tbb::task
54 #else
55  class GZ_TRANSPORT_VISIBLE PublishTask
56 #endif
57  {
61  public: PublishTask(transport::PublisherPtr _pub,
62  const google::protobuf::Message &_message)
63  : pub(_pub)
64  {
65  this->msg = _message.New();
66  this->msg->CopyFrom(_message);
67  }
68 
69 #if TBB_VERSION_MAJOR < 2021
72  public: tbb::task *execute()
73 #else
75  public: void operator()() const
76 #endif
77  {
78  this->pub->WaitForConnection();
79  this->pub->Publish(*this->msg, true);
80  this->pub->SendMessage();
81  delete this->msg;
82 #if TBB_VERSION_MAJOR < 2021
83  this->pub.reset();
84  return NULL;
85 #endif
86  }
87 
89  private: transport::PublisherPtr pub;
90 
92  private: google::protobuf::Message *msg;
93  };
95 
98 
102  class GZ_TRANSPORT_VISIBLE Node :
103  public boost::enable_shared_from_this<Node>
104  {
106  public: Node();
107 
109  public: virtual ~Node();
110 
119  public: void Init(const std::string &_space ="");
120 
132  public: bool TryInit(
133  const common::Time &_maxWait = common::Time(1, 0));
134 
139  public: bool IsInitialized() const;
140 
142  public: void Fini();
143 
146  public: std::string GetTopicNamespace() const;
147 
151  public: std::string DecodeTopicName(const std::string &_topic);
152 
156  public: std::string EncodeTopicName(const std::string &_topic);
157 
160  public: unsigned int GetId() const;
161 
164  public: void ProcessPublishers();
165 
167  public: void ProcessIncoming();
168 
172  public: bool HasLatchedSubscriber(const std::string &_topic) const;
173 
174 
181  public: template<typename M>
182  void Publish(const std::string &_topic,
183  const google::protobuf::Message &_message)
184  {
185  transport::PublisherPtr pub = this->Advertise<M>(_topic);
186 #if TBB_VERSION_MAJOR < 2021
187  PublishTask *task = new(tbb::task::allocate_root())
188  PublishTask(pub, _message);
189 
190  tbb::task::enqueue(*task);
191  return;
192 #else
193  this->taskGroup.run<PublishTask>(pub, _message);
194 #endif
195  }
196 
204  public: template<typename M>
205  transport::PublisherPtr Advertise(const std::string &_topic,
206  unsigned int _queueLimit = 1000,
207  double _hzRate = 0)
208  {
209  std::string decodedTopic = this->DecodeTopicName(_topic);
210  PublisherPtr publisher =
211  transport::TopicManager::Instance()->Advertise<M>(
212  decodedTopic, _queueLimit, _hzRate);
213 
214  boost::mutex::scoped_lock lock(this->publisherMutex);
215  publisher->SetNode(shared_from_this());
216  this->publishers.push_back(publisher);
217 
218  return publisher;
219  }
220 
227  public: void Publish(const std::string &_topic,
228  const google::protobuf::Message &_message)
229  {
230  transport::PublisherPtr pub = this->Advertise(_topic,
231  _message.GetTypeName());
232  pub->WaitForConnection();
233 
234  pub->Publish(_message, true);
235  }
236 
244  public: transport::PublisherPtr Advertise(const std::string &_topic,
245  const std::string &_msgTypeName,
246  unsigned int _queueLimit = 1000,
247  double _hzRate = 0)
248  {
249  std::string decodedTopic = this->DecodeTopicName(_topic);
250  PublisherPtr publisher =
252  decodedTopic, _msgTypeName, _queueLimit, _hzRate);
253 
254  boost::mutex::scoped_lock lock(this->publisherMutex);
255  publisher->SetNode(shared_from_this());
256  this->publishers.push_back(publisher);
257 
258  return publisher;
259  }
260 
268  public: template<typename M, typename T>
269  SubscriberPtr Subscribe(const std::string &_topic,
270  void(T::*_fp)(const boost::shared_ptr<M const> &), T *_obj,
271  bool _latching = false)
272  {
273  SubscribeOptions ops;
274  std::string decodedTopic = this->DecodeTopicName(_topic);
275  ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
276 
277  {
278  using namespace boost::placeholders;
279  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
280  this->callbacks[decodedTopic].push_back(CallbackHelperPtr(
281  new CallbackHelperT<M>(boost::bind(_fp, _obj, _1), _latching)));
282  }
283 
284  SubscriberPtr result =
285  transport::TopicManager::Instance()->Subscribe(ops);
286 
287  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
288 
289  return result;
290  }
291 
298  public: template<typename M>
299  SubscriberPtr Subscribe(const std::string &_topic,
300  void(*_fp)(const boost::shared_ptr<M const> &),
301  bool _latching = false)
302  {
303  SubscribeOptions ops;
304  std::string decodedTopic = this->DecodeTopicName(_topic);
305  ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
306 
307  {
308  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
309  this->callbacks[decodedTopic].push_back(
310  CallbackHelperPtr(new CallbackHelperT<M>(_fp, _latching)));
311  }
312 
313  SubscriberPtr result =
314  transport::TopicManager::Instance()->Subscribe(ops);
315 
316  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
317 
318  return result;
319  }
320 
328  template<typename T>
329  SubscriberPtr Subscribe(const std::string &_topic,
330  void(T::*_fp)(const std::string &), T *_obj,
331  bool _latching = false)
332  {
333  SubscribeOptions ops;
334  std::string decodedTopic = this->DecodeTopicName(_topic);
335  ops.Init(decodedTopic, shared_from_this(), _latching);
336 
337  {
338  using namespace boost::placeholders;
339  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
340  this->callbacks[decodedTopic].push_back(CallbackHelperPtr(
341  new RawCallbackHelper(boost::bind(_fp, _obj, _1))));
342  }
343 
344  SubscriberPtr result =
345  transport::TopicManager::Instance()->Subscribe(ops);
346 
347  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
348 
349  return result;
350  }
351 
352 
359  SubscriberPtr Subscribe(const std::string &_topic,
360  void(*_fp)(const std::string &), bool _latching = false)
361  {
362  SubscribeOptions ops;
363  std::string decodedTopic = this->DecodeTopicName(_topic);
364  ops.Init(decodedTopic, shared_from_this(), _latching);
365 
366  {
367  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
368  this->callbacks[decodedTopic].push_back(
370  }
371 
372  SubscriberPtr result =
373  transport::TopicManager::Instance()->Subscribe(ops);
374 
375  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
376 
377  return result;
378  }
379 
384  public: bool HandleData(const std::string &_topic,
385  const std::string &_msg);
386 
391  public: bool HandleMessage(const std::string &_topic, MessagePtr _msg);
392 
399  public: void InsertLatchedMsg(const std::string &_topic,
400  const std::string &_msg);
401 
408  public: void InsertLatchedMsg(const std::string &_topic,
409  MessagePtr _msg);
410 
414  public: std::string GetMsgType(const std::string &_topic) const;
415 
421  public: void RemoveCallback(const std::string &_topic, unsigned int _id);
422 
434  private: bool PrivateInit(const std::string &_space,
435  const common::Time &_maxWait,
436  const bool _fallbackToDefault);
437 
438  private: std::string topicNamespace;
439  private: std::vector<PublisherPtr> publishers;
440  private: std::vector<PublisherPtr>::iterator publishersIter;
441  private: static unsigned int idCounter;
442  private: unsigned int id;
443 
444  private: typedef std::list<CallbackHelperPtr> Callback_L;
445  private: typedef std::map<std::string, Callback_L> Callback_M;
446  private: Callback_M callbacks;
447  private: std::map<std::string, std::list<std::string> > incomingMsgs;
448 
450  private: std::map<std::string, std::list<MessagePtr> > incomingMsgsLocal;
451 
452 #if TBB_VERSION_MAJOR >= 2021
454  private: TaskGroup taskGroup;
455 #endif
456 
457  private: boost::mutex publisherMutex;
458  private: boost::mutex publisherDeleteMutex;
459  private: boost::recursive_mutex incomingMutex;
460 
463  private: boost::recursive_mutex processIncomingMutex;
464 
465  private: bool initialized;
466  };
468  }
469 }
470 #endif
#define NULL
Definition: CommonTypes.hh:31
transport
Definition: ConnectionManager.hh:38
Forward declarations for transport.
static TopicManager * Instance()
Get an instance of the singleton.
Definition: SingletonT.hh:36
A Time class, can be used to hold wall- or sim-time.
Definition: Time.hh:48
Callback helper Template.
Definition: CallbackHelper.hh:112
A node can advertise and subscribe topics, publish on advertised topics and listen to subscribed topi...
Definition: Node.hh:104
void Fini()
Finalize the node.
SubscriberPtr Subscribe(const std::string &_topic, void(*_fp)(const boost::shared_ptr< M const > &), bool _latching=false)
Subscribe to a topic using a bare function as the callback.
Definition: Node.hh:299
std::string GetTopicNamespace() const
Get the topic namespace for this node.
void InsertLatchedMsg(const std::string &_topic, MessagePtr _msg)
Add a latched message to the node for publication.
SubscriberPtr Subscribe(const std::string &_topic, void(T::*_fp)(const std::string &), T *_obj, bool _latching=false)
Subscribe to a topic using a class method as the callback.
Definition: Node.hh:329
transport::PublisherPtr Advertise(const std::string &_topic, const std::string &_msgTypeName, unsigned int _queueLimit=1000, double _hzRate=0)
Advertise a topic.
Definition: Node.hh:244
bool IsInitialized() const
Check if this Node has been initialized.
unsigned int GetId() const
Get the unique ID of the node.
bool HandleMessage(const std::string &_topic, MessagePtr _msg)
Handle incoming msg.
transport::PublisherPtr Advertise(const std::string &_topic, unsigned int _queueLimit=1000, double _hzRate=0)
Advertise a topic.
Definition: Node.hh:205
void ProcessIncoming()
Process incoming messages.
void Publish(const std::string &_topic, const google::protobuf::Message &_message)
A convenience function for a one-time publication of a message.
Definition: Node.hh:227
std::string DecodeTopicName(const std::string &_topic)
Decode a topic name.
bool HasLatchedSubscriber(const std::string &_topic) const
Return true if a subscriber on a specific topic is latched.
void Init(const std::string &_space="")
Init the node.
void InsertLatchedMsg(const std::string &_topic, const std::string &_msg)
Add a latched message to the node for publication.
std::string GetMsgType(const std::string &_topic) const
Get the message type for a topic.
SubscriberPtr Subscribe(const std::string &_topic, void(T::*_fp)(const boost::shared_ptr< M const > &), T *_obj, bool _latching=false)
Subscribe to a topic using a class method as the callback.
Definition: Node.hh:269
virtual ~Node()
Destructor.
bool HandleData(const std::string &_topic, const std::string &_msg)
Handle incoming data.
void RemoveCallback(const std::string &_topic, unsigned int _id)
void ProcessPublishers()
Process all publishers, which has each publisher send it's most recent message over the wire.
void Publish(const std::string &_topic, const google::protobuf::Message &_message)
A convenience function for a one-time publication of a message.
Definition: Node.hh:182
bool TryInit(const common::Time &_maxWait=common::Time(1, 0))
Try to initialize the node to use the global namespace, and specify the maximum wait time.
std::string EncodeTopicName(const std::string &_topic)
Encode a topic name.
SubscriberPtr Subscribe(const std::string &_topic, void(*_fp)(const std::string &), bool _latching=false)
Subscribe to a topic using a bare function as the callback.
Definition: Node.hh:359
Used to connect publishers to subscribers, where the subscriber wants the raw data from the publisher...
Definition: CallbackHelper.hh:178
Options for a subscription.
Definition: SubscribeOptions.hh:36
void Init(const std::string &_topic, NodePtr _node, bool _latching)
Initialize the options.
Definition: SubscribeOptions.hh:48
Definition: TaskGroup.hh:30
boost::shared_ptr< CallbackHelper > CallbackHelperPtr
boost shared pointer to transport::CallbackHelper
Definition: CallbackHelper.hh:105
boost::shared_ptr< Subscriber > SubscriberPtr
Definition: TransportTypes.hh:53
boost::shared_ptr< google::protobuf::Message > MessagePtr
Definition: TransportTypes.hh:45
boost::shared_ptr< Publisher > PublisherPtr
Definition: TransportTypes.hh:49
Forward declarations for the common classes.
Definition: Animation.hh:27