00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "ProxyPushSupplier.h"
00025 #include "Orb.h"
00026 #include "omniEventsLog.h"
00027 #include "PersistNode.h"
00028 #include <assert.h>
00029
00030 namespace OmniEvents {
00031
00035 class omni_mutex_kcol {
00036 omni_mutex& mutex;
00037 public:
00038 omni_mutex_kcol(omni_mutex& m) : mutex(m) { mutex.unlock(); }
00039 ~omni_mutex_kcol(void) { mutex.lock(); }
00040 private:
00041
00042 omni_mutex_kcol(const omni_mutex_kcol&);
00043 omni_mutex_kcol& operator=(const omni_mutex_kcol&);
00044 };
00045
00046
00047
00048
00049
00050
00051 PortableServer::Servant
00052 ProxyPushSupplierManager::incarnate(
00053 const PortableServer::ObjectId& oid,
00054 PortableServer::POA_ptr poa
00055 )
00056 {
00057 ProxyPushSupplier_i* result =new ProxyPushSupplier_i(_managedPoa,_queue);
00058 PauseThenWake p(this);
00059 _servants.insert(result);
00060 return result;
00061 }
00062
00063 void
00064 ProxyPushSupplierManager::etherealize(
00065 const PortableServer::ObjectId& oid,
00066 PortableServer::POA_ptr adapter,
00067 PortableServer::Servant serv,
00068 CORBA::Boolean cleanup_in_progress,
00069 CORBA::Boolean remaining_activations
00070 )
00071 {
00072 omni_mutex_lock pause(_lock);
00073 ProxyManager::etherealize(oid,adapter,serv,
00074 cleanup_in_progress,remaining_activations);
00075 }
00076
00077 ProxyPushSupplierManager::ProxyPushSupplierManager(
00078 PortableServer::POA_ptr parentPoa,
00079 EventQueue& q
00080 )
00081 : ProxyManager(parentPoa,"ProxyPushSupplier"),
00082 omni_thread(NULL,PRIORITY_HIGH),
00083 _queue(q),
00084 _lock(),_condition(&_lock),
00085 _refCount(1)
00086 {
00087 start();
00088 }
00089
00090 ProxyPushSupplierManager::~ProxyPushSupplierManager()
00091 {
00092 DB(20,"~ProxyPushSupplierManager()")
00093 }
00094
00095 CosEventChannelAdmin::ProxyPushSupplier_ptr
00096 ProxyPushSupplierManager::createObject()
00097 {
00098 return createNarrowedReference<CosEventChannelAdmin::ProxyPushSupplier>(
00099 _managedPoa.in(),
00100 CosEventChannelAdmin::_tc_ProxyPushSupplier->id()
00101 );
00102 }
00103
00104 void ProxyPushSupplierManager::disconnect()
00105 {
00106 for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
00107 {
00108 Proxy* p =*i;
00109 ProxyPushSupplier_i* pps =static_cast<ProxyPushSupplier_i*>(p);
00110 pps->disconnect_push_supplier();
00111 }
00112 }
00113
00114 void
00115 ProxyPushSupplierManager::run(void*)
00116 {
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131
00132 const unsigned long sleepTimeNanosec0 =0x8000;
00133 const unsigned long maxSleepNanosec =0x800000;
00134 unsigned long sleepTimeNanosec =sleepTimeNanosec0;
00135
00136 omni_mutex_lock conditionLock(_lock);
00137 while(true)
00138 {
00139 try {
00140 if(_refCount<1)
00141 break;
00142
00143 bool busy=false;
00144 bool waiting=false;
00145
00146
00147 for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
00148 {
00149 Proxy* p =*i;
00150 ProxyPushSupplier_i* pps =static_cast<ProxyPushSupplier_i*>(p);
00151 pps->trigger(busy,waiting);
00152 }
00153
00154 if(busy)
00155 {
00156
00157
00158
00159 omni_mutex_kcol l(_lock);
00160 omni_thread::yield();
00161
00162 sleepTimeNanosec=sleepTimeNanosec0;
00163 }
00164 else if(waiting)
00165 {
00166
00167
00168 if(sleepTimeNanosec<maxSleepNanosec)
00169 sleepTimeNanosec<<=1;
00170 unsigned long sec,nsec;
00171 omni_thread::get_time(&sec,&nsec,0,sleepTimeNanosec);
00172 _condition.timedwait(sec,nsec);
00173 }
00174 else
00175 {
00176
00177 _condition.wait();
00178 }
00179
00180 }
00181 catch (CORBA::SystemException& ex) {
00182 DB(2,"ProxyPushSupplierManager ignoring CORBA system exception"
00183 IF_OMNIORB4(": "<<ex._name()<<" ("<<NP_MINORSTRING(ex)<<")") ".")
00184 }
00185 catch (CORBA::Exception& ex) {
00186 DB(2,"ProxyPushSupplierManager ignoring CORBA exception"
00187 IF_OMNIORB4(": "<<ex._name()<<) ".")
00188 }
00189 catch(...) {
00190 DB(2,"ProxyPushSupplierManager thread killed by unknown exception.")
00191 break;
00192 }
00193 }
00194 }
00195
00196 void ProxyPushSupplierManager::_add_ref()
00197 {
00198 omni_mutex_lock pause(_lock);
00199 ++_refCount;
00200 }
00201
00202 void ProxyPushSupplierManager::_remove_ref()
00203 {
00204 int myref;
00205 {
00206 PauseThenWake p(this);
00207 myref = --_refCount;
00208 }
00209 if(myref<0)
00210 DB(2,"ProxyPushSupplierManager has negative ref count! "<<myref)
00211 else if(myref==0)
00212 DB(15,"ProxyPushSupplierManager has zero ref count -- shutdown.")
00213 }
00214
00215
00216
00217
00218
00219
00220 void ProxyPushSupplier_i::connect_push_consumer(
00221 CosEventComm::PushConsumer_ptr pushConsumer)
00222 {
00223 if(CORBA::is_nil(pushConsumer))
00224 throw CORBA::BAD_PARAM();
00225 if(!CORBA::is_nil(_target) || !CORBA::is_nil(_req))
00226 throw CosEventChannelAdmin::AlreadyConnected();
00227 _target=CosEventComm::PushConsumer::_duplicate(pushConsumer);
00228
00229
00230
00231 CORBA::Request_var req =_target->_request("_is_a");
00232 req->add_in_arg() <<= CosEventChannelAdmin::_tc_ProxyPushConsumer->id();
00233 req->set_return_type(CORBA::_tc_boolean);
00234 req->send_deferred();
00235 Orb::inst().deferredRequest(req._retn(),this);
00236
00237 output(WriteLock().os);
00238 }
00239
00240
00241 void ProxyPushSupplier_i::disconnect_push_supplier()
00242 {
00243 DB(5,"ProxyPushSupplier_i::disconnect_push_supplier()");
00244 eraseKey("ConsumerAdmin/ProxyPushSupplier");
00245 deactivateObject();
00246 if(CORBA::is_nil(_target))
00247 {
00248 throw CORBA::OBJECT_NOT_EXIST(
00249 IFELSE_OMNIORB4(omni::OBJECT_NOT_EXIST_NoMatch,0),
00250 CORBA::COMPLETED_NO
00251 );
00252 }
00253 else
00254 {
00255 CORBA::Request_var req=_target->_request("disconnect_push_consumer");
00256 req->send_deferred();
00257 Orb::inst().deferredRequest(req._retn());
00258 _target=CosEventComm::PushConsumer::_nil();
00259 }
00260 }
00261
00262
00263 ProxyPushSupplier_i::ProxyPushSupplier_i(
00264 PortableServer::POA_ptr poa,
00265 EventQueue& q
00266 )
00267 : Proxy(poa),
00268 EventQueue::Reader(q),
00269 _target(CosEventComm::PushConsumer::_nil()),
00270 _targetIsProxy(false)
00271 {
00272
00273 }
00274
00275 ProxyPushSupplier_i::~ProxyPushSupplier_i()
00276 {
00277 DB(20,"~ProxyPushSupplier_i()")
00278 }
00279
00280 inline void ProxyPushSupplier_i::trigger(bool& busy, bool& waiting)
00281 {
00282 if(!CORBA::is_nil(_req) && _req->poll_response())
00283 {
00284 CORBA::Environment_ptr env=_req->env();
00285 if(!CORBA::is_nil(env) && env->exception())
00286 {
00287
00288 CORBA::Exception* ex =env->exception();
00289 DB(10,"ProxyPushSupplier got exception" IF_OMNIORB4(": "<<ex->_name()) );
00290 Orb::inst().reportObjectFailure(HERE,_target.in(),ex);
00291 _req=CORBA::Request::_nil();
00292
00293
00294 CORBA::Request_var req=_target->_request("disconnect_push_consumer");
00295 req->send_deferred();
00296 Orb::inst().deferredRequest(req._retn());
00297
00298 _target=CosEventComm::PushConsumer::_nil();
00299 eraseKey("ConsumerAdmin/ProxyPushSupplier");
00300 deactivateObject();
00301 return;
00302 }
00303 _req=CORBA::Request::_nil();
00304 busy=true;
00305 }
00306 if(CORBA::is_nil(_req) && !CORBA::is_nil(_target) && moreEvents())
00307 {
00308 _req=_target->_request("push");
00309 _req->add_in_arg() <<= *(nextEvent());
00310 _req->send_deferred();
00311 busy=true;
00312 }
00313 if(!CORBA::is_nil(_req))
00314 waiting=true;
00315 }
00316
00317
00318 void ProxyPushSupplier_i::callback(CORBA::Request_ptr req)
00319 {
00320 if(_targetIsProxy)
00321 {
00322
00323
00324 DB(2,"WARNING: Multiple connections to ProxyPushSupplier.");
00325 }
00326 else if(req->return_value()>>=CORBA::Any::to_boolean(_targetIsProxy))
00327 {
00328 if(_targetIsProxy)
00329 {
00330 output(WriteLock().os);
00331 DB(15,"ProxyPushSupplier is federated.");
00332 }
00333 }
00334 else
00335 {
00336 DB(2,"ProxyPushSupplier got unexpected callback.");
00337 _targetIsProxy=false;
00338 }
00339 }
00340
00341
00342 void ProxyPushSupplier_i::reincarnate(
00343 const string& oid,
00344 const PersistNode& node
00345 )
00346 {
00347 try
00348 {
00349 using namespace CosEventChannelAdmin;
00350
00351 string ior( node.attrString("IOR").c_str() );
00352 CosEventComm::PushConsumer_var pushConsumer =
00353 string_to_<CosEventComm::PushConsumer>(ior.c_str());
00354
00355 activateObjectWithId(oid.c_str());
00356 _target=pushConsumer._retn();
00357 _targetIsProxy=bool(node.attrLong("proxy"));
00358
00359
00360 if(_targetIsProxy)
00361 {
00362 DB(15,"Attempting to reconnect ProxyPushSupplier: "<<oid.c_str())
00363
00364
00365 ProxyPushConsumer_var proxyCons =
00366 string_to_<ProxyPushConsumer>(ior.c_str());
00367 CosEventComm::PushSupplier_var thisSupp =_this();
00368 proxyCons->connect_push_supplier(thisSupp);
00369 DB(7,"Reconnected ProxyPushSupplier: "<<oid.c_str())
00370 }
00371 }
00372 catch(CosEventChannelAdmin::AlreadyConnected&){
00373
00374 DB(7,"Remote ProxyPushConsumer already connected: "<<oid.c_str())
00375 }
00376 catch(CosEventChannelAdmin::TypeError&){
00377
00378 DB(2,"Remote ProxyPushConsumer threw TypeError: "<<oid.c_str())
00379 }
00380 catch(CORBA::OBJECT_NOT_EXIST&) {}
00381 catch(CORBA::TRANSIENT& ) {}
00382 catch(CORBA::COMM_FAILURE& ) {}
00383 }
00384
00385
00386 void ProxyPushSupplier_i::output(ostream &os)
00387 {
00388 basicOutput(
00389 os,"ConsumerAdmin/ProxyPushSupplier",
00390 _target.in(),
00391 _targetIsProxy? " proxy=1": NULL
00392 );
00393 }
00394
00395
00396 };