22 #include <netinet/in.h>
27 #ifndef __NO_MULTICAST__
132 QP()->Post_Recv(fPool->GetRecvWR(bufid));
136 strcpy((
char*) fPool->GetSendBufferLocation(bufid), connid.c_str());
137 QP()->Post_Send(fPool->GetSendWR(bufid, connid.length()+1));
149 fLocalCmd.ReplyBool(res);
151 fIbContext.Release();
160 if (fConnected)
return -1;
162 EOUT(
"HANDSHAKE is timedout");
172 EOUT(
"Wrong buffer id %u", bufid);
178 EOUT(
"Connection request disappear");
183 const char* connid = (
const char*) fPool->GetSendBufferLocation(bufid);
185 if (req.
GetConnId().compare(connid)!=0) {
186 EOUT(
"AAAAA !!!!! Mismatch with connid %s %s", connid, req.
GetConnId().c_str());
193 VerbsNetworkInetrface* addon =
new VerbsNetworkInetrface(fIbContext, TakeQP());
205 EOUT(
"Wrong buffer id %u", bufid);
211 const char* connid = (
const char*) fPool->GetBufferLocation(bufid);
213 if (req.
GetConnId().compare(connid)!=0) {
214 EOUT(
"AAAAA !!!!! Mismatch with connid %s %s", connid, req.
GetConnId().c_str());
230 EOUT(
"VerbsProtocolAddon error");
240 fAllocateIndividualCQ(false)
245 EOUT(
"FATAL. Cannot start VERBS device");
258 DOUT5(
"verbs::Device::~Device()");
263 ibv_qp_type qp_type = IBV_QPT_RC;
265 if (conn_type>0) qp_type = (ibv_qp_type) conn_type;
267 thrd = MakeThread(thrd_name.c_str(),
true);
271 if (thrd_ptr == 0)
return 0;
273 unsigned input_size(0), output_size(0);
279 int num_send_seg = fIbContext.max_sge() - 1;
280 if (conn_type==IBV_QPT_UD) num_send_seg = fIbContext.max_sge() - 5;
281 if (num_send_seg<2) num_send_seg = 2;
284 port_cq, output_size + 1, num_send_seg,
285 port_cq, input_size + 1, 2);
287 if (port_qp->
qp()==0) {
299 if (!thrd.null() || !force)
return thrd;
312 if (!maddr.empty()) {
315 if (thrdname.empty())
327 EOUT(
"Cannot convert address %s to ibv_gid type", maddr.c_str());
333 EOUT(
"Addresses not the same: %s - %s", maddr.c_str(), buf.c_str());
363 EOUT(
"No port is available for the request");
370 QueuePair* port_qp = CreatePortQP(req.
GetConnThread(), port, 0, thrd);
373 std::string portid =
dabc::format(
"%04X:%08X:%08X", (
unsigned) fIbContext.lid(), (
unsigned) port_qp->qp_num(), (
unsigned) port_qp->local_psn());
374 DOUT0(
"CREATE CONNECTION %s", portid.c_str());
376 ProtocolAddon* addon =
new ProtocolAddon(port_qp);
377 addon->fPortThrd << thrd;
387 addon->fReqItem = reqitem;
389 addon->fIbContext = fIbContext;
406 ProtocolAddon* proto =
dynamic_cast<ProtocolAddon*
> (prot_ref());
409 EOUT(
"SOMETHING WRONG - NO PROTOCOL addon for the connection request");
413 std::string remoteid;
421 if (sscanf(remoteid.c_str(),
"%X:%X:%X", &proto->fRemoteLID, &proto->fRemoteQPN, &proto->fRemotePSN)!=3) {
422 EOUT(
"Cannot decode remote id string %s", remoteid.c_str());
434 DOUT0(
"CONNECT TO REMOTE %04x:%08x:%08x - %s", proto->fRemoteLID, proto->fRemoteQPN, proto->fRemotePSN, remoteid.c_str());
437 if (proto->QP()->Connect(proto->fRemoteLID, proto->fRemoteQPN, proto->fRemotePSN)) {
439 proto->fPool =
new verbs::MemoryPool(fIbContext,
"HandshakePool", 1, 1024,
false);
441 proto->fLocalCmd = cmd;
444 proto->fPortThrd.MakeWorkerFor(proto);
453 EOUT(
"Request from connection manager in undefined situation progress = %d ???", req.
progress());
467 if (cmd.
IsName(dabc::CmdConnectionManagerHandle::CmdName())) {
469 cmd_res = HandleManagerConnectionRequest(cmd);
487 if (sscanf(s.c_str(),
488 "%2X:%2X:%2X:%2X:%2X:%2X:%2X:%2X:%2X:%2X:%2X:%2X:%2X:%2X:%2X:%2X",
489 raw, raw+1, raw+2, raw+3, raw+4, raw+5, raw+6, raw+7,
490 raw+8, raw+9, raw+10, raw+11, raw+12, raw+13, raw+14, raw+15) != 16)
return false;
491 for (
unsigned n=0;n<16;n++)
499 for (
unsigned n=0;n<16;n++)
504 dabc::formats(res,
"%02X:%02X:%02X:%02X:%02X:%02X:%02X:%02X:%02X:%02X:%02X:%02X:%02X:%02X:%02X:%02X",
505 raw[0], raw[1], raw[2], raw[3], raw[4], raw[5], raw[6], raw[7],
506 raw[8], raw[9], raw[10], raw[11], raw[12], raw[13], raw[14], raw[15]);
static const char * ReqArg()
Represents command with its arguments.
std::string GetStr(const std::string &name, const std::string &dflt="") const
@ progrDoingConnect
at this state device should drive connection itself and inform about completion or failure
@ progrDoingInit
state when record should be prepared by device
Full description of connection request.
std::string GetConnId() const
std::string GetConnInfo()
void SetCustomData(Reference ref)
Reference TakeCustomData()
std::string GetServerId() const
void ReplyRemoteCommand(bool res)
std::string GetClientId() const
void SetClientId(const std::string &id)
void SetServerId(const std::string &id)
std::string GetConnThread() const
Thread name for transport.
double GetConnTimeout() const
time required to establish connection, if expired connection will be switched to "failed" state
Reference GetPort() const
bool IsServerSide() const
Indicates if local node in connection is server or client.
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
Parameter FindPar(const std::string &parname)
ThreadRef FindThread(const std::string &name, const std::string &required_class="")
ThreadRef CreateThread(const std::string &thrdname, const std::string &classname="", const std::string &devname="")
WorkerRef GetModule() const
static bool Make(const ConnectionRequest &req, WorkerAddon *addon, const std::string &devthrdname="")
static void GetRequiredQueuesSizes(const PortRef &port, unsigned &input_size, unsigned &output_size)
unsigned NumReferences()
Return number of references on the object.
const char * GetName() const
Returns name of the object, thread safe
Reference on the dabc::Port class
bool IsInput() const
Returns true if it is input port.
std::string AsStr(const std::string &dflt="") const
Reference on the arbitrary object
void Release()
Releases reference on the object.
const char * GetName() const
Return name of referenced object, if object not assigned, returns "---".
bool IsName(const char *name) const
Returns true if object name is the same as specified one.
bool null() const
Returns true if reference contains nullptr.
Reference on the dabc::Thread class
Base class for transport implementations.
virtual void OnThreadAssigned()
std::string ThreadName() const
Returns thread name of worker assigned.
RecordField Cfg(const std::string &name, Command cmd=nullptr) const
Returns configuration record of specified name.
bool MakeThreadForWorker(const std::string &thrdname="")
Creates appropriate thread for worker and assign worker to the thread.
Wrapper for IB VERBS completion queue
Reference to verbs::Context
bool OpenVerbs(bool withmulticast=false, const char *devicename=0, int ibport=-1)
virtual double ProcessTimeout(double last_diff)
virtual int ExecuteCommand(dabc::Command cmd)
Main method where commands are executed.
virtual dabc::Transport * CreateTransport(dabc::Command cmd, const dabc::Reference &port)
QueuePair * CreatePortQP(const std::string &thrd_name, dabc::Reference port, int conn_type, dabc::ThreadRef &thrd)
dabc::ThreadRef MakeThread(const char *name, bool force=false)
int HandleManagerConnectionRequest(dabc::Command cmd)
static bool fThreadSafeVerbs
Device(const std::string &name)
Special memory pool, which automatically includes PoolRegistry.
Addon to establish and verify QP connection with remote node
dabc::ThreadRef fPortThrd
virtual double ProcessTimeout(double last_diff)
virtual void VerbsProcessOperError(uint32_t)
ProtocolAddon(QueuePair *qp)
virtual void OnThreadAssigned()
virtual void VerbsProcessRecvCompl(uint32_t)
virtual void VerbsProcessSendCompl(uint32_t)
dabc::Command fLocalCmd
command which should be replied when connection established or failed
Represent VERBS queue pair functionality.
struct ibv_qp * qp() const
Implementation of NetworkTransport for VERBS.
void formats(std::string &sbuf, const char *fmt,...)
std::string format(const char *fmt,...)
const char * xmlThreadAttr
Support of InfiniBand verbs.
const char * xmlMcastAddr
std::string ConvertGidToStr(ibv_gid &gid)
bool ConvertStrToGid(const std::string &s, ibv_gid &gid)
const int LoopBackBufferSize
const int LoopBackQueueSize