51 PortableServer::Servant
53 const PortableServer::ObjectId& oid,
54 PortableServer::POA_ptr poa
65 const PortableServer::ObjectId& oid,
66 PortableServer::POA_ptr adapter,
67 PortableServer::Servant serv,
68 CORBA::Boolean cleanup_in_progress,
69 CORBA::Boolean remaining_activations
76 omni_mutex_lock pause(
_lock);
78 assert(narrowed!=NULL);
79 set<Proxy*>::iterator pos =
_servants.find(narrowed);
83 narrowed->_remove_ref();
87 DB(1,
"\t\teh? - POA attempted to etherealize unknown servant.");
92 PortableServer::POA_ptr parentPoa,
96 omni_thread(NULL,PRIORITY_HIGH),
98 _lock(),_condition(&_lock),
107 DB(20,
"~ProxyPushSupplierManager()")
110 CosEventChannelAdmin::ProxyPushSupplier_ptr
113 return createNarrowedReference<CosEventChannelAdmin::ProxyPushSupplier>(
115 CosEventChannelAdmin::_tc_ProxyPushSupplier->id()
127 CosEventChannelAdmin::ProxyPushSupplier_var ppsv =pps->_this();
150 const unsigned long sleepTimeNanosec0 =0x8000;
151 const unsigned long maxSleepNanosec =0x800000;
152 unsigned long sleepTimeNanosec =sleepTimeNanosec0;
154 omni_mutex_lock conditionLock(
_lock);
179 sleepTimeNanosec=sleepTimeNanosec0;
185 if(sleepTimeNanosec<maxSleepNanosec)
186 sleepTimeNanosec<<=1;
187 unsigned long sec,nsec;
188 omni_thread::get_time(&sec,&nsec,0,sleepTimeNanosec);
198 catch (CORBA::SystemException& ex) {
199 DB(2,
"ProxyPushSupplierManager ignoring CORBA system exception"
202 catch (CORBA::Exception& ex) {
203 DB(2,
"ProxyPushSupplierManager ignoring CORBA exception"
207 DB(2,
"ProxyPushSupplierManager thread killed by unknown exception.")
216 #if OMNIEVENTS__DEBUG_REF_COUNTS
217 DB(20,
"ProxyPushSupplierManager::_add_ref()")
219 omni_mutex_lock pause(
_lock);
225 #if OMNIEVENTS__DEBUG_REF_COUNTS
226 DB(20,
"ProxyPushSupplierManager::_remove_ref()")
235 DB(2,
"ProxyPushSupplierManager has negative ref count! "<<myref)
239 DB(15,
"ProxyPushSupplierManager has zero ref count -- shutdown.")
250 CosEventComm::PushConsumer_ptr pushConsumer)
252 if(CORBA::is_nil(pushConsumer))
253 throw CORBA::BAD_PARAM();
254 if(!CORBA::is_nil(
_target) || !CORBA::is_nil(
_req))
255 throw CosEventChannelAdmin::AlreadyConnected();
256 _target=CosEventComm::PushConsumer::_duplicate(pushConsumer);
260 CORBA::Request_var req =
_target->_request(
"_is_a");
261 req->add_in_arg() <<= CosEventChannelAdmin::_tc_ProxyPushConsumer->id();
262 req->set_return_type(CORBA::_tc_boolean);
263 req->send_deferred();
276 DB(5,
"ProxyPushSupplier_i::disconnect_push_supplier()");
277 eraseKey(
"ConsumerAdmin/ProxyPushSupplier");
281 throw CORBA::OBJECT_NOT_EXIST(
288 CORBA::Request_var req=
_target->_request(
"disconnect_push_consumer");
289 _target=CosEventComm::PushConsumer::_nil();
290 req->send_deferred();
297 PortableServer::POA_ptr poa,
302 _target(CosEventComm::PushConsumer::_nil()),
303 _targetIsProxy(false)
310 DB(20,
"~ProxyPushSupplier_i()")
317 if(!CORBA::is_nil(_req) && _req->poll_response())
319 CORBA::Environment_ptr env=_req->env();
320 if(!CORBA::is_nil(env) && env->exception())
323 CORBA::Exception* ex =env->exception();
324 DB(10,
"ProxyPushSupplier got exception" IF_OMNIORB4(
": "<<ex->_name()) );
326 _req=CORBA::Request::_nil();
329 CORBA::Request_var req=_target->_request(
"disconnect_push_consumer");
330 req->send_deferred();
333 _target=CosEventComm::PushConsumer::_nil();
334 eraseKey(
"ConsumerAdmin/ProxyPushSupplier");
338 _req=CORBA::Request::_nil();
341 if(CORBA::is_nil(_req) && !CORBA::is_nil(_target) && moreEvents())
343 _req=_target->_request(
"push");
344 _req->add_in_arg() <<= *(nextEvent());
345 _req->send_deferred();
348 if(!CORBA::is_nil(_req))
359 DB(2,
"WARNING: Multiple connections to ProxyPushSupplier.");
361 else if(req->return_value()>>=CORBA::Any::to_boolean(
_targetIsProxy))
367 DB(15,
"ProxyPushSupplier is federated.");
372 DB(2,
"ProxyPushSupplier got unexpected callback.");
385 using namespace CosEventChannelAdmin;
388 CosEventComm::PushConsumer_var pushConsumer =
389 string_to_<CosEventComm::PushConsumer>(ior.c_str());
399 DB(15,
"Attempting to reconnect ProxyPushSupplier: "<<oid.c_str())
402 ProxyPushConsumer_var proxyCons =
403 string_to_<ProxyPushConsumer>(ior.c_str());
404 CosEventComm::PushSupplier_var thisSupp =_this();
405 proxyCons->connect_push_supplier(thisSupp);
406 DB(7,
"Reconnected ProxyPushSupplier: "<<oid.c_str())
409 catch(CosEventChannelAdmin::AlreadyConnected&){
411 DB(7,
"Remote ProxyPushConsumer already connected: "<<oid.c_str())
413 catch(CosEventChannelAdmin::TypeError&){
415 DB(2,
"Remote ProxyPushConsumer threw TypeError: "<<oid.c_str())
417 catch(CORBA::OBJECT_NOT_EXIST&) {}
418 catch(CORBA::TRANSIENT& ) {}
419 catch(CORBA::COMM_FAILURE& ) {}
426 os,
"ConsumerAdmin/ProxyPushSupplier",
void disconnect_push_supplier()
void etherealize(const PortableServer::ObjectId &oid, PortableServer::POA_ptr adapter, PortableServer::Servant serv, CORBA::Boolean cleanup_in_progress, CORBA::Boolean remaining_activations)
Pauses the thread, and then calls the parent's implementation.
Base class for three of the four Proxy servants.
PortableServer::POA_var _managedPoa
The POA owned & managed by this object.
The EventQueue is a circular buffer, that contains _size-1 events.
#define NP_MINORSTRING(systemException)
void disconnect()
Send disconnect_push_consumer() to all connected PushConsumers.
PortableServer::Servant incarnate(const PortableServer::ObjectId &oid, PortableServer::POA_ptr poa)
void activate(const char *name)
Creates the Proxy-type's POA, and registers this object as its ServantManager.
Base class for ServantActivator classes that manage Proxy servants.
void * run_undetached(void *)
#define IF_OMNIORB4(omniORB4_code)
CosEventComm::PushConsumer_var _target
string attrString(const string &key, const string &fallback="") const
void deferredRequest(CORBA::Request_ptr req, Callback *callback=NULL)
Adopts the request and then stores it in _deferredRequests.
void deactivateObject()
Calls deactivate_object() to deactivate this servant in its POA.
void basicOutput(ostream &os, const char *name, CORBA::Object_ptr target=CORBA::Object::_nil(), const char *extraAttributes=NULL)
Helper method for constructing persistency output.
void callback(CORBA::Request_ptr req)
Sets _targetIsProxy, if it is.
void reincarnate(const string &oid, const PersistNode &node)
Re-create a servant from information saved in the log file.
static bool exists()
Library code may create Event Service objects without the need for persistency.
CosEventChannelAdmin::ProxyPushSupplier_ptr createObject()
~ProxyPushSupplierManager()
void eraseKey(const char *name)
Helper method for constructing persistency output.
omni_mutex_kcol(omni_mutex &m)
ProxyPushSupplierManager(PortableServer::POA_ptr parentPoa, EventQueue &q)
void activateObjectWithId(const char *oidStr)
Calls activate_object_with_id() to activate this servant in its POA.
Helper class that locks ProxyPushSupplier upon construction, and wakes it up on destruction.
set< Proxy * > _servants
The set of all active Proxies in this object's _managedPoa.
long attrLong(const string &key, long fallback=0) const
#define OMNIEVENTS__DEBUG_REF_COUNTS__DEFN(C)
Defines debug versions of _add/remove_ref() for class C.
Obtains an output stream to the active persistancy logfile, and locks it for exclusive access...
OMNIEVENTS__DEBUG_REF_COUNTS__DECL void trigger(bool &busy, bool &waiting)
Sets 'busy' if some work was done.
#define IFELSE_OMNIORB4(omniORB4_code, default_code)
void _remove_ref()
Shutdown the thread when refCount reaches zero.
#define HERE
Generates a string literal that describes the filename and line number.
void connect_push_consumer(CosEventComm::PushConsumer_ptr pushConsumer)
bool _targetIsProxy
TRUE if _target is a ProxyPushConsumer.
The opposite of omni_mutex_lock, unlocks the mutex upon construction and re-locks it upon destruction...
void output(ostream &os)
Save this object's state to a stream.
void reportObjectFailure(const char *here, CORBA::Object_ptr obj, CORBA::Exception *ex)
Called by omniEvents when an object has failed (fatal exception).
omni_mutex_kcol & operator=(const omni_mutex_kcol &)
omni_condition _condition
ProxyPushSupplier_i(PortableServer::POA_ptr poa, EventQueue &q)