00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015
00016 #include <libgearman-server/queue_libdrizzle.h>
00017 #include <libdrizzle/drizzle_client.h>
00018
00028 #define GEARMAN_QUEUE_LIBDRIZZLE_DEFAULT_DATABASE "test"
00029 #define GEARMAN_QUEUE_LIBDRIZZLE_DEFAULT_TABLE "queue"
00030 #define GEARMAN_QUEUE_QUERY_BUFFER 256
00031
00035 typedef struct
00036 {
00037 drizzle_st drizzle;
00038 drizzle_con_st con;
00039 drizzle_result_st result;
00040 char table[DRIZZLE_MAX_TABLE_SIZE];
00041 char *query;
00042 size_t query_size;
00043 } gearman_queue_libdrizzle_st;
00044
00048 static drizzle_return_t _libdrizzle_query(gearman_server_st *server,
00049 gearman_queue_libdrizzle_st *queue,
00050 const char *query, size_t query_size);
00051
00052
00053 static gearman_return_t _libdrizzle_add(gearman_server_st *server,
00054 void *context, const void *unique,
00055 size_t unique_size,
00056 const void *function_name,
00057 size_t function_name_size,
00058 const void *data, size_t data_size,
00059 gearman_job_priority_t priority);
00060 static gearman_return_t _libdrizzle_flush(gearman_server_st *gearman,
00061 void *context);
00062 static gearman_return_t _libdrizzle_done(gearman_server_st *gearman,
00063 void *context, const void *unique,
00064 size_t unique_size,
00065 const void *function_name,
00066 size_t function_name_size);
00067 static gearman_return_t _libdrizzle_replay(gearman_server_st *gearman,
00068 void *context,
00069 gearman_queue_add_fn *add_fn,
00070 void *add_context);
00071
00074
00075
00076
00077
00078 gearman_return_t gearman_server_queue_libdrizzle_conf(gearman_conf_st *conf)
00079 {
00080 gearman_conf_module_st *module;
00081
00082 module= gearman_conf_module_create(conf, NULL, "libdrizzle");
00083 if (module == NULL)
00084 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00085
00086 #define MCO(__name, __value, __help) \
00087 gearman_conf_module_add_option(module, __name, 0, __value, __help);
00088
00089 MCO("host", "HOST", "Host of server.")
00090 MCO("port", "PORT", "Port of server.")
00091 MCO("uds", "UDS", "Unix domain socket for server.")
00092 MCO("user", "USER", "User name for authentication.")
00093 MCO("password", "PASSWORD", "Password for authentication.")
00094 MCO("db", "DB", "Database to use.")
00095 MCO("table", "TABLE", "Table to use.")
00096 MCO("mysql", NULL, "Use MySQL protocol.")
00097
00098 return gearman_conf_return(conf);
00099 }
00100
00101 gearman_return_t gearman_server_queue_libdrizzle_init(gearman_server_st *server,
00102 gearman_conf_st *conf)
00103 {
00104 gearman_queue_libdrizzle_st *queue;
00105 gearman_conf_module_st *module;
00106 const char *name;
00107 const char *value;
00108 const char *host= NULL;
00109 in_port_t port= 0;
00110 const char *uds= NULL;
00111 const char *user= NULL;
00112 const char *password= NULL;
00113 drizzle_row_t row;
00114 char create[1024];
00115
00116 gearman_log_info(server->gearman, "Initializing libdrizzle module");
00117
00118 queue= malloc(sizeof(gearman_queue_libdrizzle_st));
00119 if (queue == NULL)
00120 {
00121 gearman_log_error(server->gearman, "gearman_queue_libdrizzle_init", "malloc");
00122 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00123 }
00124
00125 memset(queue, 0, sizeof(gearman_queue_libdrizzle_st));
00126 snprintf(queue->table, DRIZZLE_MAX_TABLE_SIZE,
00127 GEARMAN_QUEUE_LIBDRIZZLE_DEFAULT_TABLE);
00128
00129 if (drizzle_create(&(queue->drizzle)) == NULL)
00130 {
00131 free(queue);
00132 gearman_log_error(server->gearman, "gearman_queue_libdrizzle_init", "drizzle_create");
00133 return GEARMAN_QUEUE_ERROR;
00134 }
00135
00136 if (drizzle_con_create(&(queue->drizzle), &(queue->con)) == NULL)
00137 {
00138 drizzle_free(&(queue->drizzle));
00139 free(queue);
00140 gearman_log_error(server->gearman, "gearman_queue_libdrizzle_init", "drizzle_con_create");
00141 return GEARMAN_QUEUE_ERROR;
00142 }
00143
00144 gearman_server_set_queue_context(server, queue);
00145
00146 drizzle_con_set_db(&(queue->con), GEARMAN_QUEUE_LIBDRIZZLE_DEFAULT_DATABASE);
00147
00148
00149 module= gearman_conf_module_find(conf, "libdrizzle");
00150 if (module == NULL)
00151 {
00152 gearman_log_error(server->gearman, "gearman_queue_libdrizzle_init", "gearman_conf_module_find:NULL");
00153 return GEARMAN_QUEUE_ERROR;
00154 }
00155
00156 while (gearman_conf_module_value(module, &name, &value))
00157 {
00158 if (!strcmp(name, "host"))
00159 host= value;
00160 else if (!strcmp(name, "port"))
00161 port= (in_port_t)atoi(value);
00162 else if (!strcmp(name, "uds"))
00163 uds= value;
00164 else if (!strcmp(name, "user"))
00165 user= value;
00166 else if (!strcmp(name, "password"))
00167 password= value;
00168 else if (!strcmp(name, "db"))
00169 drizzle_con_set_db(&(queue->con), value);
00170 else if (!strcmp(name, "table"))
00171 snprintf(queue->table, DRIZZLE_MAX_TABLE_SIZE, "%s", value);
00172 else if (!strcmp(name, "mysql"))
00173 drizzle_con_set_options(&(queue->con), DRIZZLE_CON_MYSQL);
00174 else
00175 {
00176 gearman_server_queue_libdrizzle_deinit(server);
00177 gearman_log_error(server->gearman, "gearman_queue_libdrizzle_init", "Unknown argument: %s", name);
00178 return GEARMAN_QUEUE_ERROR;
00179 }
00180 }
00181
00182 if (uds == NULL)
00183 drizzle_con_set_tcp(&(queue->con), host, port);
00184 else
00185 drizzle_con_set_uds(&(queue->con), uds);
00186
00187 drizzle_con_set_auth(&(queue->con), user, password);
00188
00189
00190 if (password != NULL)
00191 memset((void *)password, 'x', strlen(password));
00192
00193 if (_libdrizzle_query(server, queue, "SHOW TABLES", 11) != DRIZZLE_RETURN_OK)
00194 {
00195 gearman_server_queue_libdrizzle_deinit(server);
00196 return GEARMAN_QUEUE_ERROR;
00197 }
00198
00199 if (drizzle_result_buffer(&(queue->result)) != DRIZZLE_RETURN_OK)
00200 {
00201 drizzle_result_free(&(queue->result));
00202 gearman_server_queue_libdrizzle_deinit(server);
00203 gearman_log_error(server->gearman, "gearman_queue_libdrizzle_init", "drizzle_result_buffer:%s", drizzle_error(&(queue->drizzle)));
00204 return GEARMAN_QUEUE_ERROR;
00205 }
00206
00207 while ((row= drizzle_row_next(&(queue->result))) != NULL)
00208 {
00209 if (!strcasecmp(queue->table, row[0]))
00210 {
00211 gearman_log_info(server->gearman, "libdrizzle module using table '%s.%s'", drizzle_con_db(&queue->con), row[0]);
00212
00213 break;
00214 }
00215 }
00216
00217 drizzle_result_free(&(queue->result));
00218
00219 if (row == NULL)
00220 {
00221 snprintf(create, 1024,
00222 "CREATE TABLE %s"
00223 "("
00224 "unique_key VARCHAR(%d) PRIMARY KEY,"
00225 "function_name VARCHAR(255),"
00226 "priority INT,"
00227 "data LONGBLOB"
00228 ")",
00229 queue->table, GEARMAN_UNIQUE_SIZE);
00230
00231 gearman_log_info(server->gearman, "libdrizzle module creating table '%s.%s'",
00232 drizzle_con_db(&queue->con), queue->table);
00233
00234 if (_libdrizzle_query(server, queue, create, strlen(create))
00235 != DRIZZLE_RETURN_OK)
00236 {
00237 gearman_server_queue_libdrizzle_deinit(server);
00238 return GEARMAN_QUEUE_ERROR;
00239 }
00240
00241 drizzle_result_free(&(queue->result));
00242 }
00243
00244 gearman_server_set_queue_add_fn(server, _libdrizzle_add);
00245 gearman_server_set_queue_flush_fn(server, _libdrizzle_flush);
00246 gearman_server_set_queue_done_fn(server, _libdrizzle_done);
00247 gearman_server_set_queue_replay_fn(server, _libdrizzle_replay);
00248
00249 return GEARMAN_SUCCESS;
00250 }
00251
00252 gearman_return_t
00253 gearman_server_queue_libdrizzle_deinit(gearman_server_st *server)
00254 {
00255 gearman_queue_libdrizzle_st *queue;
00256
00257 gearman_log_info(server->gearman, "Shutting down libdrizzle queue module");
00258
00259 queue= (gearman_queue_libdrizzle_st *)gearman_server_queue_context(server);
00260 gearman_server_set_queue_context(server, NULL);
00261 drizzle_con_free(&(queue->con));
00262 drizzle_free(&(queue->drizzle));
00263 if (queue->query != NULL)
00264 free(queue->query);
00265 free(queue);
00266
00267 return GEARMAN_SUCCESS;
00268 }
00269
00270 gearman_return_t gearmand_queue_libdrizzle_init(gearmand_st *gearmand,
00271 gearman_conf_st *conf)
00272 {
00273 return gearman_server_queue_libdrizzle_init(&(gearmand->server), conf);
00274 }
00275
00276 gearman_return_t gearmand_queue_libdrizzle_deinit(gearmand_st *gearmand)
00277 {
00278 return gearman_server_queue_libdrizzle_deinit(&(gearmand->server));
00279 }
00280
00281
00282
00283
00284
00285 static drizzle_return_t _libdrizzle_query(gearman_server_st *server,
00286 gearman_queue_libdrizzle_st *queue,
00287 const char *query, size_t query_size)
00288 {
00289 drizzle_return_t ret;
00290
00291 gearman_log_crazy(server->gearman, "libdrizzle query: %.*s", (uint32_t)query_size, query);
00292
00293 (void)drizzle_query(&(queue->con), &(queue->result), query, query_size, &ret);
00294 if (ret != DRIZZLE_RETURN_OK)
00295 {
00296
00297 if (ret == DRIZZLE_RETURN_LOST_CONNECTION)
00298 {
00299 (void)drizzle_query(&(queue->con), &(queue->result), query, query_size,
00300 &ret);
00301 }
00302
00303 if (ret != DRIZZLE_RETURN_OK)
00304 {
00305 gearman_log_error(server->gearman, "_libdrizzle_query", "drizzle_query:%s",
00306 drizzle_error(&(queue->drizzle)));
00307 return ret;
00308 }
00309 }
00310
00311 return DRIZZLE_RETURN_OK;
00312 }
00313
00314 static gearman_return_t _libdrizzle_add(gearman_server_st *server,
00315 void *context, const void *unique,
00316 size_t unique_size,
00317 const void *function_name,
00318 size_t function_name_size,
00319 const void *data, size_t data_size,
00320 gearman_job_priority_t priority)
00321 {
00322 gearman_queue_libdrizzle_st *queue= (gearman_queue_libdrizzle_st *)context;
00323 char *query;
00324 size_t query_size;
00325
00326 gearman_log_debug(server->gearman, "libdrizzle add: %.*s", (uint32_t)unique_size, (char *)unique);
00327
00328
00329
00330 #if 0
00331 if (!not started)
00332 {
00333 if (_query(drizzle, "BEGIN", 5) != DRIZZLE_RETURN_OK)
00334 return REPQ_RETURN_EXTERNAL;
00335
00336 drizzle_result_free(&(drizzle->result));
00337 }
00338 #endif
00339
00340 query_size= ((unique_size + function_name_size + data_size) * 2) +
00341 GEARMAN_QUEUE_QUERY_BUFFER;
00342 if (query_size > queue->query_size)
00343 {
00344 query= realloc(queue->query, query_size);
00345 if (query == NULL)
00346 {
00347 gearman_log_error(server->gearman, "_libdrizzle_add", "realloc");
00348 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00349 }
00350
00351 queue->query= query;
00352 queue->query_size= query_size;
00353 }
00354 else
00355 query= queue->query;
00356
00357 query_size= (size_t)snprintf(query, query_size,
00358 "INSERT INTO %s SET priority=%u,unique_key='",
00359 queue->table, (uint32_t)priority);
00360
00361 query_size+= (size_t)drizzle_escape_string(query + query_size, unique,
00362 unique_size);
00363 memcpy(query + query_size, "',function_name='", 17);
00364 query_size+= 17;
00365
00366 query_size+= (size_t)drizzle_escape_string(query + query_size, function_name,
00367 function_name_size);
00368 memcpy(query + query_size, "',data='", 8);
00369 query_size+= 8;
00370
00371 query_size+= (size_t)drizzle_escape_string(query + query_size, data,
00372 data_size);
00373 memcpy(query + query_size, "'", 1);
00374 query_size+= 1;
00375
00376 if (_libdrizzle_query(server, queue, query, query_size) != DRIZZLE_RETURN_OK)
00377 return GEARMAN_QUEUE_ERROR;
00378
00379 drizzle_result_free(&(queue->result));
00380
00381 return GEARMAN_SUCCESS;
00382 }
00383
00384 static gearman_return_t _libdrizzle_flush(gearman_server_st *server,
00385 void *context __attribute__((unused)))
00386 {
00387 gearman_log_debug(server->gearman, "libdrizzle flush");
00388
00389 return GEARMAN_SUCCESS;
00390 }
00391
00392 static gearman_return_t _libdrizzle_done(gearman_server_st *server,
00393 void *context, const void *unique,
00394 size_t unique_size,
00395 const void *function_name __attribute__((unused)),
00396 size_t function_name_size __attribute__((unused)))
00397 {
00398 gearman_queue_libdrizzle_st *queue= (gearman_queue_libdrizzle_st *)context;
00399 char *query;
00400 size_t query_size;
00401
00402 gearman_log_debug(server->gearman, "libdrizzle done: %.*s", (uint32_t)unique_size, (char *)unique);
00403
00404 query_size= (unique_size * 2) + GEARMAN_QUEUE_QUERY_BUFFER;
00405 if (query_size > queue->query_size)
00406 {
00407 query= realloc(queue->query, query_size);
00408 if (query == NULL)
00409 {
00410 gearman_log_error(server->gearman, "_libdrizzle_add", "realloc");
00411 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00412 }
00413
00414 queue->query= query;
00415 queue->query_size= query_size;
00416 }
00417 else
00418 query= queue->query;
00419
00420 query_size= (size_t)snprintf(query, query_size,
00421 "DELETE FROM %s WHERE unique_key='",
00422 queue->table);
00423
00424 query_size+= (size_t)drizzle_escape_string(query + query_size, unique,
00425 unique_size);
00426 memcpy(query + query_size, "'", 1);
00427 query_size+= 1;
00428
00429 if (_libdrizzle_query(server, queue, query, query_size) != DRIZZLE_RETURN_OK)
00430 return GEARMAN_QUEUE_ERROR;
00431
00432 drizzle_result_free(&(queue->result));
00433
00434 return GEARMAN_SUCCESS;
00435 }
00436
00437 static gearman_return_t _libdrizzle_replay(gearman_server_st *server,
00438 void *context,
00439 gearman_queue_add_fn *add_fn,
00440 void *add_context)
00441 {
00442 gearman_queue_libdrizzle_st *queue= (gearman_queue_libdrizzle_st *)context;
00443 char *query;
00444 size_t query_size;
00445 drizzle_return_t ret;
00446 drizzle_row_t row;
00447 size_t *field_sizes;
00448 gearman_return_t gret;
00449
00450 gearman_log_info(server->gearman, "libdrizzle replay start");
00451
00452 if (GEARMAN_QUEUE_QUERY_BUFFER > queue->query_size)
00453 {
00454 query= realloc(queue->query, GEARMAN_QUEUE_QUERY_BUFFER);
00455 if (query == NULL)
00456 {
00457 gearman_log_error(server->gearman, "_libdrizzle_add", "realloc");
00458 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00459 }
00460
00461 queue->query= query;
00462 queue->query_size= GEARMAN_QUEUE_QUERY_BUFFER;
00463 }
00464 else
00465 query= queue->query;
00466
00467 query_size= (size_t)snprintf(query, GEARMAN_QUEUE_QUERY_BUFFER,
00468 "SELECT unique_key,function_name,priority,data "
00469 "FROM %s",
00470 queue->table);
00471
00472 if (_libdrizzle_query(server, queue, query, query_size) != DRIZZLE_RETURN_OK)
00473 return GEARMAN_QUEUE_ERROR;
00474
00475 if (drizzle_column_skip(&(queue->result)) != DRIZZLE_RETURN_OK)
00476 {
00477 drizzle_result_free(&(queue->result));
00478 gearman_log_error(server->gearman, "_libdrizzle_replay", "drizzle_column_skip:%s", drizzle_error(&(queue->drizzle)));
00479
00480 return GEARMAN_QUEUE_ERROR;
00481 }
00482
00483 while (1)
00484 {
00485 row= drizzle_row_buffer(&(queue->result), &ret);
00486 if (ret != DRIZZLE_RETURN_OK)
00487 {
00488 drizzle_result_free(&(queue->result));
00489 gearman_log_error(server->gearman, "_libdrizzle_replay", "drizzle_row_buffer:%s", drizzle_error(&(queue->drizzle)));
00490
00491 return GEARMAN_QUEUE_ERROR;
00492 }
00493
00494 if (row == NULL)
00495 break;
00496
00497 field_sizes= drizzle_row_field_sizes(&(queue->result));
00498
00499 gearman_log_debug(server->gearman, "libdrizzle replay: %.*s", (uint32_t)field_sizes[0], row[1]);
00500
00501 gret= (*add_fn)(server, add_context, row[0], field_sizes[0], row[1],
00502 field_sizes[1], row[3], field_sizes[3], atoi(row[2]));
00503 if (gret != GEARMAN_SUCCESS)
00504 {
00505 drizzle_row_free(&(queue->result), row);
00506 drizzle_result_free(&(queue->result));
00507 return gret;
00508 }
00509
00510 row[3]= NULL;
00511 drizzle_row_free(&(queue->result), row);
00512 }
00513
00514 drizzle_result_free(&(queue->result));
00515
00516 return GEARMAN_SUCCESS;
00517 }