00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "ProxyPullSupplier.h"
00025 #include "EventChannel.h"
00026 #include "Orb.h"
00027 #include "omniEventsLog.h"
00028 #include "PersistNode.h"
00029 #include <assert.h>
00030
00031 namespace OmniEvents {
00032
00033
00034
00035
00036
00037 PortableServer::Servant ProxyPullSupplierManager::incarnate(
00038 const PortableServer::ObjectId& oid,
00039 PortableServer::POA_ptr poa
00040 )
00041 {
00042
00043 if(_servants.size()>=_channel.maxNumProxies())
00044 {
00045 ProxyPullSupplier_i* oldest =NULL;
00046 unsigned long age =0;
00047 for(set<Proxy*>::iterator i=_servants.begin(); i!=_servants.end(); ++i)
00048 if(!oldest || dynamic_cast<ProxyPullSupplier_i*>(*i)->timestamp()<age)
00049 {
00050 oldest=dynamic_cast<ProxyPullSupplier_i*>(*i);
00051 age=oldest->timestamp();
00052 }
00053 DB(5,"Evicting oldest ProxyPullSupplier to make space for a new one")
00054 try{ oldest->disconnect_pull_supplier(); }catch(CORBA::OBJECT_NOT_EXIST&){}
00055 }
00056
00057 ProxyPullSupplier_i* result =new ProxyPullSupplier_i(_managedPoa,_queue);
00058 _servants.insert(result);
00059 return result;
00060 }
00061
00062 ProxyPullSupplierManager::ProxyPullSupplierManager(
00063 const EventChannel_i& channel,
00064 PortableServer::POA_ptr parentPoa,
00065 EventQueue& q
00066 )
00067 : ProxyManager(parentPoa,"ProxyPullSupplier"),
00068 _queue(q),
00069 _channel(channel)
00070 {
00071
00072 }
00073
00074 ProxyPullSupplierManager::~ProxyPullSupplierManager()
00075 {
00076 DB(20,"~ProxyPullSupplierManager()")
00077 }
00078
00079 CosEventChannelAdmin::ProxyPullSupplier_ptr
00080 ProxyPullSupplierManager::createObject()
00081 {
00082 return createNarrowedReference<CosEventChannelAdmin::ProxyPullSupplier>(
00083 _managedPoa.in(),
00084 CosEventChannelAdmin::_tc_ProxyPullSupplier->id()
00085 );
00086 }
00087
00088 void ProxyPullSupplierManager::disconnect()
00089 {
00090 for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
00091 {
00092 ProxyPullSupplier_i* narrowed =dynamic_cast<ProxyPullSupplier_i*>(*i);
00093 narrowed->disconnect_pull_supplier();
00094 }
00095 }
00096
00097
00098
00099
00100
00101
00102
00103
00104 void ProxyPullSupplier_i::connect_pull_consumer(
00105 CosEventComm::PullConsumer_ptr pullConsumer
00106 )
00107 {
00108 if(_connected || !CORBA::is_nil(_target) || !CORBA::is_nil(_req))
00109 throw CosEventChannelAdmin::AlreadyConnected();
00110 touch();
00111 _connected=true;
00112 if(!CORBA::is_nil(pullConsumer))
00113 _target=CosEventComm::PullConsumer::_duplicate(pullConsumer);
00114
00115 output(WriteLock().os);
00116 }
00117
00118 void ProxyPullSupplier_i::disconnect_pull_supplier()
00119 {
00120 DB(5,"ProxyPullSupplier_i::disconnect_pull_supplier()");
00121 touch();
00122 eraseKey("ConsumerAdmin/ProxyPullSupplier");
00123 deactivateObject();
00124 if(!_connected)
00125 {
00126 throw CORBA::OBJECT_NOT_EXIST(
00127 IFELSE_OMNIORB4(omni::OBJECT_NOT_EXIST_NoMatch,0),
00128 CORBA::COMPLETED_NO
00129 );
00130 }
00131 else if(!CORBA::is_nil(_target))
00132 {
00133 CORBA::Request_var req=_target->_request("disconnect_pull_consumer");
00134 req->send_deferred();
00135 Orb::inst().deferredRequest(req._retn());
00136 }
00137 _target=CosEventComm::PullConsumer::_nil();
00138 }
00139
00140 CORBA::Any* ProxyPullSupplier_i::pull()
00141 {
00142 if(!_connected)
00143 throw CosEventComm::Disconnected();
00144 touch();
00145 if(moreEvents())
00146 return new CORBA::Any(*nextEvent());
00147 else
00148 throw CORBA::TRANSIENT(
00149 IFELSE_OMNIORB4(omni::TRANSIENT_CallTimedout,0),
00150 CORBA::COMPLETED_NO
00151 );
00152 }
00153
00154 CORBA::Any* ProxyPullSupplier_i::try_pull(CORBA::Boolean& has_event)
00155 {
00156 if(!_connected)
00157 throw CosEventComm::Disconnected();
00158 touch();
00159 if(moreEvents())
00160 {
00161 has_event=1;
00162 return new CORBA::Any(*nextEvent());
00163 }
00164 else
00165 {
00166 has_event=0;
00167 return new CORBA::Any();
00168 }
00169 }
00170
00171
00172
00173 ProxyPullSupplier_i::ProxyPullSupplier_i(
00174 PortableServer::POA_ptr poa,
00175 EventQueue& q
00176 )
00177 : Proxy(poa),
00178 EventQueue::Reader(q),
00179 _target(CosEventComm::PullConsumer::_nil()),
00180 _connected(false),
00181 _timestamp(0)
00182 {
00183 touch();
00184 }
00185
00186 ProxyPullSupplier_i::~ProxyPullSupplier_i()
00187 {
00188 DB(20,"~ProxyPullSupplier_i()")
00189 }
00190
00191 void ProxyPullSupplier_i::reincarnate(
00192 const string& oid,
00193 const PersistNode& node
00194 )
00195 {
00196 CosEventComm::PullConsumer_var pullConsumer =
00197 string_to_<CosEventComm::PullConsumer>(node.attrString("IOR").c_str());
00198
00199 activateObjectWithId(oid.c_str());
00200 connect_pull_consumer(pullConsumer.in());
00201 }
00202
00203 void ProxyPullSupplier_i::output(ostream& os)
00204 {
00205 basicOutput(os,"ConsumerAdmin/ProxyPullSupplier",_target.in());
00206 }
00207
00208 inline void ProxyPullSupplier_i::touch()
00209 {
00210 unsigned long nsec;
00211 omni_thread::get_time(&_timestamp,&nsec);
00212 }
00213
00214 };