00001
00002
00003
00004
00005
00006
00007 #include <arpa/inet.h>
00008 #include <net/if.h>
00009 #include <sys/select.h>
00010 #include <cstdlib>
00011 #include "Xrd/XrdConfig.hh"
00012 #include "XrdOuc/XrdOucBonjour.hh"
00013 #include "XrdOuc/XrdOucFactoryBonjour.hh"
00014 #include "XrdSys/XrdSysError.hh"
00015 #include "Xrd/XrdInet.hh"
00016 #include "Xrd/XrdProtLoad.hh"
00017
00018
00019
00020
00021
00022 extern XrdConfig XrdConf;
00023 extern XrdSysError XrdLog;
00024 extern XrdInet *XrdNetTCP[];
00025
00026
00027
00028
00029
00030 void XrdOucAvahiBonjour::EntryGroupReply(AvahiEntryGroup *g,
00031 AvahiEntryGroupState state,
00032 void *userdata)
00033 {
00034 XrdOucBonjourRegisteredEntry * entry = (XrdOucBonjourRegisteredEntry *)userdata;
00035
00036 switch (state) {
00037 case AVAHI_ENTRY_GROUP_COLLISION: {
00038 char *n;
00039
00040
00041 n = avahi_alternative_service_name(entry->record->GetServiceName());
00042 entry->record->SetServiceName(n);
00043 XrdLog.Emsg("OucBonjour", "Renaming service to ", entry->record->GetServiceName());
00044 avahi_free(n);
00045
00046 RegisterEntries(entry);
00047 break;
00048 }
00049
00050 case AVAHI_ENTRY_GROUP_FAILURE:
00051
00052 XrdLog.Emsg("OucBonjour", avahi_strerror(avahi_client_errno(avahi_entry_group_get_client(g))));
00053 break;
00054 case AVAHI_ENTRY_GROUP_ESTABLISHED:
00055 case AVAHI_ENTRY_GROUP_UNCOMMITED:
00056 case AVAHI_ENTRY_GROUP_REGISTERING:
00057
00058 break;
00059 default:
00060 XrdLog.Emsg("OucBonjour", "Invalid Avahi group creation response callback");
00061 }
00062 }
00063
00064 void XrdOucAvahiBonjour::RegisterEntries(XrdOucBonjourRegisteredEntry * entry)
00065 {
00066 int ret;
00067 char *n;
00068 AvahiStringList *list = NULL;
00069
00070
00071 if (!entry->avahiRef.avahiEntryGroup) {
00072 entry->avahiRef.avahiEntryGroup = avahi_entry_group_new(entry->avahiRef.avahiClient,
00073 EntryGroupReply,
00074 (void *)entry);
00075 if (!entry->avahiRef.avahiEntryGroup) {
00076 XrdLog.Emsg("OucBonjour", "Unable to register service entries");
00077 XrdLog.Emsg("OucBonjour", avahi_strerror(avahi_client_errno(entry->avahiRef.avahiClient)));
00078 return;
00079 }
00080 }
00081
00082
00083 if (avahi_entry_group_is_empty(entry->avahiRef.avahiEntryGroup)) {
00084
00085 list = entry->record->GetTXTAvahiList();
00086
00087 if ((ret = avahi_entry_group_add_service_strlst(entry->avahiRef.avahiEntryGroup,
00088 AVAHI_IF_UNSPEC,
00089 AVAHI_PROTO_INET,
00090 (AvahiPublishFlags)0,
00091 entry->record->GetServiceName(),
00092 entry->record->GetRegisteredType(),
00093 entry->record->GetReplyDomain(),
00094 NULL,
00095 entry->port,
00096 list)) < 0) {
00097 if (ret == AVAHI_ERR_COLLISION) {
00098
00099
00100 n = avahi_alternative_service_name(entry->record->GetServiceName());
00101 entry->record->SetServiceName(n);
00102 XrdLog.Emsg("OucBonjour", "Renaming service to ", entry->record->GetServiceName());
00103 avahi_free(n);
00104
00105 RegisterEntries(entry);
00106 return;
00107 }
00108 }
00109
00110
00111
00112 if ((ret = avahi_entry_group_commit(entry->avahiRef.avahiEntryGroup)) < 0) {
00113 XrdLog.Emsg("OucBonjour", "Unable to commit entry registration ", avahi_strerror(ret));
00114 }
00115
00116
00117 if (list)
00118 avahi_string_list_free(list);
00119 }
00120 }
00121
00122 void XrdOucAvahiBonjour::RegisterReply(AvahiClient *c,
00123 AvahiClientState state,
00124 void * userdata)
00125 {
00126 XrdOucBonjourRegisteredEntry * entry = (XrdOucBonjourRegisteredEntry *)userdata;
00127
00128
00129 entry->avahiRef.avahiClient = c;
00130
00131 switch (state) {
00132 case AVAHI_CLIENT_S_RUNNING:
00133
00134 RegisterEntries(entry);
00135 break;
00136 case AVAHI_CLIENT_FAILURE:
00137 XrdLog.Emsg("OucBonjour", avahi_strerror(avahi_client_errno(c)));
00138 break;
00139 case AVAHI_CLIENT_S_COLLISION:
00140 case AVAHI_CLIENT_S_REGISTERING:
00141
00142 if (entry->avahiRef.avahiEntryGroup)
00143 avahi_entry_group_reset(entry->avahiRef.avahiEntryGroup);
00144 break;
00145 case AVAHI_CLIENT_CONNECTING:
00146
00147 break;
00148 default:
00149 XrdLog.Emsg("OucBonjour", "Invalid Avahi register response callback");
00150 }
00151 }
00152
00153 int XrdOucAvahiBonjour::RegisterService(XrdOucBonjourRecord &record, unsigned short port)
00154 {
00155 XrdOucBonjourRegisteredEntry * entry;
00156 int err;
00157
00158
00159 if (port == 0)
00160 port = (XrdNetTCP[0] == XrdNetTCP[XrdProtLoad::ProtoMax]
00161 ? -(XrdNetTCP[0]->Port()) : XrdNetTCP[0]->Port());
00162
00163
00164 entry = (XrdOucBonjourRegisteredEntry *)malloc(sizeof(XrdOucBonjourRegisteredEntry));
00165 if (!entry)
00166 return -1;
00167
00168 entry->record = new XrdOucBonjourRecord(record);
00169 entry->port = port;
00170 entry->avahiRef.avahiEntryGroup = NULL;
00171
00172
00173 entry->avahiRef.avahiClient = avahi_client_new(avahi_simple_poll_get(poller),
00174 (AvahiClientFlags)0,
00175 RegisterReply,
00176 (void *)entry,
00177 &err);
00178
00179 if (!entry->avahiRef.avahiClient) {
00180 XrdLog.Emsg("OucBonjour", err, "Regigster service", record.GetRegisteredType());
00181 XrdLog.Emsg("OucBonjour", err, avahi_strerror(err));
00182
00183 delete entry->record;
00184 free(entry);
00185 return -1;
00186 }
00187
00188
00189
00190 ListOfRegistrations.push_back(entry);
00191 return 0;
00192 }
00193
00194
00195
00196
00197
00198 void XrdOucAvahiBonjour::BrowseReply(AvahiServiceBrowser *b,
00199 AvahiIfIndex interface,
00200 AvahiProtocol protocol,
00201 AvahiBrowserEvent event,
00202 const char *name,
00203 const char *type,
00204 const char *domain,
00205 AvahiLookupResultFlags flags,
00206 void* userdata)
00207 {
00208 XrdOucBonjourSubscribedEntry * callbackID;
00209 XrdOucAvahiBonjour *instance;
00210 XrdOucAvahiBonjourSearchNode predicate(name);
00211 XrdOucBonjourResolutionEntry * toResolve;
00212
00213 callbackID = (XrdOucBonjourSubscribedEntry *)userdata;
00214
00215
00216 instance = &XrdOucAvahiBonjour::getInstance();
00217
00218 switch (event) {
00219 case AVAHI_BROWSER_FAILURE:
00220 XrdLog.Emsg("OucBonjour", avahi_strerror(avahi_client_errno(callbackID->client)));
00221 return;
00222
00223 case AVAHI_BROWSER_NEW:
00224
00225 instance->LockNodeList();
00226
00227 toResolve = (XrdOucBonjourResolutionEntry *)malloc(sizeof(XrdOucBonjourResolutionEntry));
00228 toResolve->node = new XrdOucBonjourNode(name, type, domain);
00229 toResolve->callbackID = callbackID;
00230
00231 XrdLog.Say("------ XrdOucBonjour: discovered a new node: ", name);
00232
00233
00234 if (!(avahi_service_resolver_new(callbackID->client, interface, protocol, name, type, domain, AVAHI_PROTO_INET, (AvahiLookupFlags)0, ResolveReply, toResolve)))
00235 XrdLog.Emsg("OucBonjour", avahi_strerror(avahi_client_errno(callbackID->client)));
00236
00237
00238 instance->UnLockNodeList();
00239
00240 break;
00241
00242 case AVAHI_BROWSER_REMOVE:
00243
00244
00245 instance->LockNodeList();
00246 instance->ListOfNodes.remove_if(predicate);
00247 instance->UnLockNodeList();
00248
00249 XrdLog.Say("------ XrdOucBonjour: the node ", name, " went out the network");
00250
00251
00252 callbackID->callback(callbackID->context);
00253
00254 break;
00255
00256 case AVAHI_BROWSER_ALL_FOR_NOW:
00257 case AVAHI_BROWSER_CACHE_EXHAUSTED:
00258 break;
00259 }
00260
00261 }
00262
00263 void XrdOucAvahiBonjour::ClientReply(AvahiClient *c,
00264 AvahiClientState state,
00265 void * userdata)
00266 {
00267 if (state == AVAHI_CLIENT_FAILURE) {
00268 XrdLog.Emsg("OucBonjour", avahi_strerror(avahi_client_errno(c)));
00269 }
00270 }
00271
00272 void * XrdOucAvahiBonjour::BrowseEventLoopThread(void * context)
00273 {
00274 AvahiSimplePoll *simple_poll = NULL;
00275 AvahiServiceBrowser *sb = NULL;
00276 int error;
00277 XrdOucBonjourSubscribedEntry * callbackID;
00278
00279 callbackID = (XrdOucBonjourSubscribedEntry *)context;
00280
00281
00282 if (!(simple_poll = avahi_simple_poll_new())) {
00283 XrdLog.Emsg("OucBonjour", "Failed to create the poller discovery object");
00284 return NULL;
00285 }
00286
00287
00288 callbackID->client = avahi_client_new(avahi_simple_poll_get(simple_poll), (AvahiClientFlags)0, ClientReply, callbackID, &error);
00289
00290
00291 if (!callbackID->client) {
00292 XrdLog.Emsg("OucBonjour", error, avahi_strerror(error));
00293 avahi_simple_poll_free(simple_poll);
00294 return NULL;
00295 }
00296
00297
00298 if (!(sb = avahi_service_browser_new(callbackID->client, AVAHI_IF_UNSPEC, AVAHI_PROTO_INET, callbackID->serviceType->c_str(), NULL, (AvahiLookupFlags)0, BrowseReply, callbackID))) {
00299 XrdLog.Emsg("OucBonjour", avahi_strerror(avahi_client_errno(callbackID->client)));
00300 avahi_client_free(callbackID->client);
00301 avahi_simple_poll_free(simple_poll);
00302 return NULL;
00303 }
00304
00305
00306 avahi_simple_poll_loop(simple_poll);
00307
00308 XrdLog.Emsg("OucBonjour", "Event loop thread terminated abnormally");
00309
00310 return NULL;
00311 }
00312
00313
00314
00315
00316
00317 int XrdOucAvahiBonjour::SubscribeForUpdates(const char * servicetype,
00318 XrdOucBonjourUpdateCallback callback,
00319 void * context)
00320 {
00321 pthread_t thread;
00322 XrdOucBonjourSubscribedEntry * callbackID = (XrdOucBonjourSubscribedEntry *)malloc(sizeof(XrdOucBonjourSubscribedEntry));
00323 callbackID->callback = callback;
00324 callbackID->context = context;
00325 callbackID->serviceType = new XrdOucString(servicetype);
00326 callbackID->client = NULL;
00327
00328
00329 return XrdSysThread::Run(&thread, BrowseEventLoopThread, callbackID);
00330 }
00331
00332
00333
00334
00335
00336 void XrdOucAvahiBonjour::ResolveReply(AvahiServiceResolver *r,
00337 AvahiIfIndex interface,
00338 AvahiProtocol protocol,
00339 AvahiResolverEvent event,
00340 const char *name,
00341 const char *type,
00342 const char *domain,
00343 const char *host_name,
00344 const AvahiAddress *address,
00345 uint16_t port,
00346 AvahiStringList *txt,
00347 AvahiLookupResultFlags flags,
00348 void* userdata)
00349 {
00350 XrdOucBonjourSubscribedEntry * callbackID;
00351 XrdOucBonjourResolutionEntry * toResolve;
00352 XrdOucAvahiBonjour *instance;
00353 AvahiStringList * iterator;
00354 char * key, * value, address_str[AVAHI_ADDRESS_STR_MAX];
00355 size_t size;
00356
00357 toResolve = static_cast<XrdOucBonjourResolutionEntry *>(userdata);
00358 callbackID = toResolve->callbackID;
00359
00360 switch (event) {
00361 case AVAHI_RESOLVER_FAILURE:
00362 XrdLog.Emsg("OucBonjour", avahi_strerror(avahi_client_errno(avahi_service_resolver_get_client(r))));
00363 break;
00364
00365 case AVAHI_RESOLVER_FOUND:
00366 instance = &XrdOucAvahiBonjour::getInstance();
00367 instance->LockNodeList();
00368
00369 if (avahi_nss_support()) {
00370
00371
00372 toResolve->node->SetHostName(host_name);
00373 } else {
00374
00375
00376 avahi_address_snprint(address_str, sizeof(address_str), address);
00377 toResolve->node->SetHostName(address_str);
00378 }
00379
00380 toResolve->node->SetPort(port);
00381
00382
00383 iterator = txt;
00384 while (iterator != NULL) {
00385
00386 avahi_string_list_get_pair(iterator, &key, &value, &size);
00387
00388 toResolve->node->GetBonjourRecord().AddTXTRecord(key, value);
00389
00390 avahi_free(key);
00391 if (value)
00392 avahi_free(value);
00393
00394 iterator = avahi_string_list_get_next(iterator);
00395 }
00396
00397
00398 instance->ListOfNodes.push_back(toResolve->node);
00399 instance->UnLockNodeList();
00400 }
00401
00402
00403
00404 avahi_service_resolver_free(r);
00405
00406
00407 free(toResolve);
00408
00409
00410 if (event == AVAHI_RESOLVER_FOUND)
00411 callbackID->callback(callbackID->context);
00412 }
00413
00414 int XrdOucAvahiBonjour::ResolveNodeInformation(XrdOucBonjourResolutionEntry * nodeAndCallback)
00415 {
00416
00417
00418 return 0;
00419 }
00420
00421
00422
00423
00424
00425 bool XrdOucAvahiBonjour::XrdOucAvahiBonjourSearchNode::operator()(XrdOucBonjourNode * value)
00426 {
00427 return strcmp(value->GetBonjourRecord().GetServiceName(), ServiceName) == 0;
00428 }
00429
00430 XrdOucAvahiBonjour * XrdOucAvahiBonjour::_Instance = NULL;
00431
00432 XrdSysMutex XrdOucAvahiBonjour::SingletonMutex;
00433
00434 XrdOucAvahiBonjour::XrdOucAvahiBonjour()
00435 {
00436 char *env = strdup("AVAHI_COMPAT_NOWARN=1");
00437 putenv(env);
00438 poller = avahi_simple_poll_new();
00439 }
00440
00441 XrdOucAvahiBonjour::~XrdOucAvahiBonjour()
00442 {
00443 if (poller)
00444 avahi_simple_poll_free(poller);
00445 }
00446
00447
00448
00449
00450
00451
00452 XrdOucAvahiBonjour &XrdOucAvahiBonjour::getInstance()
00453 {
00454
00455
00456 static XrdOucAvahiBonjourSingletonCleanup cleanGuard;
00457
00458 SingletonMutex.Lock();
00459 if (!_Instance)
00460 _Instance = new XrdOucAvahiBonjour();
00461 SingletonMutex.UnLock();
00462
00463 return *_Instance;
00464 }
00465
00466 XrdOucAvahiBonjour::XrdOucAvahiBonjourSingletonCleanup::~XrdOucAvahiBonjourSingletonCleanup()
00467 {
00468 SingletonMutex.Lock();
00469 if (_Instance) {
00470 delete XrdOucAvahiBonjour::_Instance;
00471 XrdOucAvahiBonjour::_Instance = NULL;
00472 }
00473 SingletonMutex.UnLock();
00474 }