1 /*----------------------------------------------------------------------------
   2 Name:      XmlBlasterAccessUnparsed.c
   3 Project:   xmlBlaster.org
   4 Copyright: xmlBlaster.org, see xmlBlaster-LICENSE file
   5 Comment:   Wraps raw socket connection to xmlBlaster
   6            Implements sync connection and async callback
   7            Needs pthread to compile (multi threading).
   8 Author:    "Marcel Ruff" <xmlBlaster@marcelruff.info>
   9 Compile:
  10   LINUX:   gcc -DXmlBlasterAccessUnparsedMain -D_ENABLE_STACK_TRACE_ -rdynamic -export-dynamic -Wall -pedantic -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread
  11            g++ -DXmlBlasterAccessUnparsedMain -DXMLBLASTER_C_COMPILE_AS_CPP -Wall -pedantic -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread
  12            icc -DXmlBlasterAccessUnparsedMain -D_ENABLE_STACK_TRACE_ -rdynamic -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread
  13   WIN:     cl /MT /W4 -DXmlBlasterAccessUnparsedMain -D_WINDOWS -I.. -I../pthreads /FeXmlBlasterAccessUnparsedMain.exe  XmlBlasterAccessUnparsed.c ..\util\msgUtil.c ..\util\Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c ws2_32.lib pthreadVC2.lib
  14            (download pthread for Windows and WinCE from http://sources.redhat.com/pthreads-win32)
  15   Solaris: cc  -DXmlBlasterAccessUnparsedMain -v -Xc -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread -lsocket -lnsl
  16            CC  -DXmlBlasterAccessUnparsedMain -DXMLBLASTER_C_COMPILE_AS_CPP -g -D_REENTRANT -I.. -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c ../util/msgUtil.c ../util/Properties.c xmlBlasterSocket.c XmlBlasterConnectionUnparsed.c CallbackServerUnparsed.c -lpthread -lsocket -lnsl
  17 
  18   Linux with libxmlBlasterC.so:
  19            gcc -DXmlBlasterAccessUnparsedMain -o XmlBlasterAccessUnparsedMain XmlBlasterAccessUnparsed.c  -L../../../lib -lxmlBlasterClientC -I.. -Wl,-rpath=../../../lib -D_REENTRANT  -lpthread
  20 See:       http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
  21 -----------------------------------------------------------------------------*/
  22 #include <stdio.h>
  23 #include <stdlib.h>
  24 #include <string.h>
  25 #if defined(WINCE)
  26 #  if defined(XB_USE_PTHREADS)
  27 #     include <pthreads/pthread.h>
  28 #  else
  29       /*#include <pthreads/need_errno.h> */
  30       static int errno=0; /* single threaded workaround*/
  31 #  endif
  32 #else
  33 #  include <errno.h>
  34 #  include <sys/types.h>
  35 #endif
  36 #include <socket/xmlBlasterSocket.h>
  37 #include <socket/xmlBlasterZlib.h>
  38 #include <XmlBlasterAccessUnparsed.h>
  39 #include <util/Timestampc.h>
  40 
  41 /**
  42  * Little helper to collect args for the new created thread
  43  */
  44 typedef struct Dll_Export UpdateContainer {
  45    XmlBlasterAccessUnparsed *xa;
  46    MsgUnitArr *msgUnitArrP;
  47    void *userData;
  48    XmlBlasterException exception;     /* Holding a clone from the original as the callback thread may use it for another message */
  49    SocketDataHolder socketDataHolder; /* Holding a clone from the original */
  50 } UpdateContainer;
  51 
  52 static bool initialize(XmlBlasterAccessUnparsed *xa, UpdateFp update, XmlBlasterException *exception);
  53 static char *xmlBlasterConnect(XmlBlasterAccessUnparsed *xa, const char * const qos, UpdateFp update, XmlBlasterException *exception);
  54 static bool xmlBlasterDisconnect(XmlBlasterAccessUnparsed *xa, const char * const qos, XmlBlasterException *exception);
  55 static char *xmlBlasterPublish(XmlBlasterAccessUnparsed *xa, MsgUnit *msgUnit, XmlBlasterException *exception);
  56 static QosArr *xmlBlasterPublishArr(XmlBlasterAccessUnparsed *xa, MsgUnitArr *msgUnitArr, XmlBlasterException *exception);
  57 static void xmlBlasterPublishOneway(XmlBlasterAccessUnparsed *xa, MsgUnitArr *msgUnitArr, XmlBlasterException *exception);
  58 static char *xmlBlasterSubscribe(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception);
  59 static QosArr *xmlBlasterUnSubscribe(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception);
  60 static QosArr *xmlBlasterErase(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception);
  61 static MsgUnitArr *xmlBlasterGet(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception);
  62 static char *xmlBlasterPing(XmlBlasterAccessUnparsed *xa, const char * const qos, XmlBlasterException *exception);
  63 static bool isConnected(XmlBlasterAccessUnparsed *xa);
  64 static void responseEvent(MsgRequestInfo *msgRequestInfoP, void /*SocketDataHolder*/ *socketDataHolder);
  65 static MsgRequestInfo *preSendEvent(MsgRequestInfo *msgRequestInfo, XmlBlasterException *exception);
  66 static MsgRequestInfo *postSendEvent(MsgRequestInfo *msgRequestInfo, XmlBlasterException *exception);
  67 static bool checkArgs(XmlBlasterAccessUnparsed *xa, const char *methodName, bool checkIsConnected, XmlBlasterException *exception);
  68 static void interceptUpdate(MsgUnitArr *msgUnitArr, void *userData, XmlBlasterException *xmlBlasterException, void/*SocketDataHolder*/ *socketDataHolder);
  69 static bool mutexUnlock(MsgRequestInfo *msgRequestInfoP, XmlBlasterException *exception);
  70 static ssize_t writenPlain(void *xa, const int fd, const char *ptr, const size_t nbytes);
  71 static ssize_t writenCompressed(void *xa, const int fd, const char *ptr, const size_t nbytes);
  72 static ssize_t readnPlain(void * userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void * userP2);
  73 static ssize_t readnCompressed(void *userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void * userP2);
  74 
  75 Dll_Export XmlBlasterAccessUnparsed *getXmlBlasterAccessUnparsed(int argc, const char* const* argv) {
  76    XmlBlasterAccessUnparsed * const xa = (XmlBlasterAccessUnparsed *)calloc(1, sizeof(XmlBlasterAccessUnparsed));
  77    if (xa == 0) return xa;
  78    xa->argc = argc;
  79    xa->argv = argv;
  80    xa->props = createProperties(xa->argc, xa->argv);
  81    if (xa->props == 0) {
  82       freeXmlBlasterAccessUnparsed(xa);
  83       return (XmlBlasterAccessUnparsed *)0;
  84    }
  85    xa->isInitialized = false;
  86    xa->isShutdown = false;
  87    xa->connectionP = 0;
  88    xa->callbackP = 0;
  89    xa->userObject = 0; /* A client can use this pointer to point to any client specific information */
  90    xa->userFp = 0;
  91    xa->connect = xmlBlasterConnect;
  92    xa->initialize = initialize;
  93    xa->disconnect = xmlBlasterDisconnect;
  94    xa->publish = xmlBlasterPublish;
  95    xa->publishArr = xmlBlasterPublishArr;
  96    xa->publishOneway = xmlBlasterPublishOneway;
  97    xa->subscribe = xmlBlasterSubscribe;
  98    xa->unSubscribe = xmlBlasterUnSubscribe;
  99    xa->erase = xmlBlasterErase;
 100    xa->get = xmlBlasterGet;
 101    xa->ping = xmlBlasterPing;
 102    xa->isConnected = isConnected;
 103    xa->logLevel = parseLogLevel(xa->props->getString(xa->props, "logLevel", "WARN"));
 104    xa->log = xmlBlasterDefaultLogging;
 105    xa->logUserP = 0;
 106    xa->clientsUpdateFp = 0;
 107    xa->callbackMultiThreaded = xa->props->getBool(xa->props, "plugin/socket/multiThreaded", true);
 108    xa->callbackMultiThreaded = xa->props->getBool(xa->props, "dispatch/callback/plugin/socket/multiThreaded", xa->callbackMultiThreaded);
 109    /*   xa->lowLevelAutoAck = xa->props->getBool(xa->props, "plugin/socket/lowLevelAutoAck", false); */
 110    /*   xa->lowLevelAutoAck = xa->props->getBool(xa->props, "dispatch/callback/plugin/socket/lowLevelAutoAck", xa->lowLevelAutoAck); */
 111    /* Currently forced to false: needs mutex and reference counter to not freeMsgUnitArr twice */
 112    xa->lowLevelAutoAck = false;
 113 
 114    /* We shouldn't do much logging here, as the caller had no chance to redirect it up to now */
 115    if (xa->callbackMultiThreaded == true) {
 116       if (xa->logLevel>=XMLBLASTER_LOG_DUMP)
 117          xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_DUMP, __FILE__, "Multi threaded callback delivery is activated with -plugin/socket/multiThreaded true");
 118       /*xa->callbackMultiThreaded = false;*/
 119    }
 120    /* stdint.h: # define INT32_MAX              (2147483647) */
 121    xa->responseTimeout = xa->props->getLong(xa->props, "plugin/socket/responseTimeout", 2147483647L); /* Before xmlBlaster 1.1: One minute (given in millis) */
 122    xa->responseTimeout = xa->props->getLong(xa->props, "dispatch/connection/plugin/socket/responseTimeout", xa->responseTimeout);
 123    /* ERROR HANDLING ? xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_WARN, __FILE__, "Your configuration '-plugin/socket/responseTimeout %s' is invalid", argv[iarg]); */
 124    memset(&xa->callbackThreadId, 0, sizeof(pthread_t));
 125    xa->threadCounter = 0;
 126 
 127    if (xa->logLevel>=XMLBLASTER_LOG_DUMP) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_DUMP, __FILE__,
 128                                 "Created handle: -logLevel=%s -plugin/socket/responseTimeout=%ld",
 129                                 getLogLevelStr(xa->logLevel), xa->responseTimeout);
 130 
 131    /* See: http://www.llnl.gov/computing/tutorials/workshops/workshop/pthreads/MAIN.html */
 132    pthread_mutex_init(&xa->writenMutex, NULL); /* returns always 0 */
 133    pthread_mutex_init(&xa->readnMutex, NULL);
 134    return xa;
 135 }
 136 
 137 Dll_Export void freeXmlBlasterAccessUnparsed(XmlBlasterAccessUnparsed *xa)
 138 {
 139    int rc;
 140 
 141    if (xa == 0) {
 142       char *stack = getStackTrace(10);
 143       printf("[%s:%d] Please provide a valid XmlBlasterAccessUnparsed pointer to freeXmlBlasterAccessUnparsed() %s",
 144                 __FILE__, __LINE__, stack);
 145       free(stack);
 146       return;
 147    }
 148 
 149    if (xa->isShutdown) return; /* Avoid simultaneous multiple calls */
 150    xa->isShutdown = true;      /* Inhibit access to xa */
 151 
 152    if (xa->callbackP != 0) {
 153       xa->callbackP->shutdown(xa->callbackP);
 154    }
 155    if (xa->connectionP != 0) {
 156       xa->connectionP->shutdown(xa->connectionP);
 157    }
 158 
 159    if (xa->callbackP != 0) {
 160       /* Detach or join? On Linux both work fine. On Windows it blocks sometimes forever during join */
 161       const bool USE_DETACH_MODE = xa->props->getBool(xa->props, "plugin/socket/detachCbThread", true);
 162       int retVal;
 163       if (xa->callbackP->threadIsAlive && !USE_DETACH_MODE) {
 164          /* pthread_cancel() does not block. Who cleans up open resources? TODO: pthread_cleanup_push() */
 165          /* On Linux all works fine without pthread_cancel() but on Windows the later pthread_join() sometimes hangs without a pthread_cancel() */
 166          /*
 167          retVal = pthread_cancel(xa->callbackThreadId);
 168          if (retVal != 0) {
 169             xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_cancel problem return value is %d", retVal);
 170          }
 171          */
 172       }
 173 
 174       if (USE_DETACH_MODE) {
 175          /* Check if above xa->callbackP->shutdown(xa->callbackP) thread has finished: */
 176          /*bool hasTerminated = */xa->callbackP->waitOnCallbackThreadTermination(xa->callbackP, 2000);
 177 
 178          retVal = pthread_detach(xa->callbackThreadId); /* Frees resources (even if thread has died already), don't call multiple times on same thread! */
 179          if (retVal != 0) {
 180             xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "[%d] Detaching callback thread 0x%x failed with error number %d", __LINE__, get_pthread_id(xa->callbackThreadId), retVal);
 181          }
 182          else {
 183             if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
 184                                           "pthread_detach(id=%ld) succeeded for callback server thread", get_pthread_id(xa->callbackThreadId));
 185          }
 186       }
 187       else { /* JOIN mode */
 188          retVal = pthread_join(xa->callbackThreadId, 0);
 189          if (retVal != 0) {
 190             xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_join problem return value is %d", retVal);
 191          }
 192          else {
 193             if (xa->logLevel>=XMLBLASTER_LOG_INFO) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_INFO, __FILE__,
 194                                           "pthread_join(id=%ld) succeeded for callback server thread", get_pthread_id(xa->callbackThreadId));
 195          }
 196       }
 197 
 198       memset(&xa->callbackThreadId, 0, sizeof(pthread_t));
 199    }
 200 
 201    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "freeXmlBlasterAccessUnparsed() conP=0x%x cbP=0x%x", xa->connectionP, xa->callbackP);
 202 
 203    {  /* Wait for any pending update() dispatcher threads to die */
 204       int i;
 205       int num = 1000;
 206       int interval = 10;
 207       for (i=0; i<num; i++) {
 208          if ((int)xa->threadCounter < 1)
 209             break;
 210          sleepMillis(interval);
 211          if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
 212              "freeXmlBlasterAccessUnparsed(): Sleeping %d millis for update thread to join. %d/%d", interval, i, num);
 213       }
 214       if (i >= num) {
 215          if (xa->logLevel>=XMLBLASTER_LOG_ERROR) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__,
 216              "freeXmlBlasterAccessUnparsed(): There are active callback threads in user code which didn't return after sleeping for %ld millis, we continue now to shutdown ...", (long)interval*num);
 217       }
 218    }
 219 
 220    if (xa->connectionP != 0) {
 221       freeXmlBlasterConnectionUnparsed(&xa->connectionP);
 222    }
 223 
 224    if (xa->callbackP != 0) {
 225       freeCallbackServerUnparsed(&xa->callbackP);
 226    }
 227 
 228    freeProperties(xa->props);
 229 
 230    rc = pthread_mutex_destroy(&xa->writenMutex); /* On Linux this does nothing, but returns an error code EBUSY if the mutex was locked */
 231    if (rc != 0) /* EBUSY=16 "Device or resource busy": char *strerror_r(int errnum, char *buf, size_t buflen); */
 232       xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_WARN, __FILE__, "pthread_mutex_destroy(writenMutex) returned %d, we ignore it", rc);
 233 
 234    rc = pthread_mutex_destroy(&xa->readnMutex);
 235    if (rc != 0) /* EBUSY */
 236       xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_WARN, __FILE__, "pthread_mutex_destroy(readnMutex) returned %d, we ignore it", rc);
 237 
 238    free(xa);
 239 }
 240 
 241 static bool initialize(XmlBlasterAccessUnparsed *xa, UpdateFp clientUpdateFp, XmlBlasterException *exception)
 242 {
 243    int threadRet = 0;
 244    const char *compressType = 0;
 245 
 246    if (checkArgs(xa, "initialize", false, exception) == false) return false;
 247 
 248    if (xa->isInitialized) {
 249       return true;
 250    }
 251 
 252    if (clientUpdateFp == 0) {
 253       xa->clientsUpdateFp = 0;
 254       xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_INFO, "",
 255         "Your callback UpdateFp pointer is NULL, we use our default callback handler");
 256    }
 257    else {
 258       xa->clientsUpdateFp = clientUpdateFp;
 259    }
 260 
 261    if (xa->connectionP) {
 262       freeXmlBlasterConnectionUnparsed(&xa->connectionP);
 263    }
 264    xa->connectionP = getXmlBlasterConnectionUnparsed(xa->argc, xa->argv);
 265    if (xa->connectionP == 0) {
 266       strncpy0(exception->errorCode, "resource.outOfMemory", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 267       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
 268                "[%.100s:%d] Creating XmlBlasterConnectionUnparsed failed", __FILE__, __LINE__);
 269       if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
 270       return false;
 271    }
 272    xa->connectionP->log = xa->log;
 273    xa->connectionP->logUserP = xa->logUserP;
 274    xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Created XmlBlasterConnectionUnparsed");
 275 
 276 
 277    /* Switch on compression? */
 278    compressType = xa->props->getString(xa->props, "plugin/socket/compress/type", "");
 279    compressType = xa->props->getString(xa->props, "dispatch/connection/plugin/socket/compress/type", compressType);
 280 
 281    if (!strcmp(compressType, "zlib:stream")) {
 282       xa->connectionP->writeToSocket.writeToSocketFuncP = writenCompressed;
 283       xa->connectionP->writeToSocket.userP = xa;
 284       xa->connectionP->readFromSocket.readFromSocketFuncP = readnCompressed;
 285       xa->connectionP->readFromSocket.userP = xa;
 286    }
 287    else {
 288       if (strcmp(compressType, "")) {
 289          xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Unsupported compression type 'plugin/socket/compress/type=%s', falling back to plain mode", compressType);
 290       }
 291       xa->connectionP->writeToSocket.writeToSocketFuncP = writenPlain;
 292       xa->connectionP->writeToSocket.userP = xa;
 293       xa->connectionP->readFromSocket.readFromSocketFuncP = readnPlain;
 294       xa->connectionP->readFromSocket.userP = xa;
 295    }
 296 
 297    if (xa->connectionP->initConnection(xa->connectionP, exception) == false) /* Establish low level IP connection */
 298       return false;
 299 
 300    /* the fourth arg 'xa' is returned as 'void *userData' in update() method */
 301    if (xa->callbackP != 0) {
 302       freeCallbackServerUnparsed(&xa->callbackP);
 303    }
 304    xa->callbackP = getCallbackServerUnparsed(xa->argc, xa->argv, interceptUpdate, xa);
 305    if (xa->callbackP == 0) {
 306       strncpy0(exception->errorCode, "resource.outOfMemory", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 307       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
 308                "[%.100s:%d] Creating CallbackServerUnparsed failed", __FILE__, __LINE__);
 309       if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
 310       freeXmlBlasterConnectionUnparsed(&xa->connectionP);
 311       return false;
 312    }
 313    xa->callbackP->log = xa->log;
 314    xa->callbackP->logUserP = xa->logUserP;
 315 
 316    if (!strcmp(compressType, "zlib:stream")) {
 317       xa->callbackP->writeToSocket.writeToSocketFuncP = writenCompressed;
 318       xa->callbackP->writeToSocket.userP = xa;
 319       xa->callbackP->readFromSocket.readFromSocketFuncP = readnCompressed;
 320       xa->callbackP->readFromSocket.userP = xa;
 321    }
 322    else {
 323       xa->callbackP->writeToSocket.writeToSocketFuncP = writenPlain;
 324       xa->callbackP->writeToSocket.userP = xa;
 325       xa->callbackP->readFromSocket.readFromSocketFuncP = readnPlain;
 326       xa->callbackP->readFromSocket.userP = xa;
 327    }
 328 
 329    xa->callbackP->useThisSocket(xa->callbackP, xa->connectionP->socketToXmlBlaster, xa->connectionP->socketToXmlBlasterUdp);
 330 
 331    xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_INFO, __FILE__,
 332           "Created CallbackServerUnparsed instance, creating on a separate thread a listener on socket://%s:%d...",
 333           (xa->callbackP->hostCB == 0) ? "" : xa->callbackP->hostCB, xa->callbackP->portCB);
 334 
 335    /* Register our callback funtion which is called just before sending a message */
 336    xa->connectionP->preSendEvent = preSendEvent;
 337    xa->connectionP->preSendEvent_userP = xa;
 338 
 339    /* Register our callback funtion which is called just after sending a message */
 340    xa->connectionP->postSendEvent = postSendEvent;
 341    xa->connectionP->postSendEvent_userP = xa;
 342 
 343    /* thread blocks on socket listener or on socket read (if useThisSocket) */
 344    threadRet = pthread_create(&xa->callbackThreadId, (const pthread_attr_t *)0, (void * (*)(void *))xa->callbackP->runCallbackServer, (void *)xa->callbackP);
 345    if (threadRet != 0) {
 346       strncpy0(exception->errorCode, "resource.tooManyThreads", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 347       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
 348                "[%.100s:%d] Creating thread failed with error number %d",
 349                __FILE__, __LINE__, threadRet);
 350       if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
 351       freeCallbackServerUnparsed(&xa->callbackP);
 352       freeXmlBlasterConnectionUnparsed(&xa->connectionP);
 353       return false;
 354    }
 355    /* bool hasStarted = */xa->callbackP->waitOnCallbackThreadAlive(xa->callbackP, 5000);
 356 
 357    xa->isInitialized = true;
 358    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
 359                                 "initialize() successful");
 360    return xa->isInitialized;
 361 }
 362 
 363 static bool isConnected(XmlBlasterAccessUnparsed *xa)
 364 {
 365    if (xa == 0 || xa->isShutdown || xa->connectionP == 0) {
 366       return false;
 367    }
 368    return xa->connectionP->isConnected(xa->connectionP);
 369 }
 370 
 371 /**
 372  * Callback from #XmlBlasterConnectionUnparsed just before a message is sent,
 373  * the msgRequestInfo contains the requestId used.
 374  * This is the clients calling thread.
 375  * @param msgRequestInfoP Contains some informations about the request, may not be NULL
 376  * @param exception May not be NULL
 377  * @return The same (or a manipulated/encrypted) msgRequestInfo, if NULL the exception is filled.
 378  *         If msgRequestInfoP->blob.data was changed and malloc()'d by you, the caller will free() it.
 379  *         If you return NULL you need to call removeResponseListener() to avoid a memory leak.
 380  */
 381 static MsgRequestInfo *preSendEvent(MsgRequestInfo *msgRequestInfoP, XmlBlasterException *exception)
 382 {
 383    bool retBool;
 384    int retInt;
 385    XmlBlasterAccessUnparsed * const xa = (XmlBlasterAccessUnparsed *)msgRequestInfoP->xa;
 386 
 387    /* if (!strcmp(XMLBLASTER_PUBLISH_ONEWAY, msgRequestInfoP->methodName)) */
 388    if (xbl_isOneway(MSG_TYPE_INVOKE, msgRequestInfoP->methodName))
 389       return msgRequestInfoP;
 390 
 391    /* ======== Initialize threading ====== */
 392    msgRequestInfoP->responseMutexIsValid = false; /* Only to remember if the client thread holds the lock */
 393 
 394    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
 395                                 "preSendEvent(%s) occurred", msgRequestInfoP->methodName);
 396    retBool = xa->callbackP->addResponseListener(xa->callbackP, msgRequestInfoP, responseEvent);
 397    if (retBool == false) {
 398       strncpy0(exception->errorCode, "user.internal", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 399       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
 400                "[%.100s:%d] Couldn't register as response listener", __FILE__, __LINE__);
 401       if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
 402       return (MsgRequestInfo *)0;
 403    }
 404 
 405    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
 406                   "preSendEvent(requestId=%s, msgRequestInfoP->responseBlob.dataLen=%d), entering lock",
 407                   msgRequestInfoP->requestIdStr, msgRequestInfoP->responseBlob.dataLen);
 408    pthread_mutex_init(&msgRequestInfoP->responseMutex, NULL); /* returns always 0 */
 409    if ((retInt = pthread_mutex_lock(&msgRequestInfoP->responseMutex)) != 0) {
 410       strncpy0(exception->errorCode, "user.internal", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 411       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
 412                "[%.100s:%d] Error trying to lock responseMutex %d", __FILE__, __LINE__, retInt);
 413       if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
 414       return (MsgRequestInfo *)0;
 415    }
 416    msgRequestInfoP->responseMutexIsValid = true; /* Only if the client thread holds the lock */
 417 
 418    return msgRequestInfoP;
 419 }
 420 
 421 /**
 422  * This function is called by the callback server when a response message arrived (after we send a request).
 423  * The xa->responseBlob->data is malloc()'d with the response string, you need to free it.
 424  * This method is executed by the callback server thread.
 425  * @param msgRequestInfoP May not be NULL
 426  * @param socketDataHolder is on the stack and does not need to be freed, the 'data' member is
 427  *        malloc()'d and must be freed by the caller.
 428  */
 429 static void responseEvent(MsgRequestInfo *msgRequestInfoP, void /*SocketDataHolder*/ *socketDataHolder) {
 430    int retVal;
 431    SocketDataHolder *s = (SocketDataHolder *)socketDataHolder;
 432    XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)msgRequestInfoP->xa;
 433 
 434    if (msgRequestInfoP == 0)
 435       return;
 436 
 437    if (msgRequestInfoP->responseMutexIsValid == false)
 438       return;
 439 
 440    if ((retVal = pthread_mutex_lock(&msgRequestInfoP->responseMutex)) != 0) {
 441       xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Trying to lock responseMutex in responseEvent() failed %d", retVal);
 442       if (msgRequestInfoP->responseMutexIsValid == false)
 443          return;
 444    }
 445    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "responseEvent() responseMutex is LOCKED");
 446 
 447    blobcpyAlloc(&msgRequestInfoP->responseBlob, s->blob.data, s->blob.dataLen);
 448    msgRequestInfoP->responseType = s->type;
 449 
 450    if ((retVal = pthread_cond_signal(&msgRequestInfoP->responseCond)) != 0) {
 451       if (retVal == EINVAL)
 452          xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Trying to signal waiting thread in responseEvent() fails %d EINVAL: responseCond is not valid", retVal);
 453       else if (retVal == EFAULT)
 454          xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Trying to signal waiting thread in responseEvent() fails %d EFAULT: responseCond points to illegal address", retVal);
 455      else
 456          xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Trying to signal waiting thread in responseEvent() fails %d", retVal);
 457      /*return; we need to unlock the mutex */
 458    }
 459 
 460    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
 461                                 "responseEvent(requestId '%s', msgType=%c, dataLen=%d) occurred, wake up signal sent",
 462                                 s->requestId, msgRequestInfoP->responseType, msgRequestInfoP->responseBlob.dataLen);
 463 
 464    if ((retVal = pthread_mutex_unlock(&msgRequestInfoP->responseMutex)) != 0) {
 465       xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "Trying to unlock responseMutex in responseEvent() failed %d", retVal);
 466       /* return; */
 467    }
 468    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "responseEvent() responseMutex is UNLOCKED");
 469 }
 470 
 471 /**
 472  * Callback function (wait for response) called directly after a message is sent.
 473  * @param msgRequestInfoP Contains some informations about the request, may not be NULL
 474  * @param exception May not be NULL
 475  * @return The returned string from a request is written into msgRequestInfoP->data,
 476  *         the caller needs to free() it.
 477  */
 478 static MsgRequestInfo *postSendEvent(MsgRequestInfo *msgRequestInfoP, XmlBlasterException *exception)
 479 {
 480    XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)msgRequestInfoP->xa;
 481    struct timespec abstime;
 482    bool useTimeout = false;
 483    int retVal, i;
 484 
 485    if (msgRequestInfoP->rollback) {
 486       xa->callbackP->removeResponseListener(xa->callbackP, msgRequestInfoP->requestIdStr);
 487       /* cb->shutdown(), cb->waitOnCallbackThreadTermination() */
 488       mutexUnlock(msgRequestInfoP, exception);
 489       return (MsgRequestInfo *)0;
 490    }
 491 
 492    if (xa->responseTimeout > 0 && getAbsoluteTime(xa->responseTimeout, &abstime) == true) {
 493       useTimeout = true;
 494    }
 495 
 496    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "postSendEvent(requestId=%s) responseMutex is LOCKED, entering wait ...", msgRequestInfoP->requestIdStr);
 497 
 498    if ((retVal = pthread_cond_init(&msgRequestInfoP->responseCond, NULL)) != 0) {
 499       xa->callbackP->removeResponseListener(xa->callbackP, msgRequestInfoP->requestIdStr);
 500       strncpy0(exception->errorCode, "resource.exhaust", XMLBLASTEREXCEPTION_ERRORCODE_LEN); /* ErrorCode.RESOURCE_EXHAUST */
 501       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] pthread_cond_init() for '%s()' with requestId=%s returned %d.",
 502                __FILE__, __LINE__, msgRequestInfoP->methodName, msgRequestInfoP->requestIdStr, retVal);
 503       if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
 504       return (MsgRequestInfo *)0;
 505    }
 506 
 507    /* Wait for response, the callback server delivers it */
 508    while (msgRequestInfoP->responseType == 0) { /* Protect for spurious wake ups (e.g. by SIGUSR1) */
 509       if (useTimeout == true) {
 510          int error = pthread_cond_timedwait(&msgRequestInfoP->responseCond, &msgRequestInfoP->responseMutex, &abstime);
 511          if (error == ETIMEDOUT) {
 512             /*
 513              * TODO: msgRequestInfoP is on the stack and if we now return
 514              * it will be invalid:
 515              * removeResponseListener() removes it from the callback thread
 516              * but what if the callback thread currently uses it?
 517              */
 518             xa->callbackP->removeResponseListener(xa->callbackP, msgRequestInfoP->requestIdStr);
 519             strncpy0(exception->errorCode, "communication.responseTimeout", XMLBLASTEREXCEPTION_ERRORCODE_LEN); /* ErrorCode.RESOURCE_EXHAUST */
 520             SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] Waiting on response for '%s()' with requestId=%s timed out after blocking %ld millis",
 521                     __FILE__, __LINE__, msgRequestInfoP->methodName, msgRequestInfoP->requestIdStr, xa->responseTimeout);
 522             if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
 523             return (MsgRequestInfo *)0;
 524          }
 525       }
 526       else {
 527          pthread_cond_wait(&msgRequestInfoP->responseCond, &msgRequestInfoP->responseMutex); /* Wakes up from responseEvent() */
 528          if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
 529             "Wake up tread, response of length %d arrived", msgRequestInfoP->responseBlob.dataLen);
 530       }
 531    }
 532 
 533    for (i=0; i<10; i++) { /* Error recovery loop */
 534       if ((retVal = pthread_cond_destroy(&msgRequestInfoP->responseCond)) != 0) {
 535          if (retVal == EBUSY) { /* Is in use by another thread */
 536             xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_cond_destroy() for '%s()' with requestId=%s returned EBUSY=%d, we try again #%d/10",
 537                 msgRequestInfoP->methodName, msgRequestInfoP->requestIdStr, retVal, i);
 538             sleepMillis(10);
 539             continue;
 540          }
 541          else {
 542              xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_cond_destroy() for '%s()' with requestId=%s returned %d, we ignore it.",
 543                  msgRequestInfoP->methodName, msgRequestInfoP->requestIdStr, retVal);
 544          }
 545       }
 546       break;
 547    }
 548 
 549    msgRequestInfoP->blob.dataLen = msgRequestInfoP->responseBlob.dataLen;
 550    msgRequestInfoP->blob.data = msgRequestInfoP->responseBlob.data;
 551    msgRequestInfoP->responseBlob.dataLen = 0;
 552    msgRequestInfoP->responseBlob.data = 0; /* msgRequestInfoP->blob.data is now responsible to free() the data */
 553 
 554    if (xa->logLevel>=XMLBLASTER_LOG_TRACE)
 555       xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
 556          "Thread #%ld woke up in postSendEvent() for msgType=%c and dataLen=%d",
 557          msgRequestInfoP->requestIdStr, msgRequestInfoP->responseType, msgRequestInfoP->blob.dataLen);
 558 
 559 
 560    if (msgRequestInfoP->responseType == (char)MSG_TYPE_EXCEPTION) {
 561       convertToXmlBlasterException(&msgRequestInfoP->blob, exception, false);
 562       freeBlobHolderContent(&msgRequestInfoP->blob);
 563       msgRequestInfoP->responseType = 0;
 564       return (MsgRequestInfo *)0;
 565    }
 566 
 567    msgRequestInfoP->responseType = 0;
 568 
 569    /* if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "postSendEvent(requestId=%s) i woke up, entering unlock ...", msgRequestInfoP->requestIdStr); */
 570    if (mutexUnlock(msgRequestInfoP, exception) == false)
 571       return (MsgRequestInfo *)0;
 572 
 573    return msgRequestInfoP;
 574 }
 575 
 576 /**
 577  * Free lock.
 578  * @param msgRequestInfoP Transporting data
 579  * @param exception The exception struct, can be null
 580  * @return false on error, the exception struct is filled in this case and the lock is not released
 581  */
 582 static bool mutexUnlock(MsgRequestInfo *msgRequestInfoP, XmlBlasterException *exception) {
 583    XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)msgRequestInfoP->xa;
 584    int retVal;
 585    if (msgRequestInfoP->responseMutexIsValid == false)
 586       return true;
 587    msgRequestInfoP->responseMutexIsValid = false;
 588    if ((retVal = pthread_mutex_unlock(&msgRequestInfoP->responseMutex)) != 0) {
 589       char embeddedText[XMLBLASTEREXCEPTION_MESSAGE_LEN];
 590       if (exception == 0) {
 591          if ((retVal = pthread_mutex_destroy(&msgRequestInfoP->responseMutex)) != 0) {
 592             xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_destroy() for '%s()' with requestId=%s returned %d, we ignore it.",
 593                        msgRequestInfoP->methodName, msgRequestInfoP->requestIdStr, retVal);
 594          }
 595          return false;
 596       }
 597       if (*exception->errorCode != 0) {
 598          SNPRINTF(embeddedText, XMLBLASTEREXCEPTION_MESSAGE_LEN, "{%s:%s}", exception->errorCode, exception->message);
 599          if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Ignoring embedded exception %s: %s", exception->errorCode, exception->message);
 600       }
 601       else
 602          *embeddedText = 0;
 603       strncpy0(exception->errorCode, "user.internal", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 604       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] ERROR trying to unlock responseMutex, return=%d. Embedded %s", __FILE__, __LINE__, retVal, embeddedText);
 605       if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
 606 
 607       if ((retVal = pthread_mutex_destroy(&msgRequestInfoP->responseMutex)) != 0) {
 608          xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_destroy() for '%s()' with requestId=%s returned %d, we ignore it.",
 609                     msgRequestInfoP->methodName, msgRequestInfoP->requestIdStr, retVal);
 610       }
 611       return false;
 612    }
 613    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "postSendEvent() responseMutex is UNLOCKED");
 614 
 615    if ((retVal = pthread_mutex_destroy(&msgRequestInfoP->responseMutex)) != 0) {
 616       xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_destroy() for '%s()' with requestId=%s returned %d, we ignore it.",
 617                  msgRequestInfoP->methodName, msgRequestInfoP->requestIdStr, retVal);
 618    }
 619    return true;
 620 }
 621 
 622 Dll_Export const char *xmlBlasterAccessUnparsedUsage(char *usage)
 623 {
 624    /* take care not to exceed XMLBLASTER_MAX_USAGE_LEN */
 625    SNPRINTF(usage, XMLBLASTER_MAX_USAGE_LEN, "%.800s%.800s%.400s", xmlBlasterConnectionUnparsedUsage(), callbackServerRawUsage(),
 626                   "\n   -plugin/socket/multiThreaded  [true]"
 627                   "\n                       If true the update() call to your client code is a separate thread."
 628                   "\n   -plugin/socket/responseTimeout  [60000 (one minute)]"
 629                   "\n                       The time in millis to wait on a response, 0 is forever."
 630                   "\n   -logLevel           ERROR | WARN | INFO | TRACE | DUMP [WARN]"
 631                   );
 632 
 633    return usage;
 634 }
 635 
 636 static char *xmlBlasterConnect(XmlBlasterAccessUnparsed *xa, const char * const qos,
 637                                UpdateFp clientUpdateFp, XmlBlasterException *exception)
 638 {
 639    char *response = 0;
 640    char *qos_;
 641 
 642    if (checkArgs(xa, "connect", false, exception) == false) return 0;
 643 
 644    /* Is allowed, we use our default handler in this case
 645    if (clientUpdateFp == 0) {
 646       strncpy0(exception->errorCode, "user.illegalargument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 647       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] Please provide valid argument 'updateFp' to connect()", __FILE__, __LINE__);
 648       if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
 649       return false;
 650    }
 651    */
 652 
 653    if (qos == 0) {
 654       strncpy0(exception->errorCode, "user.illegalargument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 655       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] Please provide valid argument 'qos' to connect()", __FILE__, __LINE__);
 656       if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
 657       return false;
 658    }
 659 
 660    if (initialize(xa, clientUpdateFp, exception) == false) {
 661       return false;
 662    }
 663 
 664    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Invoking connect()");
 665 
 666    if (strstr(qos, "<callback") != 0) {
 667       /* User has given us a callback address */
 668       qos_ = strcpyAlloc(qos);
 669    }
 670    else {
 671       /* We add the callback sequence with our tunnel callback host and port
 672          HACK: This is error prone depending on the given qos */
 673       const char *pos;
 674       enum { SIZE=1024 };
 675       char callbackQos[SIZE];
 676       snprintf0(callbackQos, SIZE,
 677                "<queue relating='callback'>" /* maxEntries='100' maxEntriesCache='100'>" */
 678                "  <callback type='SOCKET' sessionId='%s'>"
 679                "    socket://%.120s:%d"
 680                "  </callback>"
 681                "</queue>",
 682                "NoCallbackSessionId", xa->callbackP->hostCB, xa->callbackP->portCB);
 683       qos_ = (char *)calloc(strlen(qos) + SIZE, sizeof(char *));
 684       pos = strstr(qos, "</qos>");
 685       if (pos == 0) {
 686          strncpy0(exception->errorCode, "user.illegalargument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 687          SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN, "[%.100s:%d] Please provide valid 'qos' markup to connect()", __FILE__, __LINE__);
 688          if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, exception->message);
 689          return false;
 690       }
 691       strncpy0(qos_, qos, pos-qos+1);
 692       strncat0(qos_, callbackQos, SIZE-strlen(qos_));
 693       strncat0(qos_, "</qos>", 8);
 694    }
 695    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Connecting with qos=%s", qos_);
 696 
 697    /* Register our function responseEvent() to be notified when the response arrives,
 698       this is done by preSendEvent() callback called during connect() */
 699 
 700    response = xa->connectionP->connect(xa->connectionP, qos_, exception);
 701 
 702    free(qos_);
 703    /* freeBlobHolderContent(&xa->responseBlob); */
 704 
 705    /* The response was handled by a callback to postSendEvent */
 706 
 707    if (response == 0) return response;
 708 
 709    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
 710       "Got response for connect(secretSessionId=%s)", xa->connectionP->secretSessionId);
 711    return response;
 712 }
 713 
 714 static bool xmlBlasterDisconnect(XmlBlasterAccessUnparsed *xa, const char * const qos, XmlBlasterException *exception)
 715 {
 716    bool p;
 717    if (checkArgs(xa, "disconnect", true, exception) == false ) return 0;
 718    p = xa->connectionP->disconnect(xa->connectionP, qos, exception);
 719    return p;
 720 }
 721 
 722 /**
 723  * Publish a message to the server.
 724  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.publish.html
 725  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
 726  * @see XmlBlasterConnectionUnparsed#publish() for a function documentation
 727  */
 728 static char *xmlBlasterPublish(XmlBlasterAccessUnparsed *xa, MsgUnit *msgUnit, XmlBlasterException *exception)
 729 {
 730    char *p;
 731    if (checkArgs(xa, "publish", true, exception) == false ) return 0;
 732    p = xa->connectionP->publish(xa->connectionP, msgUnit, exception);
 733    return p;
 734 }
 735 
 736 /**
 737  * Publish a message array in a bulk to the server.
 738  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.publish.html
 739  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
 740  * @see XmlBlasterConnectionUnparsed#publishArr() for a function documentation
 741  */
 742 static QosArr *xmlBlasterPublishArr(XmlBlasterAccessUnparsed *xa, MsgUnitArr *msgUnitArr, XmlBlasterException *exception)
 743 {
 744    QosArr *p;
 745    if (checkArgs(xa, "publishArr", true, exception) == false ) return 0;
 746    p = xa->connectionP->publishArr(xa->connectionP, msgUnitArr, exception);
 747    return p;
 748 }
 749 
 750 /**
 751  * Publish a message array in a bulk to the server, we don't receive an ACK.
 752  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.publish.html
 753  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
 754  * @see XmlBlasterConnectionUnparsed#publishOneway() for a function documentation
 755  */
 756 static void xmlBlasterPublishOneway(XmlBlasterAccessUnparsed *xa, MsgUnitArr *msgUnitArr, XmlBlasterException *exception)
 757 {
 758    if (checkArgs(xa, "publishOneway", true, exception) == false ) return;
 759    xa->connectionP->publishOneway(xa->connectionP, msgUnitArr, exception);
 760 }
 761 
 762 /**
 763  * Subscribe a message.
 764  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.subscribe.html
 765  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
 766  */
 767 static char *xmlBlasterSubscribe(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception)
 768 {
 769    char *p;
 770    if (checkArgs(xa, "subscribe", true, exception) == false ) return 0;
 771    p = xa->connectionP->subscribe(xa->connectionP, key, qos, exception);
 772    return p;
 773 }
 774 
 775 /**
 776  * UnSubscribe a message from the server.
 777  * @return The raw QoS XML strings returned from xmlBlaster, only NULL if an exception is thrown
 778  *         You need to free it with freeQosArr() after usage
 779  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.unSubscribe.html
 780  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
 781  */
 782 static QosArr *xmlBlasterUnSubscribe(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception)
 783 {
 784    QosArr *p;
 785    if (checkArgs(xa, "unSubscribe", true, exception) == false ) return 0;
 786    p = xa->connectionP->unSubscribe(xa->connectionP, key, qos, exception);
 787    return p;
 788 }
 789 
 790 /**
 791  * Erase a message from the server.
 792  * @return A struct holding the raw QoS XML strings returned from xmlBlaster,
 793  *         only NULL if an exception is thrown.
 794  *         You need to freeQosArr() it
 795  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.erase.html
 796  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
 797  */
 798 static QosArr *xmlBlasterErase(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception)
 799 {
 800    QosArr *p;
 801    if (checkArgs(xa, "erase", true, exception) == false ) return 0;
 802    p = xa->connectionP->erase(xa->connectionP, key, qos, exception);
 803    return p;
 804 }
 805 
 806 /**
 807  * Ping the server.
 808  * @param xa The 'this' pointer
 809  * @param qos The QoS or 0
 810  * @param exception *errorCode!=0 on failure
 811  * @return The ping return QoS raw xml string, you need to free() it
 812  *         or 0 on failure (in which case *exception.errorCode!=0)
 813  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
 814  */
 815 static char *xmlBlasterPing(XmlBlasterAccessUnparsed *xa, const char * const qos, XmlBlasterException *exception)
 816 {
 817    char *p;
 818    if (checkArgs(xa, "ping", true, exception) == false ) return 0;
 819    p = xa->connectionP->ping(xa->connectionP, qos, exception);
 820    return p;
 821 }
 822 
 823 /**
 824  * Get a message.
 825  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.get.html
 826  * @see http://www.xmlblaster.org/xmlBlaster/doc/requirements/protocol.socket.html
 827  * @return NULL on error, please check exception in such a case
 828  */
 829 static MsgUnitArr *xmlBlasterGet(XmlBlasterAccessUnparsed *xa, const char * const key, const char * qos, XmlBlasterException *exception)
 830 {
 831    MsgUnitArr *msgUnitArr;
 832    if (checkArgs(xa, "get", true, exception) == false ) return 0;
 833    msgUnitArr = xa->connectionP->get(xa->connectionP, key, qos, exception);
 834    return msgUnitArr;
 835 }
 836 
 837 static bool checkArgs(XmlBlasterAccessUnparsed *xa, const char *methodName,
 838             bool checkIsConnected, XmlBlasterException *exception)
 839 {
 840    if (xa == 0) {
 841       char *stack = getStackTrace(10);
 842       if (exception == 0) {
 843          printf("[%s:%d] Please provide a valid XmlBlasterAccessUnparsed pointer to %s() %s",
 844                   __FILE__, __LINE__, methodName, stack);
 845       }
 846       else {
 847          strncpy0(exception->errorCode, "user.illegalArgument", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 848          SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
 849                   "[%.100s:%d] Please provide a valid XmlBlasterAccessUnparsed pointer to %.16s() %s",
 850                    __FILE__, __LINE__, methodName, stack);
 851          xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, exception->message);
 852       }
 853       free(stack);
 854       return false;
 855    }
 856 
 857    if (exception == 0) {
 858       char *stack = getStackTrace(10);
 859       xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "[%s:%d] Please provide valid exception pointer to %s() %s",
 860               __FILE__, __LINE__, methodName, stack);
 861       free(stack);
 862       return false;
 863    }
 864 
 865    if (xa->isShutdown || (checkIsConnected && !xa->isConnected(xa))) {
 866       char *stack = getStackTrace(10);
 867       strncpy0(exception->errorCode, "communication.noConnection", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 868       SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
 869                "[%.100s:%d] Not connected to xmlBlaster, %s() failed %s",
 870                 __FILE__, __LINE__, methodName, stack);
 871       free(stack);
 872       xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_WARN, __FILE__, exception->message);
 873       return false;
 874    }
 875 
 876    initializeXmlBlasterException(exception);
 877 
 878    return true;
 879 }
 880 
 881 /**
 882  * Run by the new created thread, calls the clients update method.
 883  * Leaving this pthread start routine does an implicit pthread_exit().
 884  * @param container Holding all necessary informations, we free it when we are done
 885  * @return 0 on success, 1 on error. The return value is the exit value returned by pthread_join()
 886  */
 887 static int runUpdate(UpdateContainer *container)
 888 {
 889    XmlBlasterAccessUnparsed *xa = container->xa;
 890    MsgUnitArr *msgUnitArrP = container->msgUnitArrP;
 891    void *userData = container->userData;
 892    CallbackServerUnparsed *cb = (CallbackServerUnparsed*)userData;
 893    XmlBlasterException *exception = &container->exception;
 894    SocketDataHolder *socketDataHolder = &container->socketDataHolder;
 895    XMLBLASTER_C_bool retVal;
 896 
 897    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "Entering runUpdate()");
 898 
 899    retVal = xa->clientsUpdateFp(msgUnitArrP, xa, exception);
 900 
 901    if (xa->lowLevelAutoAck) { /* returned already */
 902    }
 903    else {
 904       cb->sendResponseOrException(retVal, cb, socketDataHolder, msgUnitArrP, exception);
 905    }
 906 
 907    free(container);
 908 
 909    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
 910                                 "runUpdate: Update thread 0x%x is exiting", get_pthread_id(pthread_self()));
 911    xa->threadCounter--;
 912    return (retVal==true) ? 0 : 1;
 913 }
 914 
 915 /**
 916  * Here we receive the callback messages from xmlBlaster, create a thread and dispatch
 917  * it to the clients update.
 918  * @see UpdateFp in CallbackServerUnparsed.h
 919  */
 920 static void interceptUpdate(MsgUnitArr *msgUnitArrP, void *userData,
 921                             XmlBlasterException *exception, void /*SocketDataHolder*/ *socketDataHolder)
 922 {
 923    CallbackServerUnparsed *cb = (CallbackServerUnparsed*)userData;
 924    XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)cb->updateCbUserData;
 925 
 926    if (xa->clientsUpdateFp == 0) { /* Client has not registered an update() */
 927       size_t i;
 928       bool testException = false;
 929       bool success = true;
 930 
 931       for (i=0; i<msgUnitArrP->len; i++) {
 932          const char *key = msgUnitArrP->msgUnitArr[i].key;
 933          xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_INFO, __FILE__,
 934              "CALLBACK update() default handler: Asynchronous message update arrived:%s id=%s, we ignore it in this default handler\n",
 935              key, ((SocketDataHolder*)socketDataHolder)->requestId);
 936          msgUnitArrP->msgUnitArr[i].responseQos = strcpyAlloc("<qos><state id='OK'/></qos>");
 937          /* Return QoS: Everything is OK */
 938       }
 939       if (testException) {
 940          strncpy0(exception->errorCode, "user.clientCode",
 941                   XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 942          strncpy0(exception->message, "I don't want these messages",
 943                   XMLBLASTEREXCEPTION_MESSAGE_LEN);
 944          success = false;
 945       }
 946       cb->sendResponseOrException(success, cb, socketDataHolder, msgUnitArrP, exception);
 947       return;
 948    }
 949 
 950    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "interceptUpdate(): Received message");
 951 
 952    if (xa->callbackMultiThreaded == false) {
 953       XMLBLASTER_C_bool ret = xa->clientsUpdateFp(msgUnitArrP, xa, exception);
 954       cb->sendResponseOrException(ret, cb, socketDataHolder, msgUnitArrP, exception);
 955       return;
 956    }
 957 
 958    {
 959       pthread_t tid;
 960       int threadRet = 0;
 961       UpdateContainer *container = (UpdateContainer*)malloc(sizeof(UpdateContainer));
 962       pthread_attr_t attr;
 963 
 964       pthread_attr_init(&attr);
 965       /* Cleanup all resources after ending the thread, instead of calling pthread_join() */
 966       pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
 967 
 968       container->xa = xa;
 969       container->msgUnitArrP = msgUnitArrP;
 970       container->userData = userData;
 971       memcpy(&container->exception, exception, sizeof(XmlBlasterException));
 972       memcpy(&container->socketDataHolder, socketDataHolder, sizeof(SocketDataHolder)); /* The blob pointer is freed already by CallbackServerUnparsed */
 973 
 974       if (xa->lowLevelAutoAck) {
 975          size_t i;
 976          for (i=0; i<msgUnitArrP->len; i++) {
 977             msgUnitArrP->msgUnitArr[i].responseQos = strcpyAlloc("<qos><state id='OK'/></qos>");
 978          }
 979       }
 980 
 981       /*
 982         Guaranteed sequence:
 983         The server uses max one thread to deliver update() for each client
 984         If the update contains an array of messages those are handled as a
 985         complete bulk in the correct sequence here.
 986       */
 987 
 988       /* this thread will deliver the update message to the client code,
 989          Note: we need a thread pool cache for better performance */
 990       xa->threadCounter++;
 991       threadRet = pthread_create(&tid, &attr,
 992                         (void * (*)(void *))runUpdate, (void *)container);
 993       if (threadRet != 0) {
 994          XMLBLASTER_C_bool ret = false;
 995          free(container);
 996          strncpy0(exception->errorCode, "resource.tooManyThreads", XMLBLASTEREXCEPTION_ERRORCODE_LEN);
 997          SNPRINTF(exception->message, XMLBLASTEREXCEPTION_MESSAGE_LEN,
 998                   "[%.100s:%d] Creating thread failed with error number %d, we deliver the message in the same thread",
 999                   __FILE__, __LINE__, threadRet);
1000          xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, exception->message);
1001          ret = xa->clientsUpdateFp(msgUnitArrP, xa, exception);
1002          cb->sendResponseOrException(ret, cb, socketDataHolder, msgUnitArrP, exception);
1003          xa->threadCounter--;
1004          pthread_attr_destroy(&attr);
1005          return;
1006       }
1007 
1008       /* Is done already with above PTHREAD_CREATE_DETACHED
1009          threadRet = pthread_detach(tid);
1010          if (threadRet != 0) {
1011             xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "[%d] Detaching thread failed with error number %d", __LINE__, threadRet);
1012          }
1013       */
1014 
1015       if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__,
1016          "interceptUpdate: Received message and delegated it to a separate thread 0x%x to deliver", get_pthread_id(tid));
1017 
1018       pthread_attr_destroy(&attr);
1019    }
1020 
1021    if (xa->lowLevelAutoAck) {
1022       *exception->errorCode = 0;
1023       cb->sendResponseOrException(true, cb, socketDataHolder, msgUnitArrP, exception);
1024    }
1025 }
1026 
1027 /**
1028  * Write uncompressed to socket (thread safe)
1029  */
1030 static ssize_t writenPlain(void * userP, const int fd, const char *ptr, const size_t nbytes) {
1031    int rc;
1032    ssize_t ret;
1033 
1034    XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)userP;
1035 
1036    /* Start mutex */
1037    rc = pthread_mutex_lock(&xa->writenMutex);
1038    if (rc != 0) /* EINVAL */
1039       xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_lock(writenMutex) returned %d.", rc);
1040 
1041    /* Send data */
1042    ret = writen(fd, ptr, nbytes);
1043 
1044    /* End mutex */
1045    rc = pthread_mutex_unlock(&xa->writenMutex);
1046    if (rc != 0) /* EPERM */
1047       xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_unlock(writenMutex) returned %d.", rc);
1048 
1049    return ret;
1050 
1051 }
1052 
1053 /**
1054  * Compress data and send to socket.
1055  */
1056 static ssize_t writenCompressed(void *userP, const int fd, const char *ptr, const size_t nbytes) {
1057    int rc;
1058    ssize_t ret;
1059 
1060    XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)userP;
1061    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "writenCompressed(%u)", nbytes);
1062 
1063    /* Start mutex */
1064    rc = pthread_mutex_lock(&xa->writenMutex);
1065    if (rc != 0) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_lock(writenMutex) returned %d.", rc);
1066 
1067    /* Send data */
1068    ret = xmlBlaster_writenCompressed(xa->connectionP->zlibWriteBuf, fd, ptr, nbytes);
1069 
1070    /* End mutex */
1071    rc = pthread_mutex_unlock(&xa->writenMutex);
1072    if (rc != 0) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_unlock(writenMutex) returned %d.", rc);
1073 
1074    return ret;
1075 }
1076 
1077 /**
1078  * Read uncompressed to socket (thread safe)
1079  */
1080 static ssize_t readnPlain(void * userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void *userP2) {
1081    int rc;
1082    ssize_t ret;
1083 
1084    XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)userP;
1085 
1086    rc = pthread_mutex_lock(&xa->readnMutex);
1087    if (rc != 0) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_lock(readnMutex) returned %d.", rc);
1088 
1089    ret = readn(fd, ptr, nbytes, fpNumRead, userP2);
1090 
1091    rc = pthread_mutex_unlock(&xa->readnMutex);
1092    if (rc != 0) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_unlock(readnMutex) returned %d.", rc);
1093 
1094    return ret;
1095 }
1096 
1097 /**
1098  * Read data from socket, uncompress it if necessary.
1099  */
1100 static ssize_t readnCompressed(void *userP, const int fd, char *ptr, const size_t nbytes, XmlBlasterNumReadFunc fpNumRead, void *userP2) {
1101    int rc;
1102    ssize_t ret;
1103 
1104    XmlBlasterAccessUnparsed *xa = (XmlBlasterAccessUnparsed *)userP;
1105    if (xa->logLevel>=XMLBLASTER_LOG_TRACE) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_TRACE, __FILE__, "readnCompressed(%u)", nbytes);
1106 
1107    rc = pthread_mutex_lock(&xa->readnMutex);
1108    if (rc != 0) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_lock(readnMutex) returned %d.", rc);
1109 
1110    ret = xmlBlaster_readnCompressed(xa->connectionP->zlibReadBuf, fd, ptr, nbytes, fpNumRead, userP2);
1111 
1112    rc = pthread_mutex_unlock(&xa->readnMutex);
1113    if (rc != 0) xa->log(xa->logUserP, xa->logLevel, XMLBLASTER_LOG_ERROR, __FILE__, "pthread_mutex_unlock(readnMutex) returned %d.", rc);
1114 
1115    return ret;
1116 }
1117 
1118 #ifdef XmlBlasterAccessUnparsedMain /* compile a standalone test program */
1119 
1120 /**
1121  * Here we receive the callback messages from xmlBlaster
1122  * FOR TESTING ONLY
1123  * @see UpdateFp in CallbackServerUnparsed.h
1124  */
1125 static bool myUpdate(MsgUnitArr *msgUnitArrP, void *userData, XmlBlasterException *xmlBlasterException)
1126 {
1127    size_t i;
1128    bool testException = false;
1129    if (userData != 0) ; /* to avoid compiler warning (we don't need it here) */
1130    for (i=0; i<msgUnitArrP->len; i++) {
1131       char *xml = messageUnitToXml(&msgUnitArrP->msgUnitArr[i]);
1132       printf("[client] CALLBACK update(): Asynchronous message update arrived:%s\n", xml);
1133       free(xml);
1134       msgUnitArrP->msgUnitArr[i].responseQos = strcpyAlloc("<qos><state id='OK'/></qos>");
1135       /* Return QoS: Everything is OK */
1136    }
1137    if (testException) {
1138       strncpy0(xmlBlasterException->errorCode, "user.clientCode",
1139                XMLBLASTEREXCEPTION_ERRORCODE_LEN);
1140       strncpy0(xmlBlasterException->message, "I don't want these messages",
1141                XMLBLASTEREXCEPTION_MESSAGE_LEN);
1142       return false;
1143    }
1144    return true;
1145 }
1146 
1147 /**
1148  * Invoke: XmlBlasterAccessUnparsedMain -logLevel TRACE  -numTests 10
1149  */
1150 int main(int argc, char** argv)
1151 {
1152    int ii;
1153    int numTests = 1;
1154    bool testCallInitialize = false;
1155 
1156    for (ii=0; ii < argc-1; ii++)
1157       if (strcmp(argv[ii], "-numTests") == 0) {
1158          if (strToInt(&numTests, argv[++ii]) == false)
1159             printf("[XmlBlasterAccessUnparsed] WARN '-numTests %s' is invalid\n", argv[ii]);
1160       }
1161 
1162    for (ii=0; ii<numTests; ii++) {
1163       int iarg;
1164       char *response = (char *)0;
1165       /*
1166        * callbackSessionId:
1167        * Is created by the client and used to validate callback messages in update.
1168        * This is sent on connect in ConnectQos.
1169        * (Is different from the xmlBlaster secret session ID)
1170        */
1171       const char *callbackSessionId = "topSecret";
1172       XmlBlasterException xmlBlasterException;
1173       XmlBlasterAccessUnparsed *xa = 0;
1174 
1175       /*
1176       const char *tmp = getStackTrace(20);
1177       printf("[client] stackTrace=%s\n", tmp);
1178       free(tmp);
1179       */
1180 
1181 #     ifdef PTHREAD_THREADS_MAX
1182          printf("[client] Try option '-help' if you need usage informations, max %d"
1183                 " threads per process are supported on this OS\n", PTHREAD_THREADS_MAX);
1184 #     else
1185          printf("[client] Try option '-help' if you need usage informations\n");
1186 #     endif
1187 
1188       for (iarg=0; iarg < argc; iarg++) {
1189          if (strcmp(argv[iarg], "-help") == 0 || strcmp(argv[iarg], "--help") == 0) {
1190             char usage[XMLBLASTER_MAX_USAGE_LEN];
1191             const char *pp =
1192             "\n   -logLevel            ERROR | WARN | INFO | TRACE | DUMP [WARN]"
1193             "\n   -numTests            How often to run the same tests [1]"
1194             "\n\nExample:"
1195             "\n   XmlBlasterAccessUnparsedMain -logLevel TRACE"
1196                  " -dispatch/connection/plugin/socket/hostname server.mars.universe";
1197             printf("Usage:\nXmlBlaster C SOCKET client %s\n%s%s\n",
1198                    getXmlBlasterVersion(), xmlBlasterAccessUnparsedUsage(usage), pp);
1199             exit(1);
1200          }
1201       }
1202 
1203       xa = getXmlBlasterAccessUnparsed(argc, argv);
1204 
1205       if (testCallInitialize) {
1206          if (xa->initialize(xa, myUpdate, &xmlBlasterException) == false) {
1207             printf("[client] Connection to xmlBlaster failed,"
1208                    " please start the server or check your configuration\n");
1209             freeXmlBlasterAccessUnparsed(xa);
1210             exit(1);
1211          }
1212       }
1213 
1214       {  /* connect */
1215          char connectQos[2048];
1216          char callbackQos[1024];
1217 
1218          if (testCallInitialize) {
1219             SNPRINTF(callbackQos, 1024,
1220                      "<queue relating='callback' maxEntries='100' maxEntriesCache='100'>"
1221                      "  <callback type='SOCKET' sessionId='%s'>"
1222                      "    socket://%.120s:%d"
1223                      "  </callback>"
1224                      "</queue>",
1225                      callbackSessionId, xa->callbackP->hostCB, xa->callbackP->portCB);
1226          }
1227          else
1228             *callbackQos = '\0';
1229 
1230          SNPRINTF(connectQos, 2048,
1231                 "<qos>"
1232                 " <securityService type='htpasswd' version='1.0'>"
1233                 "  <![CDATA["
1234                 "   <user>fritz</user>"
1235                 "   <passwd>secret</passwd>"
1236                 "  ]]>"
1237                 " </securityService>"
1238                 "%.1024s"
1239                 "</qos>", callbackQos);
1240 
1241          response = xa->connect(xa, connectQos, myUpdate, &xmlBlasterException);
1242          if (*xmlBlasterException.errorCode != 0) {
1243             printf("[client] Caught exception during connect errorCode=%s, message=%s\n",
1244                    xmlBlasterException.errorCode, xmlBlasterException.message);
1245             freeXmlBlasterAccessUnparsed(xa);
1246             exit(1);
1247          }
1248          free(response);
1249          printf("[client] Connected to xmlBlaster, do some tests ...\n");
1250       }
1251 
1252       response = xa->ping(xa, 0, &xmlBlasterException);
1253       if (response == (char *)0) {
1254          printf("[client] ERROR: Pinging a connected server failed: errorCode=%s, message=%s\n",
1255             xmlBlasterException.errorCode, xmlBlasterException.message);
1256       }
1257       else {
1258          printf("[client] Pinging a connected server, response=%s\n", response);
1259          free(response);
1260       }
1261 
1262       { /* subscribe ... */
1263          const char *key = "<key oid='HelloWorld'/>";
1264          const char *qos = "<qos/>";
1265          printf("[client] Subscribe message 'HelloWorld' ...\n");
1266          response = xa->subscribe(xa, key, qos, &xmlBlasterException);
1267          if (*xmlBlasterException.errorCode != 0) {
1268             printf("[client] Caught exception in subscribe errorCode=%s, message=%s\n",
1269                    xmlBlasterException.errorCode, xmlBlasterException.message);
1270             xa->disconnect(xa, 0, &xmlBlasterException);
1271             freeXmlBlasterAccessUnparsed(xa);
1272             exit(1);
1273          }
1274          printf("[client] Subscribe success, returned status is '%s'\n", response);
1275          free(response);
1276       }
1277 
1278       {  /* publish ... */
1279          MsgUnit msgUnit;
1280          printf("[client] Publishing message 'HelloWorld' ...\n");
1281          msgUnit.key = strcpyAlloc("<key oid='HelloWorld'/>");
1282          msgUnit.content = strcpyAlloc("Some message payload");
1283          msgUnit.contentLen = strlen(msgUnit.content);
1284          msgUnit.qos =strcpyAlloc("<qos><persistent/></qos>");
1285          response = xa->publish(xa, &msgUnit, &xmlBlasterException);
1286          freeMsgUnitData(&msgUnit);
1287          if (*xmlBlasterException.errorCode != 0) {
1288             printf("[client] Caught exception in publish errorCode=%s, message=%s\n",
1289                    xmlBlasterException.errorCode, xmlBlasterException.message);
1290             xa->disconnect(xa, 0, &xmlBlasterException);
1291             freeXmlBlasterAccessUnparsed(xa);
1292             exit(1);
1293          }
1294          printf("[client] Publish success, returned status is '%s'\n", response);
1295          free(response);
1296       }
1297 
1298       {  /* unSubscribe ... */
1299          const char *key = "<key oid='HelloWorld'/>";
1300          const char *qos = "<qos/>";
1301          printf("[client] UnSubscribe message 'HelloWorld' ...\n");
1302          response = xa->unSubscribe(xa, key, qos, &xmlBlasterException);
1303          if (response) {
1304             printf("[client] Unsubscribe success, returned status is '%s'\n", response);
1305             free(response);
1306          }
1307          else {
1308             printf("[client] Caught exception in unSubscribe errorCode=%s, message=%s\n",
1309                    xmlBlasterException.errorCode, xmlBlasterException.message);
1310             xa->disconnect(xa, 0, &xmlBlasterException);
1311             freeXmlBlasterAccessUnparsed(xa);
1312             exit(1);
1313          }
1314       }
1315 
1316       {  /* get synchnronous ... */
1317          size_t i;
1318          const char *key = "<key queryType='XPATH'>//key</key>";
1319          const char *qos = "<qos/>";
1320          MsgUnitArr *msgUnitArr;
1321          printf("[client] Get synchronous messages with XPath '//key' ...\n");
1322          msgUnitArr = xa->get(xa, key, qos, &xmlBlasterException);
1323          if (*xmlBlasterException.errorCode != 0) {
1324             printf("[client] Caught exception in get errorCode=%s, message=%s\n",
1325                    xmlBlasterException.errorCode, xmlBlasterException.message);
1326             xa->disconnect(xa, 0, &xmlBlasterException);
1327             freeXmlBlasterAccessUnparsed(xa);
1328             exit(1);
1329          }
1330          if (msgUnitArr != (MsgUnitArr *)0) {
1331             for (i=0; i<msgUnitArr->len; i++) {
1332                char *contentStr = strFromBlobAlloc(msgUnitArr->msgUnitArr[i].content,
1333                                                 msgUnitArr->msgUnitArr[i].contentLen);
1334                const char *dots = (msgUnitArr->msgUnitArr[i].contentLen > 96) ?
1335                                   " ..." : "";
1336                printf("\n[client] Received message#%u/%u:\n"
1337                       "-------------------------------------"
1338                       "%s\n <content>%.100s%s</content>%s\n"
1339                       "-------------------------------------\n",
1340                       i+1, msgUnitArr->len,
1341                       msgUnitArr->msgUnitArr[i].key,
1342                       contentStr, dots,
1343                       msgUnitArr->msgUnitArr[i].qos);
1344                free(contentStr);
1345             }
1346             freeMsgUnitArr(msgUnitArr);
1347          }
1348          else {
1349             printf("[client] Caught exception in get errorCode=%s, message=%s\n",
1350                    xmlBlasterException.errorCode, xmlBlasterException.message);
1351             xa->disconnect(xa, 0, &xmlBlasterException);
1352             freeXmlBlasterAccessUnparsed(xa);
1353             exit(1);
1354          }
1355       }
1356 
1357 
1358       {  /* erase ... */
1359          const char *key = "<key oid='HelloWorld'/>";
1360          const char *qos = "<qos/>";
1361          printf("[client] Erasing message 'HelloWorld' ...\n");
1362          response = xa->erase(xa, key, qos, &xmlBlasterException);
1363          if (*xmlBlasterException.errorCode != 0) {
1364             printf("[client] Caught exception in erase errorCode=%s, message=%s\n",
1365                    xmlBlasterException.errorCode, xmlBlasterException.message);
1366             xa->disconnect(xa, 0, &xmlBlasterException);
1367             freeXmlBlasterAccessUnparsed(xa);
1368             exit(1);
1369          }
1370          printf("[client] Erase success, returned status is '%s'\n", response);
1371          free(response);
1372       }
1373 
1374       if (xa->disconnect(xa, 0, &xmlBlasterException) == false) {
1375          printf("[client] Caught exception in disconnect, errorCode=%s, message=%s\n",
1376                 xmlBlasterException.errorCode, xmlBlasterException.message);
1377          freeXmlBlasterAccessUnparsed(xa);
1378          exit(1);
1379       }
1380 
1381       freeXmlBlasterAccessUnparsed(xa);
1382       if (numTests > 1) {
1383          printf("[client] Successfully finished test #%d from %d\n\n", ii, numTests);
1384       }
1385    }
1386    printf("[client] Good bye.\n");
1387    return 0; /*exit(0);*/
1388 }
1389 #endif /* #ifdef XmlBlasterAccessUnparsedMain */


syntax highlighted by Code2HTML, v. 0.9.1