1 // xmlBlaster/demo/javaclients/HelloWorldPublish.java
2 package javaclients;
3
4 import java.io.UnsupportedEncodingException;
5 import java.util.Iterator;
6 import java.util.Map;
7 import java.util.Random;
8 import java.util.logging.Level;
9 import java.util.logging.Logger;
10
11 import org.xmlBlaster.client.I_Callback;
12 import org.xmlBlaster.client.I_ConnectionStateListener;
13 import org.xmlBlaster.client.I_XmlBlasterAccess;
14 import org.xmlBlaster.client.key.EraseKey;
15 import org.xmlBlaster.client.key.PublishKey;
16 import org.xmlBlaster.client.key.UpdateKey;
17 import org.xmlBlaster.client.qos.ConnectQos;
18 import org.xmlBlaster.client.qos.ConnectReturnQos;
19 import org.xmlBlaster.client.qos.DisconnectQos;
20 import org.xmlBlaster.client.qos.EraseQos;
21 import org.xmlBlaster.client.qos.PublishQos;
22 import org.xmlBlaster.client.qos.PublishReturnQos;
23 import org.xmlBlaster.client.qos.UpdateQos;
24 import org.xmlBlaster.util.FileLocator;
25 import org.xmlBlaster.util.Global;
26 import org.xmlBlaster.util.MsgUnit;
27 import org.xmlBlaster.util.SessionName;
28 import org.xmlBlaster.util.Timestamp;
29 import org.xmlBlaster.util.XmlBlasterException;
30 import org.xmlBlaster.util.def.Constants;
31 import org.xmlBlaster.util.def.MethodName;
32 import org.xmlBlaster.util.def.PriorityEnum;
33 import org.xmlBlaster.util.dispatch.ConnectionStateEnum;
34 import org.xmlBlaster.util.dispatch.I_PostSendListener;
35 import org.xmlBlaster.util.qos.TopicProperty;
36 import org.xmlBlaster.util.qos.address.Destination;
37 import org.xmlBlaster.util.qos.storage.HistoryQueueProperty;
38 import org.xmlBlaster.util.queuemsg.MsgQueueEntry;
39
40 /**
41 * This client connects to xmlBlaster and publishes a configurable amount of messages.
42 * <p>
43 * This is a nice client to experiment and play with xmlBlaster as there are many
44 * command line options to specify the type and amount of messages published.
45 * </p>
46 * <p>
47 * Try using 'java javaclients.HelloWorldSubscribe' in another window to subscribe to
48 * our messages.
49 * Further you can type 'd' in the window running xmlBlaster to get a server dump.
50 * </p>
51 *
52 * Invoke (after starting the xmlBlaster server):
53 * <pre>
54 *Publish manually 10 messages:
55 * java javaclients.HelloWorldPublish -interactive true -numPublish 10 -oid Hello -persistent true -erase true
56 *
57 *Publish automatically 10 messages and sleep 1 sec in between:
58 * java javaclients.HelloWorldPublish -interactive false -sleep 1000 -numPublish 10 -oid Hello -persistent true -erase true
59 *
60 *Publish automatically 10 different topics with different DOM entries:
61 * java javaclients.HelloWorldPublish -interactive false -numPublish 10 -oid Hello-%counter -clientTags "<org.xmlBlaster><demo-%counter/></org.xmlBlaster>"
62 *
63 *Login as joe/5 and send one persistent message:
64 * java javaclients.HelloWorldPublish -session.name joe/5 -passwd secret -persistent true -dump[HelloWorldPublish] true
65 *
66 *Send a PtP message:
67 * java javaclients.HelloWorldPublish -destination jack/17 -forceQueuing true -persistent true -subscribable true
68 *
69 *Add some client properties which will be send in the qos to the receivers:
70 * java javaclients.HelloWorldPublish -clientProperty[transactionId] 0x23345 -clientProperty[myName] jack
71 *creates a publish Qos containing:
72 * <clientProperty name='transactionId'>0x23345</clientProperty>
73 * <clientProperty name='myName'>jack</clientProperty>
74 * </pre>
75 * <p>
76 * If interactive is false, the sleep gives the number of millis to sleep before publishing the next message.
77 * </p>
78 * <p>
79 * If erase=false the message is not erase at the end, if disconnect=false we don't logout at the end.
80 * </p>
81 * <p>
82 * You can add '%counter' to the clientTags or the content string, each occurrence will be replaced
83 * by the current message number.
84 * </p>
85 * @see javaclients.HelloWorldSubscribe
86 * @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.html" target="others">xmlBlaster interface</a>
87 */
88 public class HelloWorldPublish
89 {
90 private final Global glob;
91 private static Logger log = Logger.getLogger(HelloWorldPublish.class.getName());
92
93 public HelloWorldPublish(Global glob) {
94 this.glob = glob;
95
96 try {
97 boolean interactive = glob.getProperty().get("interactive", true);
98 boolean oneway = glob.getProperty().get("oneway", false);
99 long sleep = glob.getProperty().get("sleep", 1000L);
100 int numPublish = glob.getProperty().get("numPublish", 2000);
101 String oid = glob.getProperty().get("oid", "Hello"); // "HelloTopic_#%counter"
102 String domain = glob.getProperty().get("domain", (String)null);
103 String clientTags = glob.getProperty().get("clientTags", "<org.xmlBlaster><demo/></org.xmlBlaster>");
104 //String clientTags = glob.getProperty().get("clientTags", "<org.xmlBlaster><demo-%counter/></org.xmlBlaster>");
105 String contentStr = glob.getProperty().get("content", "Hi-%counter");
106 String contentFile = glob.getProperty().get("contentFile", (String)null);
107 PriorityEnum priority = PriorityEnum.toPriorityEnum(glob.getProperty().get("priority", PriorityEnum.NORM_PRIORITY.getInt()));
108 boolean persistent = glob.getProperty().get("persistent", true);
109 long lifeTime = glob.getProperty().get("lifeTime", -1L);
110 boolean forceUpdate = glob.getProperty().get("forceUpdate", true);
111 boolean forceDestroy = glob.getProperty().get("forceDestroy", false);
112 boolean readonly = glob.getProperty().get("readonly", false);
113 long destroyDelay = glob.getProperty().get("destroyDelay", 60000L);
114 boolean createDomEntry = glob.getProperty().get("createDomEntry", true);
115 boolean consumableQueue = glob.getProperty().get("consumableQueue", false);
116 long historyMaxMsg = glob.getProperty().get("queue/history/maxEntries", -1L);
117 boolean forceQueuing = glob.getProperty().get("forceQueuing", true);
118 boolean subscribable = glob.getProperty().get("subscribable", true);
119 String destination = glob.getProperty().get("destination", (String)null);
120 boolean erase = glob.getProperty().get("erase", true);
121 boolean disconnect = glob.getProperty().get("disconnect", true);
122 final boolean eraseTailback = glob.getProperty().get("eraseTailback", false);
123 int contentSize = glob.getProperty().get("contentSize", -1); // 2000000);
124 boolean eraseForceDestroy = glob.getProperty().get("erase.forceDestroy", false);
125 final String updateDumpToFile = glob.getProperty().get("update.dumpToFile", (String)null);
126 boolean connectPersistent = glob.getProperty().get("connect/qos/persistent", false);
127 String contentMime = glob.getProperty().get("contentMime", "text/xml");
128 String contentMimeExtended = glob.getProperty().get("contentMimeExtended", "1.0");
129
130 Map clientPropertyMap = glob.getProperty().get("clientProperty", (Map)null);
131 Map connectQosClientPropertyMap = glob.getProperty().get("connect/qos/clientProperty", (Map)null);
132
133 if (historyMaxMsg < 1 && !glob.getProperty().propertyExists("destroyDelay"))
134 destroyDelay = 24L*60L*60L*1000L; // Increase destroyDelay to one day if no history queue is used
135
136 log.info("Used settings are:");
137 log.info(" -interactive " + interactive);
138 log.info(" -sleep " + Timestamp.millisToNice(sleep));
139 log.info(" -oneway " + oneway);
140 log.info(" -erase " + erase);
141 log.info(" -disconnect " + disconnect);
142 log.info(" -eraseTailback " + eraseTailback);
143 log.info(" Pub/Sub settings");
144 log.info(" -numPublish " + numPublish);
145 log.info(" -oid " + oid);
146 log.info(" -contentMime " + contentMime);
147 log.info(" -contentMimeExtended " + contentMimeExtended);
148 log.info(" -clientTags " + clientTags);
149 log.info(" -domain " + ((domain==null)?"":domain));
150 if (contentSize >= 0) {
151 log.info(" -content [generated]");
152 log.info(" -contentSize " + contentSize);
153 }
154 //else if (contentFile != null && contentFile.length() > 0) {
155 // log.info(" -contentFile " + contentFile);
156 //}
157 else {
158 log.info(" -content " + contentStr);
159 log.info(" -contentSize " + contentStr.length());
160 log.info(" -contentFile " + contentFile);
161 }
162 log.info(" -priority " + priority.toString());
163 log.info(" -persistent " + persistent);
164 log.info(" -lifeTime " + Timestamp.millisToNice(lifeTime));
165 log.info(" -forceUpdate " + forceUpdate);
166 log.info(" -forceDestroy " + forceDestroy);
167 if (clientPropertyMap != null) {
168 Iterator it = clientPropertyMap.keySet().iterator();
169 while (it.hasNext()) {
170 String key = (String)it.next();
171 log.info(" -clientProperty["+key+"] " + clientPropertyMap.get(key).toString());
172 }
173 }
174 else {
175 log.info(" -clientProperty[] ");
176 }
177 log.info(" Topic settings");
178 log.info(" -readonly " + readonly);
179 log.info(" -destroyDelay " + Timestamp.millisToNice(destroyDelay));
180 log.info(" -createDomEntry " + createDomEntry);
181 log.info(" -queue/history/maxEntries " + historyMaxMsg);
182 log.info(" -consumableQueue " + consumableQueue);
183 log.info(" PtP settings");
184 log.info(" -subscribable " + subscribable);
185 log.info(" -forceQueuing " + forceQueuing);
186 log.info(" -destination " + destination);
187 log.info(" Erase settings");
188 log.info(" -erase.forceDestroy " + eraseForceDestroy);
189 log.info(" -erase.domain " + ((domain==null)?"":domain));
190 log.info(" Update settings");
191 log.info(" -update.dumpToFile " + updateDumpToFile);
192 log.info(" ConnectQos settings");
193 log.info(" -connect/qos/persistent " + connectPersistent);
194 if (connectQosClientPropertyMap != null) {
195 Iterator it = connectQosClientPropertyMap.keySet().iterator();
196 while (it.hasNext()) {
197 String key = (String)it.next();
198 log.info(" -connect/qos/clientProperty["+key+"] " + connectQosClientPropertyMap.get(key).toString());
199 }
200 }
201 else {
202 log.info(" -connect/qos/clientProperty[] ");
203 }
204 log.info("For more info please read:");
205 log.info(" http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.publish.html");
206
207 I_XmlBlasterAccess con = glob.getXmlBlasterAccess();
208
209 // Handle lost server explicitly
210 con.registerConnectionListener(new I_ConnectionStateListener() {
211
212 public void reachedAlive(ConnectionStateEnum oldState,
213 I_XmlBlasterAccess connection) {
214 /*
215 ConnectReturnQos conRetQos = connection.getConnectReturnQos();
216 log.info("I_ConnectionStateListener: We were lucky, connected to " +
217 connection.getGlobal().getId() + " as " + conRetQos.getSessionName());
218 */
219 if (eraseTailback) {
220 log.info("Destroying " + connection.getQueue().getNumOfEntries() +
221 " client side tailback messages");
222 connection.getQueue().clear();
223 }
224 }
225 public void reachedPolling(ConnectionStateEnum oldState,
226 I_XmlBlasterAccess connection) {
227 log.warning("I_ConnectionStateListener: No connection to xmlBlaster server, we are polling ...");
228 }
229 public void reachedDead(ConnectionStateEnum oldState,
230 I_XmlBlasterAccess connection) {
231 log.warning("I_ConnectionStateListener: Connection from " +
232 connection.getGlobal().getId() + " to xmlBlaster is DEAD.");
233 //System.exit(1);
234 }
235 public void reachedAliveSync(ConnectionStateEnum oldState, I_XmlBlasterAccess connection) {
236 }
237
238 });
239
240 // This listener receives only events from asynchronously send messages from queue.
241 // e.g. after a reconnect when client side queued messages are delivered
242 con.registerPostSendListener(new I_PostSendListener() {
243 /**
244 * @see I_PostSendListener#postSend(MsgQueueEntry[])
245 */
246 public void postSend(MsgQueueEntry[] entries) {
247 try {
248 for (int i=0; i<entries.length; i++) {
249 if (MethodName.PUBLISH.equals(entries[i].getMethodName())) {
250 MsgUnit msg = entries[i].getMsgUnit();
251 PublishReturnQos retQos = (PublishReturnQos)entries[i].getReturnObj();
252 log.info("Send asynchronously message '" + msg.getKeyOid() + "' from queue: " + retQos.toXml());
253 }
254 else
255 log.info("Send asynchronously " + entries[i].getMethodName() + " message from queue");
256 }
257 } catch (Throwable e) {
258 e.printStackTrace();
259 }
260 }
261
262 /**
263 * @see I_PostSendListener#sendingFailed(MsgQueueEntry[], XmlBlasterException)
264 */
265 public boolean sendingFailed(MsgQueueEntry[] entries, XmlBlasterException ex) {
266 try {
267 for (int i=0; i<entries.length; i++) {
268 if (MethodName.PUBLISH.equals(entries[i].getMethodName())) {
269 MsgUnit msg = entries[i].getMsgUnit();
270 log.info("Send asynchronously message '" + msg.getKeyOid() + "' from queue failed: " + ex.getMessage());
271 }
272 else
273 log.info("Send asynchronously " + entries[i].getMethodName() + " message from queue");
274 }
275 } catch (Throwable e) {
276 e.printStackTrace();
277 }
278 //return true; // true: We have handled the case (safely stored the message) and it may be removed from connection queue
279 return false; // false: Default error handling: message remains in queue and we go to dead
280 }
281 });
282
283 // ConnectQos checks -session.name and -passwd from command line
284 log.info("============= CreatingConnectQos");
285 ConnectQos qos = new ConnectQos(glob);
286 if (connectPersistent) {
287 qos.setPersistent(connectPersistent);
288 }
289 // "__remoteProperties"
290 qos.getData().addClientProperty(Constants.CLIENTPROPERTY_REMOTEPROPERTIES, true);
291 if (connectQosClientPropertyMap != null) {
292 Iterator it = connectQosClientPropertyMap.keySet().iterator();
293 while (it.hasNext()) {
294 String key = (String)it.next();
295 qos.addClientProperty(key, connectQosClientPropertyMap.get(key).toString());
296 }
297 }
298 log.info("ConnectQos is " + qos.toXml());
299 ConnectReturnQos crq = con.connect(qos, new I_Callback() {
300 public String update(String cbSessionId, UpdateKey updateKey, byte[] content, UpdateQos updateQos) throws XmlBlasterException {
301 try {
302 if (updateDumpToFile == null) {
303 log.info("Received '" + updateKey.getOid() + "':" + new String(content, "UTF-8"));
304 }
305 else {
306 FileLocator.writeFile(updateDumpToFile, content);
307 log.info("Received '" + updateKey.getOid() + "' size = " + content.length + " dumped to file " + updateDumpToFile);
308 }
309 } catch (UnsupportedEncodingException e) {
310 log.severe("Update failed: " + e.toString());
311 }
312 return "";
313 }
314 }); // Login to xmlBlaster, register for updates
315 log.info("Connect success as " + crq.toXml());
316
317 org.xmlBlaster.util.StopWatch stopWatch = new org.xmlBlaster.util.StopWatch();
318 for(int i=0; true; i++) {
319 if (numPublish != -1)
320 if (i>=numPublish)
321 break;
322
323 String currCounter = ""+(i+1);
324 if (numPublish > 0) { // Add leading zeros to have nice justified numbers in dump
325 String tmp = ""+numPublish;
326 int curLen = currCounter.length();
327 currCounter = "";
328 for (int j=curLen; j<tmp.length(); j++) {
329 currCounter += "0";
330 }
331 currCounter += (i+1);
332 }
333
334 String currOid = org.xmlBlaster.util.ReplaceVariable.replaceAll(oid, "%counter", currCounter);
335
336 if (interactive) {
337 char ret = (char)Global.waitOnKeyboardHit("Hit 'b' to break, hit other key to publish '" + currOid + "' #" + currCounter + "/" + numPublish);
338 if (ret == 'b')
339 break;
340 }
341 else {
342 if (sleep > 0 && i > 0) {
343 try { Thread.sleep(sleep); } catch( InterruptedException e) {}
344 }
345 log.info("Publish '" + currOid + "' #" + currCounter + "/" + numPublish);
346 }
347
348 PublishKey pk = new PublishKey(glob, currOid, contentMime, contentMimeExtended);
349 if (domain != null) pk.setDomain(domain);
350 pk.setClientTags(org.xmlBlaster.util.ReplaceVariable.replaceAll(clientTags, "%counter", currCounter));
351 PublishQos pq = new PublishQos(glob);
352 pq.setPriority(priority);
353 pq.setPersistent(persistent);
354 pq.setLifeTime(lifeTime);
355 pq.setForceUpdate(forceUpdate);
356 pq.setForceDestroy(forceDestroy);
357 pq.setSubscribable(subscribable);
358 if (clientPropertyMap != null) {
359 Iterator it = clientPropertyMap.keySet().iterator();
360 while (it.hasNext()) {
361 String key = (String)it.next();
362 pq.addClientProperty(key, clientPropertyMap.get(key).toString());
363 }
364 //Example for a typed property:
365 //pq.getData().addClientProperty("ALONG", (new Long(12)));
366 }
367
368 if (i == 0) {
369 TopicProperty topicProperty = new TopicProperty(glob);
370 topicProperty.setDestroyDelay(destroyDelay);
371 topicProperty.setCreateDomEntry(createDomEntry);
372 topicProperty.setReadonly(readonly);
373 if (historyMaxMsg >= 0L) {
374 HistoryQueueProperty prop = new HistoryQueueProperty(this.glob, null);
375 prop.setMaxEntries(historyMaxMsg);
376 topicProperty.setHistoryQueueProperty(prop);
377 }
378 if (consumableQueue)
379 topicProperty.setMsgDistributor("ConsumableQueue,1.0");
380 pq.setTopicProperty(topicProperty);
381 log.info("Added TopicProperty on first publish: " + topicProperty.toXml());
382 }
383
384 if (destination != null) {
385 log.fine("Using destination: '" + destination + "'");
386 Destination dest = new Destination(glob, new SessionName(glob, destination));
387 dest.forceQueuing(forceQueuing);
388 pq.addDestination(dest);
389 }
390
391 byte[] content;
392 if (contentSize >= 0) {
393 content = new byte[contentSize];
394 Random random = new Random();
395 for (int j=0; j<content.length; j++) {
396 content[j] = (byte)(random.nextInt(96)+32);
397 //content[j] = (byte)('X');
398 //content[j] = (byte)(j % 255);
399 }
400 }
401 else if (contentFile != null && contentFile.length() > 0) {
402 content = FileLocator.readFile(contentFile);
403 }
404 else {
405 content = org.xmlBlaster.util.ReplaceVariable.replaceAll(contentStr, "%counter", ""+(i+1)).getBytes();
406 }
407
408 if (log.isLoggable(Level.FINEST)) log.finest("Going to parse publish message: " + pk.toXml() + " : " + content + " : " + pq.toXml());
409 MsgUnit msgUnit = new MsgUnit(pk, content, pq);
410 if (log.isLoggable(Level.FINEST)) log.finest("Going to publish message: " + msgUnit.toXml());
411
412 if (oneway) {
413 MsgUnit msgUnitArr[] = { msgUnit };
414 con.publishOneway(msgUnitArr);
415 log.info("#" + (i+1) + "/" + numPublish +
416 ": Published oneway message '" + msgUnit.getKeyOid() + "'");
417 }
418 else {
419 PublishReturnQos prq = con.publish(msgUnit);
420 if (log.isLoggable(Level.FINEST)) log.finest("Returned: " + prq.toXml());
421
422 log.info("#" + currCounter + "/" + numPublish +
423 ": Got status='" + prq.getState() +
424 (prq.getData().hasStateInfo()?"' '" + prq.getStateInfo():"") +
425 "' rcvTimestamp=" + prq.getRcvTimestamp() +
426 " for published message '" + prq.getKeyOid() + "'");
427 }
428 }
429 log.info("Elapsed since starting to publish: " + stopWatch.nice(numPublish));
430
431 if (erase) {
432 char ret = 0;
433 if (interactive) {
434 ret = (char)Global.waitOnKeyboardHit("Hit 'e' to erase topic '"+oid+"', or any other key to keep the topic");
435 }
436
437 if (ret == 0 || ret == 'e') {
438 EraseKey ek = new EraseKey(glob, oid);
439 if (domain != null) ek.setDomain(domain);
440 EraseQos eq = new EraseQos(glob);
441 eq.setForceDestroy(eraseForceDestroy);
442 if (log.isLoggable(Level.FINEST)) log.finest("Going to erase the topic: " + ek.toXml() + eq.toXml());
443 /*EraseReturnQos[] eraseArr =*/con.erase(ek, eq);
444 log.info("Erase success");
445 }
446 }
447
448 char ret = 0;
449 if (interactive) {
450 boolean hasQueued = con.getQueue().getNumOfEntries() > 0;
451 while (ret != 'l' && ret != 'd')
452 ret = (char)Global.waitOnKeyboardHit("Hit 'l' to leave server, 'd' to disconnect" + (hasQueued ? "(and destroy client side entries)" : ""));
453 }
454
455
456 if (ret == 0 || ret == 'd') {
457 DisconnectQos dq = new DisconnectQos(glob);
458 dq.clearClientQueue(true);
459 con.disconnect(dq);
460 log.info("Disconnected from server, all resources released");
461 }
462 else {
463 con.leaveServer(null);
464 ret = 0;
465 if (interactive) {
466 while (ret != 'q')
467 ret = (char)Global.waitOnKeyboardHit("Hit 'q' to quit");
468 }
469 log.info("Left server, our server side session remains, bye");
470 }
471 }
472 catch (XmlBlasterException e) {
473 log.severe(e.getMessage());
474 }
475 catch (Exception e) {
476 e.printStackTrace();
477 log.severe(e.toString());
478 }
479 }
480
481 /**
482 * Try
483 * <pre>
484 * java javaclients.HelloWorldPublish -help
485 * </pre>
486 * for usage help
487 */
488 public static void main(String args[]) {
489 Global glob = new Global();
490
491 if (glob.init(args) != 0) { // Get help with -help
492 System.out.println(glob.usage());
493 System.err.println("\nExample:");
494 System.err.println(" java javaclients.HelloWorldPublish -interactive false -sleep 1000 -numPublish 10 -oid Hello -persistent true -erase true\n");
495 System.err.println(" java javaclients.HelloWorldPublish -clientProperty[myString] Hello -clientProperty[correlationId] 100\n");
496 System.exit(1);
497 }
498
499 new HelloWorldPublish(glob);
500 }
501 }
syntax highlighted by Code2HTML, v. 0.9.1