00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015 #include "gearmand.h"
00016
00017
00018
00019
00020
00027 static void _log(const char *line, gearman_verbose_t verbose, void *context);
00028
00029 static gearman_return_t _listen_init(gearmand_st *gearmand);
00030 static void _listen_close(gearmand_st *gearmand);
00031 static gearman_return_t _listen_watch(gearmand_st *gearmand);
00032 static void _listen_clear(gearmand_st *gearmand);
00033 static void _listen_event(int fd, short events, void *arg);
00034
00035 static gearman_return_t _wakeup_init(gearmand_st *gearmand);
00036 static void _wakeup_close(gearmand_st *gearmand);
00037 static gearman_return_t _wakeup_watch(gearmand_st *gearmand);
00038 static void _wakeup_clear(gearmand_st *gearmand);
00039 static void _wakeup_event(int fd, short events, void *arg);
00040
00041 static gearman_return_t _watch_events(gearmand_st *gearmand);
00042 static void _clear_events(gearmand_st *gearmand);
00043 static void _close_events(gearmand_st *gearmand);
00044
00047
00048
00049
00050
00051 gearmand_st *gearmand_create(const char *host, in_port_t port)
00052 {
00053 gearmand_st *gearmand;
00054
00055 gearmand= malloc(sizeof(gearmand_st));
00056 if (gearmand == NULL)
00057 return NULL;
00058
00059 if (gearman_server_create(&(gearmand->server)) == NULL)
00060 {
00061 free(gearmand);
00062 return NULL;
00063 }
00064
00065 gearmand->options= 0;
00066 gearmand->verbose= 0;
00067 gearmand->ret= 0;
00068 gearmand->backlog= GEARMAN_DEFAULT_BACKLOG;
00069 gearmand->threads= 0;
00070 gearmand->port_count= 0;
00071 gearmand->thread_count= 0;
00072 gearmand->free_dcon_count= 0;
00073 gearmand->max_thread_free_dcon_count= 0;
00074 gearmand->wakeup_fd[0]= -1;
00075 gearmand->wakeup_fd[1]= -1;
00076 gearmand->host= host;
00077 gearmand->log_fn= NULL;
00078 gearmand->log_context= NULL;
00079 gearmand->base= NULL;
00080 gearmand->port_list= NULL;
00081 gearmand->thread_list= NULL;
00082 gearmand->thread_add_next= NULL;
00083 gearmand->free_dcon_list= NULL;
00084
00085 if (port == 0)
00086 port= GEARMAN_DEFAULT_TCP_PORT;
00087
00088 if (gearmand_port_add(gearmand, port, NULL) != GEARMAN_SUCCESS)
00089 {
00090 gearmand_free(gearmand);
00091 return NULL;
00092 }
00093
00094 return gearmand;
00095 }
00096
00097 void gearmand_free(gearmand_st *gearmand)
00098 {
00099 gearmand_con_st *dcon;
00100 uint32_t x;
00101
00102 _close_events(gearmand);
00103
00104 if (gearmand->threads > 0)
00105 gearmand_log_info(gearmand, "Shutting down all threads");
00106
00107 while (gearmand->thread_list != NULL)
00108 gearmand_thread_free(gearmand->thread_list);
00109
00110 while (gearmand->free_dcon_list != NULL)
00111 {
00112 dcon= gearmand->free_dcon_list;
00113 gearmand->free_dcon_list= dcon->next;
00114 free(dcon);
00115 }
00116
00117 if (gearmand->base != NULL)
00118 event_base_free(gearmand->base);
00119
00120 gearman_server_free(&(gearmand->server));
00121
00122 for (x= 0; x < gearmand->port_count; x++)
00123 {
00124 if (gearmand->port_list[x].listen_fd != NULL)
00125 free(gearmand->port_list[x].listen_fd);
00126
00127 if (gearmand->port_list[x].listen_event != NULL)
00128 free(gearmand->port_list[x].listen_event);
00129 }
00130
00131 if (gearmand->port_list != NULL)
00132 free(gearmand->port_list);
00133
00134 gearmand_log_info(gearmand, "Shutdown complete");
00135
00136 free(gearmand);
00137 }
00138
00139 void gearmand_set_backlog(gearmand_st *gearmand, int backlog)
00140 {
00141 gearmand->backlog= backlog;
00142 }
00143
00144 void gearmand_set_job_retries(gearmand_st *gearmand, uint8_t job_retries)
00145 {
00146 gearman_server_set_job_retries(&(gearmand->server), job_retries);
00147 }
00148
00149 void gearmand_set_worker_wakeup(gearmand_st *gearmand, uint8_t worker_wakeup)
00150 {
00151 gearman_server_set_worker_wakeup(&(gearmand->server), worker_wakeup);
00152 }
00153
00154 void gearmand_set_threads(gearmand_st *gearmand, uint32_t threads)
00155 {
00156 gearmand->threads= threads;
00157 }
00158
00159 void gearmand_set_log_fn(gearmand_st *gearmand, gearman_log_fn *function,
00160 void *context, gearman_verbose_t verbose)
00161 {
00162 gearman_server_set_log_fn(&(gearmand->server), _log, gearmand, verbose);
00163 gearmand->log_fn= function;
00164 gearmand->log_context= context;
00165 gearmand->verbose= verbose;
00166 }
00167
00168 gearman_return_t gearmand_port_add(gearmand_st *gearmand, in_port_t port,
00169 gearman_connection_add_fn *function)
00170 {
00171 gearmand_port_st *port_list;
00172
00173 port_list= realloc(gearmand->port_list,
00174 sizeof(gearmand_port_st) * (gearmand->port_count + 1));
00175 if (port_list == NULL)
00176 {
00177 gearmand_log_fatal(gearmand, "gearmand_port_add:realloc:NULL");
00178 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00179 }
00180
00181 port_list[gearmand->port_count].port= port;
00182 port_list[gearmand->port_count].listen_count= 0;
00183 port_list[gearmand->port_count].gearmand= gearmand;
00184 port_list[gearmand->port_count].add_fn= function;
00185 port_list[gearmand->port_count].listen_fd= NULL;
00186 port_list[gearmand->port_count].listen_event= NULL;
00187
00188 gearmand->port_list= port_list;
00189 gearmand->port_count++;
00190
00191 return GEARMAN_SUCCESS;
00192 }
00193
00194 gearman_return_t gearmand_run(gearmand_st *gearmand)
00195 {
00196 uint32_t x;
00197
00198
00199 if (gearmand->base == NULL)
00200 {
00201 gearmand_log_info(gearmand, "Starting up");
00202
00203 if (gearmand->threads > 0)
00204 {
00205 #ifndef HAVE_EVENT_BASE_NEW
00206 gearmand_log_fatal(gearmand, "Multi-threaded gearmand requires libevent 1.4 or later, libevent 1.3 does not provided a "
00207 "thread-safe interface.");
00208 return GEARMAN_EVENT;
00209 #else
00210
00211
00212
00213 gearmand->max_thread_free_dcon_count= ((GEARMAN_MAX_FREE_SERVER_CON /
00214 gearmand->threads) / 2);
00215 #endif
00216 }
00217
00218 gearmand_log_debug(gearmand, "Initializing libevent for main thread");
00219
00220 gearmand->base= event_base_new();
00221 if (gearmand->base == NULL)
00222 {
00223 gearmand_log_fatal(gearmand, "gearmand_run:event_base_new:NULL");
00224 return GEARMAN_EVENT;
00225 }
00226
00227 gearmand_log_debug(gearmand, "Method for libevent: %s",
00228 event_base_get_method(gearmand->base));
00229
00230 gearmand->ret= _listen_init(gearmand);
00231 if (gearmand->ret != GEARMAN_SUCCESS)
00232 return gearmand->ret;
00233
00234 gearmand->ret= _wakeup_init(gearmand);
00235 if (gearmand->ret != GEARMAN_SUCCESS)
00236 return gearmand->ret;
00237
00238 gearmand_log_debug(gearmand, "Creating %u threads", gearmand->threads);
00239
00240
00241 x= 0;
00242 do
00243 {
00244 gearmand->ret= gearmand_thread_create(gearmand);
00245 if (gearmand->ret != GEARMAN_SUCCESS)
00246 return gearmand->ret;
00247 x++;
00248 }
00249 while (x < gearmand->threads);
00250
00251 gearmand->ret= gearman_server_queue_replay(&(gearmand->server));
00252 if (gearmand->ret != GEARMAN_SUCCESS)
00253 return gearmand->ret;
00254 }
00255
00256 gearmand->ret= _watch_events(gearmand);
00257 if (gearmand->ret != GEARMAN_SUCCESS)
00258 return gearmand->ret;
00259
00260 gearmand_log_info(gearmand, "Entering main event loop");
00261
00262 if (event_base_loop(gearmand->base, 0) == -1)
00263 {
00264 gearmand_log_fatal(gearmand, "gearmand_run:event_base_loop:-1");
00265 return GEARMAN_EVENT;
00266 }
00267
00268 gearmand_log_info(gearmand, "Exited main event loop");
00269
00270 return gearmand->ret;
00271 }
00272
00273 void gearmand_wakeup(gearmand_st *gearmand, gearmand_wakeup_t wakeup)
00274 {
00275 uint8_t buffer= wakeup;
00276
00277
00278
00279 if (write(gearmand->wakeup_fd[1], &buffer, 1) != 1)
00280 gearmand_log_error(gearmand, "gearmand_wakeup:write:%d", errno);
00281 }
00282
00283 void gearmand_set_round_robin(gearmand_st *gearmand, bool round_robin)
00284 {
00285 gearmand->server.flags.round_robin= round_robin;
00286 }
00287
00288
00289
00290
00291
00292
00293 static void _log(const char *line, gearman_verbose_t verbose, void *context)
00294 {
00295 gearmand_st *gearmand= (gearmand_st *)context;
00296 (*gearmand->log_fn)(line, verbose, (void *)gearmand->log_context);
00297 }
00298
00299 static gearman_return_t _listen_init(gearmand_st *gearmand)
00300 {
00301 for (uint32_t x= 0; x < gearmand->port_count; x++)
00302 {
00303 int ret;
00304 struct gearmand_port_st *port;
00305 char port_str[NI_MAXSERV];
00306 struct addrinfo ai;
00307 struct addrinfo *addrinfo;
00308
00309 port= &gearmand->port_list[x];
00310
00311 snprintf(port_str, NI_MAXSERV, "%u", (uint32_t)(port->port));
00312
00313 memset(&ai, 0, sizeof(struct addrinfo));
00314 ai.ai_flags = AI_PASSIVE;
00315 ai.ai_family = AF_UNSPEC;
00316 ai.ai_socktype = SOCK_STREAM;
00317 ai.ai_protocol= IPPROTO_TCP;
00318
00319 ret= getaddrinfo(gearmand->host, port_str, &ai, &addrinfo);
00320 if (ret != 0)
00321 {
00322 gearmand_log_fatal(gearmand, "_listen_init:getaddrinfo:%s", gai_strerror(ret));
00323 return GEARMAN_ERRNO;
00324 }
00325
00326 for (struct addrinfo *addrinfo_next= addrinfo; addrinfo_next != NULL;
00327 addrinfo_next= addrinfo_next->ai_next)
00328 {
00329 int opt;
00330 int fd;
00331 char host[NI_MAXHOST];
00332
00333 ret= getnameinfo(addrinfo_next->ai_addr, addrinfo_next->ai_addrlen, host,
00334 NI_MAXHOST, port_str, NI_MAXSERV,
00335 NI_NUMERICHOST | NI_NUMERICSERV);
00336 if (ret != 0)
00337 {
00338 gearmand_log_error(gearmand, "_listen_init:getnameinfo:%s", gai_strerror(ret));
00339 strcpy(host, "-");
00340 strcpy(port_str, "-");
00341 }
00342
00343 gearmand_log_debug(gearmand, "Trying to listen on %s:%s", host, port_str);
00344
00345
00346 fd= socket(addrinfo_next->ai_family, addrinfo_next->ai_socktype,
00347 addrinfo_next->ai_protocol);
00348 if (fd == -1)
00349 {
00350 gearmand_log_error(gearmand, "Failed to listen on %s:%s", host, port_str);
00351 continue;
00352 }
00353
00354 opt= 1;
00355 ret= setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
00356 if (ret == -1)
00357 {
00358 close(fd);
00359 gearmand_log_fatal(gearmand, "_listen_init:setsockopt:%d", errno);
00360 return GEARMAN_ERRNO;
00361 }
00362
00363 ret= bind(fd, addrinfo_next->ai_addr, addrinfo_next->ai_addrlen);
00364 if (ret == -1)
00365 {
00366 close(fd);
00367 if (errno == EADDRINUSE)
00368 {
00369 if (port->listen_fd == NULL)
00370 {
00371 gearmand_log_error(gearmand, "Address already in use %s:%s", host, port_str);
00372 }
00373
00374 continue;
00375 }
00376
00377 gearmand_log_fatal(gearmand, "_listen_init:bind:%d", errno);
00378 return GEARMAN_ERRNO;
00379 }
00380
00381 if (listen(fd, gearmand->backlog) == -1)
00382 {
00383 close(fd);
00384 gearmand_log_fatal(gearmand, "_listen_init:listen:%d", errno);
00385 return GEARMAN_ERRNO;
00386 }
00387
00388
00389 {
00390 int *fd_list;
00391
00392 fd_list= realloc(port->listen_fd, sizeof(int) * (port->listen_count + 1));
00393 if (fd_list == NULL)
00394 {
00395 close(fd);
00396 gearmand_log_fatal(gearmand, "_listen_init:realloc:%d", errno);
00397 return GEARMAN_ERRNO;
00398 }
00399
00400 port->listen_fd= fd_list;
00401 }
00402
00403 port->listen_fd[port->listen_count]= fd;
00404 port->listen_count++;
00405
00406 gearmand_log_info(gearmand, "Listening on %s:%s (%d)", host, port_str, fd);
00407 }
00408
00409 freeaddrinfo(addrinfo);
00410
00411
00412 if (port->listen_fd == NULL)
00413 {
00414 gearmand_log_fatal(gearmand, "_listen_init:Could not bind/listen to any addresses");
00415 return GEARMAN_ERRNO;
00416 }
00417
00418 port->listen_event= malloc(sizeof(struct event) * port->listen_count);
00419 if (port->listen_event == NULL)
00420 {
00421 gearmand_log_fatal(gearmand, "_listen_init:malloc:%d", errno);
00422 return GEARMAN_ERRNO;
00423 }
00424
00425 for (uint32_t y= 0; y < port->listen_count; y++)
00426 {
00427 event_set(&(port->listen_event[y]), port->listen_fd[y],
00428 EV_READ | EV_PERSIST, _listen_event, port);
00429 event_base_set(gearmand->base, &(port->listen_event[y]));
00430 }
00431 }
00432
00433 return GEARMAN_SUCCESS;
00434 }
00435
00436 static void _listen_close(gearmand_st *gearmand)
00437 {
00438 _listen_clear(gearmand);
00439
00440 for (uint32_t x= 0; x < gearmand->port_count; x++)
00441 {
00442 for (uint32_t y= 0; y < gearmand->port_list[x].listen_count; y++)
00443 {
00444 if (gearmand->port_list[x].listen_fd[y] >= 0)
00445 {
00446 gearmand_log_info(gearmand, "Closing listening socket (%d)", gearmand->port_list[x].listen_fd[y]);
00447 close(gearmand->port_list[x].listen_fd[y]);
00448 gearmand->port_list[x].listen_fd[y]= -1;
00449 }
00450 }
00451 }
00452 }
00453
00454 static gearman_return_t _listen_watch(gearmand_st *gearmand)
00455 {
00456 if (gearmand->options & GEARMAND_LISTEN_EVENT)
00457 return GEARMAN_SUCCESS;
00458
00459 for (uint32_t x= 0; x < gearmand->port_count; x++)
00460 {
00461 for (uint32_t y= 0; y < gearmand->port_list[x].listen_count; y++)
00462 {
00463 gearmand_log_info(gearmand, "Adding event for listening socket (%d)",
00464 gearmand->port_list[x].listen_fd[y]);
00465
00466 if (event_add(&(gearmand->port_list[x].listen_event[y]), NULL) == -1)
00467 {
00468 gearmand_log_fatal(gearmand, "_listen_watch:event_add:-1");
00469 return GEARMAN_EVENT;
00470 }
00471 }
00472 }
00473
00474 gearmand->options|= GEARMAND_LISTEN_EVENT;
00475 return GEARMAN_SUCCESS;
00476 }
00477
00478 static void _listen_clear(gearmand_st *gearmand)
00479 {
00480 if (! (gearmand->options & GEARMAND_LISTEN_EVENT))
00481 return;
00482
00483 int del_ret= 0;
00484 for (uint32_t x= 0; x < gearmand->port_count; x++)
00485 {
00486 for (uint32_t y= 0; y < gearmand->port_list[x].listen_count; y++)
00487 {
00488 gearmand_log_info(gearmand, "Clearing event for listening socket (%d)",
00489 gearmand->port_list[x].listen_fd[y]);
00490 del_ret= event_del(&(gearmand->port_list[x].listen_event[y]));
00491 assert(del_ret == 0);
00492 }
00493 }
00494
00495 gearmand->options&= (gearmand_options_t)~GEARMAND_LISTEN_EVENT;
00496 }
00497
00498 static void _listen_event(int fd, short events __attribute__ ((unused)),
00499 void *arg)
00500 {
00501 gearmand_port_st *port= (gearmand_port_st *)arg;
00502 struct sockaddr sa;
00503 socklen_t sa_len;
00504 char host[NI_MAXHOST];
00505 char port_str[NI_MAXSERV];
00506 int ret;
00507
00508 sa_len= sizeof(sa);
00509 fd= accept(fd, &sa, &sa_len);
00510 if (fd == -1)
00511 {
00512 if (errno == EINTR)
00513 return;
00514 else if (errno == EMFILE)
00515 {
00516 gearmand_log_error(port->gearmand, "_listen_event:accept:too many open files");
00517 return;
00518 }
00519
00520 _clear_events(port->gearmand);
00521 gearmand_log_fatal(port->gearmand, "_listen_event:accept:%d", errno);
00522 port->gearmand->ret= GEARMAN_ERRNO;
00523 return;
00524 }
00525
00526
00527
00528 ret= getnameinfo(&sa, sa_len, host, NI_MAXHOST, port_str, NI_MAXSERV,
00529 NI_NUMERICHOST | NI_NUMERICSERV);
00530 if (ret != 0)
00531 {
00532 gearmand_log_error(port->gearmand, "_listen_event:getnameinfo:%s", gai_strerror(ret));
00533 strcpy(host, "-");
00534 strcpy(port_str, "-");
00535 }
00536
00537 gearmand_log_info(port->gearmand, "Accepted connection from %s:%s", host, port_str);
00538
00539 port->gearmand->ret= gearmand_con_create(port->gearmand, fd, host, port_str,
00540 port->add_fn);
00541 if (port->gearmand->ret != GEARMAN_SUCCESS)
00542 _clear_events(port->gearmand);
00543 }
00544
00545 static gearman_return_t _wakeup_init(gearmand_st *gearmand)
00546 {
00547 int ret;
00548
00549 gearmand_log_info(gearmand, "Creating wakeup pipe");
00550
00551 ret= pipe(gearmand->wakeup_fd);
00552 if (ret == -1)
00553 {
00554 gearmand_log_fatal(gearmand, "_wakeup_init:pipe:%d", errno);
00555 return GEARMAN_ERRNO;
00556 }
00557
00558 ret= fcntl(gearmand->wakeup_fd[0], F_GETFL, 0);
00559 if (ret == -1)
00560 {
00561 gearmand_log_fatal(gearmand, "_wakeup_init:fcntl:F_GETFL:%d", errno);
00562 return GEARMAN_ERRNO;
00563 }
00564
00565 ret= fcntl(gearmand->wakeup_fd[0], F_SETFL, ret | O_NONBLOCK);
00566 if (ret == -1)
00567 {
00568 gearmand_log_fatal(gearmand, "_wakeup_init:fcntl:F_SETFL:%d", errno);
00569 return GEARMAN_ERRNO;
00570 }
00571
00572 event_set(&(gearmand->wakeup_event), gearmand->wakeup_fd[0],
00573 EV_READ | EV_PERSIST, _wakeup_event, gearmand);
00574 event_base_set(gearmand->base, &(gearmand->wakeup_event));
00575
00576 return GEARMAN_SUCCESS;
00577 }
00578
00579 static void _wakeup_close(gearmand_st *gearmand)
00580 {
00581 _wakeup_clear(gearmand);
00582
00583 if (gearmand->wakeup_fd[0] >= 0)
00584 {
00585 gearmand_log_info(gearmand, "Closing wakeup pipe");
00586 close(gearmand->wakeup_fd[0]);
00587 gearmand->wakeup_fd[0]= -1;
00588 close(gearmand->wakeup_fd[1]);
00589 gearmand->wakeup_fd[1]= -1;
00590 }
00591 }
00592
00593 static gearman_return_t _wakeup_watch(gearmand_st *gearmand)
00594 {
00595 if (gearmand->options & GEARMAND_WAKEUP_EVENT)
00596 return GEARMAN_SUCCESS;
00597
00598 gearmand_log_info(gearmand, "Adding event for wakeup pipe");
00599
00600 if (event_add(&(gearmand->wakeup_event), NULL) == -1)
00601 {
00602 gearmand_log_fatal(gearmand, "_wakeup_watch:event_add:-1");
00603 return GEARMAN_EVENT;
00604 }
00605
00606 gearmand->options|= GEARMAND_WAKEUP_EVENT;
00607 return GEARMAN_SUCCESS;
00608 }
00609
00610 static void _wakeup_clear(gearmand_st *gearmand)
00611 {
00612 if (gearmand->options & GEARMAND_WAKEUP_EVENT)
00613 {
00614 gearmand_log_info(gearmand, "Clearing event for wakeup pipe");
00615 int del_ret= event_del(&(gearmand->wakeup_event));
00616 assert(del_ret == 0);
00617 gearmand->options&= (gearmand_options_t)~GEARMAND_WAKEUP_EVENT;
00618 }
00619 }
00620
00621 static void _wakeup_event(int fd, short events __attribute__ ((unused)),
00622 void *arg)
00623 {
00624 gearmand_st *gearmand= (gearmand_st *)arg;
00625 uint8_t buffer[GEARMAN_PIPE_BUFFER_SIZE];
00626 ssize_t ret;
00627 gearmand_thread_st *thread;
00628
00629 while (1)
00630 {
00631 ret= read(fd, buffer, GEARMAN_PIPE_BUFFER_SIZE);
00632 if (ret == 0)
00633 {
00634 _clear_events(gearmand);
00635 gearmand_log_fatal(gearmand, "_wakeup_event:read:EOF");
00636 gearmand->ret= GEARMAN_PIPE_EOF;
00637 return;
00638 }
00639 else if (ret == -1)
00640 {
00641 if (errno == EINTR)
00642 continue;
00643
00644 if (errno == EAGAIN)
00645 break;
00646
00647 _clear_events(gearmand);
00648 gearmand_log_fatal(gearmand, "_wakeup_event:read:%d", errno);
00649 gearmand->ret= GEARMAN_ERRNO;
00650 return;
00651 }
00652
00653 for (ssize_t x= 0; x < ret; x++)
00654 {
00655 switch ((gearmand_wakeup_t)buffer[x])
00656 {
00657 case GEARMAND_WAKEUP_PAUSE:
00658 gearmand_log_info(gearmand, "Received PAUSE wakeup event");
00659 _clear_events(gearmand);
00660 gearmand->ret= GEARMAN_PAUSE;
00661 break;
00662
00663 case GEARMAND_WAKEUP_SHUTDOWN_GRACEFUL:
00664 gearmand_log_info(gearmand, "Received SHUTDOWN_GRACEFUL wakeup event");
00665 _listen_close(gearmand);
00666
00667 for (thread= gearmand->thread_list; thread != NULL;
00668 thread= thread->next)
00669 {
00670 gearmand_thread_wakeup(thread, GEARMAND_WAKEUP_SHUTDOWN_GRACEFUL);
00671 }
00672
00673 gearmand->ret= GEARMAN_SHUTDOWN_GRACEFUL;
00674 break;
00675
00676 case GEARMAND_WAKEUP_SHUTDOWN:
00677 gearmand_log_info(gearmand, "Received SHUTDOWN wakeup event");
00678 _clear_events(gearmand);
00679 gearmand->ret= GEARMAN_SHUTDOWN;
00680 break;
00681
00682 case GEARMAND_WAKEUP_CON:
00683 case GEARMAND_WAKEUP_RUN:
00684 default:
00685 gearmand_log_fatal(gearmand, "Received unknown wakeup event (%u)", buffer[x]);
00686 _clear_events(gearmand);
00687 gearmand->ret= GEARMAN_UNKNOWN_STATE;
00688 break;
00689 }
00690 }
00691 }
00692 }
00693
00694 static gearman_return_t _watch_events(gearmand_st *gearmand)
00695 {
00696 gearman_return_t ret;
00697
00698 ret= _listen_watch(gearmand);
00699 if (ret != GEARMAN_SUCCESS)
00700 return ret;
00701
00702 ret= _wakeup_watch(gearmand);
00703 if (ret != GEARMAN_SUCCESS)
00704 return ret;
00705
00706 return GEARMAN_SUCCESS;
00707 }
00708
00709 static void _clear_events(gearmand_st *gearmand)
00710 {
00711 _listen_clear(gearmand);
00712 _wakeup_clear(gearmand);
00713
00714
00715
00716 if (gearmand->threads == 0 && gearmand->thread_list != NULL)
00717 gearmand_thread_wakeup(gearmand->thread_list, GEARMAND_WAKEUP_SHUTDOWN);
00718 }
00719
00720 static void _close_events(gearmand_st *gearmand)
00721 {
00722 _listen_close(gearmand);
00723 _wakeup_close(gearmand);
00724 }