00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015 #include "gearmand.h"
00016
00017
00018
00019
00020
00027 static void _con_ready(int fd, short events, void *arg);
00028
00029 static gearman_return_t _con_add(gearmand_thread_st *thread,
00030 gearmand_con_st *con);
00031
00034
00035
00036
00037
00038 gearman_return_t gearmand_con_create(gearmand_st *gearmand, int fd,
00039 const char *host, const char *port,
00040 gearman_connection_add_fn *add_fn)
00041 {
00042 gearmand_con_st *dcon;
00043 gearmand_con_st *free_dcon_list;
00044 uint32_t free_dcon_count;
00045
00046 if (gearmand->free_dcon_count > 0)
00047 {
00048 dcon= gearmand->free_dcon_list;
00049 GEARMAN_LIST_DEL(gearmand->free_dcon, dcon,)
00050 }
00051 else
00052 {
00053 dcon= malloc(sizeof(gearmand_con_st));
00054 if (dcon == NULL)
00055 {
00056 close(fd);
00057 gearmand_log_fatal(gearmand, "gearmand_con_create:malloc");
00058 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00059 }
00060 }
00061
00062 dcon->last_events= 0;
00063 dcon->fd= fd;
00064 dcon->next= NULL;
00065 dcon->prev= NULL;
00066 dcon->server_con= NULL;
00067 dcon->con= NULL;
00068 dcon->add_fn= NULL;
00069 strncpy(dcon->host, host, NI_MAXHOST - 1);
00070 strncpy(dcon->port, port, NI_MAXSERV - 1);
00071 dcon->add_fn= add_fn;
00072
00073
00074 if (gearmand->threads == 0)
00075 {
00076 dcon->thread= gearmand->thread_list;
00077 return _con_add(gearmand->thread_list, dcon);
00078 }
00079
00080
00081 if (gearmand->thread_add_next == NULL)
00082 gearmand->thread_add_next= gearmand->thread_list;
00083
00084 dcon->thread= gearmand->thread_add_next;
00085
00086
00087 if (dcon->thread->dcon_add_count == 0 &&
00088 dcon->thread->free_dcon_count < gearmand->max_thread_free_dcon_count)
00089 {
00090 GEARMAN_LIST_ADD(dcon->thread->dcon_add, dcon,)
00091 gearmand_thread_wakeup(dcon->thread, GEARMAND_WAKEUP_CON);
00092 }
00093 else
00094 {
00095 (void ) pthread_mutex_lock(&(dcon->thread->lock));
00096
00097 GEARMAN_LIST_ADD(dcon->thread->dcon_add, dcon,)
00098
00099
00100 free_dcon_list= dcon->thread->free_dcon_list;
00101 free_dcon_count= dcon->thread->free_dcon_count;
00102 dcon->thread->free_dcon_list= NULL;
00103 dcon->thread->free_dcon_count= 0;
00104
00105 (void ) pthread_mutex_unlock(&(dcon->thread->lock));
00106
00107
00108
00109
00110 if (dcon->thread->dcon_add_count == 1)
00111 gearmand_thread_wakeup(dcon->thread, GEARMAND_WAKEUP_CON);
00112
00113
00114 while (free_dcon_list != NULL)
00115 {
00116 dcon= free_dcon_list;
00117 GEARMAN_LIST_DEL(free_dcon, dcon,)
00118 GEARMAN_LIST_ADD(gearmand->free_dcon, dcon,)
00119 }
00120 }
00121
00122 gearmand->thread_add_next= gearmand->thread_add_next->next;
00123
00124 return GEARMAN_SUCCESS;
00125 }
00126
00127 void gearmand_con_free(gearmand_con_st *dcon)
00128 {
00129 int del_ret= event_del(&(dcon->event));
00130 assert(del_ret == 0);
00131
00132
00133 event_set(&(dcon->event), dcon->fd, EV_READ, _con_ready, dcon);
00134 event_base_set(dcon->thread->base, &(dcon->event));
00135 event_add(&(dcon->event), NULL);
00136 del_ret= event_del(&(dcon->event));
00137 assert(del_ret == 0);
00138
00139 gearman_server_con_free(dcon->server_con);
00140 GEARMAN_LIST_DEL(dcon->thread->dcon, dcon,)
00141
00142 close(dcon->fd);
00143
00144 if (dcon->thread->gearmand->free_dcon_count < GEARMAN_MAX_FREE_SERVER_CON)
00145 {
00146 if (dcon->thread->gearmand->threads == 0)
00147 GEARMAN_LIST_ADD(dcon->thread->gearmand->free_dcon, dcon,)
00148 else
00149 {
00150
00151 (void ) pthread_mutex_lock(&(dcon->thread->lock));
00152 GEARMAN_LIST_ADD(dcon->thread->free_dcon, dcon,)
00153 (void ) pthread_mutex_unlock(&(dcon->thread->lock));
00154 }
00155 }
00156 else
00157 free(dcon);
00158 }
00159
00160 void gearmand_con_check_queue(gearmand_thread_st *thread)
00161 {
00162 gearmand_con_st *dcon;
00163
00164
00165 if (thread->dcon_add_count == 0)
00166 return;
00167
00168
00169
00170 while (thread->dcon_add_list != NULL)
00171 {
00172 (void ) pthread_mutex_lock(&(thread->lock));
00173 dcon= thread->dcon_add_list;
00174 GEARMAN_LIST_DEL(thread->dcon_add, dcon,)
00175 (void ) pthread_mutex_unlock(&(thread->lock));
00176
00177 if (_con_add(thread, dcon) != GEARMAN_SUCCESS)
00178 gearmand_wakeup(thread->gearmand, GEARMAND_WAKEUP_SHUTDOWN);
00179 }
00180 }
00181
00182 gearman_return_t gearmand_connection_watch(gearman_connection_st *con, short events,
00183 void *context __attribute__ ((unused)))
00184 {
00185 (void) context;
00186 gearmand_con_st *dcon;
00187 short set_events= 0;
00188
00189 dcon= (gearmand_con_st *)gearman_connection_context(con);
00190 dcon->con= con;
00191
00192 if (events & POLLIN)
00193 set_events|= EV_READ;
00194 if (events & POLLOUT)
00195 set_events|= EV_WRITE;
00196
00197 if (dcon->last_events != set_events)
00198 {
00199 if (dcon->last_events != 0)
00200 {
00201 int del_ret= event_del(&(dcon->event));
00202 assert(del_ret == 0);
00203 }
00204 event_set(&(dcon->event), dcon->fd, set_events | EV_PERSIST, _con_ready,
00205 dcon);
00206 event_base_set(dcon->thread->base, &(dcon->event));
00207
00208 if (event_add(&(dcon->event), NULL) == -1)
00209 {
00210 gearmand_log_fatal(dcon->thread->gearmand, "_con_watch:event_add:-1");
00211 return GEARMAN_EVENT;
00212 }
00213
00214 dcon->last_events= set_events;
00215 }
00216
00217 gearmand_log_crazy(dcon->thread->gearmand, "[%4u] %15s:%5s Watching %6s %s",
00218 dcon->thread->count, dcon->host, dcon->port,
00219 events & POLLIN ? "POLLIN" : "",
00220 events & POLLOUT ? "POLLOUT" : "");
00221
00222 return GEARMAN_SUCCESS;
00223 }
00224
00225
00226
00227
00228
00229 static void _con_ready(int fd __attribute__ ((unused)), short events,
00230 void *arg)
00231 {
00232 gearmand_con_st *dcon= (gearmand_con_st *)arg;
00233 short revents= 0;
00234 gearman_return_t ret;
00235
00236 if (events & EV_READ)
00237 revents|= POLLIN;
00238 if (events & EV_WRITE)
00239 revents|= POLLOUT;
00240
00241 ret= gearman_connection_set_revents(dcon->con, revents);
00242 if (ret != GEARMAN_SUCCESS)
00243 {
00244 gearmand_con_free(dcon);
00245 return;
00246 }
00247
00248 gearmand_log_crazy(dcon->thread->gearmand, "[%4u] %15s:%5s Ready %6s %s",
00249 dcon->thread->count, dcon->host, dcon->port,
00250 revents & POLLIN ? "POLLIN" : "",
00251 revents & POLLOUT ? "POLLOUT" : "");
00252
00253 gearmand_thread_run(dcon->thread);
00254 }
00255
00256 static gearman_return_t _con_add(gearmand_thread_st *thread,
00257 gearmand_con_st *dcon)
00258 {
00259 gearman_return_t ret;
00260
00261 dcon->server_con= gearman_server_con_add(&(thread->server_thread), dcon->fd,
00262 dcon);
00263 if (dcon->server_con == NULL)
00264 {
00265 close(dcon->fd);
00266 free(dcon);
00267 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00268 }
00269
00270 gearman_server_con_set_host(dcon->server_con, dcon->host);
00271 gearman_server_con_set_port(dcon->server_con, dcon->port);
00272
00273 if (dcon->add_fn != NULL)
00274 {
00275 ret= (*dcon->add_fn)(gearman_server_con_con(dcon->server_con));
00276 if (ret != GEARMAN_SUCCESS)
00277 {
00278 gearman_server_con_free(dcon->server_con);
00279 close(dcon->fd);
00280 free(dcon);
00281 return ret;
00282 }
00283 }
00284
00285 gearmand_log_info(thread->gearmand, "[%4u] %15s:%5s Connected", thread->count, dcon->host, dcon->port);
00286
00287 GEARMAN_LIST_ADD(thread->dcon, dcon,)
00288
00289 return GEARMAN_SUCCESS;
00290 }