1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.efaps.jms;
22
23 import java.util.HashMap;
24 import java.util.Map;
25 import java.util.UUID;
26
27 import javax.jms.DeliveryMode;
28 import javax.jms.Destination;
29 import javax.jms.JMSException;
30 import javax.jms.MessageConsumer;
31 import javax.jms.MessageListener;
32 import javax.jms.MessageProducer;
33 import javax.jms.QueueConnection;
34 import javax.jms.QueueConnectionFactory;
35 import javax.jms.Session;
36 import javax.jms.Topic;
37 import javax.jms.TopicConnection;
38 import javax.jms.TopicConnectionFactory;
39 import javax.jms.TopicSession;
40 import javax.naming.Context;
41 import javax.naming.InitialContext;
42 import javax.naming.NamingException;
43
44 import org.efaps.admin.EFapsSystemConfiguration;
45 import org.efaps.admin.KernelSettings;
46 import org.efaps.admin.common.SystemConfiguration;
47 import org.efaps.admin.datamodel.Type;
48 import org.efaps.admin.program.esjp.EFapsClassLoader;
49 import org.efaps.ci.CIAdminCommon;
50 import org.efaps.db.MultiPrintQuery;
51 import org.efaps.db.QueryBuilder;
52 import org.efaps.db.SelectBuilder;
53 import org.efaps.util.EFapsException;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56
57
58
59
60
61
62
63 public final class JmsHandler
64 {
65
66
67
68 public static final String ACTIVEJMS = "org.efaps.jms.ActiveJMS";
69
70
71
72
73 private static final Logger LOG = LoggerFactory.getLogger(JmsHandler.class);
74
75
76
77
78 private static final Map<String, QueueConnection> QUEUE2QUECONN = new HashMap<String, QueueConnection>();
79
80
81
82
83 private static final Map<String, TopicConnection> TOPIC2QUECONN = new HashMap<String, TopicConnection>();
84
85
86
87
88 private static final Map<String, JmsDefinition> NAME2DEF = new HashMap<String, JmsDefinition>();
89
90
91
92
93
94 private JmsHandler()
95 {
96 }
97
98
99
100
101
102 public static void initialize()
103 throws EFapsException
104 {
105 try {
106 final SystemConfiguration config = EFapsSystemConfiguration.get();
107 if (config != null) {
108 final int timeout = config.getAttributeValueAsInteger(KernelSettings.JMS_TIMEOOUT);
109 if (timeout > 0) {
110 JmsSession.setSessionTimeout(timeout);
111 }
112 }
113
114
115 final SystemConfiguration configJms = SystemConfiguration.get(
116 UUID.fromString("65b6a1a8-c979-4471-b175-774593f1acd7"));
117 final boolean active = configJms != null && configJms.getAttributeValueAsBoolean(JmsHandler.ACTIVEJMS);
118
119 if (active) {
120
121 if (CIAdminCommon.JmsAbstract.getType() != null) {
122
123 JmsHandler.stop();
124 final Context ctx = new InitialContext();
125 final QueryBuilder queryBldr = new QueryBuilder(CIAdminCommon.JmsAbstract);
126 final MultiPrintQuery multi = queryBldr.getPrint();
127 multi.addAttribute(CIAdminCommon.JmsAbstract.Type,
128 CIAdminCommon.JmsAbstract.Name,
129 CIAdminCommon.JmsAbstract.ConnectionFactoryJNDI,
130 CIAdminCommon.JmsAbstract.DestinationJNDI);
131 final SelectBuilder sel = new SelectBuilder().linkto(CIAdminCommon.JmsAbstract.ESJPLink)
132 .file().label();
133 multi.addSelect(sel);
134 if (multi.executeWithoutAccessCheck()) {
135 JmsResourceConfig.getResourceConfig().init();
136 }
137 while (multi.next()) {
138 final Type type = multi.<Type>getAttribute(CIAdminCommon.JmsAbstract.Type);
139 final String connectionFactoryJNDI = multi
140 .<String>getAttribute(CIAdminCommon.JmsAbstract.ConnectionFactoryJNDI);
141 final String destinationJNDI = multi
142 .<String>getAttribute(CIAdminCommon.JmsAbstract.DestinationJNDI);
143 final String name = multi.<String>getAttribute(CIAdminCommon.JmsAbstract.Name);
144 final String esjp = multi.<String>getSelect(sel);
145
146 Session session;
147 if (type.isKindOf(CIAdminCommon.JmsQueueAbstract.getType())) {
148 final QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory) ctx
149 .lookup(connectionFactoryJNDI);
150 final QueueConnection queueConnection;
151 if (JmsHandler.QUEUE2QUECONN.containsKey(connectionFactoryJNDI)) {
152 queueConnection = JmsHandler.QUEUE2QUECONN.get(connectionFactoryJNDI);
153 } else {
154 queueConnection = queueConnectionFactory.createQueueConnection();
155 }
156 JmsHandler.QUEUE2QUECONN.put(connectionFactoryJNDI, queueConnection);
157 session = queueConnection.createQueueSession(false,
158 Session.AUTO_ACKNOWLEDGE);
159 queueConnection.start();
160 } else {
161 final TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) ctx
162 .lookup(connectionFactoryJNDI);
163 final TopicConnection topicConn;
164 if (JmsHandler.TOPIC2QUECONN.containsKey(connectionFactoryJNDI)) {
165 topicConn = JmsHandler.TOPIC2QUECONN.get(connectionFactoryJNDI);
166 } else {
167 topicConn = topicConnectionFactory.createTopicConnection();
168 }
169 JmsHandler.TOPIC2QUECONN.put(connectionFactoryJNDI, topicConn);
170 if (type.isKindOf(CIAdminCommon.JmsTopicDurableConsumer.getType())) {
171 topicConn.setClientID(org.efaps.db.Context.getThreadContext().getPath() + ":" + name);
172 }
173 session = topicConn.createTopicSession(true, Session.AUTO_ACKNOWLEDGE);
174 topicConn.start();
175 }
176 final Destination dest = (Destination) ctx.lookup(destinationJNDI);
177
178 if (type.isKindOf(CIAdminCommon.JmsQueueProducer.getType())
179 || type.isKindOf(CIAdminCommon.JmsTopicProducer.getType())) {
180 final MessageProducer producer;
181 if (type.isKindOf(CIAdminCommon.JmsQueueProducer.getType())) {
182 producer = session.createProducer(dest);
183 } else {
184 producer = ((TopicSession) session).createPublisher((Topic) dest);
185 }
186 producer.setDeliveryMode(DeliveryMode.PERSISTENT);
187 JmsHandler.NAME2DEF.put(name, new JmsDefinition(name, producer, session));
188 } else {
189 final MessageConsumer consumer;
190 if (type.isKindOf(CIAdminCommon.JmsQueueConsumer.getType())) {
191 consumer = session.createConsumer(dest);
192 } else if (type.isKindOf(CIAdminCommon.JmsTopicDurableConsumer.getType())) {
193 consumer = session.createDurableSubscriber((Topic) dest, name);
194 } else {
195 consumer = ((TopicSession) session).createSubscriber((Topic) dest);
196 }
197 @SuppressWarnings("unchecked")
198 final Class<? extends MessageListener> clazz = (Class<? extends MessageListener>) Class
199 .forName(esjp.trim(), false, EFapsClassLoader.getInstance());
200 final MessageListener myListener = clazz.newInstance();
201 consumer.setMessageListener(myListener);
202 }
203
204 }
205 }
206 }
207 } catch (final NamingException e) {
208
209 JmsHandler.LOG.warn("Naming exception for JMS", e);
210 } catch (final JMSException e) {
211
212 JmsHandler.LOG.error("JMS not working!!!!!!!", e);
213 } catch (final ClassNotFoundException e) {
214 throw new EFapsException("ClassNotFoundException", e);
215 } catch (final InstantiationException e) {
216 throw new EFapsException("InstantiationException", e);
217 } catch (final IllegalAccessException e) {
218 throw new EFapsException("IllegalAccessException", e);
219 }
220 }
221
222
223
224
225
226 public static JmsDefinition getJmsDefinition(final String _name)
227 {
228 return JmsHandler.NAME2DEF.get(_name);
229 }
230
231
232
233
234 public static void addJmsDefintion(final JmsDefinition _jmsDefinition)
235 {
236 JmsHandler.NAME2DEF.put(_jmsDefinition.getName(), _jmsDefinition);
237 }
238
239
240
241
242
243 public static void stop()
244 throws EFapsException
245 {
246 for (final QueueConnection queCon : JmsHandler.QUEUE2QUECONN.values()) {
247 try {
248 queCon.close();
249 } catch (final JMSException e) {
250 throw new EFapsException("JMSException", e);
251 }
252 }
253 for (final TopicConnection queCon : JmsHandler.TOPIC2QUECONN.values()) {
254 try {
255 queCon.close();
256 } catch (final JMSException e) {
257 throw new EFapsException("JMSException", e);
258 }
259 }
260 JmsHandler.TOPIC2QUECONN.clear();
261 JmsHandler.QUEUE2QUECONN.clear();
262 JmsHandler.NAME2DEF.clear();
263 }
264
265
266
267
268 public static class JmsDefinition
269 {
270
271
272
273
274 private final String name;
275
276
277
278
279 private final MessageProducer messageProducer;
280
281
282
283
284 private final Session session;
285
286
287
288
289
290
291 public JmsDefinition(final String _name,
292 final MessageProducer _producer,
293 final Session _session)
294 {
295 this.name = _name;
296 this.messageProducer = _producer;
297 this.session = _session;
298 }
299
300
301
302
303
304
305
306 public Session getSession()
307 {
308 return this.session;
309 }
310
311
312
313
314
315
316
317 public MessageProducer getMessageProducer()
318 {
319 return this.messageProducer;
320 }
321
322
323
324
325
326
327
328 public String getName()
329 {
330 return this.name;
331 }
332 }
333 }