00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015
00026 gearman_command_info_st gearman_command_info_list[GEARMAN_COMMAND_MAX]=
00027 {
00028 { "TEXT", 3, false },
00029 { "CAN_DO", 1, false },
00030 { "CANT_DO", 1, false },
00031 { "RESET_ABILITIES", 0, false },
00032 { "PRE_SLEEP", 0, false },
00033 { "UNUSED", 0, false },
00034 { "NOOP", 0, false },
00035 { "SUBMIT_JOB", 2, true },
00036 { "JOB_CREATED", 1, false },
00037 { "GRAB_JOB", 0, false },
00038 { "NO_JOB", 0, false },
00039 { "JOB_ASSIGN", 2, true },
00040 { "WORK_STATUS", 3, false },
00041 { "WORK_COMPLETE", 1, true },
00042 { "WORK_FAIL", 1, false },
00043 { "GET_STATUS", 1, false },
00044 { "ECHO_REQ", 0, true },
00045 { "ECHO_RES", 0, true },
00046 { "SUBMIT_JOB_BG", 2, true },
00047 { "ERROR", 2, false },
00048 { "STATUS_RES", 5, false },
00049 { "SUBMIT_JOB_HIGH", 2, true },
00050 { "SET_CLIENT_ID", 1, false },
00051 { "CAN_DO_TIMEOUT", 2, false },
00052 { "ALL_YOURS", 0, false },
00053 { "WORK_EXCEPTION", 1, true },
00054 { "OPTION_REQ", 1, false },
00055 { "OPTION_RES", 1, false },
00056 { "WORK_DATA", 1, true },
00057 { "WORK_WARNING", 1, true },
00058 { "GRAB_JOB_UNIQ", 0, false },
00059 { "JOB_ASSIGN_UNIQ", 3, true },
00060 { "SUBMIT_JOB_HIGH_BG", 2, true },
00061 { "SUBMIT_JOB_LOW", 2, true },
00062 { "SUBMIT_JOB_LOW_BG", 2, true },
00063 { "SUBMIT_JOB_SCHED", 7, true },
00064 { "SUBMIT_JOB_EPOCH", 3, true }
00065 };
00066
00067 inline static gearman_return_t packet_create_arg(gearman_packet_st *packet,
00068 const void *arg, size_t arg_size)
00069 {
00070 void *new_args;
00071 size_t offset;
00072 uint8_t x;
00073
00074 if (packet->argc == gearman_command_info_list[packet->command].argc &&
00075 (! (gearman_command_info_list[packet->command].data) ||
00076 packet->data != NULL))
00077 {
00078 gearman_universal_set_error(packet->universal, "gearman_packet_create_arg",
00079 "too many arguments for command");
00080 return GEARMAN_TOO_MANY_ARGS;
00081 }
00082
00083 if (packet->argc == gearman_command_info_list[packet->command].argc)
00084 {
00085 packet->data= arg;
00086 packet->data_size= arg_size;
00087 return GEARMAN_SUCCESS;
00088 }
00089
00090 if (packet->args_size == 0 && packet->magic != GEARMAN_MAGIC_TEXT)
00091 packet->args_size= GEARMAN_PACKET_HEADER_SIZE;
00092
00093 if ((packet->args_size + arg_size) < GEARMAN_ARGS_BUFFER_SIZE)
00094 {
00095 packet->args= packet->args_buffer;
00096 }
00097 else
00098 {
00099 if (packet->args == packet->args_buffer)
00100 packet->args= NULL;
00101
00102 new_args= realloc(packet->args, packet->args_size + arg_size);
00103 if (new_args == NULL)
00104 {
00105 gearman_universal_set_error(packet->universal, "gearman_packet_create_arg", "realloc");
00106 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00107 }
00108
00109 if (packet->args_size > 0)
00110 memcpy(new_args, packet->args_buffer, packet->args_size);
00111
00112 packet->args= new_args;
00113 }
00114
00115 memcpy(packet->args + packet->args_size, arg, arg_size);
00116 packet->args_size+= arg_size;
00117 packet->arg_size[packet->argc]= arg_size;
00118 packet->argc++;
00119
00120 if (packet->magic == GEARMAN_MAGIC_TEXT)
00121 {
00122 offset= 0;
00123 }
00124 else
00125 {
00126 offset= GEARMAN_PACKET_HEADER_SIZE;
00127 }
00128
00129 for (x= 0; x < packet->argc; x++)
00130 {
00131 packet->arg[x]= packet->args + offset;
00132 offset+= packet->arg_size[x];
00133 }
00134
00135 return GEARMAN_SUCCESS;
00136 }
00137
00140
00141
00142
00143
00144
00145 gearman_packet_st *gearman_packet_create(gearman_universal_st *gearman,
00146 gearman_packet_st *packet)
00147 {
00148 if (packet == NULL)
00149 {
00150 packet= malloc(sizeof(gearman_packet_st));
00151 if (packet == NULL)
00152 {
00153 gearman_universal_set_error(gearman, "gearman_packet_create", "malloc");
00154 return NULL;
00155 }
00156
00157 packet->options.allocated= true;
00158 }
00159 else
00160 {
00161 packet->options.allocated= false;
00162 packet->options.complete= false;
00163 packet->options.free_data= false;
00164 }
00165
00166 packet->magic= 0;
00167 packet->command= 0;
00168 packet->argc= 0;
00169 packet->args_size= 0;
00170 packet->data_size= 0;
00171 packet->universal= gearman;
00172
00173 if (! (gearman->options.dont_track_packets))
00174 {
00175 if (gearman->packet_list != NULL)
00176 gearman->packet_list->prev= packet;
00177 packet->next= gearman->packet_list;
00178 packet->prev= NULL;
00179 gearman->packet_list= packet;
00180 gearman->packet_count++;
00181 }
00182
00183 packet->args= NULL;
00184 packet->data= NULL;
00185
00186 return packet;
00187 }
00188
00189 gearman_return_t gearman_packet_create_arg(gearman_packet_st *packet,
00190 const void *arg, size_t arg_size)
00191 {
00192 return packet_create_arg(packet, arg, arg_size);
00193 }
00194
00195 gearman_return_t gearman_packet_create_args(gearman_universal_st *gearman,
00196 gearman_packet_st *packet,
00197 enum gearman_magic_t magic,
00198 gearman_command_t command,
00199 const void *args[],
00200 const size_t args_size[],
00201 size_t args_count)
00202 {
00203 gearman_return_t ret;
00204 size_t x;
00205
00206 packet= gearman_packet_create(gearman, packet);
00207 if (packet == NULL)
00208 {
00209 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00210 }
00211
00212 packet->magic= magic;
00213 packet->command= command;
00214
00215 for (x= 0; x < args_count; x++)
00216 {
00217 ret= packet_create_arg(packet, args[x], args_size[x]);
00218 if (ret != GEARMAN_SUCCESS)
00219 {
00220 gearman_packet_free(packet);
00221 return ret;
00222 }
00223 }
00224
00225 ret= gearman_packet_pack_header(packet);
00226
00227 if (ret != GEARMAN_SUCCESS)
00228 gearman_packet_free(packet);
00229
00230 return ret;
00231 }
00232
00233 void gearman_packet_free(gearman_packet_st *packet)
00234 {
00235 if (packet->args != packet->args_buffer && packet->args != NULL)
00236 free(packet->args);
00237
00238 if (packet->options.free_data && packet->data != NULL)
00239 {
00240 if (packet->universal->workload_free_fn == NULL)
00241 {
00242 free((void *)packet->data);
00243 }
00244 else
00245 {
00246 packet->universal->workload_free_fn((void *)(packet->data),
00247 (void *)packet->universal->workload_free_context);
00248 }
00249 }
00250
00251 if (! (packet->universal->options.dont_track_packets))
00252 {
00253 if (packet->universal->packet_list == packet)
00254 packet->universal->packet_list= packet->next;
00255 if (packet->prev != NULL)
00256 packet->prev->next= packet->next;
00257 if (packet->next != NULL)
00258 packet->next->prev= packet->prev;
00259 packet->universal->packet_count--;
00260 }
00261
00262 if (packet->options.allocated)
00263 free(packet);
00264 }
00265
00266 gearman_return_t gearman_packet_pack_header(gearman_packet_st *packet)
00267 {
00268 uint64_t length_64;
00269 uint32_t tmp;
00270
00271 if (packet->magic == GEARMAN_MAGIC_TEXT)
00272 {
00273 packet->options.complete= true;
00274 return GEARMAN_SUCCESS;
00275 }
00276
00277 if (packet->args_size == 0)
00278 {
00279 packet->args= packet->args_buffer;
00280 packet->args_size= GEARMAN_PACKET_HEADER_SIZE;
00281 }
00282
00283 switch (packet->magic)
00284 {
00285 case GEARMAN_MAGIC_TEXT:
00286 break;
00287
00288 case GEARMAN_MAGIC_REQUEST:
00289 memcpy(packet->args, "\0REQ", 4);
00290 break;
00291
00292 case GEARMAN_MAGIC_RESPONSE:
00293 memcpy(packet->args, "\0RES", 4);
00294 break;
00295
00296 default:
00297 gearman_universal_set_error(packet->universal, "gearman_packet_pack_header",
00298 "invalid magic value");
00299 return GEARMAN_INVALID_MAGIC;
00300 }
00301
00302 if (packet->command == GEARMAN_COMMAND_TEXT ||
00303 packet->command >= GEARMAN_COMMAND_MAX)
00304 {
00305 gearman_universal_set_error(packet->universal, "gearman_packet_pack_header",
00306 "invalid command value");
00307 return GEARMAN_INVALID_COMMAND;
00308 }
00309
00310 tmp= packet->command;
00311 tmp= htonl(tmp);
00312 memcpy(packet->args + 4, &tmp, 4);
00313
00314 length_64= packet->args_size + packet->data_size - GEARMAN_PACKET_HEADER_SIZE;
00315
00316
00317 if (length_64 >= UINT32_MAX || length_64 < packet->data_size)
00318 {
00319 gearman_universal_set_error(packet->universal, "gearman_packet_pack_header",
00320 "data size too too long");
00321 return GEARMAN_ARGUMENT_TOO_LARGE;
00322 }
00323
00324 tmp= (uint32_t)length_64;
00325 tmp= htonl(tmp);
00326 memcpy(packet->args + 8, &tmp, 4);
00327
00328 packet->options.complete= true;
00329
00330 return GEARMAN_SUCCESS;
00331 }
00332
00333 gearman_return_t gearman_packet_unpack_header(gearman_packet_st *packet)
00334 {
00335 uint32_t tmp;
00336
00337 if (!memcmp(packet->args, "\0REQ", 4))
00338 packet->magic= GEARMAN_MAGIC_REQUEST;
00339 else if (!memcmp(packet->args, "\0RES", 4))
00340 packet->magic= GEARMAN_MAGIC_RESPONSE;
00341 else
00342 {
00343 gearman_universal_set_error(packet->universal, "gearman_packet_unpack_header",
00344 "invalid magic value");
00345 return GEARMAN_INVALID_MAGIC;
00346 }
00347
00348 memcpy(&tmp, packet->args + 4, 4);
00349 packet->command= ntohl(tmp);
00350
00351 if (packet->command == GEARMAN_COMMAND_TEXT ||
00352 packet->command >= GEARMAN_COMMAND_MAX)
00353 {
00354 gearman_universal_set_error(packet->universal, "gearman_packet_unpack_header",
00355 "invalid command value");
00356 return GEARMAN_INVALID_COMMAND;
00357 }
00358
00359 memcpy(&tmp, packet->args + 8, 4);
00360 packet->data_size= ntohl(tmp);
00361
00362 return GEARMAN_SUCCESS;
00363 }
00364
00365 size_t gearman_packet_pack(const gearman_packet_st *packet,
00366 gearman_connection_st *con __attribute__ ((unused)),
00367 void *data, size_t data_size,
00368 gearman_return_t *ret_ptr)
00369 {
00370 if (packet->args_size == 0)
00371 {
00372 *ret_ptr= GEARMAN_SUCCESS;
00373 return 0;
00374 }
00375
00376 if (packet->args_size > data_size)
00377 {
00378 *ret_ptr= GEARMAN_FLUSH_DATA;
00379 return 0;
00380 }
00381
00382 memcpy(data, packet->args, packet->args_size);
00383 *ret_ptr= GEARMAN_SUCCESS;
00384 return packet->args_size;
00385 }
00386
00387 size_t gearman_packet_unpack(gearman_packet_st *packet,
00388 gearman_connection_st *con __attribute__ ((unused)),
00389 const void *data, size_t data_size,
00390 gearman_return_t *ret_ptr)
00391 {
00392 uint8_t *ptr;
00393 size_t used_size;
00394 size_t arg_size;
00395
00396 if (packet->args_size == 0)
00397 {
00398 if (data_size > 0 && ((uint8_t *)data)[0] != 0)
00399 {
00400
00401 ptr= memchr(data, '\n', data_size);
00402 if (ptr == NULL)
00403 {
00404 *ret_ptr= GEARMAN_IO_WAIT;
00405 return 0;
00406 }
00407
00408 packet->magic= GEARMAN_MAGIC_TEXT;
00409 packet->command= GEARMAN_COMMAND_TEXT;
00410
00411 used_size= (size_t)(ptr - ((uint8_t *)data)) + 1;
00412 *ptr= 0;
00413 if (used_size > 1 && *(ptr - 1) == '\r')
00414 *(ptr - 1)= 0;
00415
00416 for (arg_size= used_size, ptr= (uint8_t *)data; ptr != NULL; data= ptr)
00417 {
00418 ptr= memchr(data, ' ', arg_size);
00419 if (ptr != NULL)
00420 {
00421 *ptr= 0;
00422 ptr++;
00423 while (*ptr == ' ')
00424 ptr++;
00425
00426 arg_size-= (size_t)(ptr - ((uint8_t *)data));
00427 }
00428
00429 *ret_ptr= packet_create_arg(packet, data, ptr == NULL ? arg_size :
00430 (size_t)(ptr - ((uint8_t *)data)));
00431 if (*ret_ptr != GEARMAN_SUCCESS)
00432 return used_size;
00433 }
00434
00435 return used_size;
00436 }
00437 else if (data_size < GEARMAN_PACKET_HEADER_SIZE)
00438 {
00439 *ret_ptr= GEARMAN_IO_WAIT;
00440 return 0;
00441 }
00442
00443 packet->args= packet->args_buffer;
00444 packet->args_size= GEARMAN_PACKET_HEADER_SIZE;
00445 memcpy(packet->args, data, GEARMAN_PACKET_HEADER_SIZE);
00446
00447 *ret_ptr= gearman_packet_unpack_header(packet);
00448 if (*ret_ptr != GEARMAN_SUCCESS)
00449 return 0;
00450
00451 used_size= GEARMAN_PACKET_HEADER_SIZE;
00452 }
00453 else
00454 {
00455 used_size= 0;
00456 }
00457
00458 while (packet->argc != gearman_command_info_list[packet->command].argc)
00459 {
00460 if (packet->argc != (gearman_command_info_list[packet->command].argc - 1) ||
00461 gearman_command_info_list[packet->command].data)
00462 {
00463 ptr= memchr(((uint8_t *)data) + used_size, 0, data_size - used_size);
00464 if (ptr == NULL)
00465 {
00466 *ret_ptr= GEARMAN_IO_WAIT;
00467 return used_size;
00468 }
00469
00470 arg_size= (size_t)(ptr - (((uint8_t *)data) + used_size)) + 1;
00471 *ret_ptr= packet_create_arg(packet, ((uint8_t *)data) + used_size, arg_size);
00472
00473 if (*ret_ptr != GEARMAN_SUCCESS)
00474 return used_size;
00475
00476 packet->data_size-= arg_size;
00477 used_size+= arg_size;
00478 }
00479 else
00480 {
00481 if ((data_size - used_size) < packet->data_size)
00482 {
00483 *ret_ptr= GEARMAN_IO_WAIT;
00484 return used_size;
00485 }
00486
00487 *ret_ptr= packet_create_arg(packet, ((uint8_t *)data) + used_size,
00488 packet->data_size);
00489 if (*ret_ptr != GEARMAN_SUCCESS)
00490 return used_size;
00491
00492 used_size+= packet->data_size;
00493 packet->data_size= 0;
00494 }
00495 }
00496
00497 *ret_ptr= GEARMAN_SUCCESS;
00498 return used_size;
00499 }
00500
00501 void gearman_packet_give_data(gearman_packet_st *packet, const void *data,
00502 size_t data_size)
00503 {
00504 packet->data= data;
00505 packet->data_size= data_size;
00506 packet->options.free_data= true;
00507 }
00508
00509 void *gearman_packet_take_data(gearman_packet_st *packet, size_t *data_size)
00510 {
00511 void *data= (void *)(packet->data);
00512
00513 *data_size= packet->data_size;
00514
00515 packet->data= NULL;
00516 packet->data_size= 0;
00517 packet->options.free_data= false;
00518
00519 return data;
00520 }