1   /*
2    * Copyright 2003 - 2013 The eFaps Team
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   *
16   * Revision:        $Rev$
17   * Last Changed:    $Date$
18   * Last Changed By: $Author$
19   */
20  
21  package org.efaps.jms;
22  
23  import java.util.HashMap;
24  import java.util.Map;
25  import java.util.UUID;
26  
27  import javax.jms.DeliveryMode;
28  import javax.jms.Destination;
29  import javax.jms.JMSException;
30  import javax.jms.MessageConsumer;
31  import javax.jms.MessageListener;
32  import javax.jms.MessageProducer;
33  import javax.jms.QueueConnection;
34  import javax.jms.QueueConnectionFactory;
35  import javax.jms.Session;
36  import javax.jms.Topic;
37  import javax.jms.TopicConnection;
38  import javax.jms.TopicConnectionFactory;
39  import javax.jms.TopicSession;
40  import javax.naming.Context;
41  import javax.naming.InitialContext;
42  import javax.naming.NamingException;
43  
44  import org.efaps.admin.EFapsSystemConfiguration;
45  import org.efaps.admin.KernelSettings;
46  import org.efaps.admin.common.SystemConfiguration;
47  import org.efaps.admin.datamodel.Type;
48  import org.efaps.admin.program.esjp.EFapsClassLoader;
49  import org.efaps.ci.CIAdminCommon;
50  import org.efaps.db.MultiPrintQuery;
51  import org.efaps.db.QueryBuilder;
52  import org.efaps.db.SelectBuilder;
53  import org.efaps.util.EFapsException;
54  import org.slf4j.Logger;
55  import org.slf4j.LoggerFactory;
56  
57  /**
58   * TODO comment!
59   *
60   * @author The eFaps Team
61   * @version $Id$
62   */
63  public final class JmsHandler
64  {
65      /**
66       * Key to a configuration in JMS-module.
67       */
68      public static final String ACTIVEJMS = "org.efaps.jms.ActiveJMS";
69  
70      /**
71       * Logger for this class.
72       */
73      private static final Logger LOG = LoggerFactory.getLogger(JmsHandler.class);
74  
75      /**
76       * Mapping of JNDI-Name of the ConnectionFactory to QueueConnection.
77       */
78      private static final Map<String, QueueConnection> QUEUE2QUECONN = new HashMap<String, QueueConnection>();
79  
80      /**
81       * Mapping of JNDI-Name of the ConnectionFactory to TopicConnection.
82       */
83      private static final Map<String, TopicConnection> TOPIC2QUECONN = new HashMap<String, TopicConnection>();
84  
85      /**
86       * Mapping of a name to a JmsDefinition.
87       */
88      private static final Map<String, JmsDefinition> NAME2DEF = new HashMap<String, JmsDefinition>();
89  
90  
91      /**
92       * Create Singelton.
93       */
94      private JmsHandler()
95      {
96      }
97  
98      /**
99       * Initialize the Jms.
100      * @throws EFapsException on error
101      */
102     public static void initialize()
103         throws EFapsException
104     {
105         try {
106             final SystemConfiguration config = EFapsSystemConfiguration.get();
107             if (config != null) {
108                 final int timeout = config.getAttributeValueAsInteger(KernelSettings.JMS_TIMEOOUT);
109                 if (timeout > 0) {
110                     JmsSession.setSessionTimeout(timeout);
111                 }
112             }
113 
114             //JMS-Configuration
115             final SystemConfiguration configJms = SystemConfiguration.get(
116                             UUID.fromString("65b6a1a8-c979-4471-b175-774593f1acd7"));
117             final boolean active = configJms != null && configJms.getAttributeValueAsBoolean(JmsHandler.ACTIVEJMS);
118 
119             if (active) {
120             // this check is necessary for first install and update
121                 if (CIAdminCommon.JmsAbstract.getType() != null) {
122                     // remove any existing
123                     JmsHandler.stop();
124                     final Context ctx = new InitialContext();
125                     final QueryBuilder queryBldr = new QueryBuilder(CIAdminCommon.JmsAbstract);
126                     final MultiPrintQuery multi = queryBldr.getPrint();
127                     multi.addAttribute(CIAdminCommon.JmsAbstract.Type,
128                                     CIAdminCommon.JmsAbstract.Name,
129                                     CIAdminCommon.JmsAbstract.ConnectionFactoryJNDI,
130                                     CIAdminCommon.JmsAbstract.DestinationJNDI);
131                     final SelectBuilder sel = new SelectBuilder().linkto(CIAdminCommon.JmsAbstract.ESJPLink)
132                                     .file().label();
133                     multi.addSelect(sel);
134                     if (multi.executeWithoutAccessCheck()) {
135                         JmsResourceConfig.getResourceConfig().init();
136                     }
137                     while (multi.next()) {
138                         final Type type = multi.<Type>getAttribute(CIAdminCommon.JmsAbstract.Type);
139                         final String connectionFactoryJNDI = multi
140                                         .<String>getAttribute(CIAdminCommon.JmsAbstract.ConnectionFactoryJNDI);
141                         final String destinationJNDI = multi
142                                         .<String>getAttribute(CIAdminCommon.JmsAbstract.DestinationJNDI);
143                         final String name = multi.<String>getAttribute(CIAdminCommon.JmsAbstract.Name);
144                         final String esjp = multi.<String>getSelect(sel);
145 
146                         Session session;
147                         if (type.isKindOf(CIAdminCommon.JmsQueueAbstract.getType())) {
148                             final QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory) ctx
149                                             .lookup(connectionFactoryJNDI);
150                             final QueueConnection queueConnection;
151                             if (JmsHandler.QUEUE2QUECONN.containsKey(connectionFactoryJNDI)) {
152                                 queueConnection = JmsHandler.QUEUE2QUECONN.get(connectionFactoryJNDI);
153                             } else {
154                                 queueConnection = queueConnectionFactory.createQueueConnection();
155                             }
156                             JmsHandler.QUEUE2QUECONN.put(connectionFactoryJNDI, queueConnection);
157                             session = queueConnection.createQueueSession(false,
158                                             Session.AUTO_ACKNOWLEDGE);
159                             queueConnection.start();
160                         } else {
161                             final TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) ctx
162                                             .lookup(connectionFactoryJNDI);
163                             final TopicConnection topicConn;
164                             if (JmsHandler.TOPIC2QUECONN.containsKey(connectionFactoryJNDI)) {
165                                 topicConn = JmsHandler.TOPIC2QUECONN.get(connectionFactoryJNDI);
166                             } else {
167                                 topicConn = topicConnectionFactory.createTopicConnection();
168                             }
169                             JmsHandler.TOPIC2QUECONN.put(connectionFactoryJNDI, topicConn);
170                             if (type.isKindOf(CIAdminCommon.JmsTopicDurableConsumer.getType())) {
171                                 topicConn.setClientID(org.efaps.db.Context.getThreadContext().getPath() + ":" + name);
172                             }
173                             session = topicConn.createTopicSession(true, Session.AUTO_ACKNOWLEDGE);
174                             topicConn.start();
175                         }
176                         final Destination dest = (Destination) ctx.lookup(destinationJNDI);
177 
178                         if (type.isKindOf(CIAdminCommon.JmsQueueProducer.getType())
179                                         || type.isKindOf(CIAdminCommon.JmsTopicProducer.getType())) {
180                             final MessageProducer producer;
181                             if (type.isKindOf(CIAdminCommon.JmsQueueProducer.getType())) {
182                                 producer = session.createProducer(dest);
183                             } else {
184                                 producer = ((TopicSession) session).createPublisher((Topic) dest);
185                             }
186                             producer.setDeliveryMode(DeliveryMode.PERSISTENT);
187                             JmsHandler.NAME2DEF.put(name, new JmsDefinition(name, producer, session));
188                         } else {
189                             final MessageConsumer consumer;
190                             if (type.isKindOf(CIAdminCommon.JmsQueueConsumer.getType())) {
191                                 consumer = session.createConsumer(dest);
192                             } else if (type.isKindOf(CIAdminCommon.JmsTopicDurableConsumer.getType())) {
193                                 consumer = session.createDurableSubscriber((Topic) dest, name);
194                             } else {
195                                 consumer = ((TopicSession) session).createSubscriber((Topic) dest);
196                             }
197                             @SuppressWarnings("unchecked")
198                             final Class<? extends MessageListener> clazz = (Class<? extends MessageListener>) Class
199                                             .forName(esjp.trim(), false, EFapsClassLoader.getInstance());
200                             final MessageListener myListener = clazz.newInstance();
201                             consumer.setMessageListener(myListener);
202                         }
203 
204                     }
205                 }
206             }
207         } catch (final NamingException e) {
208             // this means that a definition cannot be found.
209             JmsHandler.LOG.warn("Naming exception for JMS", e);
210         } catch (final JMSException e) {
211             // this means normally that e.g. no jms server was found
212             JmsHandler.LOG.error("JMS not working!!!!!!!", e);
213         } catch (final ClassNotFoundException e) {
214             throw new EFapsException("ClassNotFoundException", e);
215         } catch (final InstantiationException e) {
216             throw new EFapsException("InstantiationException", e);
217         } catch (final IllegalAccessException e) {
218             throw new EFapsException("IllegalAccessException", e);
219         }
220     }
221 
222     /**
223      * @param _name anme of the definition
224      * @return  JmsDefinition for the given name
225      */
226     public static JmsDefinition getJmsDefinition(final String _name)
227     {
228         return JmsHandler.NAME2DEF.get(_name);
229     }
230 
231     /**
232      * @param _jmsDefinition JmsDefinition to add
233      */
234     public static void addJmsDefintion(final JmsDefinition _jmsDefinition)
235     {
236         JmsHandler.NAME2DEF.put(_jmsDefinition.getName(), _jmsDefinition);
237     }
238 
239     /**
240      * Stop the jms.
241      * @throws EFapsException on error
242      */
243     public static void stop()
244         throws EFapsException
245     {
246         for (final QueueConnection queCon : JmsHandler.QUEUE2QUECONN.values()) {
247             try {
248                 queCon.close();
249             } catch (final JMSException e) {
250                 throw new EFapsException("JMSException", e);
251             }
252         }
253         for (final TopicConnection queCon : JmsHandler.TOPIC2QUECONN.values()) {
254             try {
255                 queCon.close();
256             } catch (final JMSException e) {
257                 throw new EFapsException("JMSException", e);
258             }
259         }
260         JmsHandler.TOPIC2QUECONN.clear();
261         JmsHandler.QUEUE2QUECONN.clear();
262         JmsHandler.NAME2DEF.clear();
263     }
264 
265     /**
266      * A definition of Jms for eFaps.
267      */
268     public static class JmsDefinition
269     {
270 
271         /**
272          * Name of the Definition.
273          */
274         private final String name;
275 
276         /**
277          * the related MessageProducer.
278          */
279         private final MessageProducer messageProducer;
280 
281         /**
282          * The related Session.
283          */
284         private final Session session;
285 
286         /**
287          * @param _name     name
288          * @param _producer MessageProducer
289          * @param _session  Session
290          */
291         public JmsDefinition(final String _name,
292                              final MessageProducer _producer,
293                              final Session _session)
294         {
295             this.name = _name;
296             this.messageProducer = _producer;
297             this.session = _session;
298         }
299 
300         /**
301          * This is the getter method for instance variable {@link #session}.
302          *
303          * @return value of instance variable {@link #session}
304          * @see #parameters
305          */
306         public Session getSession()
307         {
308             return this.session;
309         }
310 
311         /**
312          * This is the getter method for instance variable {@link #messageProducer}.
313          *
314          * @return value of instance variable {@link #messageProducer}
315          * @see #parameters
316          */
317         public MessageProducer getMessageProducer()
318         {
319             return this.messageProducer;
320         }
321 
322         /**
323          * This is the getter method for instance variable {@link #name}.
324          *
325          * @return value of instance variable {@link #name}
326          * @see #parameters
327          */
328         public String getName()
329         {
330             return this.name;
331         }
332     }
333 }