00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015
00016
00017
00018
00019
00029 static uint32_t _server_job_hash(const char *key, size_t key_size);
00030
00034 static inline void _server_con_worker_list_append(gearman_server_worker_st *list,
00035 gearman_server_worker_st *worker);
00036
00041 static gearman_server_job_st *
00042 _server_job_get_unique(gearman_server_st *server, uint32_t unique_key,
00043 gearman_server_function_st *server_function,
00044 const char *unique, size_t data_size);
00045
00048
00049
00050
00051
00052 gearman_server_job_st *
00053 gearman_server_job_add(gearman_server_st *server, const char *function_name,
00054 size_t function_name_size, const char *unique,
00055 size_t unique_size, const void *data, size_t data_size,
00056 gearman_job_priority_t priority,
00057 gearman_server_client_st *server_client,
00058 gearman_return_t *ret_ptr)
00059 {
00060 gearman_server_job_st *server_job;
00061 gearman_server_function_st *server_function;
00062 uint32_t key;
00063
00064 server_function= gearman_server_function_get(server, function_name,
00065 function_name_size);
00066 if (server_function == NULL)
00067 {
00068 *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
00069 return NULL;
00070 }
00071
00072 if (unique_size == 0)
00073 {
00074 server_job= NULL;
00075 key= 0;
00076 }
00077 else
00078 {
00079 if (unique_size == 1 && *unique == '-')
00080 {
00081 if (data_size == 0)
00082 {
00083 key= 0;
00084 server_job= NULL;
00085 }
00086 else
00087 {
00088
00089 key= _server_job_hash(data, data_size);
00090 server_job= _server_job_get_unique(server, key, server_function, data,
00091 data_size);
00092 }
00093 }
00094 else
00095 {
00096
00097 key= _server_job_hash(unique, unique_size);
00098 server_job= _server_job_get_unique(server, key, server_function, unique,
00099 0);
00100 }
00101 }
00102
00103 if (server_job == NULL)
00104 {
00105 if (server_function->max_queue_size > 0 &&
00106 server_function->job_total >= server_function->max_queue_size)
00107 {
00108 *ret_ptr= GEARMAN_JOB_QUEUE_FULL;
00109 return NULL;
00110 }
00111
00112 server_job= gearman_server_job_create(server, NULL);
00113 if (server_job == NULL)
00114 {
00115 *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
00116 return NULL;
00117 }
00118
00119 server_job->priority= priority;
00120
00121 server_job->function= server_function;
00122 server_function->job_total++;
00123
00124 snprintf(server_job->job_handle, GEARMAN_JOB_HANDLE_SIZE, "%s:%u",
00125 server->job_handle_prefix, server->job_handle_count);
00126 snprintf(server_job->unique, GEARMAN_UNIQUE_SIZE, "%.*s",
00127 (uint32_t)unique_size, unique);
00128 server->job_handle_count++;
00129 server_job->data= data;
00130 server_job->data_size= data_size;
00131
00132 server_job->unique_key= key;
00133 key= key % GEARMAN_JOB_HASH_SIZE;
00134 GEARMAN_HASH_ADD(server->unique, key, server_job, unique_);
00135
00136 key= _server_job_hash(server_job->job_handle,
00137 strlen(server_job->job_handle));
00138 server_job->job_handle_key= key;
00139 key= key % GEARMAN_JOB_HASH_SIZE;
00140 GEARMAN_HASH_ADD(server->job, key, server_job,);
00141
00142 if (server->state.queue_startup)
00143 {
00144 server_job->state|= GEARMAN_SERVER_JOB_QUEUED;
00145 }
00146 else if (server_client == NULL && server->queue_add_fn != NULL)
00147 {
00148 *ret_ptr= (*(server->queue_add_fn))(server,
00149 (void *)server->queue_context,
00150 server_job->unique,
00151 unique_size,
00152 function_name,
00153 function_name_size,
00154 data, data_size, priority);
00155 if (*ret_ptr != GEARMAN_SUCCESS)
00156 {
00157 server_job->data= NULL;
00158 gearman_server_job_free(server_job);
00159 return NULL;
00160 }
00161
00162 if (server->queue_flush_fn != NULL)
00163 {
00164 *ret_ptr= (*(server->queue_flush_fn))(server,
00165 (void *)server->queue_context);
00166 if (*ret_ptr != GEARMAN_SUCCESS)
00167 {
00168 server_job->data= NULL;
00169 gearman_server_job_free(server_job);
00170 return NULL;
00171 }
00172 }
00173
00174 server_job->state|= GEARMAN_SERVER_JOB_QUEUED;
00175 }
00176
00177 *ret_ptr= gearman_server_job_queue(server_job);
00178 if (*ret_ptr != GEARMAN_SUCCESS)
00179 {
00180 if (server_client == NULL && server->queue_done_fn != NULL)
00181 {
00182
00183 (void)(*(server->queue_done_fn))(server,
00184 (void *)server->queue_context,
00185 server_job->unique, unique_size,
00186 server_job->function->function_name,
00187 server_job->function->function_name_size);
00188 }
00189
00190 gearman_server_job_free(server_job);
00191 return NULL;
00192 }
00193 }
00194 else
00195 *ret_ptr= GEARMAN_JOB_EXISTS;
00196
00197 if (server_client != NULL)
00198 {
00199 server_client->job= server_job;
00200 GEARMAN_LIST_ADD(server_job->client, server_client, job_)
00201 }
00202
00203 return server_job;
00204 }
00205
00206 gearman_server_job_st *
00207 gearman_server_job_create(gearman_server_st *server,
00208 gearman_server_job_st *server_job)
00209 {
00210 if (server_job == NULL)
00211 {
00212 if (server->free_job_count > 0)
00213 {
00214 server_job= server->free_job_list;
00215 GEARMAN_LIST_DEL(server->free_job, server_job,)
00216 }
00217 else
00218 {
00219 server_job= malloc(sizeof(gearman_server_job_st));
00220 if (server_job == NULL)
00221 return NULL;
00222 }
00223
00224 server_job->options.allocated= true;
00225 }
00226 else
00227 server_job->options.allocated= false;
00228
00229 memset(&server_job->state, 0, sizeof(gearman_server_job_state_t));
00230 server_job->retries= 0;
00231 server_job->priority= 0;
00232 server_job->job_handle_key= 0;
00233 server_job->unique_key= 0;
00234 server_job->client_count= 0;
00235 server_job->numerator= 0;
00236 server_job->denominator= 0;
00237 server_job->data_size= 0;
00238 server_job->server= server;
00239 server_job->next= NULL;
00240 server_job->prev= NULL;
00241 server_job->unique_next= NULL;
00242 server_job->unique_prev= NULL;
00243 server_job->worker_next= NULL;
00244 server_job->worker_prev= NULL;
00245 server_job->function= NULL;
00246 server_job->function_next= NULL;
00247 server_job->data= NULL;
00248 server_job->client_list= NULL;
00249 server_job->worker= NULL;
00250 server_job->job_handle[0]= 0;
00251 server_job->unique[0]= 0;
00252
00253 return server_job;
00254 }
00255
00256 void gearman_server_job_free(gearman_server_job_st *server_job)
00257 {
00258 uint32_t key;
00259
00260 if (server_job->worker != NULL)
00261 server_job->function->job_running--;
00262
00263 server_job->function->job_total--;
00264
00265 if (server_job->data != NULL)
00266 free((void *)(server_job->data));
00267
00268 while (server_job->client_list != NULL)
00269 gearman_server_client_free(server_job->client_list);
00270
00271 if (server_job->worker != NULL)
00272 GEARMAN_LIST_DEL(server_job->worker->job, server_job, worker_)
00273
00274 key= server_job->unique_key % GEARMAN_JOB_HASH_SIZE;
00275 GEARMAN_HASH_DEL(server_job->server->unique, key, server_job, unique_);
00276
00277 key= server_job->job_handle_key % GEARMAN_JOB_HASH_SIZE;
00278 GEARMAN_HASH_DEL(server_job->server->job, key, server_job,);
00279
00280 if (server_job->options.allocated)
00281 {
00282 if (server_job->server->free_job_count < GEARMAN_MAX_FREE_SERVER_JOB)
00283 GEARMAN_LIST_ADD(server_job->server->free_job, server_job,)
00284 else
00285 free(server_job);
00286 }
00287 }
00288
00289 gearman_server_job_st *gearman_server_job_get(gearman_server_st *server,
00290 const char *job_handle,
00291 gearman_server_con_st *worker_con)
00292 {
00293 uint32_t key;
00294
00295 key= _server_job_hash(job_handle, strlen(job_handle));
00296
00297 for (gearman_server_job_st *server_job= server->job_hash[key % GEARMAN_JOB_HASH_SIZE];
00298 server_job != NULL; server_job= server_job->next)
00299 {
00300 if (server_job->job_handle_key == key &&
00301 !strcmp(server_job->job_handle, job_handle))
00302 {
00303
00304 if (worker_con != NULL &&
00305 (server_job->worker == NULL || server_job->worker->con != worker_con))
00306 {
00307 return NULL;
00308 }
00309
00310 return server_job;
00311 }
00312 }
00313
00314 return NULL;
00315 }
00316
00317 gearman_server_job_st *
00318 gearman_server_job_peek(gearman_server_con_st *server_con)
00319 {
00320 gearman_server_worker_st *server_worker;
00321 gearman_job_priority_t priority;
00322
00323 for (server_worker= server_con->worker_list; server_worker != NULL;
00324 server_worker= server_worker->con_next)
00325 {
00326 if (server_worker->function->job_count != 0)
00327 {
00328 for (priority= GEARMAN_JOB_PRIORITY_HIGH;
00329 priority != GEARMAN_JOB_PRIORITY_MAX; priority++)
00330 {
00331 if (server_worker->function->job_list[priority] != NULL)
00332 {
00333 if (server_worker->function->job_list[priority]->state & GEARMAN_SERVER_JOB_IGNORE)
00334 {
00335
00336
00337 server_worker->function->job_list[priority]->state&=
00338 (gearman_server_job_state_t)~GEARMAN_SERVER_JOB_IGNORE;
00339 gearman_server_job_free(gearman_server_job_take(server_con));
00340 return gearman_server_job_peek(server_con);
00341 }
00342 return server_worker->function->job_list[priority];
00343 }
00344 }
00345 }
00346 }
00347
00348 return NULL;
00349 }
00350
00351 static inline void _server_con_worker_list_append(gearman_server_worker_st *list,
00352 gearman_server_worker_st *worker)
00353 {
00354 worker->con_prev= NULL;
00355 worker->con_next= list;
00356 while (worker->con_next != NULL)
00357 {
00358 worker->con_prev= worker->con_next;
00359 worker->con_next= worker->con_next->con_next;
00360 }
00361 if (worker->con_prev)
00362 worker->con_prev->con_next= worker;
00363 }
00364
00365 gearman_server_job_st *
00366 gearman_server_job_take(gearman_server_con_st *server_con)
00367 {
00368 gearman_server_worker_st *server_worker;
00369 gearman_server_job_st *server_job;
00370 gearman_job_priority_t priority;
00371
00372 for (server_worker= server_con->worker_list; server_worker != NULL;
00373 server_worker= server_worker->con_next)
00374 {
00375 if (server_worker->function->job_count != 0)
00376 break;
00377 }
00378
00379 if (server_worker == NULL)
00380 return NULL;
00381
00382 if (server_con->thread->server->flags.round_robin)
00383 {
00384 GEARMAN_LIST_DEL(server_con->worker, server_worker, con_)
00385 _server_con_worker_list_append(server_con->worker_list, server_worker);
00386 ++server_con->worker_count;
00387 if (server_con->worker_list == NULL) {
00388 server_con->worker_list= server_worker;
00389 }
00390 }
00391
00392 for (priority= GEARMAN_JOB_PRIORITY_HIGH;
00393 priority != GEARMAN_JOB_PRIORITY_MAX; priority++)
00394 {
00395 if (server_worker->function->job_list[priority] != NULL)
00396 break;
00397 }
00398
00399 server_job= server_worker->function->job_list[priority];
00400 server_job->function->job_list[priority]= server_job->function_next;
00401 if (server_job->function->job_end[priority] == server_job)
00402 server_job->function->job_end[priority]= NULL;
00403 server_job->function->job_count--;
00404
00405 server_job->worker= server_worker;
00406 GEARMAN_LIST_ADD(server_worker->job, server_job, worker_)
00407 server_job->function->job_running++;
00408
00409 if (server_job->state & GEARMAN_SERVER_JOB_IGNORE)
00410 {
00411 gearman_server_job_free(server_job);
00412 return gearman_server_job_take(server_con);
00413 }
00414
00415 return server_job;
00416 }
00417
00418 gearman_return_t gearman_server_job_queue(gearman_server_job_st *job)
00419 {
00420 gearman_server_client_st *client;
00421 gearman_server_worker_st *worker;
00422 uint32_t noop_sent;
00423 gearman_return_t ret;
00424
00425 if (job->worker != NULL)
00426 {
00427 job->retries++;
00428 if (job->server->job_retries == job->retries)
00429 {
00430 gearman_log_error(job->server->gearman,
00431 "Dropped job due to max retry count: %s %s",
00432 job->job_handle, job->unique);
00433 for (client= job->client_list; client != NULL; client= client->job_next)
00434 {
00435 ret= gearman_server_io_packet_add(client->con, false,
00436 GEARMAN_MAGIC_RESPONSE,
00437 GEARMAN_COMMAND_WORK_FAIL,
00438 job->job_handle,
00439 (size_t)strlen(job->job_handle),
00440 NULL);
00441 if (ret != GEARMAN_SUCCESS)
00442 return ret;
00443 }
00444
00445
00446 if (job->state & GEARMAN_SERVER_JOB_QUEUED &&
00447 job->server->queue_done_fn != NULL)
00448 {
00449 ret= (*(job->server->queue_done_fn))(job->server,
00450 (void *)job->server->queue_context,
00451 job->unique,
00452 (size_t)strlen(job->unique),
00453 job->function->function_name,
00454 job->function->function_name_size);
00455 if (ret != GEARMAN_SUCCESS)
00456 return ret;
00457 }
00458
00459 gearman_server_job_free(job);
00460 return GEARMAN_SUCCESS;
00461 }
00462
00463 GEARMAN_LIST_DEL(job->worker->job, job, worker_)
00464 job->worker= NULL;
00465 job->function->job_running--;
00466 job->function_next= NULL;
00467 job->numerator= 0;
00468 job->denominator= 0;
00469 }
00470
00471
00472 if (job->function->worker_list != NULL)
00473 {
00474 worker= job->function->worker_list;
00475 noop_sent= 0;
00476 do
00477 {
00478 if (worker->con->options & GEARMAN_SERVER_CON_SLEEPING &&
00479 !(worker->con->options & GEARMAN_SERVER_CON_NOOP_SENT))
00480 {
00481 ret= gearman_server_io_packet_add(worker->con, false,
00482 GEARMAN_MAGIC_RESPONSE,
00483 GEARMAN_COMMAND_NOOP, NULL);
00484 if (ret != GEARMAN_SUCCESS)
00485 return ret;
00486
00487 worker->con->options|= GEARMAN_SERVER_CON_NOOP_SENT;
00488 noop_sent++;
00489 }
00490
00491 worker= worker->function_next;
00492 }
00493 while (worker != job->function->worker_list &&
00494 (job->server->worker_wakeup == 0 ||
00495 noop_sent < job->server->worker_wakeup));
00496
00497 job->function->worker_list= worker;
00498 }
00499
00500
00501 if (job->function->job_list[job->priority] == NULL)
00502 job->function->job_list[job->priority]= job;
00503 else
00504 job->function->job_end[job->priority]->function_next= job;
00505 job->function->job_end[job->priority]= job;
00506 job->function->job_count++;
00507
00508 return GEARMAN_SUCCESS;
00509 }
00510
00511
00512
00513
00514
00515 static uint32_t _server_job_hash(const char *key, size_t key_size)
00516 {
00517 const char *ptr= key;
00518 int32_t value= 0;
00519
00520 while (key_size--)
00521 {
00522 value += (int32_t)*ptr++;
00523 value += (value << 10);
00524 value ^= (value >> 6);
00525 }
00526 value += (value << 3);
00527 value ^= (value >> 11);
00528 value += (value << 15);
00529
00530 return (uint32_t)(value == 0 ? 1 : value);
00531 }
00532
00533 static gearman_server_job_st *
00534 _server_job_get_unique(gearman_server_st *server, uint32_t unique_key,
00535 gearman_server_function_st *server_function,
00536 const char *unique, size_t data_size)
00537 {
00538 gearman_server_job_st *server_job;
00539
00540 for (server_job= server->unique_hash[unique_key % GEARMAN_JOB_HASH_SIZE];
00541 server_job != NULL; server_job= server_job->unique_next)
00542 {
00543 if (data_size == 0)
00544 {
00545 if (server_job->function == server_function &&
00546 server_job->unique_key == unique_key &&
00547 !strcmp(server_job->unique, unique))
00548 {
00549 return server_job;
00550 }
00551 }
00552 else
00553 {
00554 if (server_job->function == server_function &&
00555 server_job->unique_key == unique_key &&
00556 server_job->data_size == data_size &&
00557 !memcmp(server_job->data, unique, data_size))
00558 {
00559 return server_job;
00560 }
00561 }
00562 }
00563
00564 return NULL;
00565 }