OmniEvents
EventChannel.cc
Go to the documentation of this file.
1 // Package : omniEvents
2 // EventChannel.cc Created : 2003/12/04
3 // Author : Alex Tingle
4 //
5 // Copyright (C) 2003-2005 Alex Tingle.
6 //
7 // This file is part of the omniEvents application.
8 //
9 // omniEvents is free software; you can redistribute it and/or
10 // modify it under the terms of the GNU Lesser General Public
11 // License as published by the Free Software Foundation; either
12 // version 2.1 of the License, or (at your option) any later version.
13 //
14 // omniEvents is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 // Lesser General Public License for more details.
18 //
19 // You should have received a copy of the GNU Lesser General Public
20 // License along with this library; if not, write to the Free Software
21 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22 //
23 
24 #include "EventChannel.h"
25 #include "ConsumerAdmin.h"
26 #include "SupplierAdmin.h"
27 #include "omniEventsLog.h"
28 #include "Orb.h"
29 
30 #include <list>
31 
32 namespace OmniEvents {
33 
34 // CORBA interface methods
35 CosEventChannelAdmin::ConsumerAdmin_ptr EventChannel_i::for_consumers()
36 {
38  throw CORBA::OBJECT_NOT_EXIST();
39  return _consumerAdmin->_this();
40 }
41 
42 
43 CosEventChannelAdmin::SupplierAdmin_ptr EventChannel_i::for_suppliers()
44 {
46  throw CORBA::OBJECT_NOT_EXIST();
47  return _supplierAdmin->_this();
48 }
49 
50 
52 {
54  throw CORBA::OBJECT_NOT_EXIST();
55 
56  // Prevent further incoming connections.
57  _shutdownRequested=true;
58 
59  DB(5,"EventChannel_i::destroy()")
60 
61  // Send disconnect messages to connected clients.
62  if(_consumerAdmin)
64  if(_supplierAdmin)
66 }
67 
68 
70 : Servant(PortableServer::POA::_nil()),
71  _eventChannelStore(store),
72  _consumerAdmin(NULL),
73  _supplierAdmin(NULL),
74  _poaManager(),
75  _shutdownRequested(false),
76  _properties(),
77  _mapper(NULL),
78  _lock(),
79  _refCount(1)
80 {}
81 
82 
84  const char* channelName,
85  const PersistNode* node
86 )
87 {
88  // The order of these various initialization methods is very important.
89  // I've documented dependencies as 'REQUIRES' comments.
90 
91  createPoa(channelName);
92 
93  if(node)
94  _properties._attr=node->_attr;
95 
96  // REQUIRES: _properties
98 
99  // REQUIRES: _consumerAdmin, _properties
101 
102  if(node)
103  {
104  PersistNode* saNode =node->child("SupplierAdmin");
105  if(saNode)
106  _supplierAdmin->reincarnate(*saNode);
107 
108  PersistNode* caNode =node->child("ConsumerAdmin");
109  if(caNode)
110  _consumerAdmin->reincarnate(*caNode);
111  }
112 
113  activateObjectWithId("EventChannel");
114 
115  // Remove the constructor's reference. This object will now be destroyed when
116  // the POA releases it.
117  _remove_ref();
118 
119  // REQUIRES: activate() ...since it uses _this().
120  setInsName(_properties.attrString("InsName"));
121 
122  // Start the channel's thread running.
123  start_undetached();
124 }
125 
126 
128 {
129  DB(20,"~EventChannel_i()")
130  // Destroy the mapper object, even when the EventChannel is being shut down
131  // without a call to destroy(). This can happen if the channel is
132  // implemented through libomniEvents - the channel could be shut down and
133  // later reincarnated in the same process. The Mapper's lifecycle should
134  // match that of the EventChannel.
135  if(_mapper)
136  {
137  _mapper->destroy();
138  _mapper=NULL;
139  }
140  if(_consumerAdmin)
141  {
142  _consumerAdmin->_remove_ref();
143  _consumerAdmin=NULL;
144  }
145  if(_supplierAdmin)
146  {
147  _supplierAdmin->_remove_ref();
148  _supplierAdmin=NULL;
149  }
150 }
151 
152 
154 {
155  // Ensure that activate() is called before start()/run().
156  assert(!CORBA::is_nil(_poa));
157 
158  const char* action="";
159  try
160  {
162  {
163  action="add this object to the store";
164  _eventChannelStore->insert(this);
165  }
166 
168  {
169  action="create this object in the persistency database";
170  WriteLock log;
171  output(log.os);
172  }
173 
174  // Process events until the channel is destroyed.
175  action="run main loop";
176  mainLoop();
177 
179  {
180  action="remove this object from the store";
181  _eventChannelStore->erase(this);
182  }
183 
185  {
187  {
188  action="remove record from persistency database";
189  CORBA::String_var poaName =_poa->the_name();
190  WriteLock log;
191  log.os<<"-ecf/"<<poaName.in()<<'\n';
192  }
193  action="destroy POA";
194  _poa->destroy(
195  CORBA::Boolean(1) /* etherealize_objects */,
196  CORBA::Boolean(0) /* wait_for_completion */
197  );
198  _poa=PortableServer::POA::_nil();
199 
200  } // end if(_shutdownRequested)
201 
202  }
203  catch(PortableServer::POAManager::AdapterInactive& ex) {
204  DB(0,"EventChannel_i::run_undetached() - failed to "<<action<<
205  ", POA deactivated from the outside.")
206  }
207  catch (CORBA::SystemException& ex) {
208  DB(0,"EventChannel_i::run_undetached() - failed to "<<action<<
209  ", System exception: "<<ex._name()<<" ("<<NP_MINORSTRING(ex)<<")")
210  }
211  catch (CORBA::Exception& ex) {
212  DB(0,"EventChannel_i::run_undetached() - failed to "<<action<<
213  ", CORBA exception: "<<ex._name())
214  }
215 
216  // Thread now exits, and this object is deleted.
217  return NULL;
218 }
219 
220 
222 {
223  _poaManager->activate();
224  unsigned long localCyclePeriod_ns=cyclePeriod_ns();
225  while(_refCount>0 && !_shutdownRequested)
226  {
227  //
228  // TRANSFER PHASE - transfer events from SupplierAdmin to ConsumerAdmin.
229  _poaManager->hold_requests(CORBA::Boolean(1) /* wait_for_completion */);
230 
231  if(_shutdownRequested) break;
232 
233  list<CORBA::Any*> events;
234  _supplierAdmin->collect(events);
235  _consumerAdmin->send(events);
236  assert(events.empty());
237 
238  _poaManager->activate();
239 
240  //
241  // COMMUNICATION PHASE - talk with clients' suppliers & consumers.
242  // Note: On Linux the resolution of nanosleep is a huge 10ms.
243  omni_thread::sleep(0,localCyclePeriod_ns);
244  }
245 }
246 
247 
249 {
250 #if OMNIEVENTS__DEBUG_REF_COUNTS
251  DB(20,"EventChannel_i::_add_ref()")
252 #endif
253  omni_mutex_lock pause(_lock);
254  ++_refCount;
255 }
256 
257 
259 {
260 #if OMNIEVENTS__DEBUG_REF_COUNTS
261  DB(20,"EventChannel_i::_remove_ref()")
262 #endif
263  int myref;
264  {
265  omni_mutex_lock pause(_lock);
266  myref = --_refCount;
267  }
268 
269  if(myref<0)
270  {
271  DB(2,"EventChannel has negative ref count! "<<myref)
272  }
273  else if(myref==0)
274  {
275  DB(15,"EventChannel has zero ref count -- shutdown.")
276  join(NULL);
277  }
278 }
279 
280 
281 void EventChannel_i::output(ostream& os)
282 {
283  CORBA::String_var poaName =_poa->the_name();
284  string name =string("ecf/")+poaName.in();
285  _properties.output(os,name);
286  if(_supplierAdmin)
287  _supplierAdmin->output(os);
288  if(_consumerAdmin)
289  _consumerAdmin->output(os);
290 }
291 
292 
293 void EventChannel_i::setInsName(const string v)
294 {
295  Mapper* newMapper =NULL;
296  try
297  {
298 
299  // If _insName is set, then create a mapper object to allow clients to
300  // find this object with a `corbaloc' string.
301  if(!v.empty())
302  {
303  // !! Throws when there is already an object named 'v' in the INSPOA.
304  CORBA::Object_var obj( _this() );
305  newMapper=new Mapper(v.c_str(),obj.in());
306  }
307  // Deactivate the old _mapper object.
308  if(_mapper)
309  _mapper->destroy();
310  _mapper=newMapper;
311 
312  }
313  catch(...)
314  {
315  // Can't use an auto_ptr, because MS VC++ 6 has no auto_ptr::reset()
316  delete newMapper;
317  throw;
318  }
319 }
320 
321 
322 void EventChannel_i::createPoa(const char* channelName)
323 {
324  using namespace PortableServer;
325  POA_ptr p=Orb::inst()._RootPOA.in();
326 
327  // POLICIES:
328  // Lifespan =PERSISTENT // we can persist
329  // Assignment =USER_ID // write our own oid
330  // Uniqueness =[default] UNIQUE_ID // one servant per object
331  // ImplicitActivation=[default] IMPLICIT_ACTIVATION // auto activation
332  // RequestProcessing =[default] USE_ACTIVE_OBJECT_MAP_ONLY
333  // ServantRetention =[default] RETAIN // stateless POA
334  // Thread =SINGLE_THREAD_MODEL // keep it simple
335 
336  CORBA::PolicyList policies;
337  policies.length(3);
338  policies[0]=p->create_lifespan_policy(PERSISTENT);
339  policies[1]=p->create_id_assignment_policy(USER_ID);
340  policies[2]=p->create_thread_policy(SINGLE_THREAD_MODEL);
341 
342  try // finally
343  {
344  try
345  {
346  // Create a new POA (and new POAManager) for this channel.
347  // The POAManager will be used for all of this channel's POAs.
348  _poa=p->create_POA(channelName,POAManager::_nil(),policies);
349  _poaManager=_poa->the_POAManager();
350  }
351  catch(POA::AdapterAlreadyExists& ex) // create_POA
352  {
353  DB(0,"EventChannel_i::createPoa() - POA::AdapterAlreadyExists")
354  throw;
355  }
356  catch(POA::InvalidPolicy& ex) // create_POA
357  {
358  DB(0,"EventChannel_i::createPoa() - POA::InvalidPolicy: "<<ex.index)
359  throw;
360  }
361  }
362  catch(...) // finally
363  {
364  // Destroy the policy objects (Not strictly necessary in omniORB)
365  for(CORBA::ULong i=0; i<policies.length(); ++i)
366  policies[i]->destroy();
367  throw;
368  }
369 
370  // Destroy the policy objects (Not strictly necessary in omniORB)
371  for(CORBA::ULong i=0; i<policies.length(); ++i)
372  policies[i]->destroy();
373 }
374 
375 
376 //
377 // class EventChannelStore
378 //
379 
380 
382 :_channels(),_lock()
383 {}
384 
386 {
387  // ?? IMPLEMENT ME
388 }
389 
391 {
392  omni_mutex_lock l(_lock);
393  bool insertOK =_channels.insert(channel).second;
394  if(!insertOK)
395  DB(2,"Attempted to store an EventChannel, when it is already stored.");
396 }
397 
399 {
400  omni_mutex_lock l(_lock);
401  set<EventChannel_i*>::iterator pos =_channels.find(channel);
402  if(pos==_channels.end())
403  DB(2,"Failed to erase unknown EventChannel.")
404  else
405  _channels.erase(pos);
406 }
407 
408 void EventChannelStore::output(ostream &os)
409 {
410  omni_mutex_lock l(_lock);
411  for(set<EventChannel_i*>::iterator i=_channels.begin();
412  i!=_channels.end();
413  ++i)
414  {
415  (*i)->output(os);
416  }
417 }
418 
419 
420 }; // end namespace OmniEvents
421 
PersistNode * child(const string &key) const
Definition: PersistNode.cc:171
void _remove_ref()
Shutdown the thread when refCount reaches zero.
#define NP_MINORSTRING(systemException)
Definition: Orb.h:52
OMNIEVENTS__DEBUG_REF_COUNTS__DECL void send(CORBA::Any *event)
Queues a single event for sending to consumers.
set< EventChannel_i * > _channels
Definition: EventChannel.h:218
A dummy servant that installs itself into the INSPOA and redirects all calls to the real destination...
Definition: Mapper.h:33
void mainLoop()
The main loop for a channel.
void erase(EventChannel_i *channel)
void disconnect()
Send disconnect_XXX_supplier() to all connected consumers.
void createPoa(const char *channelName)
Constructs the main POA for this channel.
EventChannel_i(EventChannelStore *store=NULL)
Definition: EventChannel.cc:69
void activate(const char *channelName, const PersistNode *node=NULL)
Creates the channel's POA, and any child objects.
Definition: EventChannel.cc:83
void output(ostream &os)
Save this object's state to a stream.
#define DB(l, x)
Definition: Orb.h:49
string attrString(const string &key, const string &fallback="") const
Definition: PersistNode.cc:155
void output(ostream &os)
void reincarnate(const PersistNode &node)
Populate this servant from log information.
map< string, string > _attr
Definition: PersistNode.h:72
PortableServer::POA_var _RootPOA
Definition: Orb.h:89
static bool exists()
Library code may create Event Service objects without the need for persistency.
Container for Event Channels.
Definition: EventChannel.h:209
ConsumerAdmin_i * _consumerAdmin
Definition: EventChannel.h:198
PortableServer::POAManager_var _poaManager
Definition: EventChannel.h:199
void output(ostream &os, string name) const
Definition: PersistNode.cc:44
SupplierAdmin_i * _supplierAdmin
Definition: EventChannel.h:197
EventChannelStore * _eventChannelStore
Definition: EventChannel.h:196
unsigned long cyclePeriod_ns() const
Definition: EventChannel.h:181
PortableServer::POA_var _poa
Definition: Servant.h:131
~EventChannel_i()
Cleans up the _poa, if this object is deleted before its thread starts.
void destroy()
Definition: Mapper.h:49
void activateObjectWithId(const char *oidStr)
Calls activate_object_with_id() to activate this servant in its POA.
Definition: Servant.cc:125
CosEventChannelAdmin::ConsumerAdmin_ptr for_consumers()
Definition: EventChannel.cc:35
static Orb & inst()
Definition: Orb.h:81
void disconnect()
Send disconnect_XXX_consumer() to all connected consumers.
Obtains an output stream to the active persistancy logfile, and locks it for exclusive access...
void insert(EventChannel_i *channel)
void * run_undetached(void *)
Entry point for the channel's thread.
CosEventChannelAdmin::SupplierAdmin_ptr for_suppliers()
Definition: EventChannel.cc:43
Base class for servants.
Definition: Servant.h:113
void setInsName(const string v)
Construct a new Mapper object, and registers it in the INSPOA.
void output(ostream &os)
Save this object's state to a stream.
OMNIEVENTS__DEBUG_REF_COUNTS__DECL void collect(list< CORBA::Any * > &events)
Collects all events that have arrived since the last call.
Servant for CosEventChannelAdmin::EventChannel objects, also inherits from omni_thread.
Definition: EventChannel.h:111
void reincarnate(const PersistNode &node)
Populate this servant from log information.