1 // xmlBlaster/demo/javaclients/HelloWorldPublish.java
  2 package javaclients;
  3 
  4 import java.io.UnsupportedEncodingException;
  5 import java.util.Iterator;
  6 import java.util.Map;
  7 import java.util.Random;
  8 import java.util.logging.Level;
  9 import java.util.logging.Logger;
 10 
 11 import org.xmlBlaster.client.I_Callback;
 12 import org.xmlBlaster.client.I_ConnectionStateListener;
 13 import org.xmlBlaster.client.I_XmlBlasterAccess;
 14 import org.xmlBlaster.client.key.EraseKey;
 15 import org.xmlBlaster.client.key.PublishKey;
 16 import org.xmlBlaster.client.key.UpdateKey;
 17 import org.xmlBlaster.client.qos.ConnectQos;
 18 import org.xmlBlaster.client.qos.ConnectReturnQos;
 19 import org.xmlBlaster.client.qos.DisconnectQos;
 20 import org.xmlBlaster.client.qos.EraseQos;
 21 import org.xmlBlaster.client.qos.PublishQos;
 22 import org.xmlBlaster.client.qos.PublishReturnQos;
 23 import org.xmlBlaster.client.qos.UpdateQos;
 24 import org.xmlBlaster.util.FileLocator;
 25 import org.xmlBlaster.util.Global;
 26 import org.xmlBlaster.util.MsgUnit;
 27 import org.xmlBlaster.util.SessionName;
 28 import org.xmlBlaster.util.Timestamp;
 29 import org.xmlBlaster.util.XmlBlasterException;
 30 import org.xmlBlaster.util.def.Constants;
 31 import org.xmlBlaster.util.def.MethodName;
 32 import org.xmlBlaster.util.def.PriorityEnum;
 33 import org.xmlBlaster.util.dispatch.ConnectionStateEnum;
 34 import org.xmlBlaster.util.dispatch.I_PostSendListener;
 35 import org.xmlBlaster.util.qos.TopicProperty;
 36 import org.xmlBlaster.util.qos.address.Destination;
 37 import org.xmlBlaster.util.qos.storage.HistoryQueueProperty;
 38 import org.xmlBlaster.util.queuemsg.MsgQueueEntry;
 39 
 40 /**
 41  * This client connects to xmlBlaster and publishes a configurable amount of messages.
 42  * <p>
 43  * This is a nice client to experiment and play with xmlBlaster as there are many
 44  * command line options to specify the type and amount of messages published.
 45  * </p>
 46  * <p>
 47  * Try using 'java javaclients.HelloWorldSubscribe' in another window to subscribe to
 48  * our messages.
 49  * Further you can type 'd' in the window running xmlBlaster to get a server dump.
 50  * </p>
 51  *
 52  * Invoke (after starting the xmlBlaster server):
 53  * <pre>
 54  *Publish manually 10 messages:
 55  * java javaclients.HelloWorldPublish -interactive true -numPublish 10 -oid Hello -persistent true -erase true
 56  *
 57  *Publish automatically 10 messages and sleep 1 sec in between:
 58  * java javaclients.HelloWorldPublish -interactive false -sleep 1000 -numPublish 10 -oid Hello -persistent true -erase true
 59  *
 60  *Publish automatically 10 different topics with different DOM entries:
 61  * java javaclients.HelloWorldPublish -interactive false -numPublish 10 -oid Hello-%counter -clientTags "<org.xmlBlaster><demo-%counter/></org.xmlBlaster>"
 62  *
 63  *Login as joe/5 and send one persistent message:
 64  * java javaclients.HelloWorldPublish -session.name joe/5 -passwd secret -persistent true -dump[HelloWorldPublish] true
 65  *
 66  *Send a PtP message:
 67  * java javaclients.HelloWorldPublish -destination jack/17 -forceQueuing true -persistent true -subscribable true
 68  *
 69  *Add some client properties which will be send in the qos to the receivers:
 70  * java javaclients.HelloWorldPublish -clientProperty[transactionId] 0x23345 -clientProperty[myName] jack
 71  *creates a publish Qos containing:
 72  *   &lt;clientProperty name='transactionId'>0x23345&lt;/clientProperty>
 73  *   &lt;clientProperty name='myName'>jack&lt;/clientProperty>
 74  * </pre>
 75  * <p>
 76  * If interactive is false, the sleep gives the number of millis to sleep before publishing the next message.
 77  * </p>
 78  * <p>
 79  * If erase=false the message is not erase at the end, if disconnect=false we don't logout at the end.
 80  * </p>
 81  * <p>
 82  * You can add '%counter' to the clientTags or the content string, each occurrence will be replaced
 83  * by the current message number.
 84  * </p>
 85  * @see javaclients.HelloWorldSubscribe
 86  * @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.html" target="others">xmlBlaster interface</a>
 87  */
 88 public class HelloWorldPublish
 89 {
 90    private final Global glob;
 91    private static Logger log = Logger.getLogger(HelloWorldPublish.class.getName());
 92 
 93    public HelloWorldPublish(Global glob) {
 94       this.glob = glob;
 95 
 96       try {
 97          boolean interactive = glob.getProperty().get("interactive", true);
 98          boolean oneway = glob.getProperty().get("oneway", false);
 99          long sleep = glob.getProperty().get("sleep", 1000L);
100          int numPublish = glob.getProperty().get("numPublish", 2000);
101          String oid = glob.getProperty().get("oid", "Hello");  // "HelloTopic_#%counter"
102          String domain = glob.getProperty().get("domain", (String)null);
103          String clientTags = glob.getProperty().get("clientTags", "<org.xmlBlaster><demo/></org.xmlBlaster>");
104          //String clientTags = glob.getProperty().get("clientTags", "<org.xmlBlaster><demo-%counter/></org.xmlBlaster>");
105          String contentStr = glob.getProperty().get("content", "Hi-%counter");
106          String contentFile = glob.getProperty().get("contentFile", (String)null);
107          PriorityEnum priority = PriorityEnum.toPriorityEnum(glob.getProperty().get("priority", PriorityEnum.NORM_PRIORITY.getInt()));
108          boolean persistent = glob.getProperty().get("persistent", true);
109          long lifeTime = glob.getProperty().get("lifeTime", -1L);
110          boolean forceUpdate = glob.getProperty().get("forceUpdate", true);
111          boolean forceDestroy = glob.getProperty().get("forceDestroy", false);
112          boolean readonly = glob.getProperty().get("readonly", false);
113          long destroyDelay = glob.getProperty().get("destroyDelay", 60000L);
114          boolean createDomEntry = glob.getProperty().get("createDomEntry", true);
115          boolean consumableQueue = glob.getProperty().get("consumableQueue", false);
116          long historyMaxMsg = glob.getProperty().get("queue/history/maxEntries", -1L);
117          boolean forceQueuing = glob.getProperty().get("forceQueuing", true);
118          boolean subscribable = glob.getProperty().get("subscribable", true);
119          String destination = glob.getProperty().get("destination", (String)null);
120          boolean erase = glob.getProperty().get("erase", true);
121          boolean disconnect = glob.getProperty().get("disconnect", true);
122          final boolean eraseTailback = glob.getProperty().get("eraseTailback", false);
123          int contentSize = glob.getProperty().get("contentSize", -1); // 2000000);
124          boolean eraseForceDestroy = glob.getProperty().get("erase.forceDestroy", false);
125          final String updateDumpToFile = glob.getProperty().get("update.dumpToFile", (String)null);
126          boolean connectPersistent = glob.getProperty().get("connect/qos/persistent", false);
127          String contentMime = glob.getProperty().get("contentMime", "text/xml");
128          String contentMimeExtended = glob.getProperty().get("contentMimeExtended", "1.0");
129 
130          Map clientPropertyMap = glob.getProperty().get("clientProperty", (Map)null);
131          Map connectQosClientPropertyMap = glob.getProperty().get("connect/qos/clientProperty", (Map)null);
132 
133          if (historyMaxMsg < 1 && !glob.getProperty().propertyExists("destroyDelay"))
134             destroyDelay = 24L*60L*60L*1000L; // Increase destroyDelay to one day if no history queue is used
135 
136          log.info("Used settings are:");
137          log.info("   -interactive    " + interactive);
138          log.info("   -sleep          " + Timestamp.millisToNice(sleep));
139          log.info("   -oneway         " + oneway);
140          log.info("   -erase          " + erase);
141          log.info("   -disconnect     " + disconnect);
142          log.info("   -eraseTailback  " + eraseTailback);
143          log.info(" Pub/Sub settings");
144          log.info("   -numPublish     " + numPublish);
145          log.info("   -oid            " + oid);
146          log.info("   -contentMime    " + contentMime);
147          log.info("   -contentMimeExtended " + contentMimeExtended);
148          log.info("   -clientTags     " + clientTags);
149          log.info("   -domain         " + ((domain==null)?"":domain));
150          if (contentSize >= 0) {
151             log.info("   -content        [generated]");
152             log.info("   -contentSize    " + contentSize);
153          }
154          //else if (contentFile != null && contentFile.length() > 0) {
155          //   log.info("   -contentFile    " + contentFile);
156          //}
157          else {
158             log.info("   -content        " + contentStr);
159             log.info("   -contentSize    " + contentStr.length());
160             log.info("   -contentFile    " + contentFile);
161          }
162          log.info("   -priority       " + priority.toString());
163          log.info("   -persistent     " + persistent);
164          log.info("   -lifeTime       " + Timestamp.millisToNice(lifeTime));
165          log.info("   -forceUpdate    " + forceUpdate);
166          log.info("   -forceDestroy   " + forceDestroy);
167          if (clientPropertyMap != null) {
168             Iterator it = clientPropertyMap.keySet().iterator();
169             while (it.hasNext()) {
170                String key = (String)it.next();
171                log.info("   -clientProperty["+key+"]   " + clientPropertyMap.get(key).toString());
172             }
173          }
174          else {
175             log.info("   -clientProperty[]   ");
176          }
177          log.info(" Topic settings");
178          log.info("   -readonly       " + readonly);
179          log.info("   -destroyDelay   " + Timestamp.millisToNice(destroyDelay));
180          log.info("   -createDomEntry " + createDomEntry);
181          log.info("   -queue/history/maxEntries " + historyMaxMsg);
182          log.info("   -consumableQueue " + consumableQueue);
183          log.info(" PtP settings");
184          log.info("   -subscribable   " + subscribable);
185          log.info("   -forceQueuing   " + forceQueuing);
186          log.info("   -destination    " + destination);
187          log.info(" Erase settings");
188          log.info("   -erase.forceDestroy " + eraseForceDestroy);
189          log.info("   -erase.domain   " + ((domain==null)?"":domain));
190          log.info(" Update settings");
191          log.info("   -update.dumpToFile " + updateDumpToFile);
192          log.info(" ConnectQos settings");
193          log.info("   -connect/qos/persistent " + connectPersistent);
194          if (connectQosClientPropertyMap != null) {
195             Iterator it = connectQosClientPropertyMap.keySet().iterator();
196             while (it.hasNext()) {
197                String key = (String)it.next();
198                log.info("   -connect/qos/clientProperty["+key+"]   " + connectQosClientPropertyMap.get(key).toString());
199             }
200          }
201          else {
202             log.info("   -connect/qos/clientProperty[]   ");
203          }
204          log.info("For more info please read:");
205          log.info("   http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.publish.html");
206 
207          I_XmlBlasterAccess con = glob.getXmlBlasterAccess();
208 
209          // Handle lost server explicitly
210          con.registerConnectionListener(new I_ConnectionStateListener() {
211 
212                public void reachedAlive(ConnectionStateEnum oldState,
213                                         I_XmlBlasterAccess connection) {
214                   /*
215                   ConnectReturnQos conRetQos = connection.getConnectReturnQos();
216                   log.info("I_ConnectionStateListener: We were lucky, connected to " +
217                      connection.getGlobal().getId() + " as " + conRetQos.getSessionName());
218                      */
219                   if (eraseTailback) {
220                      log.info("Destroying " + connection.getQueue().getNumOfEntries() +
221                                   " client side tailback messages");
222                      connection.getQueue().clear();
223                   }
224                }
225                public void reachedPolling(ConnectionStateEnum oldState,
226                                           I_XmlBlasterAccess connection) {
227                   log.warning("I_ConnectionStateListener: No connection to xmlBlaster server, we are polling ...");
228                }
229                public void reachedDead(ConnectionStateEnum oldState,
230                                        I_XmlBlasterAccess connection) {
231                   log.warning("I_ConnectionStateListener: Connection from " +
232                           connection.getGlobal().getId() + " to xmlBlaster is DEAD.");
233                   //System.exit(1);
234                }
235                public void reachedAliveSync(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
236                }
237 
238             });
239 
240          // This listener receives only events from asynchronously send messages from queue.
241          // e.g. after a reconnect when client side queued messages are delivered
242          con.registerPostSendListener(new I_PostSendListener() {
243             /**
244              * @see I_PostSendListener#postSend(MsgQueueEntry[])
245              */
246             public void postSend(MsgQueueEntry[] entries) {
247                try {
248                   for (int i=0; i<entries.length; i++) {
249                      if (MethodName.PUBLISH.equals(entries[i].getMethodName())) { 
250                         MsgUnit msg = entries[i].getMsgUnit();
251                         PublishReturnQos retQos = (PublishReturnQos)entries[i].getReturnObj();
252                         log.info("Send asynchronously message '" + msg.getKeyOid() + "' from queue: " + retQos.toXml());
253                      }
254                      else
255                         log.info("Send asynchronously " + entries[i].getMethodName() + " message from queue");
256                   }
257                } catch (Throwable e) {
258                   e.printStackTrace();
259                }
260             }
261 
262             /**
263              * @see I_PostSendListener#sendingFailed(MsgQueueEntry[], XmlBlasterException)
264              */
265             public boolean sendingFailed(MsgQueueEntry[] entries, XmlBlasterException ex) {
266                try {
267                   for (int i=0; i<entries.length; i++) {
268                      if (MethodName.PUBLISH.equals(entries[i].getMethodName())) { 
269                         MsgUnit msg = entries[i].getMsgUnit();
270                         log.info("Send asynchronously message '" + msg.getKeyOid() + "' from queue failed: " + ex.getMessage());
271                      }
272                      else
273                         log.info("Send asynchronously " + entries[i].getMethodName() + " message from queue");
274                   }
275                } catch (Throwable e) {
276                   e.printStackTrace();
277                }
278                //return true; // true: We have handled the case (safely stored the message) and it may be removed from connection queue
279                return false; // false: Default error handling: message remains in queue and we go to dead
280             }
281          });
282 
283          // ConnectQos checks -session.name and -passwd from command line
284          log.info("============= CreatingConnectQos");
285          ConnectQos qos = new ConnectQos(glob);
286          if (connectPersistent) {
287             qos.setPersistent(connectPersistent);
288          }
289          // "__remoteProperties"
290          qos.getData().addClientProperty(Constants.CLIENTPROPERTY_REMOTEPROPERTIES, true);
291          if (connectQosClientPropertyMap != null) {
292             Iterator it = connectQosClientPropertyMap.keySet().iterator();
293             while (it.hasNext()) {
294                String key = (String)it.next();
295                qos.addClientProperty(key, connectQosClientPropertyMap.get(key).toString());
296             }
297          }
298          log.info("ConnectQos is " + qos.toXml());
299          ConnectReturnQos crq = con.connect(qos, new I_Callback() {
300          public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) throws XmlBlasterException {
301             try {
302                if (updateDumpToFile == null) {
303                   log.info("Received '" + updateKey.getOid() + "':" + new String(content, "UTF-8"));
304                }
305                else {
306                   FileLocator.writeFile(updateDumpToFile, content);
307                   log.info("Received '" + updateKey.getOid() + "' size = " + content.length + " dumped to file " + updateDumpToFile);
308                }
309             } catch (UnsupportedEncodingException e) {
310                log.severe("Update failed: " + e.toString());
311             }
312             return "";
313          }
314          });  // Login to xmlBlaster, register for updates
315          log.info("Connect success as " + crq.toXml());
316 
317          org.xmlBlaster.util.StopWatch stopWatch = new org.xmlBlaster.util.StopWatch();
318          for(int i=0; true; i++) {
319             if (numPublish != -1)
320                if (i>=numPublish)
321                   break;
322 
323             String currCounter = ""+(i+1);
324             if (numPublish > 0) { // Add leading zeros to have nice justified numbers in dump
325                String tmp = ""+numPublish;
326                int curLen = currCounter.length();
327                currCounter = "";
328                for (int j=curLen; j<tmp.length(); j++) {
329                   currCounter += "0";
330                }
331                currCounter += (i+1);
332             }
333 
334             String currOid = org.xmlBlaster.util.ReplaceVariable.replaceAll(oid, "%counter", currCounter);
335 
336             if (interactive) {
337                char ret = (char)Global.waitOnKeyboardHit("Hit 'b' to break, hit other key to publish '" + currOid + "' #" + currCounter + "/" + numPublish);
338                if (ret == 'b')
339                   break;
340             }
341             else {
342                if (sleep > 0 && i > 0) {
343                   try { Thread.sleep(sleep); } catch( InterruptedException e) {}
344                }
345                log.info("Publish '" + currOid + "' #" + currCounter + "/" + numPublish);
346             }
347 
348             PublishKey pk = new PublishKey(glob, currOid, contentMime, contentMimeExtended);
349             if (domain != null) pk.setDomain(domain);
350             pk.setClientTags(org.xmlBlaster.util.ReplaceVariable.replaceAll(clientTags, "%counter", currCounter));
351             PublishQos pq = new PublishQos(glob);
352             pq.setPriority(priority);
353             pq.setPersistent(persistent);
354             pq.setLifeTime(lifeTime);
355             pq.setForceUpdate(forceUpdate);
356             pq.setForceDestroy(forceDestroy);
357             pq.setSubscribable(subscribable);
358             if (clientPropertyMap != null) {
359                Iterator it = clientPropertyMap.keySet().iterator();
360                while (it.hasNext()) {
361                   String key = (String)it.next();
362                   pq.addClientProperty(key, clientPropertyMap.get(key).toString());
363                }
364                //Example for a typed property:
365                //pq.getData().addClientProperty("ALONG", (new Long(12)));
366             }
367 
368             if (i == 0) {
369                TopicProperty topicProperty = new TopicProperty(glob);
370                topicProperty.setDestroyDelay(destroyDelay);
371                topicProperty.setCreateDomEntry(createDomEntry);
372                topicProperty.setReadonly(readonly);
373                if (historyMaxMsg >= 0L) {
374                   HistoryQueueProperty prop = new HistoryQueueProperty(this.glob, null);
375                   prop.setMaxEntries(historyMaxMsg);
376                   topicProperty.setHistoryQueueProperty(prop);
377                }
378                if (consumableQueue)
379                   topicProperty.setMsgDistributor("ConsumableQueue,1.0");
380                pq.setTopicProperty(topicProperty);
381                log.info("Added TopicProperty on first publish: " + topicProperty.toXml());
382             }
383 
384             if (destination != null) {
385                log.fine("Using destination: '" + destination + "'");
386                Destination dest = new Destination(glob, new SessionName(glob, destination));
387                dest.forceQueuing(forceQueuing);
388                pq.addDestination(dest);
389             }
390 
391             byte[] content;
392             if (contentSize >= 0) {
393                content = new byte[contentSize];
394                Random random = new Random();
395                for (int j=0; j<content.length; j++) {
396                   content[j] = (byte)(random.nextInt(96)+32);
397                   //content[j] = (byte)('X');
398                   //content[j] = (byte)(j % 255);
399                }
400             }
401             else if (contentFile != null && contentFile.length() > 0) {
402                content = FileLocator.readFile(contentFile);
403             }
404             else {
405                content = org.xmlBlaster.util.ReplaceVariable.replaceAll(contentStr, "%counter", ""+(i+1)).getBytes();
406             }
407 
408             if (log.isLoggable(Level.FINEST)) log.finest("Going to parse publish message: " + pk.toXml() + " : " + content + " : " + pq.toXml());
409             MsgUnit msgUnit = new MsgUnit(pk, content, pq);
410             if (log.isLoggable(Level.FINEST)) log.finest("Going to publish message: " + msgUnit.toXml());
411 
412             if (oneway) {
413                MsgUnit msgUnitArr[] = { msgUnit };
414                con.publishOneway(msgUnitArr);
415                log.info("#" + (i+1) + "/" + numPublish +
416                          ": Published oneway message '" + msgUnit.getKeyOid() + "'");
417             }
418             else {
419                PublishReturnQos prq = con.publish(msgUnit);
420                if (log.isLoggable(Level.FINEST)) log.finest("Returned: " + prq.toXml());
421 
422                log.info("#" + currCounter + "/" + numPublish +
423                          ": Got status='" + prq.getState() +
424                          (prq.getData().hasStateInfo()?"' '" + prq.getStateInfo():"") +
425                          "' rcvTimestamp=" + prq.getRcvTimestamp() +
426                          " for published message '" + prq.getKeyOid() + "'");
427             }
428          }
429          log.info("Elapsed since starting to publish: " + stopWatch.nice(numPublish));
430 
431          if (erase) {
432             char ret = 0;
433             if (interactive) {
434                ret = (char)Global.waitOnKeyboardHit("Hit 'e' to erase topic '"+oid+"', or any other key to keep the topic");
435             }
436 
437             if (ret == 0 || ret == 'e') {
438                EraseKey ek = new EraseKey(glob, oid);
439                if (domain != null) ek.setDomain(domain);
440                EraseQos eq = new EraseQos(glob);
441                eq.setForceDestroy(eraseForceDestroy);
442                if (log.isLoggable(Level.FINEST)) log.finest("Going to erase the topic: " + ek.toXml() + eq.toXml());
443                /*EraseReturnQos[] eraseArr =*/con.erase(ek, eq);
444                log.info("Erase success");
445             }
446          }
447 
448          char ret = 0;
449          if (interactive) {
450             boolean hasQueued = con.getQueue().getNumOfEntries() > 0;
451             while (ret != 'l' && ret != 'd')
452                ret = (char)Global.waitOnKeyboardHit("Hit 'l' to leave server, 'd' to disconnect" + (hasQueued ? "(and destroy client side entries)" : ""));
453          }
454 
455 
456          if (ret == 0 || ret == 'd') {
457             DisconnectQos dq = new DisconnectQos(glob);
458             dq.clearClientQueue(true);
459             con.disconnect(dq);
460             log.info("Disconnected from server, all resources released");
461          }
462          else {
463             con.leaveServer(null);
464             ret = 0;
465             if (interactive) {
466                while (ret != 'q')
467                   ret = (char)Global.waitOnKeyboardHit("Hit 'q' to quit");
468             }
469             log.info("Left server, our server side session remains, bye");
470          }
471       }
472       catch (XmlBlasterException e) {
473          log.severe(e.getMessage());
474       }
475       catch (Exception e) {
476          e.printStackTrace();
477          log.severe(e.toString());
478       }
479    }
480 
481    /**
482     * Try
483     * <pre>
484     *   java javaclients.HelloWorldPublish -help
485     * </pre>
486     * for usage help
487     */
488    public static void main(String args[]) {
489       Global glob = new Global();
490 
491       if (glob.init(args) != 0) { // Get help with -help
492          System.out.println(glob.usage());
493          System.err.println("\nExample:");
494          System.err.println("  java javaclients.HelloWorldPublish -interactive false -sleep 1000 -numPublish 10 -oid Hello -persistent true -erase true\n");
495          System.err.println("  java javaclients.HelloWorldPublish  -clientProperty[myString] Hello -clientProperty[correlationId] 100\n");
496          System.exit(1);
497       }
498 
499       new HelloWorldPublish(glob);
500    }
501 }


syntax highlighted by Code2HTML, v. 0.9.1