00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 const char *XrdOssAioCVSID = "$Id: XrdOssAio.cc 35287 2010-09-14 21:19:35Z ganis $";
00014
00015 #include <signal.h>
00016 #include <stdio.h>
00017 #include <unistd.h>
00018 #ifdef _POSIX_ASYNCHRONOUS_IO
00019 #ifdef __FreeBSD__
00020 #include <fcntl.h>
00021 #endif
00022 #ifdef __macos__
00023 #include <sys/aio.h>
00024 #else
00025 #include <aio.h>
00026 #endif
00027 #endif
00028
00029 #include "XrdOss/XrdOssApi.hh"
00030 #include "XrdOss/XrdOssTrace.hh"
00031 #include "XrdSys/XrdSysError.hh"
00032 #include "XrdSys/XrdSysPlatform.hh"
00033 #include "XrdSys/XrdSysPthread.hh"
00034 #include "XrdSfs/XrdSfsAio.hh"
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044 #ifdef __macos__
00045 #undef _POSIX_ASYNCHRONOUS_IO
00046 #endif
00047
00048
00049
00050
00051
00052 extern XrdOucTrace OssTrace;
00053
00054
00055 extern XrdSysError OssEroute;
00056
00057 int XrdOssFile::AioFailure = 0;
00058
00059 #ifdef _POSIX_ASYNCHRONOUS_IO
00060 #ifdef SIGRTMAX
00061 const int OSS_AIO_READ_DONE = SIGRTMAX-1;
00062 const int OSS_AIO_WRITE_DONE = SIGRTMAX;
00063 #else
00064 #define OSS_AIO_READ_DONE SIGUSR1
00065 #define OSS_AIO_WRITE_DONE SIGUSR2
00066 #endif
00067 #endif
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079 int XrdOssFile::Fsync(XrdSfsAio *aiop)
00080 {
00081
00082 #ifdef _POSIX_ASYNCHRONOUS_IO
00083 int rc;
00084
00085
00086
00087 if (XrdOssSys::AioAllOk)
00088 {aiop->sfsAio.aio_fildes = fd;
00089 aiop->sfsAio.aio_sigevent.sigev_signo = OSS_AIO_WRITE_DONE;
00090 aiop->TIdent = tident;
00091
00092
00093
00094 if (!(rc = aio_fsync(O_SYNC, &aiop->sfsAio))) return 0;
00095 if (errno != EAGAIN && errno != ENOSYS) return -errno;
00096
00097
00098
00099
00100 {int fcnt = AioFailure++;
00101 if ((fcnt & 0x3ff) == 1) OssEroute.Emsg("aio", errno, "fsync async");
00102 }
00103 }
00104 #endif
00105
00106
00107
00108 if ((aiop->Result = Fsync())) aiop->Result = -errno;
00109
00110
00111
00112 aiop->doneWrite();
00113 return 0;
00114 }
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131 int XrdOssFile::Read(XrdSfsAio *aiop)
00132 {
00133
00134 #ifdef _POSIX_ASYNCHRONOUS_IO
00135 EPNAME("AioRead");
00136 int rc;
00137
00138
00139
00140 if (XrdOssSys::AioAllOk)
00141 {aiop->sfsAio.aio_fildes = fd;
00142 aiop->sfsAio.aio_sigevent.sigev_signo = OSS_AIO_READ_DONE;
00143 aiop->TIdent = tident;
00144 TRACE(Debug, "Read " <<aiop->sfsAio.aio_nbytes <<'@'
00145 <<aiop->sfsAio.aio_offset <<" started; aiocb="
00146 <<std::hex <<aiop <<std::dec);
00147
00148
00149
00150 if (!(rc = aio_read(&aiop->sfsAio))) return 0;
00151 if (errno != EAGAIN && errno != ENOSYS) return -errno;
00152
00153
00154
00155
00156 {int fcnt = AioFailure++;
00157 if ((fcnt & 0x3ff) == 1) OssEroute.Emsg("aio", errno, "read async");
00158 }
00159 }
00160 #endif
00161
00162
00163
00164 aiop->Result = this->Read((void *)aiop->sfsAio.aio_buf,
00165 (off_t)aiop->sfsAio.aio_offset,
00166 (size_t)aiop->sfsAio.aio_nbytes);
00167
00168
00169
00170 aiop->doneRead();
00171 return 0;
00172 }
00173
00174
00175
00176
00177
00178
00179
00180
00181
00182
00183
00184
00185
00186
00187
00188
00189 int XrdOssFile::Write(XrdSfsAio *aiop)
00190 {
00191 #ifdef _POSIX_ASYNCHRONOUS_IO
00192 EPNAME("AioWrite");
00193 int rc;
00194
00195
00196
00197 if (XrdOssSys::AioAllOk)
00198 {aiop->sfsAio.aio_fildes = fd;
00199 aiop->sfsAio.aio_sigevent.sigev_signo = OSS_AIO_WRITE_DONE;
00200 aiop->TIdent = tident;
00201 TRACE(Debug, "Write " <<aiop->sfsAio.aio_nbytes <<'@'
00202 <<aiop->sfsAio.aio_offset <<" started; aiocb="
00203 <<std::hex <<aiop <<std::dec);
00204
00205
00206
00207 if (!(rc = aio_write(&aiop->sfsAio))) return 0;
00208 if (errno != EAGAIN && errno != ENOSYS) return -errno;
00209
00210
00211
00212
00213 {int fcnt = AioFailure++;
00214 if ((fcnt & 0x3ff) == 1) OssEroute.Emsg("Write",errno,"write async");
00215 }
00216 }
00217 #endif
00218
00219
00220
00221 aiop->Result = this->Write((const void *)aiop->sfsAio.aio_buf,
00222 (off_t)aiop->sfsAio.aio_offset,
00223 (size_t)aiop->sfsAio.aio_nbytes);
00224
00225
00226
00227 aiop->doneWrite();
00228 return 0;
00229 }
00230
00231
00232
00233
00234
00235
00236
00237
00238 int XrdOssSys::AioAllOk = 0;
00239
00240 #if defined(_POSIX_ASYNCHRONOUS_IO) && !defined(HAVE_SIGWTI)
00241
00242
00243 siginfo_t *XrdOssAioInfoR;
00244 siginfo_t *XrdOssAioInfoW;
00245 extern "C" {extern void XrdOssAioRSH(int, siginfo_t *, void *);}
00246 extern "C" {extern void XrdOssAioWSH(int, siginfo_t *, void *);}
00247 #endif
00248
00249
00250
00251
00252
00253
00254
00255
00256
00257
00258 int XrdOssSys::AioInit()
00259 {
00260 #if defined(_POSIX_ASYNCHRONOUS_IO)
00261 EPNAME("AioInit");
00262 extern void *XrdOssAioWait(void *carg);
00263 pthread_t tid;
00264 int retc;
00265
00266 #ifndef HAVE_SIGWTI
00267
00268
00269
00270
00271
00272 struct sigaction sa;
00273
00274 sa.sa_sigaction = XrdOssAioRSH;
00275 sa.sa_flags = SA_SIGINFO;
00276 sigemptyset(&sa.sa_mask);
00277 sigaddset(&sa.sa_mask, OSS_AIO_WRITE_DONE);
00278 if (sigaction(OSS_AIO_READ_DONE, &sa, NULL) < 0)
00279 {OssEroute.Emsg("AioInit", errno, "creating AIO read signal handler; "
00280 "AIO support terminated.");
00281 return 0;
00282 }
00283
00284 sa.sa_sigaction = XrdOssAioWSH;
00285 sa.sa_flags = SA_SIGINFO;
00286 sigemptyset(&sa.sa_mask);
00287 sigaddset(&sa.sa_mask, OSS_AIO_READ_DONE);
00288 if (sigaction(OSS_AIO_WRITE_DONE, &sa, NULL) < 0)
00289 {OssEroute.Emsg("AioInit", errno, "creating AIO write signal handler; "
00290 "AIO support terminated.");
00291 return 0;
00292 }
00293 #endif
00294
00295
00296
00297
00298
00299 if ((retc = XrdSysThread::Run(&tid, XrdOssAioWait,
00300 (void *)(&OSS_AIO_READ_DONE))) < 0)
00301 OssEroute.Emsg("AioInit", retc, "creating AIO read signal thread; "
00302 "AIO support terminated.");
00303 #ifdef __FreeBSD__
00304 else {DEBUG("started AIO read signal thread.");
00305 #else
00306 else {DEBUG("started AIO read signal thread; tid=" <<(unsigned int)tid);
00307 #endif
00308 if ((retc = XrdSysThread::Run(&tid, XrdOssAioWait,
00309 (void *)(&OSS_AIO_WRITE_DONE))) < 0)
00310 OssEroute.Emsg("AioInit", retc, "creating AIO write signal thread; "
00311 "AIO support terminated.");
00312 #ifdef __FreeBSD__
00313 else {DEBUG("started AIO write signal thread.");
00314 #else
00315 else {DEBUG("started AIO write signal thread; tid=" <<(unsigned int)tid);
00316 #endif
00317 AioAllOk = 1;
00318 }
00319 }
00320
00321
00322
00323 return AioAllOk;
00324 #else
00325 return 1;
00326 #endif
00327 }
00328
00329
00330
00331
00332
00333 void *XrdOssAioWait(void *mySigarg)
00334 {
00335 #ifdef _POSIX_ASYNCHRONOUS_IO
00336 EPNAME("AioWait");
00337 int mySignum = *((int *)mySigarg);
00338 const char *sigType = (mySignum == OSS_AIO_READ_DONE ? "read" : "write");
00339 const int isRead = (mySignum == OSS_AIO_READ_DONE);
00340 sigset_t mySigset;
00341 siginfo_t myInfo;
00342 XrdSfsAio *aiop;
00343 int rc, numsig;
00344 ssize_t retval;
00345 #ifndef HAVE_SIGWTI
00346 extern int sigwaitinfo(const sigset_t *set, siginfo_t *info);
00347 extern siginfo_t *XrdOssAioInfoR;
00348 extern siginfo_t *XrdOssAioInfoW;
00349
00350
00351
00352
00353
00354 if (isRead) XrdOssAioInfoR = &myInfo;
00355 else XrdOssAioInfoW = &myInfo;
00356
00357
00358
00359 sigfillset(&mySigset);
00360 sigdelset(&mySigset, mySignum);
00361 #else
00362
00363
00364
00365 sigemptyset(&mySigset);
00366 sigaddset(&mySigset, mySignum);
00367 #endif
00368
00369
00370
00371 do {do {numsig = sigwaitinfo((const sigset_t *)&mySigset, &myInfo);}
00372 while (numsig < 0 && errno == EINTR);
00373 if (numsig < 0)
00374 {OssEroute.Emsg("AioWait",errno,sigType,"wait for AIO signal");
00375 XrdOssSys::AioAllOk = 0;
00376 break;
00377 }
00378 if (numsig != mySignum || myInfo.si_code != SI_ASYNCIO)
00379 {char buff[80];
00380 sprintf(buff, "%d %d", myInfo.si_code, numsig);
00381 OssEroute.Emsg("AioWait", "received unexpected signal", buff);
00382 continue;
00383 }
00384
00385 #ifdef __macos__
00386 aiop = (XrdSfsAio *)myInfo.si_value.sigval_ptr;
00387 #else
00388 aiop = (XrdSfsAio *)myInfo.si_value.sival_ptr;
00389 #endif
00390
00391 while ((rc = aio_error(&aiop->sfsAio)) == EINPROGRESS) {}
00392 retval = (ssize_t)aio_return(&aiop->sfsAio);
00393
00394 DEBUG(sigType <<" completed for " <<aiop->TIdent <<"; rc=" <<rc
00395 <<" result=" <<retval <<" aiocb=" <<std::hex <<aiop <<std::dec);
00396
00397 if (retval < 0) aiop->Result = -rc;
00398 else aiop->Result = retval;
00399
00400 if (isRead) aiop->doneRead();
00401 else aiop->doneWrite();
00402 } while(1);
00403 #endif
00404 return (void *)0;
00405 }
00406
00407 #if defined( _POSIX_ASYNCHRONOUS_IO) && !defined(HAVE_SIGWTI)
00408
00409
00410
00411
00412
00413
00414
00415
00416 int sigwaitinfo(const sigset_t *set, siginfo_t *info)
00417 {
00418
00419
00420
00421 sigsuspend(set);
00422 return info->si_signo;
00423 }
00424
00425
00426
00427
00428
00429
00430
00431
00432 extern "C"
00433 {
00434 void XrdOssAioRSH(int signum, siginfo_t *info, void *ucontext)
00435 {
00436 extern siginfo_t *XrdOssAioInfoR;
00437
00438
00439
00440
00441
00442 XrdOssAioInfoR->si_signo = info->si_signo;
00443 XrdOssAioInfoR->si_errno = info->si_errno;
00444 XrdOssAioInfoR->si_code = info->si_code;
00445 #ifdef __macos__
00446 XrdOssAioInfoR->si_value.sigval_ptr = info->si_addr;
00447 #else
00448 XrdOssAioInfoR->si_value.sival_ptr = info->si_value.sival_ptr;
00449 #endif
00450 }
00451 }
00452
00453
00454
00455
00456
00457
00458
00459
00460 extern "C"
00461 {
00462 void XrdOssAioWSH(int signum, siginfo_t *info, void *ucontext)
00463 {
00464 extern siginfo_t *XrdOssAioInfoW;
00465
00466
00467
00468
00469
00470 XrdOssAioInfoW->si_signo = info->si_signo;
00471 XrdOssAioInfoW->si_errno = info->si_errno;
00472 XrdOssAioInfoW->si_code = info->si_code;
00473 #ifdef __macos__
00474 XrdOssAioInfoW->si_value.sigval_ptr = info->si_addr;
00475 #else
00476 XrdOssAioInfoW->si_value.sival_ptr = info->si_value.sival_ptr;
00477 #endif
00478 }
00479 }
00480 #endif