Main Page   Namespace List   Class Hierarchy   Compound List   File List   Namespace Members   Compound Members   File Members  

ProxyPushConsumer.cc

Go to the documentation of this file.
00001 //                            Package   : omniEvents
00002 // ProxyPushConsumer.cc       Created   : 2003/12/04
00003 //                            Author    : Alex Tingle
00004 //
00005 //    Copyright (C) 2003 Alex Tingle.
00006 //
00007 //    This file is part of the omniEvents application.
00008 //
00009 //    omniEvents is free software; you can redistribute it and/or
00010 //    modify it under the terms of the GNU Lesser General Public
00011 //    License as published by the Free Software Foundation; either
00012 //    version 2.1 of the License, or (at your option) any later version.
00013 //
00014 //    omniEvents is distributed in the hope that it will be useful,
00015 //    but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 //    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00017 //    Lesser General Public License for more details.
00018 //
00019 //    You should have received a copy of the GNU Lesser General Public
00020 //    License along with this library; if not, write to the Free Software
00021 //    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 //
00023 
00024 #include "ProxyPushConsumer.h"
00025 #include "ConsumerAdmin.h"
00026 #include "Orb.h"
00027 #include "omniEventsLog.h"
00028 #include "PersistNode.h"
00029 
00030 #include <assert.h>
00031 
00032 namespace OmniEvents {
00033 
00034 void ProxyPushConsumer_i::connect_push_supplier(
00035   CosEventComm::PushSupplier_ptr pushSupplier)
00036 {
00037   // pushSupplier is permitted to be nil.
00038   if(CORBA::is_nil(pushSupplier))
00039       return;
00040 
00041   string oidstr =currentObjectId();
00042   Connections_t::iterator pos =_connections.find(oidstr);
00043 
00044   if(pos!=_connections.end())
00045       throw CosEventChannelAdmin::AlreadyConnected();
00046 
00047   Connection& newConnection =
00048     _connections.insert(Connections_t::value_type(
00049       oidstr,
00050       Connection(
00051         _channelName.in(),
00052         oidstr,
00053         CosEventComm::PushSupplier::_duplicate(pushSupplier)
00054       )
00055     )).first->second; // insert() returns pair<iterator,bool>
00056 
00057   // Test to see whether pushSupplier is a ProxyPushSupplier.
00058   // If so, then we will aggressively try to reconnect, when we are reincarnated
00059   CORBA::Request_var req =pushSupplier->_request("_is_a");
00060   req->add_in_arg() <<= CosEventChannelAdmin::_tc_ProxyPushSupplier->id();
00061   req->set_return_type(CORBA::_tc_boolean);
00062   req->send_deferred();
00063   Orb::inst().deferredRequest(req._retn(),&newConnection); // Register callback
00064 
00065   newConnection.output(WriteLock().os);
00066 }
00067 
00068 
00069 void ProxyPushConsumer_i::disconnect_push_consumer()
00070 {
00071 #ifdef HAVE_OMNIORB4
00072   DB(5,"ProxyPushConsumer_i::disconnect_push_consumer()")
00073   string oidstr =currentObjectId();
00074   Connections_t::iterator pos =_connections.find(oidstr);
00075 
00076   if(pos!=_connections.end())
00077   {
00078     CORBA::Request_var req =
00079       pos->second._target->_request("disconnect_push_supplier");
00080     req->send_deferred();
00081     Orb::inst().deferredRequest(req._retn());
00082     // Erase this connection from the log file.
00083     {
00084       WriteLock log;
00085       log.os<<"-ecf/"<<_channelName.in();
00086       log.os<<"/SupplierAdmin/ProxyPushConsumer/"<<oidstr<<'\n';
00087     }
00088     _connections.erase(pos);
00089   }
00090 #else /* Silently ignore disconnects with omniORB3 */
00091   DB(5,"Ignoring disconnect_push_consumer(). Upgrade to omniORB4!")
00092 #endif
00093 }
00094 
00095 
00096 void ProxyPushConsumer_i::push(const CORBA::Any& event)
00097 {
00098 #ifdef OMNIEVENTS_REAL_TIME_PUSH
00099   if(!_useLocalQueue)
00100   {
00101     _consumerAdmin.send(new CORBA::Any(event));
00102     _useLocalQueue=true;
00103   }
00104   else
00105 #endif
00106     _queue.push_back(new CORBA::Any(event));
00107 }
00108 
00109 
00110 ProxyPushConsumer_i::ProxyPushConsumer_i(
00111   PortableServer::POA_ptr p,
00112   list<CORBA::Any*>&      q,
00113   ConsumerAdmin_i&        consumerAdmin
00114 )
00115 : Servant(PortableServer::POA::_nil()),
00116   _connections(),
00117   _channelName(p->the_name()),
00118   _consumerAdmin(consumerAdmin),
00119   _queue(q),
00120   _useLocalQueue(false)
00121 {
00122   using namespace PortableServer;
00123   try
00124   {  
00125     // POLICIES:
00126     //  Lifespan          =PERSISTENT             // we can persist
00127     //  Assignment        =USER_ID                // write our own oid
00128     //  Uniqueness        =MULTIPLE_ID            // only one servant
00129     //  ImplicitActivation=NO_IMPLICIT_ACTIVATION // disable auto activation
00130     //  RequestProcessing =USE_DEFAULT_SERVANT    // only one servant
00131     //  ServantRetention  =NON_RETAIN             // stateless POA
00132     //  Thread            =SINGLE_THREAD_MODEL    // keep it simple
00133 
00134     CORBA::PolicyList policies;
00135     policies.length(7);
00136     policies[0]=p->create_lifespan_policy(PERSISTENT);
00137     policies[1]=p->create_id_assignment_policy(USER_ID);
00138     policies[2]=p->create_id_uniqueness_policy(MULTIPLE_ID);
00139     policies[3]=p->create_implicit_activation_policy(NO_IMPLICIT_ACTIVATION);
00140     policies[4]=p->create_request_processing_policy(USE_DEFAULT_SERVANT);
00141     policies[5]=p->create_servant_retention_policy(NON_RETAIN);
00142     policies[6]=p->create_thread_policy(SINGLE_THREAD_MODEL);
00143 
00144     // Create a POA for this proxy type in this channel.
00145     string          poaName =string(_channelName.in())+".ProxyPushConsumer";
00146     POAManager_var  parentManager =p->the_POAManager();
00147     _poa=p->create_POA(poaName.c_str(),parentManager.in(),policies);
00148   }
00149   catch(POA::AdapterAlreadyExists&) // create_POA
00150   {
00151     DB(0,"ProxyPushConsumer_i::ProxyPushConsumer_i() - "
00152           "POA::AdapterAlreadyExists")
00153   }
00154   catch(POA::InvalidPolicy& ex) // create_POA
00155   {
00156     DB(0,"ProxyPushConsumer_i::ProxyPushConsumer_i() - "
00157           "POA::InvalidPolicy: "<<ex.index)
00158   }
00159   // This object is the POA's default servant.
00160   _poa->set_servant(this);
00161 }
00162 
00163 
00164 ProxyPushConsumer_i::~ProxyPushConsumer_i()
00165 {
00166   DB(20,"~ProxyPushConsumer_i()")
00167 }
00168 
00169 
00170 CosEventChannelAdmin::ProxyPushConsumer_ptr
00171 ProxyPushConsumer_i::createObject()
00172 {
00173   return createNarrowedReference<CosEventChannelAdmin::ProxyPushConsumer>(
00174            _poa.in(),
00175            CosEventChannelAdmin::_tc_ProxyPushConsumer->id()
00176          );
00177 }
00178 
00179 
00180 void ProxyPushConsumer_i::disconnect()
00181 {
00182   Connections_t::iterator curr,next=_connections.begin();
00183   while(next!=_connections.end())
00184   {
00185     curr=next++;
00186     CORBA::Request_var req =
00187       curr->second._target->_request("disconnect_push_supplier");
00188     req->send_deferred();
00189     Orb::inst().deferredRequest(req._retn());
00190     _connections.erase(curr);
00191   }
00192 }
00193 
00194 
00195 void ProxyPushConsumer_i::reincarnate(const PersistNode& node)
00196 {
00197   // Reincarnate all connections from node's children.
00198   for(map<string,PersistNode*>::const_iterator i=node._child.begin();
00199       i!=node._child.end();
00200       ++i)
00201   {
00202     const char* oidstr =i->first.c_str();
00203     string      ior( i->second->attrString("IOR") );
00204     bool        isProxy( i->second->attrLong("proxy") );
00205     assert(_connections.find(oidstr)==_connections.end());
00206     try
00207     {
00208       using namespace CosEventComm;
00209       using namespace CosEventChannelAdmin;
00210 
00211       PushSupplier_var supp =string_to_<PushSupplier>(ior.c_str());
00212       _connections.insert(Connections_t::value_type(
00213         oidstr,
00214         Connection(_channelName.in(),oidstr,supp._retn(),isProxy)
00215       ));
00216       DB(5,"Reincarnated ProxyPushConsumer: "<<oidstr)
00217 
00218       // If supp is a ProxyPushSupplier, then try to reconnect.
00219       if(isProxy)
00220       {
00221         DB(15,"Attempting to reconnect ProxyPushConsumer: "<<oidstr)
00222         // This will only work if the proxy is implemented in the same way as
00223         // omniEvents, so connect_() automatically creates a proxy.
00224         ProxyPushSupplier_var proxySupp =
00225           string_to_<ProxyPushSupplier>(ior.c_str());
00226         PortableServer::ObjectId_var objectId =
00227           PortableServer::string_to_ObjectId(oidstr);
00228         CORBA::Object_var obj =
00229           _poa->create_reference_with_id(
00230             objectId.in(),
00231             CosEventChannelAdmin::_tc_ProxyPushConsumer->id()
00232           );
00233         PushConsumer_var thisCons =CosEventComm::PushConsumer::_narrow(obj);
00234         proxySupp->connect_push_consumer(thisCons.in());
00235         DB(7,"Reconnected ProxyPushConsumer: "<<oidstr)
00236       }
00237     }
00238     catch(CORBA::BAD_PARAM&) {
00239       // This will happen when IOR fails to narrow.
00240       DB(5,"Failed to reincarnate ProxyPushConsumer: "<<oidstr)
00241     }
00242     catch(CosEventChannelAdmin::AlreadyConnected&){ //connect_push_consumer()
00243       // The supplier doesn't need to be reconnected.
00244       DB(7,"Remote ProxyPushSupplier already connected: "<<oidstr)
00245     }
00246     catch(CosEventChannelAdmin::TypeError&){ // connect_push_consumer()
00247       // Don't know what to make of this...
00248       DB(2,"Remote ProxyPushSupplier threw TypeError: "<<oidstr)
00249     }
00250     catch(CORBA::OBJECT_NOT_EXIST&) {} // object 'supp' not responding.
00251     catch(CORBA::TRANSIENT&       ) {} // object 'supp' not responding.
00252     catch(CORBA::COMM_FAILURE&    ) {} // object 'supp' not responding.
00253   } // end loop for(i)
00254 }
00255 
00256 
00257 void ProxyPushConsumer_i::output(ostream& os) const
00258 {
00259   for(Connections_t::const_iterator i=_connections.begin();
00260       i!=_connections.end();
00261       ++i)
00262   {
00263     i->second.output(os);
00264   }
00265 }
00266 
00267 
00268 string ProxyPushConsumer_i::currentObjectId() const
00269 {
00270 #ifdef HAVE_OMNIORB4
00271   try
00272   {
00273     using namespace PortableServer;
00274     ObjectId_var oid =Orb::inst()._POACurrent->get_object_id();
00275     CORBA::String_var oidStr =ObjectId_to_string(oid.in());
00276     return string(oidStr.in());
00277   }
00278   catch(PortableServer::Current::NoContext&) // get_object_id()
00279   {
00280     DB(0,"No context!!")
00281   }
00282   catch(CORBA::BAD_PARAM&) // ObjectId_to_string()
00283   {
00284     // Should never get here in omniORB, because ObjectID is a char*.
00285     assert(0);
00286   }
00287   return "ERROR";
00288 #else
00289   throw CORBA::NO_IMPLEMENT();
00290 #endif
00291 }
00292 
00293 
00294 //
00295 //  ProxyPushConsumer_i::Connection
00296 //
00297 
00298 ProxyPushConsumer_i::Connection::Connection(
00299   const char*                    channelName,
00300   const string&                  oidstr,
00301   CosEventComm::PushSupplier_ptr pushSupplier,
00302   bool                           isProxy
00303 ):_channelName(channelName),
00304   _oidstr(oidstr),
00305   _target(pushSupplier),
00306   _targetIsProxy(isProxy)
00307 {
00308   // pass
00309 }
00310 
00311 void ProxyPushConsumer_i::Connection::callback(CORBA::Request_ptr req)
00312 {
00313   bool save =_targetIsProxy;
00314   if(req->return_value()>>=CORBA::Any::to_boolean(_targetIsProxy))
00315   {
00316     if(_targetIsProxy)
00317     {
00318       output(WriteLock().os);
00319       DB(15,"ProxyPushConsumer is federated.");
00320     }
00321   }
00322   else
00323   {
00324     DB(2,"ProxyPushConsumer got unexpected callback.");
00325     _targetIsProxy=save; // Reset it just to be sure.
00326   }
00327 }
00328 
00329 void ProxyPushConsumer_i::Connection::output(ostream& os) const
00330 {
00331   os<<"ecf/"<<_channelName;
00332   os<<"/SupplierAdmin/ProxyPushConsumer/"<<_oidstr;
00333 
00334   if(!CORBA::is_nil(_target.in()))
00335   {
00336     CORBA::String_var iorstr;
00337     iorstr = Orb::inst()._orb->object_to_string(_target.in());
00338     os<<" IOR="<<iorstr.in();
00339     if(_targetIsProxy)
00340         os<<" proxy=1";
00341   }
00342   os<<" ;;\n";
00343 }
00344 
00345 
00346 }; // end namespace OmniEvents

Generated on Fri Nov 19 17:42:20 2004 for OmniEvents by doxygen1.2.15