00001
00006 #include "common.h"
00007
00008 #include <libgearman-server/queue_libtokyocabinet.h>
00009 #include <tcutil.h>
00010 #include <tcadb.h>
00011
00018
00019
00020
00021
00025 typedef struct
00026 {
00027 TCADB *db;
00028 } gearman_queue_libtokyocabinet_st;
00029
00030
00031 static gearman_return_t _libtokyocabinet_add(gearman_server_st *server, void *context,
00032 const void *unique,
00033 size_t unique_size,
00034 const void *function_name,
00035 size_t function_name_size,
00036 const void *data, size_t data_size,
00037 gearman_job_priority_t priority);
00038 static gearman_return_t _libtokyocabinet_flush(gearman_server_st *server, void *context);
00039 static gearman_return_t _libtokyocabinet_done(gearman_server_st *server, void *context,
00040 const void *unique,
00041 size_t unique_size,
00042 const void *function_name,
00043 size_t function_name_size);
00044 static gearman_return_t _libtokyocabinet_replay(gearman_server_st *server, void *context,
00045 gearman_queue_add_fn *add_fn,
00046 void *add_context);
00047
00051 static const char * _libtokyocabinet_tcaerrmsg(TCADB *db)
00052 {
00053 switch (tcadbomode(db))
00054 {
00055 case ADBOHDB:
00056 return tcerrmsg(tchdbecode((TCHDB *)tcadbreveal(db)));
00057 case ADBOBDB:
00058 return tcerrmsg(tcbdbecode((TCBDB *)tcadbreveal(db)));
00059 default:
00060 return tcerrmsg(TCEMISC);
00061 }
00062 }
00063
00064
00065
00066
00067
00068 gearman_return_t gearman_server_queue_libtokyocabinet_conf(gearman_conf_st *conf)
00069 {
00070 gearman_conf_module_st *module;
00071
00072 module= gearman_conf_module_create(conf, NULL, "libtokyocabinet");
00073 if (module == NULL)
00074 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00075
00076 gearman_conf_module_add_option(module, "file", 0, "FILE_NAME",
00077 "File name of the database. [see: man tcadb, tcadbopen() for name guidelines]");
00078 gearman_conf_module_add_option(module, "optimize", 0, "yes/no",
00079 "Optimize database on open. [default=yes]");
00080 return gearman_conf_return(conf);
00081 }
00082
00083 gearman_return_t gearman_queue_libtokyocabinet_init(gearman_server_st *server,
00084 gearman_conf_st *conf)
00085 {
00086 gearman_queue_libtokyocabinet_st *queue;
00087 gearman_conf_module_st *module;
00088 const char *name;
00089 const char *value;
00090 const char *opt_file= NULL;
00091 const char *opt_optimize= NULL;
00092
00093 gearman_log_info(server->gearman, "Initializing libtokyocabinet module");
00094
00095 queue= calloc(1, sizeof(gearman_queue_libtokyocabinet_st));
00096 if (queue == NULL)
00097 {
00098 gearman_log_error(server->gearman, "gearman_queue_libtokyocabinet_init", "malloc");
00099 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00100 }
00101
00102 if ((queue->db= tcadbnew()) == NULL)
00103 {
00104 free(queue);
00105 gearman_log_error(server->gearman, "gearman_queue_libtokyocabinet_init",
00106 "tcadbnew");
00107 return GEARMAN_QUEUE_ERROR;
00108 }
00109
00110
00111 module= gearman_conf_module_find(conf, "libtokyocabinet");
00112 if (module == NULL)
00113 {
00114 gearman_log_error(server->gearman, "gearman_queue_libtokyocabinet_init",
00115 "modconf_module_find:NULL");
00116 return GEARMAN_QUEUE_ERROR;
00117 }
00118
00119 while (gearman_conf_module_value(module, &name, &value))
00120 {
00121 if (!strcmp(name, "file"))
00122 opt_file= value;
00123 else if (!strcmp(name, "optimize"))
00124 opt_optimize= value;
00125 else
00126 {
00127 tcadbdel(queue->db);
00128 free(queue);
00129 gearman_log_error(server->gearman, "gearman_queue_libtokyocabinet_init",
00130 "Unknown argument: %s", name);
00131 return GEARMAN_QUEUE_ERROR;
00132 }
00133 }
00134
00135 if (opt_file == NULL)
00136 {
00137 gearman_log_error(server->gearman, "gearman_queue_libtokyocabinet_init",
00138 "No --file given");
00139 return GEARMAN_QUEUE_ERROR;
00140 }
00141
00142 if (!tcadbopen(queue->db, opt_file))
00143 {
00144 tcadbdel(queue->db);
00145 free(queue);
00146
00147 gearman_log_error(server->gearman, "gearman_queue_libtokyocabinet_init",
00148 "tcadbopen(%s): %s", opt_file, _libtokyocabinet_tcaerrmsg(queue->db));
00149
00150 return GEARMAN_QUEUE_ERROR;
00151 }
00152
00153 if (opt_optimize == NULL || !strcasecmp(opt_optimize, "yes"))
00154 {
00155 gearman_log_info(server->gearman, "libtokyocabinet optimizing database file");
00156 if (!tcadboptimize(queue->db, NULL))
00157 {
00158 tcadbdel(queue->db);
00159 free(queue);
00160 gearman_log_error(server->gearman, "gearman_queue_libtokyocabinet_init",
00161 "tcadboptimize");
00162
00163 return GEARMAN_QUEUE_ERROR;
00164 }
00165 }
00166
00167 gearman_server_set_queue_context(server, queue);
00168
00169 gearman_server_set_queue_add_fn(server, _libtokyocabinet_add);
00170 gearman_server_set_queue_flush_fn(server, _libtokyocabinet_flush);
00171 gearman_server_set_queue_done_fn(server, _libtokyocabinet_done);
00172 gearman_server_set_queue_replay_fn(server, _libtokyocabinet_replay);
00173
00174 return GEARMAN_SUCCESS;
00175 }
00176
00177 gearman_return_t gearman_queue_libtokyocabinet_deinit(gearman_server_st *server)
00178 {
00179 gearman_queue_libtokyocabinet_st *queue;
00180
00181 gearman_log_info(server->gearman, "Shutting down libtokyocabinet queue module");
00182
00183 queue= (gearman_queue_libtokyocabinet_st *)gearman_server_queue_context(server);
00184 gearman_server_set_queue_context(server, NULL);
00185 tcadbdel(queue->db);
00186
00187 free(queue);
00188
00189 return GEARMAN_SUCCESS;
00190 }
00191
00192 gearman_return_t gearmand_queue_libtokyocabinet_init(gearmand_st *gearmand,
00193 gearman_conf_st *conf)
00194 {
00195 return gearman_queue_libtokyocabinet_init(&(gearmand->server), conf);
00196 }
00197
00198 gearman_return_t gearmand_queue_libtokyocabinet_deinit(gearmand_st *gearmand)
00199 {
00200 return gearman_queue_libtokyocabinet_deinit(&(gearmand->server));
00201 }
00202
00203
00204
00205
00206
00207 static gearman_return_t _libtokyocabinet_add(gearman_server_st *server, void *context,
00208 const void *unique,
00209 size_t unique_size,
00210 const void *function_name,
00211 size_t function_name_size,
00212 const void *data, size_t data_size,
00213 gearman_job_priority_t priority)
00214 {
00215 gearman_queue_libtokyocabinet_st *queue= (gearman_queue_libtokyocabinet_st *)context;
00216 bool rc;
00217 TCXSTR *key;
00218 TCXSTR *job_data;
00219
00220 gearman_log_debug(server->gearman, "libtokyocabinet add: %.*s", (uint32_t)unique_size, (char *)unique);
00221
00222 char key_str[GEARMAN_QUEUE_TOKYOCABINET_MAX_KEY_LEN];
00223 size_t key_length= (size_t)snprintf(key_str, GEARMAN_QUEUE_TOKYOCABINET_MAX_KEY_LEN, "%.*s-%.*s",
00224 (int)function_name_size,
00225 (const char *)function_name, (int)unique_size,
00226 (const char *)unique);
00227
00228 key= tcxstrnew();
00229 tcxstrcat(key, key_str, (int)key_length);
00230
00231 gearman_log_debug(server->gearman, "libtokyocabinet key: %.*s", (int)key_length, key_str);
00232
00233 job_data= tcxstrnew();
00234
00235 tcxstrcat(job_data, (const char *)function_name, (int)function_name_size);
00236 tcxstrcat(job_data, "\0", 1);
00237 tcxstrcat(job_data, (const char *)unique, (int)unique_size);
00238 tcxstrcat(job_data, "\0", 1);
00239
00240 switch (priority)
00241 {
00242 case GEARMAN_JOB_PRIORITY_HIGH:
00243 case GEARMAN_JOB_PRIORITY_MAX:
00244 tcxstrcat2(job_data,"0");
00245 break;
00246 case GEARMAN_JOB_PRIORITY_LOW:
00247 tcxstrcat2(job_data,"2");
00248 break;
00249 case GEARMAN_JOB_PRIORITY_NORMAL:
00250 default:
00251 tcxstrcat2(job_data,"1");
00252 }
00253
00254 tcxstrcat(job_data, (const char *)data, (int)data_size);
00255
00256 rc= tcadbput(queue->db, tcxstrptr(key), tcxstrsize(key),
00257 tcxstrptr(job_data), tcxstrsize(job_data));
00258
00259 tcxstrdel(key);
00260 tcxstrdel(job_data);
00261
00262 if (!rc)
00263 return GEARMAN_QUEUE_ERROR;
00264
00265 return GEARMAN_SUCCESS;
00266 }
00267
00268 static gearman_return_t _libtokyocabinet_flush(gearman_server_st *server,
00269 void *context __attribute__((unused)))
00270 {
00271 gearman_queue_libtokyocabinet_st *queue= (gearman_queue_libtokyocabinet_st *)context;
00272
00273 gearman_log_debug(server->gearman, "libtokyocabinet flush");
00274
00275 if (!tcadbsync(queue->db))
00276 return GEARMAN_QUEUE_ERROR;
00277
00278 return GEARMAN_SUCCESS;
00279 }
00280
00281 static gearman_return_t _libtokyocabinet_done(gearman_server_st *server, void *context,
00282 const void *unique,
00283 size_t unique_size,
00284 const void *function_name,
00285 size_t function_name_size)
00286 {
00287 gearman_queue_libtokyocabinet_st *queue= (gearman_queue_libtokyocabinet_st *)context;
00288 bool rc;
00289 TCXSTR *key;
00290
00291 (void) function_name;
00292 (void) function_name_size;
00293 gearman_log_debug(server->gearman, "libtokyocabinet add: %.*s", (uint32_t)unique_size, (char *)unique);
00294
00295 char key_str[GEARMAN_QUEUE_TOKYOCABINET_MAX_KEY_LEN];
00296 size_t key_length= (size_t)snprintf(key_str, GEARMAN_QUEUE_TOKYOCABINET_MAX_KEY_LEN, "%.*s-%.*s",
00297 (int)function_name_size,
00298 (const char *)function_name, (int)unique_size,
00299 (const char *)unique);
00300
00301 key= tcxstrnew();
00302 tcxstrcat(key, key_str, (int)key_length);
00303 rc= tcadbout(queue->db, tcxstrptr(key), tcxstrsize(key));
00304 tcxstrdel(key);
00305
00306 if (!rc)
00307 return GEARMAN_QUEUE_ERROR;
00308
00309 return GEARMAN_SUCCESS;
00310 }
00311
00312 static gearman_return_t _callback_for_record(gearman_server_st *server,
00313 TCXSTR *key, TCXSTR *data,
00314 gearman_queue_add_fn *add_fn,
00315 void *add_context)
00316 {
00317 char *data_cstr;
00318 size_t data_cstr_size;
00319 const char *function;
00320 size_t function_len;
00321 char *unique;
00322 size_t unique_len;
00323 gearman_job_priority_t priority;
00324 gearman_return_t gret;
00325
00326 gearman_log_debug(server->gearman, "replaying: %s", (char *) tcxstrptr(key));
00327
00328 data_cstr= (char *)tcxstrptr(data);
00329 data_cstr_size= (size_t)tcxstrsize(data);
00330
00331 function= data_cstr;
00332 function_len= strlen(function);
00333
00334 unique= data_cstr+function_len+1;
00335 unique_len= strlen(unique);
00336
00337
00338 data_cstr += unique_len+function_len+2;
00339 data_cstr_size -= unique_len+function_len+2;
00340
00341 assert(unique);
00342 assert(unique_len);
00343 assert(function);
00344 assert(function_len);
00345
00346
00347 if (*data_cstr == '2')
00348 priority = GEARMAN_JOB_PRIORITY_LOW;
00349 else if (*data_cstr == '0')
00350 priority = GEARMAN_JOB_PRIORITY_HIGH;
00351 else
00352 priority = GEARMAN_JOB_PRIORITY_NORMAL;
00353
00354 ++data_cstr;
00355 --data_cstr_size;
00356
00357
00358 void *data_ptr= malloc(data_cstr_size);
00359 if (data_ptr == NULL)
00360 {
00361 return GEARMAN_QUEUE_ERROR;
00362 }
00363 memcpy(data_ptr, data_cstr, data_cstr_size);
00364
00365 gret = (*add_fn)(server, add_context, unique, unique_len,
00366 function, function_len,
00367 data_ptr, data_cstr_size,
00368 priority);
00369
00370 if (gret != GEARMAN_SUCCESS)
00371 {
00372 return gret;
00373 }
00374 return GEARMAN_SUCCESS;
00375 }
00376
00377
00378 static gearman_return_t _libtokyocabinet_replay(gearman_server_st *server, void *context,
00379 gearman_queue_add_fn *add_fn,
00380 void *add_context)
00381 {
00382 gearman_queue_libtokyocabinet_st *queue= (gearman_queue_libtokyocabinet_st *)context;
00383 TCXSTR *key;
00384 TCXSTR *data;
00385 void *iter= NULL;
00386 int iter_size= 0;
00387 gearman_return_t gret;
00388 gearman_return_t tmp_gret;
00389
00390 gearman_log_info(server->gearman, "libtokyocabinet replay start");
00391
00392 if (!tcadbiterinit(queue->db))
00393 {
00394 return GEARMAN_QUEUE_ERROR;
00395 }
00396 key= tcxstrnew();
00397 data= tcxstrnew();
00398 gret= GEARMAN_SUCCESS;
00399 uint64_t x= 0;
00400 while ((iter= tcadbiternext(queue->db, &iter_size)))
00401 {
00402 tcxstrclear(key);
00403 tcxstrclear(data);
00404 tcxstrcat(key, iter, iter_size);
00405 free(iter);
00406 iter= tcadbget(queue->db, tcxstrptr(key), tcxstrsize(key), &iter_size);
00407 if (! iter) {
00408 gearman_log_info(server->gearman, "libtokyocabinet replay key disappeared: %s", (char *)tcxstrptr(key));
00409 continue;
00410 }
00411 tcxstrcat(data, iter, iter_size);
00412 free(iter);
00413 tmp_gret= _callback_for_record(server, key, data, add_fn, add_context);
00414 if (tmp_gret != GEARMAN_SUCCESS)
00415 {
00416 gret= GEARMAN_QUEUE_ERROR;
00417 break;
00418 }
00419 ++x;
00420 }
00421 tcxstrdel(key);
00422 tcxstrdel(data);
00423
00424 gearman_log_info(server->gearman, "libtokyocabinet replayed %ld records", x);
00425
00426 return gret;
00427 }