00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 #include "TGo4DataRunnable.h"
00017
00018 #include <iostream.h>
00019
00020 #include "Go4Log/TGo4Log.h"
00021 #include "Go4Queue/TGo4BufferQueue.h"
00022 #include "Go4ThreadManager/TGo4ThreadManager.h"
00023 #include "Go4Socket/TGo4SocketSignalHandler.h"
00024 #include "Go4Socket/TGo4Socket.h"
00025 #include "TGo4TaskHandler.h"
00026 #include "TGo4TaskHandlerAbortException.h"
00027 #include "TGo4Task.h"
00028 #include "Go4CommandsTaskHandler/Go4CommandsTaskHandler.h"
00029
00030 TGo4DataRunnable::TGo4DataRunnable(const char* name,
00031 TGo4ThreadManager* man,
00032 TGo4TaskHandler* hand,
00033 Bool_t receivermode)
00034 :TGo4TaskHandlerRunnable(name,man,hand,receivermode)
00035 {
00036 fxBufferQueue=dynamic_cast<TGo4BufferQueue*> (fxTaskHandler->GetDataQueue() );
00037 fxTransport=fxTaskHandler->GetDataTransport();
00038 }
00039
00040 TGo4DataRunnable::~TGo4DataRunnable()
00041 {
00042 }
00043
00044 Int_t TGo4DataRunnable::Run(void* ptr)
00045 {
00046 if(!CheckTransportOpen()) return 0;
00047 if(fbReceiverMode)
00048 {
00049 Int_t rev=fxTransport->ReceiveBuffer();
00050 if(rev>=0)
00051 {
00052 TBuffer* buf=const_cast<TBuffer*> (fxTransport->GetBuffer());
00053 Int_t val=0;
00054 if(CheckStopBuffer(buf,&val)) return 0;
00055 Go4EmergencyCommand_t comvalue= (Go4EmergencyCommand_t) (val);
00056 if(val>=0 && comvalue==kComQuit)
00057 {
00058
00059 GetThread()->Stop();
00060 TGo4Command* qcommand = new TGo4ComDisconnectSlave;
00061 TGo4Task* cli = dynamic_cast<TGo4Task*>(fxManager);
00062 if(cli) cli->SubmitLocalCommand(qcommand);
00063 return 0;
00064 }
00065 else
00066 {
00067 fxBufferQueue->AddBuffer(buf, kTRUE);
00068 }
00069 }
00070 else
00071 {
00072
00073 if (TGo4SocketSignalHandler::fgiLastSignal == SIGWINCH)
00074
00075 {
00076
00077 TGo4Log::Debug(" %s: caught SIGWINCH ",GetName());
00078 TGo4SocketSignalHandler::fgiLastSignal = 0;
00079
00080 }
00081 else if(fxManager->IsTerminating())
00082 {
00083
00084 TGo4Log::Debug("Receive Error in %s during threadmanager termination. Ignored.",GetName());
00085 GetThread()->Stop();
00086
00087 }
00088 else
00089 {
00090 TGo4Log::Debug(" !!!Receive Error in DataRunnable of Task %s!!!",
00091 fxTaskHandler->GetName());
00092
00093 throw TGo4TaskHandlerAbortException(this);
00094 }
00095
00096 }
00097
00098 }
00099 else
00100 {
00101
00102 TBuffer* buf= fxBufferQueue->WaitBuffer();
00103 if (buf)
00104
00105 {
00106 CheckStopBuffer(buf);
00107 fxTransport->SendBuffer(buf);
00108 fxBufferQueue->FreeBuffer(buf);
00109 }
00110 }
00111 return 0;
00112 }
00113
00114