00001
00002
00003
00004
00005
00006
00007
00008
00014 #include <errno.h>
00015 #include <signal.h>
00016 #include <stdio.h>
00017 #include <stdlib.h>
00018 #include <unistd.h>
00019
00020 #include <libgearman/gearman.h>
00021
00022 typedef enum
00023 {
00024 REVERSE_WORKER_OPTIONS_NONE= 0,
00025 REVERSE_WORKER_OPTIONS_DATA= (1 << 0),
00026 REVERSE_WORKER_OPTIONS_STATUS= (1 << 1),
00027 REVERSE_WORKER_OPTIONS_UNIQUE= (1 << 2)
00028 } reverse_worker_options_t;
00029
00030 static void *reverse(gearman_job_st *job, void *context,
00031 size_t *result_size, gearman_return_t *ret_ptr);
00032
00033 static void usage(char *name);
00034
00035 int main(int argc, char *argv[])
00036 {
00037 int c;
00038 uint32_t count= 0;
00039 char *host= NULL;
00040 in_port_t port= 0;
00041 reverse_worker_options_t options= REVERSE_WORKER_OPTIONS_NONE;
00042 int timeout= -1;
00043 gearman_return_t ret;
00044 gearman_worker_st worker;
00045
00046 while ((c = getopt(argc, argv, "c:dh:p:st:u")) != -1)
00047 {
00048 switch(c)
00049 {
00050 case 'c':
00051 count= (uint32_t)atoi(optarg);
00052 break;
00053
00054 case 'd':
00055 options|= REVERSE_WORKER_OPTIONS_DATA;
00056 break;
00057
00058 case 'h':
00059 host= optarg;
00060 break;
00061
00062 case 'p':
00063 port= (in_port_t)atoi(optarg);
00064 break;
00065
00066 case 's':
00067 options|= REVERSE_WORKER_OPTIONS_STATUS;
00068 break;
00069
00070 case 't':
00071 timeout= atoi(optarg);
00072 break;
00073
00074 case 'u':
00075 options|= REVERSE_WORKER_OPTIONS_UNIQUE;
00076 break;
00077
00078 default:
00079 usage(argv[0]);
00080 exit(1);
00081 }
00082 }
00083
00084 if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
00085 {
00086 fprintf(stderr, "signal:%d\n", errno);
00087 exit(1);
00088 }
00089
00090 if (gearman_worker_create(&worker) == NULL)
00091 {
00092 fprintf(stderr, "Memory allocation failure on worker creation\n");
00093 exit(1);
00094 }
00095
00096 if (options & REVERSE_WORKER_OPTIONS_UNIQUE)
00097 gearman_worker_add_options(&worker, GEARMAN_WORKER_GRAB_UNIQ);
00098
00099 if (timeout >= 0)
00100 gearman_worker_set_timeout(&worker, timeout);
00101
00102 ret= gearman_worker_add_server(&worker, host, port);
00103 if (ret != GEARMAN_SUCCESS)
00104 {
00105 fprintf(stderr, "%s\n", gearman_worker_error(&worker));
00106 exit(1);
00107 }
00108
00109 ret= gearman_worker_add_function(&worker, "reverse", 0, reverse,
00110 &options);
00111 if (ret != GEARMAN_SUCCESS)
00112 {
00113 fprintf(stderr, "%s\n", gearman_worker_error(&worker));
00114 exit(1);
00115 }
00116
00117 while (1)
00118 {
00119 ret= gearman_worker_work(&worker);
00120 if (ret != GEARMAN_SUCCESS)
00121 {
00122 fprintf(stderr, "%s\n", gearman_worker_error(&worker));
00123 break;
00124 }
00125
00126 if (count > 0)
00127 {
00128 count--;
00129 if (count == 0)
00130 break;
00131 }
00132 }
00133
00134 gearman_worker_free(&worker);
00135
00136 return 0;
00137 }
00138
00139 static void *reverse(gearman_job_st *job, void *context,
00140 size_t *result_size, gearman_return_t *ret_ptr)
00141 {
00142 reverse_worker_options_t options= *((reverse_worker_options_t *)context);
00143 const uint8_t *workload;
00144 uint8_t *result;
00145 size_t x;
00146 size_t y;
00147
00148 workload= gearman_job_workload(job);
00149 *result_size= gearman_job_workload_size(job);
00150
00151 result= malloc(*result_size);
00152 if (result == NULL)
00153 {
00154 fprintf(stderr, "malloc:%d\n", errno);
00155 *ret_ptr= GEARMAN_WORK_FAIL;
00156 return NULL;
00157 }
00158
00159 for (y= 0, x= *result_size; x; x--, y++)
00160 {
00161 result[y]= ((uint8_t *)workload)[x - 1];
00162
00163 if (options & REVERSE_WORKER_OPTIONS_DATA)
00164 {
00165 *ret_ptr= gearman_job_send_data(job, &(result[y]), 1);
00166 if (*ret_ptr != GEARMAN_SUCCESS)
00167 {
00168 free(result);
00169 return NULL;
00170 }
00171 }
00172
00173 if (options & REVERSE_WORKER_OPTIONS_STATUS)
00174 {
00175 *ret_ptr= gearman_job_send_status(job, (uint32_t)y,
00176 (uint32_t)*result_size);
00177 if (*ret_ptr != GEARMAN_SUCCESS)
00178 {
00179 free(result);
00180 return NULL;
00181 }
00182
00183 sleep(1);
00184 }
00185 }
00186
00187 printf("Job=%s%s%s Workload=%.*s Result=%.*s\n", gearman_job_handle(job),
00188 options & REVERSE_WORKER_OPTIONS_UNIQUE ? " Unique=" : "",
00189 options & REVERSE_WORKER_OPTIONS_UNIQUE ? gearman_job_unique(job) : "",
00190 (int)*result_size, workload, (int)*result_size, result);
00191
00192 *ret_ptr= GEARMAN_SUCCESS;
00193
00194 if (options & REVERSE_WORKER_OPTIONS_DATA)
00195 {
00196 *result_size= 0;
00197 return NULL;
00198 }
00199
00200 return result;
00201 }
00202
00203 static void usage(char *name)
00204 {
00205 printf("\nusage: %s [-h <host>] [-p <port>]\n", name);
00206 printf("\t-c <count> - number of jobs to run before exiting\n");
00207 printf("\t-d - send result back in data chunks\n");
00208 printf("\t-h <host> - job server host\n");
00209 printf("\t-p <port> - job server port\n");
00210 printf("\t-s - send status updates and sleep while running job\n");
00211 printf("\t-t <timeout> - timeout in milliseconds\n");
00212 printf("\t-u - when grabbing jobs, grab the uniqie id\n");
00213 }