00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015
00016
00017
00018
00019
00020 gearman_server_con_st *gearman_server_con_add(gearman_server_thread_st *thread,
00021 int fd, void *data)
00022 {
00023 gearman_server_con_st *con;
00024 gearman_return_t ret;
00025
00026 con= gearman_server_con_create(thread);
00027 if (con == NULL)
00028 return NULL;
00029
00030 if (gearman_connection_set_fd(&(con->con), fd) != GEARMAN_SUCCESS)
00031 {
00032 gearman_server_con_free(con);
00033 return NULL;
00034 }
00035
00036 con->con.context= data;
00037
00038 ret= gearman_connection_set_events(&(con->con), POLLIN);
00039 if (ret != GEARMAN_SUCCESS)
00040 {
00041 gearman_server_con_free(con);
00042 return NULL;
00043 }
00044
00045 return con;
00046 }
00047
00048 gearman_server_con_st *
00049 gearman_server_con_create(gearman_server_thread_st *thread)
00050 {
00051 gearman_server_con_st *con;
00052
00053 if (thread->free_con_count > 0)
00054 {
00055 con= thread->free_con_list;
00056 GEARMAN_LIST_DEL(thread->free_con, con,)
00057 }
00058 else
00059 {
00060 con= malloc(sizeof(gearman_server_con_st));
00061 if (con == NULL)
00062 {
00063 gearman_log_error(thread->gearman, "gearman_server_con_create", "malloc");
00064 return NULL;
00065 }
00066 }
00067
00068 gearman_connection_options_t options[] = { GEARMAN_CON_IGNORE_LOST_CONNECTION, GEARMAN_CON_MAX };
00069 if (gearman_connection_create(thread->gearman, &(con->con), options) == NULL)
00070 {
00071 free(con);
00072 return NULL;
00073 }
00074
00075 con->options= 0;
00076 con->ret= 0;
00077 con->io_list= false;
00078 con->proc_list= false;
00079 con->proc_removed= false;
00080 con->io_packet_count= 0;
00081 con->proc_packet_count= 0;
00082 con->worker_count= 0;
00083 con->client_count= 0;
00084 con->thread= thread;
00085 con->packet= NULL;
00086 con->io_packet_list= NULL;
00087 con->io_packet_end= NULL;
00088 con->proc_packet_list= NULL;
00089 con->proc_packet_end= NULL;
00090 con->io_next= NULL;
00091 con->io_prev= NULL;
00092 con->proc_next= NULL;
00093 con->proc_prev= NULL;
00094 con->worker_list= NULL;
00095 con->client_list= NULL;
00096 con->host= NULL;
00097 con->port= NULL;
00098 strcpy(con->id, "-");
00099
00100 (void) pthread_mutex_lock(&thread->lock);
00101 GEARMAN_LIST_ADD(thread->con, con,)
00102 (void) pthread_mutex_unlock(&thread->lock);
00103
00104 return con;
00105 }
00106
00107 void gearman_server_con_free(gearman_server_con_st *con)
00108 {
00109 gearman_server_thread_st *thread= con->thread;
00110 gearman_server_packet_st *packet;
00111
00112 con->host= NULL;
00113 con->port= NULL;
00114
00115 if (thread->server->flags.threaded &&
00116 !(con->proc_removed) && !(thread->server->proc_shutdown))
00117 {
00118 con->options= GEARMAN_SERVER_CON_DEAD;
00119 gearman_server_con_proc_add(con);
00120 return;
00121 }
00122
00123 gearman_connection_free(&(con->con));
00124
00125 if (con->proc_list)
00126 gearman_server_con_proc_remove(con);
00127
00128 if (con->io_list)
00129 gearman_server_con_io_remove(con);
00130
00131 if (con->packet != NULL)
00132 {
00133 if (&(con->packet->packet) != con->con.recv_packet)
00134 gearman_packet_free(&(con->packet->packet));
00135 gearman_server_packet_free(con->packet, con->thread, true);
00136 }
00137
00138 while (con->io_packet_list != NULL)
00139 gearman_server_io_packet_remove(con);
00140
00141 while (con->proc_packet_list != NULL)
00142 {
00143 packet= gearman_server_proc_packet_remove(con);
00144 gearman_packet_free(&(packet->packet));
00145 gearman_server_packet_free(packet, con->thread, true);
00146 }
00147
00148 gearman_server_con_free_workers(con);
00149
00150 while (con->client_list != NULL)
00151 gearman_server_client_free(con->client_list);
00152
00153 (void) pthread_mutex_lock(&thread->lock);
00154 GEARMAN_LIST_DEL(con->thread->con, con,)
00155 (void) pthread_mutex_unlock(&thread->lock);
00156
00157 if (thread->free_con_count < GEARMAN_MAX_FREE_SERVER_CON)
00158 GEARMAN_LIST_ADD(thread->free_con, con,)
00159 else
00160 free(con);
00161 }
00162
00163 gearman_connection_st *gearman_server_con_con(gearman_server_con_st *con)
00164 {
00165 return &con->con;
00166 }
00167
00168 const void *gearman_server_con_data(const gearman_server_con_st *con)
00169 {
00170 return gearman_connection_context(&(con->con));
00171 }
00172
00173 void gearman_server_con_set_data(gearman_server_con_st *con, void *data)
00174 {
00175 gearman_connection_set_context(&(con->con), data);
00176 }
00177
00178 const char *gearman_server_con_host(gearman_server_con_st *con)
00179 {
00180 return con->host;
00181 }
00182
00183 void gearman_server_con_set_host(gearman_server_con_st *con, const char *host)
00184 {
00185 con->host= host;
00186 }
00187
00188 const char *gearman_server_con_port(gearman_server_con_st *con)
00189 {
00190 return con->port;
00191 }
00192
00193 void gearman_server_con_set_port(gearman_server_con_st *con, const char *port)
00194 {
00195 con->port= port;
00196 }
00197
00198 const char *gearman_server_con_id(gearman_server_con_st *con)
00199 {
00200 return con->id;
00201 }
00202
00203 void gearman_server_con_set_id(gearman_server_con_st *con, char *id,
00204 size_t size)
00205 {
00206 if (size >= GEARMAN_SERVER_CON_ID_SIZE)
00207 size= GEARMAN_SERVER_CON_ID_SIZE - 1;
00208
00209 memcpy(con->id, id, size);
00210 con->id[size]= 0;
00211 }
00212
00213 void gearman_server_con_free_worker(gearman_server_con_st *con,
00214 char *function_name,
00215 size_t function_name_size)
00216 {
00217 gearman_server_worker_st *worker= con->worker_list;
00218 gearman_server_worker_st *prev_worker= NULL;
00219
00220 while (worker != NULL)
00221 {
00222 if (worker->function->function_name_size == function_name_size &&
00223 !memcmp(worker->function->function_name, function_name,
00224 function_name_size))
00225 {
00226 gearman_server_worker_free(worker);
00227
00228
00229 if (prev_worker == NULL)
00230 worker= con->worker_list;
00231 else
00232 worker= prev_worker;
00233 }
00234 else
00235 {
00236
00237 prev_worker= worker;
00238 worker= worker->con_next;
00239 }
00240 }
00241 }
00242
00243 void gearman_server_con_free_workers(gearman_server_con_st *con)
00244 {
00245 while (con->worker_list != NULL)
00246 gearman_server_worker_free(con->worker_list);
00247 }
00248
00249 void gearman_server_con_io_add(gearman_server_con_st *con)
00250 {
00251 if (con->io_list)
00252 return;
00253
00254 (void) pthread_mutex_lock(&con->thread->lock);
00255
00256 GEARMAN_LIST_ADD(con->thread->io, con, io_)
00257 con->io_list= true;
00258
00259
00260 if (con->thread->io_count == 1 && con->thread->run_fn)
00261 {
00262 (void) pthread_mutex_unlock(&con->thread->lock);
00263 (*con->thread->run_fn)(con->thread, con->thread->run_fn_arg);
00264 }
00265 else
00266 {
00267 (void) pthread_mutex_unlock(&con->thread->lock);
00268 }
00269 }
00270
00271 void gearman_server_con_io_remove(gearman_server_con_st *con)
00272 {
00273 (void) pthread_mutex_lock(&con->thread->lock);
00274 if (con->io_list)
00275 {
00276 GEARMAN_LIST_DEL(con->thread->io, con, io_)
00277 con->io_list= false;
00278 }
00279 (void) pthread_mutex_unlock(&con->thread->lock);
00280 }
00281
00282 gearman_server_con_st *
00283 gearman_server_con_io_next(gearman_server_thread_st *thread)
00284 {
00285 gearman_server_con_st *con= thread->io_list;
00286
00287 if (con == NULL)
00288 return NULL;
00289
00290 gearman_server_con_io_remove(con);
00291
00292 return con;
00293 }
00294
00295 void gearman_server_con_proc_add(gearman_server_con_st *con)
00296 {
00297 if (con->proc_list)
00298 return;
00299
00300 (void) pthread_mutex_lock(&con->thread->lock);
00301 GEARMAN_LIST_ADD(con->thread->proc, con, proc_)
00302 con->proc_list= true;
00303 (void) pthread_mutex_unlock(&con->thread->lock);
00304
00305 if (! (con->thread->server->proc_shutdown) &&
00306 !(con->thread->server->proc_wakeup))
00307 {
00308 (void) pthread_mutex_lock(&(con->thread->server->proc_lock));
00309 con->thread->server->proc_wakeup= true;
00310 (void) pthread_cond_signal(&(con->thread->server->proc_cond));
00311 (void) pthread_mutex_unlock(&(con->thread->server->proc_lock));
00312 }
00313 }
00314
00315 void gearman_server_con_proc_remove(gearman_server_con_st *con)
00316 {
00317 (void) pthread_mutex_lock(&con->thread->lock);
00318
00319 if (con->proc_list)
00320 {
00321 GEARMAN_LIST_DEL(con->thread->proc, con, proc_)
00322 con->proc_list= false;
00323 }
00324 (void) pthread_mutex_unlock(&con->thread->lock);
00325 }
00326
00327 gearman_server_con_st *
00328 gearman_server_con_proc_next(gearman_server_thread_st *thread)
00329 {
00330 gearman_server_con_st *con;
00331
00332 if (thread->proc_list == NULL)
00333 return NULL;
00334
00335 (void) pthread_mutex_lock(&thread->lock);
00336
00337 con= thread->proc_list;
00338 while (con != NULL)
00339 {
00340 GEARMAN_LIST_DEL(thread->proc, con, proc_)
00341 con->proc_list= false;
00342 if (!(con->proc_removed))
00343 break;
00344 con= thread->proc_list;
00345 }
00346
00347 (void) pthread_mutex_unlock(&thread->lock);
00348
00349 return con;
00350 }