XrdOssAio.cc

Go to the documentation of this file.
00001 /******************************************************************************/
00002 /*                                                                            */
00003 /*                          X r d O s s A i o . c c                           */
00004 /*                                                                            */
00005 /* (c) 2004 by the Board of Trustees of the Leland Stanford, Jr., University  */
00006 /*                            All Rights Reserved                             */
00007 /*   Produced by Andrew Hanushevsky for Stanford University under contract    */
00008 /*              DE-AC03-76-SFO0515 with the Department of Energy              */
00009 /******************************************************************************/
00010   
00011 //         $Id: XrdOssAio.cc 35287 2010-09-14 21:19:35Z ganis $
00012 
00013 const char *XrdOssAioCVSID = "$Id: XrdOssAio.cc 35287 2010-09-14 21:19:35Z ganis $";
00014 
00015 #include <signal.h>
00016 #include <stdio.h>
00017 #include <unistd.h>
00018 #ifdef _POSIX_ASYNCHRONOUS_IO
00019 #ifdef __FreeBSD__
00020 #include <fcntl.h>
00021 #endif
00022 #ifdef __macos__
00023 #include <sys/aio.h>
00024 #else
00025 #include <aio.h>
00026 #endif
00027 #endif
00028 
00029 #include "XrdOss/XrdOssApi.hh"
00030 #include "XrdOss/XrdOssTrace.hh"
00031 #include "XrdSys/XrdSysError.hh"
00032 #include "XrdSys/XrdSysPlatform.hh"
00033 #include "XrdSys/XrdSysPthread.hh"
00034 #include "XrdSfs/XrdSfsAio.hh"
00035 
00036 // All AIO interfaces are defined here.
00037  
00038 
00039 // Currently we disable aio support for MacOS because it is way too
00040 // buggy and incomplete. The two major problems are:
00041 // 1) No implementation of sigwaitinfo(). Though we can simulate it...
00042 // 2) Event notification returns an incomplete siginfo structure.
00043 //
00044 #ifdef __macos__
00045 #undef _POSIX_ASYNCHRONOUS_IO
00046 #endif
00047 
00048 /******************************************************************************/
00049 /*                               G l o b a l s                                */
00050 /******************************************************************************/
00051   
00052 extern XrdOucTrace OssTrace;
00053 //define tident aiop->TIdent
00054 
00055 extern XrdSysError OssEroute;
00056 
00057 int   XrdOssFile::AioFailure = 0;
00058 
00059 #ifdef _POSIX_ASYNCHRONOUS_IO
00060 #ifdef SIGRTMAX
00061 const int OSS_AIO_READ_DONE  = SIGRTMAX-1;
00062 const int OSS_AIO_WRITE_DONE = SIGRTMAX;
00063 #else
00064 #define OSS_AIO_READ_DONE  SIGUSR1
00065 #define OSS_AIO_WRITE_DONE SIGUSR2
00066 #endif
00067 #endif
00068 
00069 /******************************************************************************/
00070 /*                                 F s y n c                                  */
00071 /******************************************************************************/
00072   
00073 /*
00074   Function: Async fsync() a file
00075 
00076   Input:    aiop      - A aio request object
00077 */
00078 
00079 int XrdOssFile::Fsync(XrdSfsAio *aiop)
00080 {
00081 
00082 #ifdef _POSIX_ASYNCHRONOUS_IO
00083    int rc;
00084 
00085 // Complete the aio request block and do the operation
00086 //
00087    if (XrdOssSys::AioAllOk)
00088       {aiop->sfsAio.aio_fildes = fd;
00089        aiop->sfsAio.aio_sigevent.sigev_signo  = OSS_AIO_WRITE_DONE;
00090        aiop->TIdent = tident;
00091 
00092       // Start the operation
00093       //
00094          if (!(rc = aio_fsync(O_SYNC, &aiop->sfsAio))) return 0;
00095          if (errno != EAGAIN && errno != ENOSYS) return -errno;
00096 
00097       // Aio failed keep track of the problem (msg every 1024 events). Note
00098       // that the handling of the counter is sloppy because we do not lock it.
00099       //
00100          {int fcnt = AioFailure++;
00101           if ((fcnt & 0x3ff) == 1) OssEroute.Emsg("aio", errno, "fsync async");
00102          }
00103      }
00104 #endif
00105 
00106 // Execute this request in a synchronous fashion
00107 //
00108    if ((aiop->Result = Fsync())) aiop->Result = -errno;
00109 
00110 // Simply call the write completion routine and return as if all went well
00111 //
00112    aiop->doneWrite();
00113    return 0;
00114 }
00115 
00116 /******************************************************************************/
00117 /*                                  R e a d                                   */
00118 /******************************************************************************/
00119 
00120 /*
00121   Function: Async read `blen' bytes from the associated file, placing in 'buff'
00122 
00123   Input:    aiop      - An aio request object
00124 
00125    Output:  <0 -> Operation failed, value is negative errno value.
00126             =0 -> Operation queued
00127             >0 -> Operation not queued, system resources unavailable or
00128                                         asynchronous I/O is not supported.
00129 */
00130   
00131 int XrdOssFile::Read(XrdSfsAio *aiop)
00132 {
00133 
00134 #ifdef _POSIX_ASYNCHRONOUS_IO
00135    EPNAME("AioRead");
00136    int rc;
00137 
00138 // Complete the aio request block and do the operation
00139 //
00140    if (XrdOssSys::AioAllOk)
00141       {aiop->sfsAio.aio_fildes = fd;
00142        aiop->sfsAio.aio_sigevent.sigev_signo  = OSS_AIO_READ_DONE;
00143        aiop->TIdent = tident;
00144        TRACE(Debug,  "Read " <<aiop->sfsAio.aio_nbytes <<'@'
00145                              <<aiop->sfsAio.aio_offset <<" started; aiocb="
00146                              <<std::hex <<aiop <<std::dec);
00147 
00148        // Start the operation
00149        //
00150           if (!(rc = aio_read(&aiop->sfsAio))) return 0;
00151           if (errno != EAGAIN && errno != ENOSYS) return -errno;
00152 
00153       // Aio failed keep track of the problem (msg every 1024 events). Note
00154       // that the handling of the counter is sloppy because we do not lock it.
00155       //
00156          {int fcnt = AioFailure++;
00157           if ((fcnt & 0x3ff) == 1) OssEroute.Emsg("aio", errno, "read async");
00158          }
00159      }
00160 #endif
00161 
00162 // Execute this request in a synchronous fashion
00163 //
00164    aiop->Result = this->Read((void *)aiop->sfsAio.aio_buf,
00165                               (off_t)aiop->sfsAio.aio_offset,
00166                              (size_t)aiop->sfsAio.aio_nbytes);
00167 
00168 // Simple call the read completion routine and return as if all went well
00169 //
00170    aiop->doneRead();
00171    return 0;
00172 }
00173 
00174 /******************************************************************************/
00175 /*                                 W r i t e                                  */
00176 /******************************************************************************/
00177   
00178 /*
00179   Function: Async write `blen' bytes from 'buff' into the associated file
00180 
00181   Input:    aiop      - An aio request object.
00182 
00183    Output:  <0 -> Operation failed, value is negative errno value.
00184             =0 -> Operation queued
00185             >0 -> Operation not queued, system resources unavailable or
00186                                         asynchronous I/O is not supported.
00187 */
00188   
00189 int XrdOssFile::Write(XrdSfsAio *aiop)
00190 {
00191 #ifdef _POSIX_ASYNCHRONOUS_IO
00192    EPNAME("AioWrite");
00193    int rc;
00194 
00195 // Complete the aio request block and do the operation
00196 //
00197    if (XrdOssSys::AioAllOk)
00198       {aiop->sfsAio.aio_fildes = fd;
00199        aiop->sfsAio.aio_sigevent.sigev_signo  = OSS_AIO_WRITE_DONE;
00200        aiop->TIdent = tident;
00201        TRACE(Debug, "Write " <<aiop->sfsAio.aio_nbytes <<'@'
00202                              <<aiop->sfsAio.aio_offset <<" started; aiocb="
00203                              <<std::hex <<aiop <<std::dec);
00204 
00205        // Start the operation
00206        //
00207           if (!(rc = aio_write(&aiop->sfsAio))) return 0;
00208           if (errno != EAGAIN && errno != ENOSYS) return -errno;
00209 
00210        // Aio failed keep track of the problem (msg every 1024 events). Note
00211        // that the handling of the counter is sloppy because we do not lock it.
00212        //
00213           {int fcnt = AioFailure++;
00214            if ((fcnt & 0x3ff) == 1) OssEroute.Emsg("Write",errno,"write async");
00215           }
00216       }
00217 #endif
00218 
00219 // Execute this request in a synchronous fashion
00220 //
00221    aiop->Result = this->Write((const void *)aiop->sfsAio.aio_buf,
00222                                      (off_t)aiop->sfsAio.aio_offset,
00223                                     (size_t)aiop->sfsAio.aio_nbytes);
00224 
00225 // Simply call the write completion routine and return as if all went well
00226 //
00227    aiop->doneWrite();
00228    return 0;
00229 }
00230 
00231 /******************************************************************************/
00232 /*                 X r d O s s S y s   A I O   M e t h o d s                  */
00233 /******************************************************************************/
00234 /******************************************************************************/
00235 /*                               G l o b a l s                                */
00236 /******************************************************************************/
00237 
00238 int   XrdOssSys::AioAllOk = 0;
00239   
00240 #if defined(_POSIX_ASYNCHRONOUS_IO) && !defined(HAVE_SIGWTI)
00241 // The folowing is for sigwaitinfo() emulation
00242 //
00243 siginfo_t *XrdOssAioInfoR;
00244 siginfo_t *XrdOssAioInfoW;
00245 extern "C" {extern void XrdOssAioRSH(int, siginfo_t *, void *);}
00246 extern "C" {extern void XrdOssAioWSH(int, siginfo_t *, void *);}
00247 #endif
00248 
00249 /******************************************************************************/
00250 /*                               A i o I n i t                                */
00251 /******************************************************************************/
00252 /*
00253   Function: Initialize for AIO processing.
00254 
00255   Return:   True if successful, false otherwise.
00256 */
00257 
00258 int XrdOssSys::AioInit()
00259 {
00260 #if defined(_POSIX_ASYNCHRONOUS_IO)
00261    EPNAME("AioInit");
00262    extern void *XrdOssAioWait(void *carg);
00263    pthread_t tid;
00264    int retc;
00265 
00266 #ifndef HAVE_SIGWTI
00267 // For those platforms that do not have sigwaitinfo(), we provide the
00268 // appropriate emulation using a signal handler. We actually provide for
00269 // two handlers since we separate reads from writes. To emulate synchronous
00270 // signals, we prohibit one signal hander from interrupting another one.
00271 //
00272     struct sigaction sa;
00273 
00274     sa.sa_sigaction = XrdOssAioRSH;
00275     sa.sa_flags = SA_SIGINFO;
00276     sigemptyset(&sa.sa_mask);
00277     sigaddset(&sa.sa_mask, OSS_AIO_WRITE_DONE);
00278     if (sigaction(OSS_AIO_READ_DONE, &sa, NULL) < 0)
00279        {OssEroute.Emsg("AioInit", errno, "creating AIO read signal handler; "
00280                                  "AIO support terminated.");
00281         return 0;
00282        }
00283 
00284     sa.sa_sigaction = XrdOssAioWSH;
00285     sa.sa_flags = SA_SIGINFO;
00286     sigemptyset(&sa.sa_mask);
00287     sigaddset(&sa.sa_mask, OSS_AIO_READ_DONE);
00288     if (sigaction(OSS_AIO_WRITE_DONE, &sa, NULL) < 0)
00289        {OssEroute.Emsg("AioInit", errno, "creating AIO write signal handler; "
00290                                  "AIO support terminated.");
00291         return 0;
00292        }
00293 #endif
00294 
00295 // The AIO signal handler consists of two thread (one for read and one for
00296 // write) that synhronously wait for AIO events. We assume, blithely, that
00297 // the first two real-time signals have been blocked for all threads.
00298 //
00299    if ((retc = XrdSysThread::Run(&tid, XrdOssAioWait,
00300                                 (void *)(&OSS_AIO_READ_DONE))) < 0)
00301       OssEroute.Emsg("AioInit", retc, "creating AIO read signal thread; "
00302                                  "AIO support terminated.");
00303 #ifdef __FreeBSD__
00304       else {DEBUG("started AIO read signal thread.");
00305 #else
00306       else {DEBUG("started AIO read signal thread; tid=" <<(unsigned int)tid);
00307 #endif
00308             if ((retc = XrdSysThread::Run(&tid, XrdOssAioWait,
00309                                 (void *)(&OSS_AIO_WRITE_DONE))) < 0)
00310                OssEroute.Emsg("AioInit", retc, "creating AIO write signal thread; "
00311                                  "AIO support terminated.");
00312 #ifdef __FreeBSD__
00313                else {DEBUG("started AIO write signal thread.");
00314 #else
00315                else {DEBUG("started AIO write signal thread; tid=" <<(unsigned int)tid);
00316 #endif
00317                      AioAllOk = 1;
00318                     }
00319            }
00320 
00321 // All done
00322 //
00323    return AioAllOk;
00324 #else
00325    return 1;
00326 #endif
00327 }
00328 
00329 /******************************************************************************/
00330 /*                               A i o W a i t                                */
00331 /******************************************************************************/
00332   
00333 void *XrdOssAioWait(void *mySigarg)
00334 {
00335 #ifdef _POSIX_ASYNCHRONOUS_IO
00336    EPNAME("AioWait");
00337    int mySignum = *((int *)mySigarg);
00338    const char *sigType = (mySignum == OSS_AIO_READ_DONE ? "read" : "write");
00339    const int  isRead   = (mySignum == OSS_AIO_READ_DONE);
00340    sigset_t  mySigset;
00341    siginfo_t myInfo;
00342    XrdSfsAio *aiop;
00343    int rc, numsig;
00344    ssize_t retval;
00345 #ifndef HAVE_SIGWTI
00346    extern int sigwaitinfo(const sigset_t *set, siginfo_t *info);
00347    extern siginfo_t *XrdOssAioInfoR;
00348    extern siginfo_t *XrdOssAioInfoW;
00349 
00350 // We will catch one signal at a time. So, the address of siginfo_t can be
00351 // placed in a global area where the signal handler will find it. We have one
00352 // two places where this can go.
00353 //
00354    if (isRead) XrdOssAioInfoR = &myInfo;
00355       else XrdOssAioInfoW = &myInfo;
00356 
00357 // Initialize the signal we will be suspended for
00358 //
00359    sigfillset(&mySigset);
00360    sigdelset(&mySigset, mySignum);
00361 #else
00362 
00363 // Initialize the signal we will be waiting for
00364 //
00365    sigemptyset(&mySigset);
00366    sigaddset(&mySigset, mySignum);
00367 #endif
00368 
00369 // Simply wait for events and requeue the completed AIO operation
00370 //
00371    do {do {numsig = sigwaitinfo((const sigset_t *)&mySigset, &myInfo);}
00372           while (numsig < 0 && errno == EINTR);
00373        if (numsig < 0)
00374           {OssEroute.Emsg("AioWait",errno,sigType,"wait for AIO signal");
00375            XrdOssSys::AioAllOk = 0;
00376            break;
00377           }
00378        if (numsig != mySignum || myInfo.si_code != SI_ASYNCIO)
00379           {char buff[80];
00380            sprintf(buff, "%d %d", myInfo.si_code, numsig);
00381            OssEroute.Emsg("AioWait", "received unexpected signal", buff);
00382            continue;
00383           }
00384 
00385 #ifdef __macos__
00386        aiop = (XrdSfsAio *)myInfo.si_value.sigval_ptr;
00387 #else
00388        aiop = (XrdSfsAio *)myInfo.si_value.sival_ptr;
00389 #endif
00390 
00391        while ((rc = aio_error(&aiop->sfsAio)) == EINPROGRESS) {}
00392        retval = (ssize_t)aio_return(&aiop->sfsAio);
00393 
00394        DEBUG(sigType <<" completed for " <<aiop->TIdent <<"; rc=" <<rc 
00395              <<" result=" <<retval <<" aiocb=" <<std::hex <<aiop <<std::dec);
00396 
00397        if (retval < 0) aiop->Result = -rc;
00398           else         aiop->Result = retval;
00399 
00400        if (isRead) aiop->doneRead();
00401           else     aiop->doneWrite();
00402       } while(1);
00403 #endif
00404    return (void *)0;
00405 }
00406  
00407 #if defined( _POSIX_ASYNCHRONOUS_IO) && !defined(HAVE_SIGWTI)
00408 /******************************************************************************/
00409 /*                           s i g w a i t i n f o                            */
00410 /******************************************************************************/
00411   
00412 // Some platforms do not have sigwaitinfo() (e.g., MacOS). We provide for
00413 // emulation here. It's not as good as the kernel version and the 
00414 // implementation is very specific to the task at hand.
00415 //
00416 int sigwaitinfo(const sigset_t *set, siginfo_t *info)
00417 {
00418 // Now enable the signal handler by unblocking the signal. It will move the
00419 // siginfo into the waiting struct and we can return.
00420 //
00421    sigsuspend(set);
00422    return info->si_signo;
00423 }
00424  
00425 /******************************************************************************/
00426 /*                          X r d O s s A i o R S H                           */
00427 /******************************************************************************/
00428   
00429 // XrdOssAioRSH handles AIO read signals. This handler was setup at AIO
00430 // initialization time but only when this platform does not have sigwaitinfo().
00431 //
00432 extern "C"
00433 {
00434 void XrdOssAioRSH(int signum, siginfo_t *info, void *ucontext)
00435 {
00436    extern siginfo_t *XrdOssAioInfoR;
00437 
00438 // If we received a signal, it must have been for an AIO read and the read
00439 // signal thread enabled this signal. This means that a valid address exists
00440 // in the global read info pointer that we can now fill out.
00441 //
00442    XrdOssAioInfoR->si_signo = info->si_signo;
00443    XrdOssAioInfoR->si_errno = info->si_errno;
00444    XrdOssAioInfoR->si_code  = info->si_code;
00445 #ifdef __macos__
00446    XrdOssAioInfoR->si_value.sigval_ptr = info->si_addr;
00447 #else
00448    XrdOssAioInfoR->si_value.sival_ptr = info->si_value.sival_ptr;
00449 #endif
00450 }
00451 }
00452  
00453 /******************************************************************************/
00454 /*                          X r d O s s A i o W S H                           */
00455 /******************************************************************************/
00456   
00457 // XrdOssAioRSH handles AIO read signals. This handler was setup at AIO
00458 // initialization time but only when this platform does not have sigwaitinfo().
00459 //
00460 extern "C"
00461 {
00462 void XrdOssAioWSH(int signum, siginfo_t *info, void *ucontext)
00463 {
00464    extern siginfo_t *XrdOssAioInfoW;
00465 
00466 // If we received a signal, it must have been for an AIO read and the read
00467 // signal thread enabled this signal. This means that a valid address exists
00468 // in the global read info pointer that we can now fill out.
00469 //
00470    XrdOssAioInfoW->si_signo = info->si_signo;
00471    XrdOssAioInfoW->si_errno = info->si_errno;
00472    XrdOssAioInfoW->si_code  = info->si_code;
00473 #ifdef __macos__
00474    XrdOssAioInfoW->si_value.sigval_ptr = info->si_addr;
00475 #else
00476    XrdOssAioInfoW->si_value.sival_ptr = info->si_value.sival_ptr;
00477 #endif
00478 }
00479 }
00480 #endif

Generated on Tue Jul 5 14:46:45 2011 for ROOT_528-00b_version by  doxygen 1.5.1