1   /*
2    * Copyright 2003 - 2013 The eFaps Team
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   *
16   * Revision:        $Rev$
17   * Last Changed:    $Date$
18   * Last Changed By: $Author$
19   */
20  
21  package org.efaps.db.store;
22  
23  import java.io.IOException;
24  import java.io.InputStream;
25  import java.sql.Blob;
26  import java.sql.Connection;
27  import java.sql.PreparedStatement;
28  import java.sql.ResultSet;
29  import java.sql.SQLException;
30  import java.sql.Statement;
31  
32  import javax.transaction.xa.Xid;
33  
34  import org.efaps.db.Context;
35  import org.efaps.db.Instance;
36  import org.efaps.db.databases.AbstractDatabase;
37  import org.efaps.db.transaction.ConnectionResource;
38  import org.efaps.db.wrapper.SQLPart;
39  import org.efaps.db.wrapper.SQLSelect;
40  import org.efaps.util.EFapsException;
41  import org.slf4j.Logger;
42  import org.slf4j.LoggerFactory;
43  
44  /**
45   * The class implements the {@link Resource} interface for SQL blobs. For each
46   * file id a new JDBC store resource must be created.
47   *
48   * @author The eFaps Team
49   * @version $Id$
50   */
51  public class JDBCStoreResource
52      extends AbstractStoreResource
53  {
54      /**
55       * Name of the table the content is stored in.
56       */
57      public static final String TABLENAME_STORE = "T_CMGENSTOREJDBC";
58  
59      /**
60       * Name of the column the content is stored in.
61       */
62      public static final String COLNAME_FILECONTENT = "FILECONTENT";
63  
64      /**
65       * Logging instance used in this class.
66       */
67      private static Logger LOG = LoggerFactory.getLogger(JDBCStoreResource.class);
68  
69      /**
70       * Method called to initialize this StoreResource.
71       *
72       * @param _instance     instance of the object this StoreResource is wanted
73       *                      for
74       * @param _store        Store this resource belongs to
75       * @throws EFapsException on error
76       * @see org.efaps.db.store.Resource#initialize(Instance, Map, Compress)
77       */
78      @Override
79      public void initialize(final Instance _instance,
80                             final Store _store)
81          throws EFapsException
82      {
83          super.initialize(_instance, _store);
84      }
85  
86      /**
87       * {@inheritDoc}
88       */
89      @Override
90      protected int add2Select(final SQLSelect _select)
91      {
92          _select.column(2, "ID").leftJoin(JDBCStoreResource.TABLENAME_STORE, 2, "ID", 0, "ID");
93          return 1;
94      }
95  
96      /**
97       * {@inheritDoc}
98       */
99      @Override
100     protected void insertDefaults()
101         throws EFapsException
102     {
103         super.insertDefaults();
104         if (!getExist()[1] && getGeneralID() != null) {
105             try {
106                 final ConnectionResource res = Context.getThreadContext().getConnectionResource();
107                 final Connection con = res.getConnection();
108                 Context.getDbType().newInsert(JDBCStoreResource.TABLENAME_STORE, "ID", false)
109                                 .column("ID", getGeneralID())
110                                 .execute(con);
111                 res.commit();
112             } catch (final SQLException e) {
113                 throw new EFapsException(JDBCStoreResource.class, "insertDefaults", e);
114             }
115         }
116     }
117 
118     /**
119      * The method writes the context (from the input stream) into a SQL blob.
120      *
121      * @param _in   input stream defined the content of the file
122      * @param _size length of the content (or negative meaning that the length
123      *              is not known; then the content gets the length of readable
124      *              bytes from the input stream) return size of the created
125      *              temporary file object
126      * @param _fileName name of the file
127      * @return size of the file
128      * @throws EFapsException on error
129      */
130     @Override
131     public long write(final InputStream _in,
132                       final long _size,
133                       final String _fileName)
134         throws EFapsException
135     {
136         long size = 0;
137         ConnectionResource res = null;
138         try {
139             res = Context.getThreadContext().getConnectionResource();
140 
141             final AbstractDatabase<?> db = Context.getDbType();
142             final StringBuilder cmd = new StringBuilder().append(db.getSQLPart(SQLPart.UPDATE)).append(" ")
143                     .append(db.getTableQuote()).append(JDBCStoreResource.TABLENAME_STORE)
144                     .append(db.getTableQuote())
145                     .append(" ").append(db.getSQLPart(SQLPart.SET)).append(" ")
146                     .append(db.getColumnQuote())
147                     .append(JDBCStoreResource.COLNAME_FILECONTENT)
148                     .append(db.getColumnQuote()).append(db.getSQLPart(SQLPart.EQUAL)).append("? ")
149                     .append(db.getSQLPart(SQLPart.WHERE)).append(" ")
150                     .append(db.getColumnQuote()).append("ID").append(db.getColumnQuote())
151                     .append(db.getSQLPart(SQLPart.EQUAL)).append(getGeneralID());
152 
153             final PreparedStatement stmt = res.getConnection().prepareStatement(cmd.toString());
154             try {
155                 stmt.setBinaryStream(1, _in, ((Long) _size).intValue());
156                 stmt.execute();
157             } finally {
158                 stmt.close();
159             }
160             size = _size;
161             res.commit();
162         } catch (final EFapsException e) {
163             res.abort();
164             throw e;
165         } catch (final SQLException e) {
166             res.abort();
167             JDBCStoreResource.LOG.error("write of content failed", e);
168             throw new EFapsException(JDBCStoreResource.class, "write.SQLException", e);
169         }
170         setFileInfo(_fileName, size);
171         return size;
172     }
173 
174     /**
175      * Deletes the file defined in {@link #fileId}.
176      */
177     @Override
178     public void delete()
179     {
180     }
181 
182     /**
183      * Returns for the file the input stream.
184      *
185      * @return input stream of the file with the content
186      * @throws EFapsException on error
187      */
188     @Override
189     public InputStream read()
190         throws EFapsException
191     {
192         StoreResourceInputStream in = null;
193         ConnectionResource res = null;
194         try {
195             res = Context.getThreadContext().getConnectionResource();
196 
197             final Statement stmt = res.getConnection().createStatement();
198             final StringBuffer cmd = new StringBuffer()
199                 .append("select ").append(JDBCStoreResource.COLNAME_FILECONTENT).append(" ")
200                 .append("from ").append(JDBCStoreResource.TABLENAME_STORE).append(" ")
201                 .append("where ID =").append(getGeneralID());
202             final ResultSet resultSet = stmt.executeQuery(cmd.toString());
203             if (resultSet.next()) {
204                 if (Context.getDbType().supportsBinaryInputStream())  {
205                     in = new JDBCStoreResourceInputStream(this,
206                                                           res,
207                                                           resultSet.getBinaryStream(1));
208                 } else  {
209                     in = new JDBCStoreResourceInputStream(this,
210                                                           res,
211                                                           resultSet.getBlob(1));
212                 }
213             }
214             resultSet.close();
215             stmt.close();
216         } catch (final IOException e) {
217             JDBCStoreResource.LOG.error("read of content failed", e);
218             throw new EFapsException(JDBCStoreResource.class, "read.SQLException", e);
219         } catch (final SQLException e) {
220             JDBCStoreResource.LOG.error("read of content failed", e);
221             throw new EFapsException(JDBCStoreResource.class, "read.SQLException", e);
222         } finally {
223             if (in == null) {
224                 res.abort();
225             }
226         }
227         return in;
228     }
229 
230     /**
231      * Ask the resource manager to prepare for a transaction commit of the
232      * transaction specified in xid. (used for 2-phase commits).
233      * @param _xid  global transaction identifier (not used, because each file
234      *              with the file id gets a new VFS store resource instance)
235      * @return always 0
236      */
237     @Override
238     public int prepare(final Xid _xid)
239     {
240         if (JDBCStoreResource.LOG.isDebugEnabled()) {
241             JDBCStoreResource.LOG.debug("prepare (xid = " + _xid + ")");
242         }
243         return 0;
244     }
245 
246     /**
247      * The method is called from the transaction manager if the complete
248      * transaction is completed.<br/> Nothing is to do here, because the
249      * commitment is done by the {@link ConnectionResource} instance.
250      *
251      * @param _xid  global transaction identifier (not used, because each file
252      *              with the file id gets a new VFS store resource instance)
253      * @param _onePhase <i>true</i> if it is a one phase commitment transaction
254      *                  (not used)
255      */
256     @Override
257     public void commit(final Xid _xid,
258                        final boolean _onePhase)
259     {
260         if (JDBCStoreResource.LOG.isDebugEnabled()) {
261             JDBCStoreResource.LOG.debug("commit (xid = " + _xid + ", one phase = " + _onePhase + ")");
262         }
263     }
264 
265     /**
266      * If the file written in the virtual file system must be rolled back, only
267      * the created temporary file (created from method {@link #write}) is
268      * deleted.
269      *
270      * @param _xid  global transaction identifier (not used, because each file
271      *              with the file id gets a new VFS store resource instance)
272      */
273     @Override
274     public void rollback(final Xid _xid)
275     {
276         if (JDBCStoreResource.LOG.isDebugEnabled()) {
277             JDBCStoreResource.LOG.debug("rollback (xid = " + _xid + ")");
278         }
279     }
280 
281     /**
282      * Tells the resource manager to forget about a heuristically completed
283      * transaction branch.
284      *
285      * @param _xid  global transaction identifier (not used, because each file
286      *              with the file id gets a new VFS store resource instance)
287      */
288     @Override
289     public void forget(final Xid _xid)
290     {
291         if (JDBCStoreResource.LOG.isDebugEnabled()) {
292             JDBCStoreResource.LOG.debug("forget (xid = " + _xid + ")");
293         }
294     }
295 
296     /**
297      * Obtains the current transaction timeout value set for this XAResource
298      * instance.
299      *
300      * @return always 0
301      */
302     @Override
303     public int getTransactionTimeout()
304     {
305         if (JDBCStoreResource.LOG.isDebugEnabled()) {
306             JDBCStoreResource.LOG.debug("getTransactionTimeout");
307         }
308         return 0;
309     }
310 
311     /**
312      * Obtains a list of prepared transaction branches from a resource manager.
313      *
314      * @param _flag flag
315      * @return always <code>null</code>
316      */
317     @Override
318     public Xid[] recover(final int _flag)
319     {
320         if (JDBCStoreResource.LOG.isDebugEnabled()) {
321             JDBCStoreResource.LOG.debug("recover (flag = " + _flag + ")");
322         }
323         return null;
324     }
325 
326     /**
327      * Sets the current transaction timeout value for this XAResource instance.
328      *
329      * @param _seconds number of seconds
330      * @return always <i>true</i>
331      */
332     @Override
333     public boolean setTransactionTimeout(final int _seconds)
334     {
335         if (JDBCStoreResource.LOG.isDebugEnabled()) {
336             JDBCStoreResource.LOG.debug("setTransactionTimeout (seconds = " + _seconds + ")");
337         }
338         return true;
339     }
340 
341     /**
342      * This class implements an InputStream to read bytes from a
343      * {@link java.sql.Blob} if the get binary stream of the blob does not
344      * support the available method (and returns e.g. always 0 like the Oracle
345      * JDBC driver).
346      *
347      * TODO:  avaible must be long! (because of max integer value!)
348      */
349     private class BlobInputStream
350         extends InputStream
351     {
352         /**
353          * Stores the blob for this input stream.
354          */
355         private final Blob blob;
356 
357         /**
358          * The actual InputStream.
359          */
360         private final InputStream in;
361 
362         /**
363          * Hold the available bytes in the input stream.
364          */
365         private int available;
366 
367         /**
368          * @param _blob blob to be read
369          * @throws SQLException on error with blob
370          */
371         protected BlobInputStream(final Blob _blob)
372             throws SQLException
373         {
374             this.blob = _blob;
375             this.in = _blob.getBinaryStream();
376             this.available = (int) this.blob.length();
377         }
378 
379         /**
380          * @see java.io.InputStream#read()
381          * @return length of the stream
382          * @throws IOException on error
383          */
384         @Override
385         public int read()
386             throws IOException
387         {
388             this.available--;
389             return this.in.read();
390         }
391 
392         /**
393          * @see java.io.InputStream#read(byte[])
394          * @param _bytes bytes to read
395          * @return length of the stream
396          * @throws IOException on error
397          */
398         @Override
399         public int read(final byte[] _bytes)
400             throws IOException
401         {
402             int length = _bytes.length;
403             if (this.available > 0)  {
404                 if (this.available < length)  {
405                     length = this.available;
406                 }
407                 this.available = this.available - length;
408                 this.in.read(_bytes);
409             } else  {
410                 length = -1;
411             }
412             return length;
413         }
414 
415         /**
416          * @see java.io.InputStream#available()
417          * @return <i>true</i> if available
418          * @throws IOException on error
419          */
420         @Override
421         public int available()
422             throws IOException
423         {
424             return this.available;
425         }
426     }
427 
428 
429     /**
430      * Extdens super class.
431      *
432      */
433     private class JDBCStoreResourceInputStream
434         extends StoreResourceInputStream
435     {
436         /**
437          * The connection resource.
438          */
439         private final ConnectionResource res;
440 
441         /**
442          * @param _storeRe  store resource itself
443          * @param _res      connection resource
444          * @param _blob     blob with the input stream
445          * @throws SQLException on error with blob
446          * @throws IOException on error with inputstream
447          */
448         protected JDBCStoreResourceInputStream(final AbstractStoreResource _storeRe,
449                                                final ConnectionResource _res,
450                                                final Blob _blob)
451             throws IOException, SQLException
452         {
453             super(_storeRe,
454                   Context.getDbType().supportsBlobInputStreamAvailable()
455                           ? _blob.getBinaryStream()
456                           : new BlobInputStream(_blob));
457             this.res = _res;
458         }
459 
460         /**
461          * @param _storeRes store resource itself
462          * @param _res      connection resource
463          * @param _in       binary input stream (from the blob)
464          * @throws IOException on error
465          */
466         protected JDBCStoreResourceInputStream(final AbstractStoreResource _storeRes,
467                                                final ConnectionResource _res,
468                                                final InputStream _in)
469             throws IOException
470         {
471             super(_storeRes, _in);
472             this.res = _res;
473         }
474 
475         /**
476          * @throws IOException on error
477          */
478         @Override
479         protected void beforeClose()
480             throws IOException
481         {
482             super.beforeClose();
483             try {
484                 if (this.res.isOpened()) {
485                     this.res.commit();
486                 }
487             } catch (final EFapsException e) {
488                 throw new IOException("commit of connection resource not possible", e);
489             }
490         }
491     }
492 }