00001
00002
00003
00004
00005
00006
00007
00008
00009
00010 #include "XrdFfs/XrdFfsQueue.hh"
00011
00012
00013
00014 #ifdef __cplusplus
00015 extern "C" {
00016 #endif
00017
00018 struct XrdFfsQueueTasks *XrdFfsQueueTaskque_head = NULL;
00019 struct XrdFfsQueueTasks *XrdFfsQueueTaskque_tail = NULL;
00020 unsigned int XrdFfsQueueNext_task_id = 0;
00021 pthread_mutex_t XrdFfsQueueTaskque_mutex = PTHREAD_MUTEX_INITIALIZER;
00022 pthread_cond_t XrdFfsQueueTaskque_cond = PTHREAD_COND_INITIALIZER;
00023
00024 void XrdFfsQueue_enqueue(struct XrdFfsQueueTasks *task)
00025 {
00026 pthread_mutex_lock(&XrdFfsQueueTaskque_mutex);
00027
00028 task->id = XrdFfsQueueNext_task_id + 1;
00029 XrdFfsQueueNext_task_id = task->id;
00030 if (XrdFfsQueueTaskque_tail == NULL)
00031 {
00032 XrdFfsQueueTaskque_head = task;
00033 XrdFfsQueueTaskque_tail = task;
00034 task->next = NULL;
00035 pthread_cond_broadcast(&XrdFfsQueueTaskque_cond);
00036 }
00037 else
00038 {
00039 task->prev = XrdFfsQueueTaskque_tail;
00040 task->next = NULL;
00041 XrdFfsQueueTaskque_tail->next = task;
00042 XrdFfsQueueTaskque_tail = task;
00043 }
00044
00045 pthread_mutex_unlock(&XrdFfsQueueTaskque_mutex);
00046 return;
00047 }
00048
00049 struct XrdFfsQueueTasks *XrdFfsQueue_dequeue()
00050 {
00051 struct XrdFfsQueueTasks *head;
00052 while (pthread_mutex_lock(&XrdFfsQueueTaskque_mutex) == 0)
00053 if (XrdFfsQueueTaskque_head == NULL)
00054 {
00055 pthread_cond_wait(&XrdFfsQueueTaskque_cond, &XrdFfsQueueTaskque_mutex);
00056 pthread_mutex_unlock(&XrdFfsQueueTaskque_mutex);
00057 }
00058 else
00059 break;
00060
00061 head = XrdFfsQueueTaskque_head;
00062 XrdFfsQueueTaskque_head = XrdFfsQueueTaskque_head->next;
00063
00064 head->next = NULL;
00065 head->prev = NULL;
00066
00067 if (XrdFfsQueueTaskque_head == NULL)
00068 XrdFfsQueueTaskque_tail = NULL;
00069
00070 pthread_mutex_unlock(&XrdFfsQueueTaskque_mutex);
00071 return head;
00072 }
00073
00074
00075
00076 struct XrdFfsQueueTasks* XrdFfsQueue_create_task(void* (*func)(void*), void **args, short initstat)
00077 {
00078 struct XrdFfsQueueTasks *task = (struct XrdFfsQueueTasks*) malloc(sizeof(struct XrdFfsQueueTasks));
00079 task->func = func;
00080 task->args = args;
00081 task->done = ( (initstat == -1)? -1 : 0);
00082
00083 pthread_mutex_init(&task->mutex, NULL);
00084 pthread_cond_init(&task->cond, NULL);
00085
00086 XrdFfsQueue_enqueue(task);
00087 return task;
00088 }
00089
00090 void XrdFfsQueue_free_task(struct XrdFfsQueueTasks *task)
00091 {
00092 pthread_mutex_destroy(&task->mutex);
00093 pthread_cond_destroy(&task->cond);
00094 task->func = NULL;
00095 task->args = NULL;
00096 task->next = NULL;
00097 task->prev = NULL;
00098 free(task);
00099 task = NULL;
00100 }
00101
00102 void XrdFfsQueue_wait_task(struct XrdFfsQueueTasks *task)
00103 {
00104 pthread_mutex_lock(&task->mutex);
00105 if (task->done != 1)
00106 pthread_cond_wait(&task->cond, &task->mutex);
00107 pthread_mutex_unlock(&task->mutex);
00108 }
00109
00110 unsigned int XrdFfsQueue_count_tasks()
00111 {
00112 unsigned int que_len = 0;
00113 pthread_mutex_lock(&XrdFfsQueueTaskque_mutex);
00114 if (XrdFfsQueueTaskque_head != NULL && XrdFfsQueueTaskque_tail != NULL) {
00115 if (XrdFfsQueueTaskque_tail->id > XrdFfsQueueTaskque_head->id)
00116 que_len = XrdFfsQueueTaskque_tail->id - XrdFfsQueueTaskque_head->id;
00117 else
00118 que_len = (unsigned int)2147483647 - (XrdFfsQueueTaskque_head->id - XrdFfsQueueTaskque_tail->id) + 1;
00119 }
00120 pthread_mutex_unlock(&XrdFfsQueueTaskque_mutex);
00121 return que_len;
00122 }
00123
00124
00125
00126 void *XrdFfsQueue_worker(void* x)
00127 {
00128 struct XrdFfsQueueTasks *task;
00129 short quit = 0;
00130
00131 loop:
00132 task = XrdFfsQueue_dequeue();
00133
00134 if (task->done == -1)
00135 quit = 1;
00136
00137 pthread_mutex_lock(&task->mutex);
00138 #ifdef QUEDEBUG
00139 printf("worker %d on task %d\n", wid, task->id);
00140 #endif
00141 if (!quit)
00142 (task->func)(task->args);
00143
00144 task->done = 1;
00145 pthread_cond_signal(&task->cond);
00146 pthread_mutex_unlock(&task->mutex);
00147 if (quit)
00148 {
00149 #ifdef QUEDEBUG
00150 printf("worker %d is leaving\n", wid);
00151 #endif
00152 free(x);
00153
00154 return(NULL);
00155 }
00156 else
00157 goto loop;
00158 }
00159
00160 pthread_mutex_t XrdFfsQueueWorker_mutex;
00161 unsigned short XrdFfsQueueNworkers = 0;
00162 unsigned int XrdFfsQueueWorker_id = 0;
00163
00164 int XrdFfsQueue_create_workers(int n)
00165 {
00166 int i, rc, *id;
00167 pthread_t *thread;
00168 pthread_attr_t attr;
00169 size_t stacksize = 2*1024*1024;
00170
00171 pthread_attr_init(&attr);
00172 pthread_attr_setstacksize(&attr, stacksize);
00173 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
00174
00175 pthread_mutex_lock(&XrdFfsQueueWorker_mutex);
00176 for (i = 0; i < n; i++)
00177 {
00178 id = (int*) malloc(sizeof(int));
00179 *id = XrdFfsQueueWorker_id++;
00180 thread = (pthread_t*) malloc(sizeof(pthread_t));
00181 if (thread == NULL)
00182 {
00183 XrdFfsQueueWorker_id--;
00184 break;
00185 }
00186 rc = pthread_create(thread, &attr, XrdFfsQueue_worker, id);
00187 if (rc != 0)
00188 {
00189 XrdFfsQueueWorker_id--;
00190 break;
00191 }
00192 pthread_detach(*thread);
00193 free(thread);
00194 }
00195 pthread_attr_destroy(&attr);
00196 XrdFfsQueueNworkers += i;
00197 pthread_mutex_unlock(&XrdFfsQueueWorker_mutex);
00198 return i;
00199 }
00200
00201 int XrdFfsQueue_remove_workers(int n)
00202 {
00203 int i;
00204 struct XrdFfsQueueTasks *task;
00205
00206 pthread_mutex_lock(&XrdFfsQueueWorker_mutex);
00207 if (XrdFfsQueueNworkers == 0)
00208 n = 0;
00209 else if (n > XrdFfsQueueNworkers)
00210 {
00211 n = XrdFfsQueueNworkers;
00212 XrdFfsQueueNworkers = 0;
00213 }
00214 else
00215 XrdFfsQueueNworkers -= n;
00216 for (i = 0; i < n; i++)
00217 {
00218 task = XrdFfsQueue_create_task(NULL, NULL, -1);
00219 XrdFfsQueue_wait_task(task);
00220 XrdFfsQueue_free_task(task);
00221 }
00222 pthread_mutex_unlock(&XrdFfsQueueWorker_mutex);
00223 return n;
00224 }
00225
00226 int XrdFfsQueue_count_workers()
00227 {
00228 int i;
00229 pthread_mutex_lock(&XrdFfsQueueWorker_mutex);
00230 i = XrdFfsQueueNworkers;
00231 pthread_mutex_unlock(&XrdFfsQueueWorker_mutex);
00232 return i;
00233 }
00234
00235
00236
00237
00238
00239
00240
00241
00242
00243
00244
00245
00246
00247
00248
00249
00250
00251
00252
00253
00254
00255
00256
00257
00258
00259
00260
00261
00262
00263
00264
00265
00266
00267
00268
00269
00270
00271
00272
00273
00274
00275
00276
00277
00278
00279
00280
00281
00282
00283
00284
00285
00286
00287
00288
00289
00290
00291
00292
00293
00294
00295
00296
00297
00298
00299
00300
00301
00302 #ifdef __cplusplus
00303 }
00304 #endif