00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 const char *XrdXrootdJobCVSID = "$Id: XrdXrootdJob.cc 35287 2010-09-14 21:19:35Z ganis $";
00014
00015 #include <stdio.h>
00016 #include <string.h>
00017 #include <netinet/in.h>
00018 #include <sys/uio.h>
00019
00020 #include "Xrd/XrdLink.hh"
00021 #include "Xrd/XrdScheduler.hh"
00022 #include "XrdOuc/XrdOucProg.hh"
00023 #include "XrdOuc/XrdOucStream.hh"
00024 #include "XrdSys/XrdSysPlatform.hh"
00025 #include "XrdXrootd/XrdXrootdJob.hh"
00026 #include "XrdXrootd/XrdXrootdResponse.hh"
00027 #include "XrdXrootd/XrdXrootdTrace.hh"
00028 #include "XProtocol/XProtocol.hh"
00029 #include "XProtocol/XPtypes.hh"
00030
00031
00032
00033
00034
00035 class XrdXrootdJob2Do : public XrdJob
00036 {
00037 public:
00038 friend class XrdXrootdJob;
00039
00040 void DoIt();
00041
00042 enum JobStatus {Job_Active, Job_Cancel, Job_Done, Job_Waiting};
00043
00044 JobStatus Status;
00045
00046 XrdXrootdJob2Do(XrdXrootdJob *job,
00047 int jnum,
00048 const char **args,
00049 XrdXrootdResponse *Resp,
00050 int opts);
00051 ~XrdXrootdJob2Do();
00052
00053 private:
00054 int addClient(XrdXrootdResponse *rp, int opts);
00055 void delClient(XrdXrootdResponse *rp);
00056 XrdOucTList *lstClient(void);
00057 int verClient(int dodel=0);
00058 void Redrive(void);
00059 void sendResult(char *lp, int caned=0);
00060
00061 static const int maxClients = 8;
00062 struct {XrdLink *Link;
00063 unsigned int Inst;
00064 kXR_char streamid[2];
00065 char isSync;
00066 } Client[maxClients];
00067
00068 int numClients;
00069
00070 XrdOucStream jobStream;
00071 XrdXrootdJob *theJob;
00072 char *theArgs[5];
00073 char *theResult;
00074 int JobNum;
00075 char JobMark;
00076 char doRedrive;
00077 };
00078
00079
00080
00081
00082
00083 extern XrdOucTrace *XrdXrootdTrace;
00084
00085 int XrdXrootdJobWaiting(XrdXrootdJob2Do *item, void *arg)
00086 {
00087 return (item->Status == XrdXrootdJob2Do::Job_Waiting);
00088 }
00089
00090
00091
00092
00093
00094
00095
00096
00097 XrdXrootdJob2Do::XrdXrootdJob2Do(XrdXrootdJob *job,
00098 int jnum,
00099 const char **args,
00100 XrdXrootdResponse *resp,
00101 int opts)
00102 : XrdJob(job->JobName)
00103 {
00104 int i;
00105 for (i = 0; i < 5 && args[i]; i++) theArgs[i] = strdup(args[i]);
00106 for (i = i; i < 5; i++) theArgs[i] = (char *)0;
00107 theJob = job;
00108 JobNum = jnum;
00109 JobMark = 0;
00110 numClients = 0;
00111 theResult = 0;
00112 doRedrive = 0;
00113 Status = Job_Waiting;
00114 addClient(resp, opts);
00115 }
00116
00117
00118
00119
00120
00121 XrdXrootdJob2Do::~XrdXrootdJob2Do()
00122 {
00123 int i;
00124
00125 for (i = 0; i < numClients; i++)
00126 if (!Client[i].isSync) {sendResult(0, 1); break;}
00127
00128 for (i = 0; i < 5; i++)
00129 if (theArgs[i]) free(theArgs[i]);
00130 }
00131
00132
00133
00134
00135
00136 void XrdXrootdJob2Do::DoIt()
00137 {
00138 XrdXrootdJob2Do *jp = 0;
00139 char *lp = 0;
00140 int i;
00141
00142
00143
00144 theJob->myMutex.Lock();
00145
00146
00147
00148
00149 if (Status != Job_Cancel)
00150 {if (theJob->theProg->Run(&jobStream, theArgs[1], theArgs[2],
00151 theArgs[3], theArgs[4])) Status = Job_Cancel;
00152 else {theJob->myMutex.UnLock();
00153 lp = jobStream.GetLine();
00154 theJob->myMutex.Lock();
00155 if (Status != Job_Cancel)
00156 {Status = Job_Done;
00157 for (i = 0; i < numClients; i++)
00158 if (!Client[i].isSync) {sendResult(lp); break;}
00159 }
00160 }
00161 }
00162
00163
00164
00165
00166
00167 if (doRedrive)
00168 {if (theJob->numJobs > theJob->maxJobs) Redrive();
00169 theJob->numJobs--;
00170 }
00171
00172
00173
00174
00175 if (Status != Job_Cancel && numClients) theResult = lp;
00176 else {if (Status == Job_Cancel) sendResult(0, 1);
00177 jp = theJob->JobTable.Remove(JobNum);
00178 }
00179
00180
00181
00182
00183 theJob->myMutex.UnLock();
00184 if (jp) delete jp;
00185 }
00186
00187
00188
00189
00190
00191
00192
00193
00194 int XrdXrootdJob2Do::addClient(XrdXrootdResponse *rp, int opts)
00195 {
00196 XrdLink *lp = rp->theLink();
00197 unsigned int Inst = lp->Inst();
00198 int i;
00199
00200
00201
00202 if (numClients >= maxClients) verClient();
00203
00204
00205
00206 for (i = 0; i < numClients; i++)
00207 if (lp == Client[i].Link && Inst == Client[i].Inst) return 0;
00208
00209
00210
00211 if (numClients >= maxClients) return -1;
00212 Client[numClients].Link = lp;
00213 Client[numClients].Inst = Inst;
00214 if (opts & JOB_Sync) Client[numClients].isSync = 1;
00215 else {rp->StreamID(Client[numClients].streamid);
00216 Client[numClients].isSync = 0;
00217 }
00218 numClients++;
00219 JobMark = 0;
00220 return 1;
00221 }
00222
00223
00224
00225
00226
00227 void XrdXrootdJob2Do::delClient(XrdXrootdResponse *rp)
00228 {
00229 XrdLink *lp = rp->theLink();
00230 unsigned int Inst = lp->Inst();
00231 int i, j;
00232
00233
00234
00235 for (i = 0; i < numClients; i++)
00236 if (lp == Client[i].Link && Inst == Client[i].Inst)
00237 {for (j = i+1; j < numClients; j++) Client[i++] = Client[j];
00238 numClients--;
00239 break;
00240 }
00241 }
00242
00243
00244
00245
00246
00247
00248
00249
00250 XrdOucTList *XrdXrootdJob2Do::lstClient()
00251 {
00252 char State, buff[4096], *bp = buff;
00253 int bsz, i, k;
00254
00255
00256
00257 switch(Status)
00258 {case Job_Active: State = 'a'; break;
00259 case Job_Cancel: State = 'c'; break;
00260 case Job_Done: State = 'd'; break;
00261 case Job_Waiting: State = 'w'; break;
00262 default: State = 'u'; break;
00263 };
00264
00265
00266
00267 bp = buff + sprintf(buff, "<s>%c</s><conn>", State);
00268 bsz = sizeof(buff) - (bp - buff) - 8;
00269
00270
00271
00272 if (!numClients) bp++;
00273 else for (i = 0; i < numClients; i++)
00274 if (Client[i].Link && Client[i].Link->isInstance(Client[i].Inst))
00275 {if ((k = strlcpy(bp, Client[i].Link->ID, bsz)) >= bsz
00276 || (bsz -= k) < 1) {bp++; break;}
00277 bp += k; *bp = ' '; bp++; bsz--;
00278 }
00279
00280
00281
00282 if (*(bp-1) == ' ') bp--;
00283 strcpy(bp, "</conn>");
00284
00285
00286
00287 return new XrdOucTList(buff, bp-buff+7);
00288 }
00289
00290
00291
00292
00293
00294 int XrdXrootdJob2Do::verClient(int dodel)
00295 {
00296 int i, j, k;
00297
00298
00299
00300 for (i = 0; i < numClients; i++)
00301 if (!Client[i].Link->isInstance(Client[i].Inst))
00302 {k = i;
00303 for (j = i+1; j < numClients; j++,k++) Client[k] = Client[j];
00304 numClients--; i--;
00305 }
00306
00307
00308
00309 if (!numClients && dodel)
00310 {XrdXrootdJob2Do *jp = theJob->JobTable.Remove(JobNum);
00311 if (jp->Status == XrdXrootdJob2Do::Job_Waiting) theJob->numJobs--;
00312 delete jp;
00313 return 0;
00314 }
00315 return numClients;
00316 }
00317
00318
00319
00320
00321
00322 void XrdXrootdJob2Do::Redrive()
00323 {
00324 XrdXrootdJob2Do *jp;
00325 int Start = 0;
00326
00327
00328
00329
00330 while ((jp = theJob->JobTable.Apply(XrdXrootdJobWaiting, (void *)0, Start)))
00331 if (jp->verClient(jp->JobMark > 0)) break;
00332 else Start = jp->JobNum+1;
00333
00334
00335
00336 if (jp)
00337 {jp->Status = Job_Active; jp->doRedrive = 1;
00338 theJob->Sched->Schedule((XrdJob *)jp);
00339 }
00340 }
00341
00342
00343
00344
00345
00346 void XrdXrootdJob2Do::sendResult(char *lp, int caned)
00347 {
00348 const char *TraceID = "sendResult";
00349 const kXR_int32 Xcan = static_cast<kXR_int32>(htonl(kXR_Cancelled));
00350 const kXR_int32 Xbad = static_cast<kXR_int32>(htonl(kXR_ServerError));
00351 XrdXrootdReqID ReqID;
00352 struct iovec jobVec[6];
00353 XResponseType jobStat;
00354 const char *trc, *tre;
00355 kXR_int32 erc;
00356 int j, i, ovhd = 0, dlen = 0, n = 1;
00357
00358
00359
00360 if (lp)
00361 {jobStat = kXR_ok; trc = "ok";
00362 if (theArgs[0])
00363 { jobVec[n].iov_base = theArgs[0];
00364 dlen = jobVec[n].iov_len = strlen(theArgs[0]); n++;
00365 jobVec[n].iov_base = (char *)" ";
00366 dlen += jobVec[n].iov_len = 1; n++;
00367 }
00368 } else {
00369 jobStat = kXR_error; trc = "error";
00370 if (caned) {erc = Xcan; lp = (char *)"Cancelled by admin.";}
00371 else {erc = Xbad; lp = (char *)"Program failed.";}
00372 jobVec[n].iov_base = (char *)&erc;
00373 dlen = jobVec[n].iov_len = sizeof(erc); n++;
00374 ovhd = 1;
00375 }
00376 jobVec[n].iov_base = lp;
00377 dlen += jobVec[n].iov_len = strlen(lp)+ovhd; n++;
00378
00379
00380
00381 j = 0;
00382 for (i = 0; i < numClients; i++)
00383 {if (!Client[i].isSync)
00384 {ReqID.setID(Client[i].streamid,
00385 Client[i].Link->FDnum(), Client[i].Link->Inst());
00386 tre = (XrdXrootdResponse::Send(ReqID, jobStat, jobVec, n, dlen) < 0
00387 ? "skipped" : "sent");
00388 TRACE(RSP, tre <<" async " <<trc <<" to " <<Client[i].Link->ID);
00389 } else if (i != j) Client[j++] = Client[i];
00390 }
00391 numClients = j;
00392 }
00393
00394
00395
00396
00397
00398
00399
00400
00401 XrdXrootdJob::XrdXrootdJob(XrdScheduler *schp,
00402 XrdOucProg *pgm,
00403 const char *jname,
00404 int maxjobs)
00405 : XrdJob("Job Scheduler"),
00406 JobTable(maxjobs*3)
00407 {
00408
00409
00410 Sched = schp;
00411 theProg = pgm;
00412 JobName = strdup(jname);
00413 maxJobs = maxjobs;
00414 numJobs = 0;
00415
00416
00417
00418 schp->Schedule((XrdJob *)this, time(0) + (reScan));
00419 }
00420
00421
00422
00423
00424
00425
00426
00427
00428
00429 XrdXrootdJob::~XrdXrootdJob()
00430 {
00431 if (JobName) free(JobName);
00432 myMutex.Lock();
00433 Sched->Cancel((XrdJob *)this);
00434 myMutex.UnLock();
00435 }
00436
00437
00438
00439
00440
00441 int XrdXrootdJob::Cancel(const char *jkey, XrdXrootdResponse *resp)
00442 {
00443 XrdXrootdJob2Do *jp = 0;
00444 int i, jNum, jNext = 0, numcaned = 0;
00445
00446
00447
00448 myMutex.Lock();
00449
00450
00451
00452 if (jkey)
00453 {if ((jp = JobTable.Find(jkey)))
00454 {numcaned = 1;
00455 if (resp) {jp->delClient(resp);
00456 if (!jp->numClients) CleanUp(jp);
00457 }
00458 else CleanUp(jp);
00459 }
00460 myMutex.UnLock();
00461 return numcaned;
00462 }
00463
00464
00465
00466 while((jNum = JobTable.Next(jNext)) >= 0)
00467 {jp = JobTable.Item(jNum);
00468 if (resp)
00469 {i = jp->numClients;
00470 jp->delClient(resp);
00471 if (i != jp->numClients) numcaned++;
00472 if (!jp->numClients) CleanUp(jp);
00473 } else {
00474 CleanUp(jp);
00475 numcaned++;
00476 }
00477 }
00478
00479
00480
00481 myMutex.UnLock();
00482 return numcaned;
00483 }
00484
00485
00486
00487
00488
00489 void XrdXrootdJob::DoIt()
00490 {
00491 int jNum, jNext = 0;
00492 XrdXrootdJob2Do *jp;
00493
00494
00495
00496 while((jNum = JobTable.Next(jNext)) >= 0)
00497 {myMutex.Lock();
00498 if ((jp = JobTable.Item(jNum)))
00499 {if (jp->JobMark) {if (!jp->verClient()) CleanUp(jp);}
00500 else jp->JobMark = 1;
00501 }
00502 myMutex.UnLock();
00503 }
00504
00505
00506
00507 Sched->Schedule((XrdJob *)this, time(0) + (reScan));
00508 }
00509
00510
00511
00512
00513
00514
00515
00516 XrdOucTList *XrdXrootdJob::List()
00517 {
00518 char *jkey, buff[1024];
00519 int tlen, jNum, jNext = 0;
00520 XrdXrootdJob2Do *jp;
00521 XrdOucTList *tF = 0, *tL = 0, *tp;
00522
00523
00524
00525 while((jNum = JobTable.Next(jNext)) >= 0)
00526 {myMutex.Lock();
00527 if ((jp = JobTable.Item(jNum, &jkey)) && (tp = jp->lstClient()))
00528 {tlen = sprintf(buff, "<job id=\"%s\">%s", JobName, jkey);
00529 if (tL) tL->next = new XrdOucTList(buff, tlen, tp);
00530 else tF = new XrdOucTList(buff, tlen, tp);
00531 tL = tp->next = new XrdOucTList("</job>", 6);
00532 }
00533 myMutex.UnLock();
00534 }
00535
00536
00537
00538 return tF;
00539 }
00540
00541
00542
00543
00544
00545 int XrdXrootdJob::Schedule(const char *jkey,
00546 const char **args,
00547 XrdXrootdResponse *resp,
00548 int Opts)
00549 {
00550 XrdXrootdJob2Do *jp;
00551 const char *msg = "Job resources currently not available.";
00552 int jobNum, rc, isSync = Opts & JOB_Sync;
00553
00554
00555
00556 if (!jkey || !(*jkey))
00557 return resp->Send(kXR_ArgMissing, "Job target not specified.");
00558
00559
00560
00561 myMutex.Lock();
00562 if (!(Opts & JOB_Unique) && jkey && (jp = JobTable.Find(jkey)))
00563 {if (jp->Status == XrdXrootdJob2Do::Job_Done)
00564 {rc = sendResult(resp, args[0], jp);
00565 myMutex.UnLock();
00566 return rc;
00567 }
00568 if (jp->addClient(resp, Opts) < 0) isSync = 1;
00569 else msg = "Job scheduled.";
00570 } else {
00571 if ((jobNum = JobTable.Alloc()) < 0) isSync = 1;
00572 else {if ((jp = new XrdXrootdJob2Do(this, jobNum, args, resp, Opts)))
00573 {JobTable.Insert(jp, jkey, jobNum);
00574 if (numJobs < maxJobs)
00575 {Sched->Schedule((XrdJob *)jp);
00576 jp->Status = XrdXrootdJob2Do::Job_Active;
00577 jp->doRedrive = 1;
00578 }
00579 numJobs++; msg = "Job Scheduled";
00580 }
00581 }
00582 }
00583
00584
00585
00586 if (isSync) rc = resp->Send(kXR_wait, 30, msg);
00587 else rc = resp->Send(kXR_waitresp, 600, "Job scheduled.");
00588 myMutex.UnLock();
00589 return rc;
00590 }
00591
00592
00593
00594
00595
00596
00597
00598
00599 void XrdXrootdJob::CleanUp(XrdXrootdJob2Do *jp)
00600 {
00601 int theStatus = jp->Status;
00602
00603
00604
00605
00606
00607
00608 jp->Status = XrdXrootdJob2Do::Job_Cancel;
00609 if (theStatus == XrdXrootdJob2Do::Job_Waiting
00610 || theStatus == XrdXrootdJob2Do::Job_Done)
00611 Sched->Schedule((XrdJob *)jp);
00612 else if (theStatus == XrdXrootdJob2Do::Job_Active)
00613 jp->jobStream.Drain();
00614 if (theStatus == XrdXrootdJob2Do::Job_Waiting) numJobs--;
00615 }
00616
00617
00618
00619
00620
00621 int XrdXrootdJob::sendResult(XrdXrootdResponse *resp,
00622 const char *rpfx,
00623 XrdXrootdJob2Do *job)
00624 {
00625 struct iovec jobResp[4];
00626 int dlen, i, rc;
00627
00628
00629
00630 if (!(job->theResult)) rc = resp->Send(kXR_ServerError,"Program failed");
00631 else {if (!rpfx) {dlen = 0; i = 1;}
00632 else { jobResp[1].iov_base = (char *)rpfx;
00633 dlen = jobResp[1].iov_len = strlen(rpfx);
00634 jobResp[2].iov_base = (char *)" ";
00635 dlen += jobResp[2].iov_len = 1;
00636 i = 3;
00637 }
00638 jobResp[i].iov_base = job->theResult;
00639 dlen += jobResp[i].iov_len = strlen(job->theResult);
00640 rc = resp->Send(jobResp, i+1, dlen);
00641 }
00642
00643
00644
00645 job->delClient(resp);
00646 if (!job->numClients) CleanUp(job);
00647
00648
00649
00650 return rc;
00651 }