00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015
00019 struct _worker_function_st
00020 {
00021 struct {
00022 bool packet_in_use:1;
00023 bool change:1;
00024 bool remove:1;
00025 } options;
00026 struct _worker_function_st *next;
00027 struct _worker_function_st *prev;
00028 char *function_name;
00029 size_t function_length;
00030 gearman_worker_fn *worker_fn;
00031 void *context;
00032 gearman_packet_st packet;
00033 };
00034
00041 static inline struct _worker_function_st *_function_exist(gearman_worker_st *worker, const char *function_name, size_t function_length)
00042 {
00043 struct _worker_function_st *function;
00044
00045 for (function= worker->function_list; function != NULL;
00046 function= function->next)
00047 {
00048 if (function_length == function->function_length)
00049 {
00050 if (! memcmp(function_name, function->function_name, function_length))
00051 break;
00052 }
00053 }
00054
00055 return function;
00056 }
00057
00061 static gearman_worker_st *_worker_allocate(gearman_worker_st *worker, bool is_clone);
00062
00066 static gearman_return_t _worker_packet_init(gearman_worker_st *worker);
00067
00071 static gearman_return_t _worker_add_server(const char *host, in_port_t port,
00072 void *context);
00073
00077 static gearman_return_t _worker_function_create(gearman_worker_st *worker,
00078 const char *function_name,
00079 size_t function_length,
00080 uint32_t timeout,
00081 gearman_worker_fn *worker_fn,
00082 void *context);
00083
00087 static void _worker_function_free(gearman_worker_st *worker,
00088 struct _worker_function_st *function);
00089
00090
00093
00094
00095
00096
00097 gearman_worker_st *gearman_worker_create(gearman_worker_st *worker)
00098 {
00099 worker= _worker_allocate(worker, false);
00100
00101 if (worker == NULL)
00102 return NULL;
00103
00104 if (_worker_packet_init(worker) != GEARMAN_SUCCESS)
00105 {
00106 gearman_worker_free(worker);
00107 return NULL;
00108 }
00109
00110 return worker;
00111 }
00112
00113 gearman_worker_st *gearman_worker_clone(gearman_worker_st *worker,
00114 const gearman_worker_st *from)
00115 {
00116 gearman_universal_st *check;
00117
00118 if (! from)
00119 {
00120 return _worker_allocate(worker, false);
00121 }
00122
00123 worker= _worker_allocate(worker, true);
00124
00125 if (worker == NULL)
00126 {
00127 return worker;
00128 }
00129
00130 worker->options.non_blocking= from->options.non_blocking;
00131 worker->options.grab_job_in_use= from->options.grab_job_in_use;
00132 worker->options.pre_sleep_in_use= from->options.pre_sleep_in_use;
00133 worker->options.work_job_in_use= from->options.work_job_in_use;
00134 worker->options.change= from->options.change;
00135 worker->options.grab_uniq= from->options.grab_uniq;
00136 worker->options.timeout_return= from->options.timeout_return;
00137
00138 check= gearman_universal_clone(&(worker->universal), &from->universal);
00139 if (check == NULL)
00140 {
00141 gearman_worker_free(worker);
00142 return NULL;
00143 }
00144
00145 if (_worker_packet_init(worker) != GEARMAN_SUCCESS)
00146 {
00147 gearman_worker_free(worker);
00148 return NULL;
00149 }
00150
00151 return worker;
00152 }
00153
00154 void gearman_worker_free(gearman_worker_st *worker)
00155 {
00156 if (worker->options.packet_init)
00157 {
00158 gearman_packet_free(&(worker->grab_job));
00159 gearman_packet_free(&(worker->pre_sleep));
00160 }
00161
00162 if (worker->job != NULL)
00163 gearman_job_free(worker->job);
00164
00165 if (worker->options.work_job_in_use)
00166 gearman_job_free(&(worker->work_job));
00167
00168 if (worker->work_result != NULL)
00169 {
00170 if ((&worker->universal)->workload_free_fn == NULL)
00171 free(worker->work_result);
00172 else
00173 {
00174 (&worker->universal)->workload_free_fn(worker->work_result,
00175 (void *)(&worker->universal)->workload_free_context);
00176 }
00177 }
00178
00179 while (worker->function_list != NULL)
00180 _worker_function_free(worker, worker->function_list);
00181
00182 gearman_job_free_all(worker);
00183
00184 if ((&worker->universal) != NULL)
00185 gearman_universal_free((&worker->universal));
00186
00187 if (worker->options.allocated)
00188 free(worker);
00189 }
00190
00191 const char *gearman_worker_error(gearman_worker_st *worker)
00192 {
00193 return gearman_universal_error((&worker->universal));
00194 }
00195
00196 int gearman_worker_errno(gearman_worker_st *worker)
00197 {
00198 return gearman_universal_errno((&worker->universal));
00199 }
00200
00201 gearman_worker_options_t gearman_worker_options(const gearman_worker_st *worker)
00202 {
00203 gearman_worker_options_t options;
00204 memset(&options, 0, sizeof(gearman_worker_options_t));
00205
00206 if (worker->options.allocated)
00207 options|= GEARMAN_WORKER_ALLOCATED;
00208 if (worker->options.non_blocking)
00209 options|= GEARMAN_WORKER_NON_BLOCKING;
00210 if (worker->options.packet_init)
00211 options|= GEARMAN_WORKER_PACKET_INIT;
00212 if (worker->options.grab_job_in_use)
00213 options|= GEARMAN_WORKER_GRAB_JOB_IN_USE;
00214 if (worker->options.pre_sleep_in_use)
00215 options|= GEARMAN_WORKER_PRE_SLEEP_IN_USE;
00216 if (worker->options.work_job_in_use)
00217 options|= GEARMAN_WORKER_WORK_JOB_IN_USE;
00218 if (worker->options.change)
00219 options|= GEARMAN_WORKER_CHANGE;
00220 if (worker->options.grab_uniq)
00221 options|= GEARMAN_WORKER_GRAB_UNIQ;
00222 if (worker->options.timeout_return)
00223 options|= GEARMAN_WORKER_TIMEOUT_RETURN;
00224
00225 return options;
00226 }
00227
00228 void gearman_worker_set_options(gearman_worker_st *worker,
00229 gearman_worker_options_t options)
00230 {
00231 gearman_worker_options_t usable_options[]= {
00232 GEARMAN_WORKER_NON_BLOCKING,
00233 GEARMAN_WORKER_GRAB_UNIQ,
00234 GEARMAN_WORKER_TIMEOUT_RETURN,
00235 GEARMAN_WORKER_MAX
00236 };
00237
00238 gearman_worker_options_t *ptr;
00239
00240
00241 for (ptr= usable_options; *ptr != GEARMAN_WORKER_MAX ; ptr++)
00242 {
00243 if (options & *ptr)
00244 {
00245 gearman_worker_add_options(worker, *ptr);
00246 }
00247 else
00248 {
00249 gearman_worker_remove_options(worker, *ptr);
00250 }
00251 }
00252 }
00253
00254 void gearman_worker_add_options(gearman_worker_st *worker,
00255 gearman_worker_options_t options)
00256 {
00257 if (options & GEARMAN_WORKER_NON_BLOCKING)
00258 {
00259 gearman_universal_add_options((&worker->universal), GEARMAN_NON_BLOCKING);
00260 worker->options.non_blocking= true;
00261 }
00262
00263 if (options & GEARMAN_WORKER_GRAB_UNIQ)
00264 {
00265 worker->grab_job.command= GEARMAN_COMMAND_GRAB_JOB_UNIQ;
00266 (void)gearman_packet_pack_header(&(worker->grab_job));
00267 worker->options.grab_uniq= true;
00268 }
00269
00270 if (options & GEARMAN_WORKER_TIMEOUT_RETURN)
00271 {
00272 worker->options.timeout_return= true;
00273 }
00274 }
00275
00276 void gearman_worker_remove_options(gearman_worker_st *worker,
00277 gearman_worker_options_t options)
00278 {
00279 if (options & GEARMAN_WORKER_NON_BLOCKING)
00280 {
00281 gearman_universal_remove_options((&worker->universal), GEARMAN_NON_BLOCKING);
00282 worker->options.non_blocking= false;
00283 }
00284
00285 if (options & GEARMAN_WORKER_TIMEOUT_RETURN)
00286 {
00287 worker->options.timeout_return= false;
00288 }
00289
00290 if (options & GEARMAN_WORKER_GRAB_UNIQ)
00291 {
00292 worker->grab_job.command= GEARMAN_COMMAND_GRAB_JOB;
00293 (void)gearman_packet_pack_header(&(worker->grab_job));
00294 worker->options.grab_uniq= false;
00295 }
00296 }
00297
00298 int gearman_worker_timeout(gearman_worker_st *worker)
00299 {
00300 return gearman_universal_timeout((&worker->universal));
00301 }
00302
00303 void gearman_worker_set_timeout(gearman_worker_st *worker, int timeout)
00304 {
00305 gearman_worker_add_options(worker, GEARMAN_WORKER_TIMEOUT_RETURN);
00306 gearman_universal_set_timeout((&worker->universal), timeout);
00307 }
00308
00309 void *gearman_worker_context(const gearman_worker_st *worker)
00310 {
00311 return worker->context;
00312 }
00313
00314 void gearman_worker_set_context(gearman_worker_st *worker, void *context)
00315 {
00316 worker->context= context;
00317 }
00318
00319 void gearman_worker_set_log_fn(gearman_worker_st *worker,
00320 gearman_log_fn *function, void *context,
00321 gearman_verbose_t verbose)
00322 {
00323 gearman_set_log_fn((&worker->universal), function, context, verbose);
00324 }
00325
00326 void gearman_worker_set_workload_malloc_fn(gearman_worker_st *worker,
00327 gearman_malloc_fn *function,
00328 void *context)
00329 {
00330 gearman_set_workload_malloc_fn((&worker->universal), function, context);
00331 }
00332
00333 void gearman_worker_set_workload_free_fn(gearman_worker_st *worker,
00334 gearman_free_fn *function,
00335 void *context)
00336 {
00337 gearman_set_workload_free_fn((&worker->universal), function, context);
00338 }
00339
00340 gearman_return_t gearman_worker_add_server(gearman_worker_st *worker,
00341 const char *host, in_port_t port)
00342 {
00343 if (gearman_connection_create_args((&worker->universal), NULL, host, port) == NULL)
00344 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00345
00346 return GEARMAN_SUCCESS;
00347 }
00348
00349 gearman_return_t gearman_worker_add_servers(gearman_worker_st *worker,
00350 const char *servers)
00351 {
00352 return gearman_parse_servers(servers, _worker_add_server, worker);
00353 }
00354
00355 void gearman_worker_remove_servers(gearman_worker_st *worker)
00356 {
00357 gearman_free_all_cons((&worker->universal));
00358 }
00359
00360 gearman_return_t gearman_worker_wait(gearman_worker_st *worker)
00361 {
00362 return gearman_wait((&worker->universal));
00363 }
00364
00365 gearman_return_t gearman_worker_register(gearman_worker_st *worker,
00366 const char *function_name,
00367 uint32_t timeout)
00368 {
00369 return _worker_function_create(worker, function_name, strlen(function_name), timeout, NULL, NULL);
00370 }
00371
00372 bool gearman_worker_function_exist(gearman_worker_st *worker,
00373 const char *function_name,
00374 size_t function_length)
00375 {
00376 struct _worker_function_st *function;
00377
00378 function= _function_exist(worker, function_name, function_length);
00379
00380 return (function && function->options.remove == false) ? true : false;
00381 }
00382
00383 static inline gearman_return_t _worker_unregister(gearman_worker_st *worker,
00384 const char *function_name, size_t function_length)
00385 {
00386 struct _worker_function_st *function;
00387 gearman_return_t ret;
00388 const void *args[1];
00389 size_t args_size[1];
00390
00391 function= _function_exist(worker, function_name, function_length);
00392
00393 if (function == NULL || function->options.remove)
00394 return GEARMAN_NO_REGISTERED_FUNCTION;
00395
00396 gearman_packet_free(&(function->packet));
00397
00398 args[0]= function->function_name;
00399 args_size[0]= function->function_length;
00400 ret= gearman_packet_create_args((&worker->universal), &(function->packet),
00401 GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_CANT_DO,
00402 args, args_size, 1);
00403 if (ret != GEARMAN_SUCCESS)
00404 {
00405 function->options.packet_in_use= false;
00406 return ret;
00407 }
00408
00409 function->options.change= true;
00410 function->options.remove= true;
00411
00412 worker->options.change= true;
00413
00414 return GEARMAN_SUCCESS;
00415 }
00416
00417 gearman_return_t gearman_worker_unregister(gearman_worker_st *worker,
00418 const char *function_name)
00419 {
00420 return _worker_unregister(worker, function_name, strlen(function_name));
00421 }
00422
00423 gearman_return_t gearman_worker_unregister_all(gearman_worker_st *worker)
00424 {
00425 gearman_return_t ret;
00426 struct _worker_function_st *function;
00427 uint32_t count= 0;
00428
00429 if (worker->function_list == NULL)
00430 return GEARMAN_NO_REGISTERED_FUNCTIONS;
00431
00432
00433
00434 for (function= worker->function_list; function != NULL;
00435 function= function->next)
00436 {
00437 if (function->options.remove == false)
00438 count++;
00439 }
00440
00441 if (count == 0)
00442 return GEARMAN_NO_REGISTERED_FUNCTIONS;
00443
00444 gearman_packet_free(&(worker->function_list->packet));
00445
00446 ret= gearman_packet_create_args((&worker->universal),
00447 &(worker->function_list->packet),
00448 GEARMAN_MAGIC_REQUEST,
00449 GEARMAN_COMMAND_RESET_ABILITIES,
00450 NULL, NULL, 0);
00451 if (ret != GEARMAN_SUCCESS)
00452 {
00453 worker->function_list->options.packet_in_use= false;
00454
00455 return ret;
00456 }
00457
00458 while (worker->function_list->next != NULL)
00459 _worker_function_free(worker, worker->function_list->next);
00460
00461 worker->function_list->options.change= true;
00462 worker->function_list->options.remove= true;
00463
00464 worker->options.change= true;
00465
00466 return GEARMAN_SUCCESS;
00467 }
00468
00469 gearman_job_st *gearman_worker_grab_job(gearman_worker_st *worker,
00470 gearman_job_st *job,
00471 gearman_return_t *ret_ptr)
00472 {
00473 struct _worker_function_st *function;
00474 uint32_t active;
00475
00476 while (1)
00477 {
00478 switch (worker->state)
00479 {
00480 case GEARMAN_WORKER_STATE_START:
00481
00482 if (worker->options.change)
00483 {
00484 worker->function= worker->function_list;
00485 while (worker->function != NULL)
00486 {
00487 if (! (worker->function->options.change))
00488 {
00489 worker->function= worker->function->next;
00490 continue;
00491 }
00492
00493 for (worker->con= (&worker->universal)->con_list; worker->con != NULL;
00494 worker->con= worker->con->next)
00495 {
00496 if (worker->con->fd == -1)
00497 continue;
00498
00499 case GEARMAN_WORKER_STATE_FUNCTION_SEND:
00500 *ret_ptr= gearman_connection_send(worker->con, &(worker->function->packet),
00501 true);
00502 if (*ret_ptr != GEARMAN_SUCCESS)
00503 {
00504 if (*ret_ptr == GEARMAN_IO_WAIT)
00505 worker->state= GEARMAN_WORKER_STATE_FUNCTION_SEND;
00506 else if (*ret_ptr == GEARMAN_LOST_CONNECTION)
00507 continue;
00508
00509 return NULL;
00510 }
00511 }
00512
00513 if (worker->function->options.remove)
00514 {
00515 function= worker->function->prev;
00516 _worker_function_free(worker, worker->function);
00517 if (function == NULL)
00518 worker->function= worker->function_list;
00519 else
00520 worker->function= function;
00521 }
00522 else
00523 {
00524 worker->function->options.change= false;
00525 worker->function= worker->function->next;
00526 }
00527 }
00528
00529 worker->options.change= false;
00530 }
00531
00532 if (worker->function_list == NULL)
00533 {
00534 gearman_universal_set_error((&worker->universal), "gearman_worker_grab_job",
00535 "no functions have been registered");
00536 *ret_ptr= GEARMAN_NO_REGISTERED_FUNCTIONS;
00537 return NULL;
00538 }
00539
00540 for (worker->con= (&worker->universal)->con_list; worker->con != NULL;
00541 worker->con= worker->con->next)
00542 {
00543
00544 if (worker->con->fd == -1)
00545 {
00546 for (worker->function= worker->function_list;
00547 worker->function != NULL;
00548 worker->function= worker->function->next)
00549 {
00550 case GEARMAN_WORKER_STATE_CONNECT:
00551 *ret_ptr= gearman_connection_send(worker->con, &(worker->function->packet),
00552 true);
00553 if (*ret_ptr != GEARMAN_SUCCESS)
00554 {
00555 if (*ret_ptr == GEARMAN_IO_WAIT)
00556 worker->state= GEARMAN_WORKER_STATE_CONNECT;
00557 else if (*ret_ptr == GEARMAN_COULD_NOT_CONNECT ||
00558 *ret_ptr == GEARMAN_LOST_CONNECTION)
00559 {
00560 break;
00561 }
00562
00563 return NULL;
00564 }
00565 }
00566
00567 if (*ret_ptr == GEARMAN_COULD_NOT_CONNECT)
00568 continue;
00569 }
00570
00571 case GEARMAN_WORKER_STATE_GRAB_JOB_SEND:
00572 if (worker->con->fd == -1)
00573 continue;
00574
00575 *ret_ptr= gearman_connection_send(worker->con, &(worker->grab_job), true);
00576 if (*ret_ptr != GEARMAN_SUCCESS)
00577 {
00578 if (*ret_ptr == GEARMAN_IO_WAIT)
00579 worker->state= GEARMAN_WORKER_STATE_GRAB_JOB_SEND;
00580 else if (*ret_ptr == GEARMAN_LOST_CONNECTION)
00581 continue;
00582
00583 return NULL;
00584 }
00585
00586 if (worker->job == NULL)
00587 {
00588 worker->job= gearman_job_create(worker, job);
00589 if (worker->job == NULL)
00590 {
00591 *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
00592 return NULL;
00593 }
00594 }
00595
00596 while (1)
00597 {
00598 case GEARMAN_WORKER_STATE_GRAB_JOB_RECV:
00599 (void)gearman_connection_recv(worker->con, &(worker->job->assigned), ret_ptr,
00600 true);
00601 if (*ret_ptr != GEARMAN_SUCCESS)
00602 {
00603 if (*ret_ptr == GEARMAN_IO_WAIT)
00604 worker->state= GEARMAN_WORKER_STATE_GRAB_JOB_RECV;
00605 else
00606 {
00607 gearman_job_free(worker->job);
00608 worker->job= NULL;
00609
00610 if (*ret_ptr == GEARMAN_LOST_CONNECTION)
00611 break;
00612 }
00613
00614 return NULL;
00615 }
00616
00617 if (worker->job->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN ||
00618 worker->job->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN_UNIQ)
00619 {
00620 worker->job->options.assigned_in_use= true;
00621 worker->job->con= worker->con;
00622 worker->state= GEARMAN_WORKER_STATE_GRAB_JOB_SEND;
00623 job= worker->job;
00624 worker->job= NULL;
00625 return job;
00626 }
00627
00628 if (worker->job->assigned.command == GEARMAN_COMMAND_NO_JOB)
00629 {
00630 gearman_packet_free(&(worker->job->assigned));
00631 break;
00632 }
00633
00634 if (worker->job->assigned.command != GEARMAN_COMMAND_NOOP)
00635 {
00636 gearman_universal_set_error((&worker->universal), "gearman_worker_grab_job",
00637 "unexpected packet:%s",
00638 gearman_command_info_list[worker->job->assigned.command].name);
00639 gearman_packet_free(&(worker->job->assigned));
00640 gearman_job_free(worker->job);
00641 worker->job= NULL;
00642 *ret_ptr= GEARMAN_UNEXPECTED_PACKET;
00643 return NULL;
00644 }
00645
00646 gearman_packet_free(&(worker->job->assigned));
00647 }
00648 }
00649
00650 case GEARMAN_WORKER_STATE_PRE_SLEEP:
00651 for (worker->con= (&worker->universal)->con_list; worker->con != NULL;
00652 worker->con= worker->con->next)
00653 {
00654 if (worker->con->fd == -1)
00655 continue;
00656
00657 *ret_ptr= gearman_connection_send(worker->con, &(worker->pre_sleep), true);
00658 if (*ret_ptr != GEARMAN_SUCCESS)
00659 {
00660 if (*ret_ptr == GEARMAN_IO_WAIT)
00661 worker->state= GEARMAN_WORKER_STATE_PRE_SLEEP;
00662 else if (*ret_ptr == GEARMAN_LOST_CONNECTION)
00663 continue;
00664
00665 return NULL;
00666 }
00667 }
00668
00669 worker->state= GEARMAN_WORKER_STATE_START;
00670
00671
00672 active= 0;
00673 for (worker->con= (&worker->universal)->con_list; worker->con != NULL;
00674 worker->con= worker->con->next)
00675 {
00676 if (worker->con->fd == -1)
00677 continue;
00678
00679 *ret_ptr= gearman_connection_set_events(worker->con, POLLIN);
00680 if (*ret_ptr != GEARMAN_SUCCESS)
00681 return NULL;
00682
00683 active++;
00684 }
00685
00686 if ((&worker->universal)->options.non_blocking)
00687 {
00688 *ret_ptr= GEARMAN_NO_JOBS;
00689 return NULL;
00690 }
00691
00692 if (active == 0)
00693 {
00694 if ((&worker->universal)->timeout < 0)
00695 usleep(GEARMAN_WORKER_WAIT_TIMEOUT * 1000);
00696 else
00697 {
00698 if ((&worker->universal)->timeout > 0)
00699 usleep((unsigned int)(&worker->universal)->timeout * 1000);
00700
00701 if (worker->options.timeout_return)
00702 {
00703 gearman_universal_set_error((&worker->universal), "gearman_worker_grab_job",
00704 "timeout reached");
00705 *ret_ptr= GEARMAN_TIMEOUT;
00706 return NULL;
00707 }
00708 }
00709 }
00710 else
00711 {
00712 *ret_ptr= gearman_wait((&worker->universal));
00713 if (*ret_ptr != GEARMAN_SUCCESS && (*ret_ptr != GEARMAN_TIMEOUT ||
00714 worker->options.timeout_return))
00715 {
00716 return NULL;
00717 }
00718 }
00719
00720 break;
00721
00722 default:
00723 gearman_universal_set_error((&worker->universal), "gearman_worker_grab_job",
00724 "unknown state: %u", worker->state);
00725 *ret_ptr= GEARMAN_UNKNOWN_STATE;
00726 return NULL;
00727 }
00728 }
00729 }
00730
00731 void gearman_job_free(gearman_job_st *job)
00732 {
00733 if (job->options.assigned_in_use)
00734 gearman_packet_free(&(job->assigned));
00735
00736 if (job->options.work_in_use)
00737 gearman_packet_free(&(job->work));
00738
00739 if (job->worker->job_list == job)
00740 job->worker->job_list= job->next;
00741 if (job->prev != NULL)
00742 job->prev->next= job->next;
00743 if (job->next != NULL)
00744 job->next->prev= job->prev;
00745 job->worker->job_count--;
00746
00747 if (job->options.allocated)
00748 free(job);
00749 }
00750
00751 void gearman_job_free_all(gearman_worker_st *worker)
00752 {
00753 while (worker->job_list != NULL)
00754 gearman_job_free(worker->job_list);
00755 }
00756
00757 gearman_return_t gearman_worker_add_function(gearman_worker_st *worker,
00758 const char *function_name,
00759 uint32_t timeout,
00760 gearman_worker_fn *worker_fn,
00761 void *context)
00762 {
00763 if (function_name == NULL)
00764 {
00765 gearman_universal_set_error((&worker->universal), "gearman_worker_add_function",
00766 "function name not given");
00767
00768 return GEARMAN_INVALID_FUNCTION_NAME;
00769 }
00770
00771 if (worker_fn == NULL)
00772 {
00773 gearman_universal_set_error((&worker->universal), "gearman_worker_add_function",
00774 "function not given");
00775
00776 return GEARMAN_INVALID_WORKER_FUNCTION;
00777 }
00778
00779 return _worker_function_create(worker, function_name, strlen(function_name),
00780 timeout, worker_fn,
00781 context);
00782 }
00783
00784 gearman_return_t gearman_worker_work(gearman_worker_st *worker)
00785 {
00786 gearman_job_st *check;
00787 gearman_return_t ret;
00788
00789 switch (worker->work_state)
00790 {
00791 case GEARMAN_WORKER_WORK_UNIVERSAL_GRAB_JOB:
00792 check= gearman_worker_grab_job(worker, &(worker->work_job), &ret);
00793 (void)check;
00794
00795 if (ret != GEARMAN_SUCCESS)
00796 return ret;
00797
00798 for (worker->work_function= worker->function_list;
00799 worker->work_function != NULL;
00800 worker->work_function= worker->work_function->next)
00801 {
00802 if (!strcmp(gearman_job_function_name(&(worker->work_job)),
00803 worker->work_function->function_name))
00804 {
00805 break;
00806 }
00807 }
00808
00809 if (worker->work_function == NULL)
00810 {
00811 gearman_job_free(&(worker->work_job));
00812 gearman_universal_set_error((&worker->universal), "gearman_worker_work",
00813 "function not found");
00814 return GEARMAN_INVALID_FUNCTION_NAME;
00815 }
00816
00817 if (worker->work_function->worker_fn == NULL)
00818 {
00819 gearman_job_free(&(worker->work_job));
00820 gearman_universal_set_error((&worker->universal), "gearman_worker_work",
00821 "no callback function supplied");
00822 return GEARMAN_INVALID_FUNCTION_NAME;
00823 }
00824
00825 worker->options.work_job_in_use= true;
00826 worker->work_result_size= 0;
00827
00828 case GEARMAN_WORKER_WORK_UNIVERSAL_FUNCTION:
00829 worker->work_result= worker->work_function->worker_fn(&(worker->work_job),
00830 (void *)worker->work_function->context,
00831 &(worker->work_result_size), &ret);
00832 if (ret == GEARMAN_WORK_FAIL)
00833 {
00834 ret= gearman_job_send_fail(&(worker->work_job));
00835 if (ret != GEARMAN_SUCCESS)
00836 {
00837 if (ret == GEARMAN_LOST_CONNECTION)
00838 break;
00839
00840 worker->work_state= GEARMAN_WORKER_WORK_UNIVERSAL_FAIL;
00841 return ret;
00842 }
00843
00844 break;
00845 }
00846
00847 if (ret != GEARMAN_SUCCESS)
00848 {
00849 if (ret == GEARMAN_LOST_CONNECTION)
00850 break;
00851
00852 worker->work_state= GEARMAN_WORKER_WORK_UNIVERSAL_FUNCTION;
00853 return ret;
00854 }
00855
00856 case GEARMAN_WORKER_WORK_UNIVERSAL_COMPLETE:
00857 ret= gearman_job_send_complete(&(worker->work_job), worker->work_result,
00858 worker->work_result_size);
00859 if (ret == GEARMAN_IO_WAIT)
00860 {
00861 worker->work_state= GEARMAN_WORKER_WORK_UNIVERSAL_COMPLETE;
00862 return ret;
00863 }
00864
00865 if (worker->work_result != NULL)
00866 {
00867 if ((&worker->universal)->workload_free_fn == NULL)
00868 free(worker->work_result);
00869 else
00870 {
00871 (&worker->universal)->workload_free_fn(worker->work_result,
00872 (&worker->universal)->workload_free_context);
00873 }
00874 worker->work_result= NULL;
00875 }
00876
00877 if (ret != GEARMAN_SUCCESS)
00878 {
00879 if (ret == GEARMAN_LOST_CONNECTION)
00880 break;
00881
00882 return ret;
00883 }
00884
00885 break;
00886
00887 case GEARMAN_WORKER_WORK_UNIVERSAL_FAIL:
00888 ret= gearman_job_send_fail(&(worker->work_job));
00889 if (ret != GEARMAN_SUCCESS)
00890 {
00891 if (ret == GEARMAN_LOST_CONNECTION)
00892 break;
00893
00894 return ret;
00895 }
00896
00897 break;
00898
00899 default:
00900 gearman_universal_set_error((&worker->universal), "gearman_worker_work",
00901 "unknown state: %u", worker->work_state);
00902 return GEARMAN_UNKNOWN_STATE;
00903 }
00904
00905 gearman_job_free(&(worker->work_job));
00906 worker->options.work_job_in_use= false;
00907 worker->work_state= GEARMAN_WORKER_WORK_UNIVERSAL_GRAB_JOB;
00908
00909 return GEARMAN_SUCCESS;
00910 }
00911
00912 gearman_return_t gearman_worker_echo(gearman_worker_st *worker,
00913 const void *workload,
00914 size_t workload_size)
00915 {
00916 return gearman_echo((&worker->universal), workload, workload_size);
00917 }
00918
00919
00920
00921
00922
00923 static gearman_worker_st *_worker_allocate(gearman_worker_st *worker, bool is_clone)
00924 {
00925 if (worker == NULL)
00926 {
00927 worker= malloc(sizeof(gearman_worker_st));
00928 if (worker == NULL)
00929 return NULL;
00930
00931 worker->options.allocated= true;
00932 }
00933 else
00934 {
00935 worker->options.allocated= false;
00936 }
00937
00938 worker->options.non_blocking= false;
00939 worker->options.packet_init= false;
00940 worker->options.grab_job_in_use= false;
00941 worker->options.pre_sleep_in_use= false;
00942 worker->options.work_job_in_use= false;
00943 worker->options.change= false;
00944 worker->options.grab_uniq= false;
00945 worker->options.timeout_return= false;
00946
00947 worker->state= 0;
00948 worker->work_state= 0;
00949 worker->function_count= 0;
00950 worker->job_count= 0;
00951 worker->work_result_size= 0;
00952 worker->con= NULL;
00953 worker->job= NULL;
00954 worker->job_list= NULL;
00955 worker->function= NULL;
00956 worker->function_list= NULL;
00957 worker->work_function= NULL;
00958 worker->work_result= NULL;
00959
00960 if (! is_clone)
00961 {
00962 gearman_universal_st *check;
00963
00964 check= gearman_universal_create(&worker->universal, NULL);
00965 if (check == NULL)
00966 {
00967 gearman_worker_free(worker);
00968 return NULL;
00969 }
00970
00971 gearman_universal_set_timeout((&worker->universal), GEARMAN_WORKER_WAIT_TIMEOUT);
00972 }
00973
00974 return worker;
00975 }
00976
00977 static gearman_return_t _worker_packet_init(gearman_worker_st *worker)
00978 {
00979 gearman_return_t ret;
00980
00981 ret= gearman_packet_create_args((&worker->universal), &(worker->grab_job),
00982 GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_GRAB_JOB,
00983 NULL, NULL, 0);
00984 if (ret != GEARMAN_SUCCESS)
00985 return ret;
00986
00987 ret= gearman_packet_create_args((&worker->universal), &(worker->pre_sleep),
00988 GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_PRE_SLEEP,
00989 NULL, NULL, 0);
00990 if (ret != GEARMAN_SUCCESS)
00991 {
00992 gearman_packet_free(&(worker->grab_job));
00993 return ret;
00994 }
00995
00996 worker->options.packet_init= true;
00997
00998 return GEARMAN_SUCCESS;
00999 }
01000
01001 static gearman_return_t _worker_add_server(const char *host, in_port_t port,
01002 void *context)
01003 {
01004 return gearman_worker_add_server((gearman_worker_st *)context, host, port);
01005 }
01006
01007 static gearman_return_t _worker_function_create(gearman_worker_st *worker,
01008 const char *function_name,
01009 size_t function_length,
01010 uint32_t timeout,
01011 gearman_worker_fn *worker_fn,
01012 void *context)
01013 {
01014 struct _worker_function_st *function;
01015 gearman_return_t ret;
01016 char timeout_buffer[11];
01017 const void *args[2];
01018 size_t args_size[2];
01019
01020 function= malloc(sizeof(struct _worker_function_st));
01021 if (function == NULL)
01022 {
01023 gearman_universal_set_error((&worker->universal), "_worker_function_create", "malloc");
01024 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
01025 }
01026
01027 function->options.packet_in_use= true;
01028 function->options.change= true;
01029 function->options.remove= false;
01030
01031 function->function_name= strdup(function_name);
01032 function->function_length= function_length;
01033 if (function->function_name == NULL)
01034 {
01035 free(function);
01036 gearman_universal_set_error((&worker->universal), "gearman_worker_add_function", "strdup");
01037 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
01038 }
01039
01040 function->worker_fn= worker_fn;
01041 function->context= context;
01042
01043 if (timeout > 0)
01044 {
01045 snprintf(timeout_buffer, 11, "%u", timeout);
01046 args[0]= function_name;
01047 args_size[0]= strlen(function_name) + 1;
01048 args[1]= timeout_buffer;
01049 args_size[1]= strlen(timeout_buffer);
01050 ret= gearman_packet_create_args((&worker->universal), &(function->packet),
01051 GEARMAN_MAGIC_REQUEST,
01052 GEARMAN_COMMAND_CAN_DO_TIMEOUT,
01053 args, args_size, 2);
01054 }
01055 else
01056 {
01057 args[0]= function->function_name;
01058 args_size[0]= function->function_length= function_length;
01059 ret= gearman_packet_create_args((&worker->universal), &(function->packet),
01060 GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_CAN_DO,
01061 args, args_size, 1);
01062 }
01063 if (ret != GEARMAN_SUCCESS)
01064 {
01065 free(function->function_name);
01066 free(function);
01067 return ret;
01068 }
01069
01070 if (worker->function_list != NULL)
01071 worker->function_list->prev= function;
01072 function->next= worker->function_list;
01073 function->prev= NULL;
01074 worker->function_list= function;
01075 worker->function_count++;
01076
01077 worker->options.change= true;
01078
01079 return GEARMAN_SUCCESS;
01080 }
01081
01082 static void _worker_function_free(gearman_worker_st *worker,
01083 struct _worker_function_st *function)
01084 {
01085 if (worker->function_list == function)
01086 worker->function_list= function->next;
01087 if (function->prev != NULL)
01088 function->prev->next= function->next;
01089 if (function->next != NULL)
01090 function->next->prev= function->prev;
01091 worker->function_count--;
01092
01093 if (function->options.packet_in_use)
01094 gearman_packet_free(&(function->packet));
01095
01096 free(function->function_name);
01097 free(function);
01098 }