XrdFfsQueue.cc

Go to the documentation of this file.
00001 /******************************************************************************/
00002 /* XrdFfsQueue.cc  functions to run independent tasks in queue                */
00003 /*                                                                            */
00004 /* (c) 2010 by the Board of Trustees of the Leland Stanford, Jr., University  */
00005 /*                            All Rights Reserved                             */
00006 /* Author: Wei Yang (SLAC National Accelerator Laboratory, 2009)              */
00007 /*         Contract DE-AC02-76-SFO0515 with the Department of Energy          */
00008 /******************************************************************************/
00009 
00010 #include "XrdFfs/XrdFfsQueue.hh"
00011 
00012 /* queue operation */
00013  
00014 #ifdef __cplusplus
00015   extern "C" {
00016 #endif
00017 
00018 struct XrdFfsQueueTasks *XrdFfsQueueTaskque_head = NULL;
00019 struct XrdFfsQueueTasks *XrdFfsQueueTaskque_tail = NULL;
00020 unsigned int XrdFfsQueueNext_task_id = 0;
00021 pthread_mutex_t XrdFfsQueueTaskque_mutex = PTHREAD_MUTEX_INITIALIZER;
00022 pthread_cond_t XrdFfsQueueTaskque_cond = PTHREAD_COND_INITIALIZER;
00023 
00024 void XrdFfsQueue_enqueue(struct XrdFfsQueueTasks *task)
00025 {
00026     pthread_mutex_lock(&XrdFfsQueueTaskque_mutex);
00027 
00028     task->id = XrdFfsQueueNext_task_id + 1;
00029     XrdFfsQueueNext_task_id = task->id;
00030     if (XrdFfsQueueTaskque_tail == NULL) 
00031     {
00032         XrdFfsQueueTaskque_head = task;
00033         XrdFfsQueueTaskque_tail = task;
00034         task->next = NULL;
00035         pthread_cond_broadcast(&XrdFfsQueueTaskque_cond);
00036     }
00037     else
00038     {
00039         task->prev = XrdFfsQueueTaskque_tail;
00040         task->next = NULL;
00041         XrdFfsQueueTaskque_tail->next = task;
00042         XrdFfsQueueTaskque_tail = task;
00043     }
00044 
00045     pthread_mutex_unlock(&XrdFfsQueueTaskque_mutex);
00046     return;
00047 }
00048 
00049 struct XrdFfsQueueTasks *XrdFfsQueue_dequeue()
00050 {
00051     struct XrdFfsQueueTasks *head;
00052     while (pthread_mutex_lock(&XrdFfsQueueTaskque_mutex) == 0)
00053         if (XrdFfsQueueTaskque_head == NULL)
00054         {
00055             pthread_cond_wait(&XrdFfsQueueTaskque_cond, &XrdFfsQueueTaskque_mutex);
00056             pthread_mutex_unlock(&XrdFfsQueueTaskque_mutex);
00057         }
00058         else
00059             break;
00060 
00061     head = XrdFfsQueueTaskque_head;
00062     XrdFfsQueueTaskque_head = XrdFfsQueueTaskque_head->next;
00063 
00064     head->next = NULL;
00065     head->prev = NULL;        
00066 
00067     if (XrdFfsQueueTaskque_head == NULL)
00068         XrdFfsQueueTaskque_tail = NULL;
00069 
00070     pthread_mutex_unlock(&XrdFfsQueueTaskque_mutex);
00071     return head;
00072 }
00073 
00074 /* create, wait and free(delete) a task */
00075 
00076 struct XrdFfsQueueTasks* XrdFfsQueue_create_task(void* (*func)(void*), void **args, short initstat)
00077 {
00078     struct XrdFfsQueueTasks *task = (struct XrdFfsQueueTasks*) malloc(sizeof(struct XrdFfsQueueTasks));
00079     task->func = func;
00080     task->args = args;
00081     task->done = ( (initstat == -1)? -1 : 0); /* -1 means this task is meant to kill a worker thread */
00082 
00083     pthread_mutex_init(&task->mutex, NULL);
00084     pthread_cond_init(&task->cond, NULL);
00085 
00086     XrdFfsQueue_enqueue(task);
00087     return task;
00088 }
00089 
00090 void XrdFfsQueue_free_task(struct XrdFfsQueueTasks *task) 
00091 {
00092     pthread_mutex_destroy(&task->mutex);
00093     pthread_cond_destroy(&task->cond);
00094     task->func = NULL;
00095     task->args = NULL;
00096     task->next = NULL;
00097     task->prev = NULL;
00098     free(task);
00099     task = NULL;
00100 }
00101 
00102 void XrdFfsQueue_wait_task(struct XrdFfsQueueTasks *task)
00103 {
00104     pthread_mutex_lock(&task->mutex);
00105     if (task->done != 1)
00106         pthread_cond_wait(&task->cond, &task->mutex);
00107     pthread_mutex_unlock(&task->mutex);
00108 }
00109 
00110 unsigned int XrdFfsQueue_count_tasks()
00111 {
00112     unsigned int que_len = 0;
00113     pthread_mutex_lock(&XrdFfsQueueTaskque_mutex);
00114     if (XrdFfsQueueTaskque_head != NULL && XrdFfsQueueTaskque_tail != NULL) {
00115         if (XrdFfsQueueTaskque_tail->id > XrdFfsQueueTaskque_head->id)
00116             que_len = XrdFfsQueueTaskque_tail->id - XrdFfsQueueTaskque_head->id;
00117         else
00118             que_len = (unsigned int)2147483647 - (XrdFfsQueueTaskque_head->id - XrdFfsQueueTaskque_tail->id) + 1;
00119     }
00120     pthread_mutex_unlock(&XrdFfsQueueTaskque_mutex);
00121     return que_len;
00122 }
00123 
00124 /* workers */
00125 
00126 void *XrdFfsQueue_worker(void* x)
00127 {
00128     struct XrdFfsQueueTasks *task;
00129     short quit = 0;
00130 
00131     loop:
00132     task = XrdFfsQueue_dequeue();
00133 
00134     if (task->done == -1) // terminate this worker thread
00135         quit = 1;
00136 
00137     pthread_mutex_lock(&task->mutex);
00138 #ifdef QUEDEBUG
00139     printf("worker %d on task %d\n", wid, task->id);
00140 #endif
00141     if (!quit)
00142         (task->func)(task->args);
00143  
00144     task->done = 1;
00145     pthread_cond_signal(&task->cond);
00146     pthread_mutex_unlock(&task->mutex);
00147     if (quit)
00148     {
00149 #ifdef QUEDEBUG
00150         printf("worker %d is leaving\n", wid);
00151 #endif
00152         free(x);
00153 //        pthread_exit(NULL);
00154         return(NULL);
00155     }
00156     else
00157         goto loop;
00158 }
00159 
00160 pthread_mutex_t XrdFfsQueueWorker_mutex;
00161 unsigned short XrdFfsQueueNworkers = 0;
00162 unsigned int XrdFfsQueueWorker_id = 0;
00163 
00164 int XrdFfsQueue_create_workers(int n)
00165 {
00166     int i, rc, *id;
00167     pthread_t *thread;
00168     pthread_attr_t attr;
00169     size_t stacksize = 2*1024*1024;
00170 
00171     pthread_attr_init(&attr);
00172     pthread_attr_setstacksize(&attr, stacksize);
00173     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
00174     
00175     pthread_mutex_lock(&XrdFfsQueueWorker_mutex);
00176     for (i = 0; i < n; i++)
00177     {
00178         id = (int*) malloc(sizeof(int));
00179         *id = XrdFfsQueueWorker_id++;
00180         thread = (pthread_t*) malloc(sizeof(pthread_t));
00181         if (thread == NULL) 
00182         {
00183             XrdFfsQueueWorker_id--;
00184             break;
00185         }
00186         rc = pthread_create(thread, &attr, XrdFfsQueue_worker, id);
00187         if (rc != 0) 
00188         {
00189             XrdFfsQueueWorker_id--;
00190             break;
00191         }
00192         pthread_detach(*thread);
00193         free(thread);
00194     }
00195     pthread_attr_destroy(&attr);
00196     XrdFfsQueueNworkers += i;
00197     pthread_mutex_unlock(&XrdFfsQueueWorker_mutex);
00198     return i;
00199 }
00200 
00201 int XrdFfsQueue_remove_workers(int n)
00202 {
00203     int i;
00204     struct XrdFfsQueueTasks *task;
00205 
00206     pthread_mutex_lock(&XrdFfsQueueWorker_mutex);
00207     if (XrdFfsQueueNworkers == 0)
00208         n = 0;
00209     else if (n > XrdFfsQueueNworkers)
00210     {
00211         n = XrdFfsQueueNworkers;
00212         XrdFfsQueueNworkers = 0;
00213     }
00214     else
00215         XrdFfsQueueNworkers -= n;
00216     for (i = 0; i < n; i++)
00217     {
00218         task = XrdFfsQueue_create_task(NULL, NULL, -1);
00219         XrdFfsQueue_wait_task(task);
00220         XrdFfsQueue_free_task(task);
00221     }
00222     pthread_mutex_unlock(&XrdFfsQueueWorker_mutex);
00223     return n;
00224 }
00225 
00226 int XrdFfsQueue_count_workers()
00227 {
00228     int i;
00229     pthread_mutex_lock(&XrdFfsQueueWorker_mutex);
00230     i = XrdFfsQueueNworkers;
00231     pthread_mutex_unlock(&XrdFfsQueueWorker_mutex);
00232     return i;
00233 }
00234 
00235 
00236 /* Test program below
00237    ==================
00238 
00239 struct jobargs {
00240     int i;
00241     int XrdFfsQueueWorker_id;
00242 };
00243 
00244 void* job1(void *arg)
00245 {
00246      int i = ((struct jobargs*)arg)->i;
00247 //     int wid = ((struct jobargs*)arg)->XrdFfsQueueWorker_id;
00248 
00249 //     if (i == 10 || i == 20 || i == 30 || i == 40)
00250 //        sleep(2);
00251      printf("hello from job1 ( %d )\n", i);
00252 }
00253 
00254 int main()
00255 {
00256     int i;
00257 
00258     XrdFfsQueue_create_workers(20);
00259 #define N 500
00260     struct XrdFfsQueueTasks *myjob1[N];
00261     struct jobargs myarg1[N];
00262 
00263     sleep(1);
00264     printf("1st round ...\n");
00265     for (i = 0; i < N; i++)
00266     {
00267         myarg1[i].i = i;
00268         myjob1[i] = XrdFfsQueue_create_task((void*) &job1, (void*) &myarg1[i], 0);
00269     }
00270     for (i = 0; i < N; i++)
00271     {
00272         XrdFfsQueue_wait_task(myjob1[i]);
00273         XrdFfsQueue_free_task(myjob1[i]);
00274     }
00275 
00276     printf("there are %d workers after 1st round\n", XrdFfsQueue_count_workers());
00277     printf("remove %d workers\n", XrdFfsQueue_remove_workers(8));
00278     printf("add 1 worker\n");
00279     XrdFfsQueue_create_workers(10);
00280 
00281     sleep(2);
00282     printf("2nd round ...\n");
00283 
00284     for (i = 0; i < N; i++)
00285     {
00286         myarg1[i].i = i;
00287         myjob1[i] = XrdFfsQueue_create_task((void*) &job1, (void*) &myarg1[i], 0);
00288     }
00289     for (i = 0; i < N; i++)
00290     {
00291         XrdFfsQueue_wait_task(myjob1[i]);
00292         XrdFfsQueue_free_task(myjob1[i]);
00293     }
00294 
00295     XrdFfsQueue_remove_workers(XrdFfsQueue_count_workers());
00296     printf("bye ...\n");
00297     return 0; 
00298 }
00299 
00300 */ 
00301 
00302 #ifdef __cplusplus
00303   }
00304 #endif

Generated on Tue Jul 5 14:46:35 2011 for ROOT_528-00b_version by  doxygen 1.5.1