001/*
002 * HA-JDBC: High-Availability JDBC
003 * Copyright (c) 2004-2007 Paul Ferraro
004 * 
005 * This library is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Lesser General Public License as published by the 
007 * Free Software Foundation; either version 2.1 of the License, or (at your 
008 * option) any later version.
009 * 
010 * This library is distributed in the hope that it will be useful, but WITHOUT
011 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 
012 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Lesser General Public License
016 * along with this library; if not, write to the Free Software Foundation, 
017 * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
018 * 
019 * Contact: ferraro@users.sourceforge.net
020 */
021package net.sf.hajdbc.sql;
022
023import java.lang.reflect.Method;
024import java.sql.Connection;
025import java.sql.ResultSet;
026import java.sql.SQLException;
027import java.sql.Statement;
028import java.util.ArrayList;
029import java.util.Collections;
030import java.util.LinkedList;
031import java.util.List;
032import java.util.Map;
033import java.util.Set;
034import java.util.SortedMap;
035import java.util.TreeSet;
036import java.util.concurrent.locks.Lock;
037
038import net.sf.hajdbc.Database;
039import net.sf.hajdbc.DatabaseProperties;
040import net.sf.hajdbc.LockManager;
041import net.sf.hajdbc.Messages;
042import net.sf.hajdbc.TableProperties;
043import net.sf.hajdbc.util.SQLExceptionFactory;
044import net.sf.hajdbc.util.reflect.Methods;
045
046/**
047 * @author Paul Ferraro
048 * @param <D> 
049 * @param <S> 
050 */
051@SuppressWarnings("nls")
052public abstract class AbstractStatementInvocationHandler<D, S extends Statement> extends AbstractChildInvocationHandler<D, Connection, S>
053{
054        private static final Set<Method> driverReadMethodSet = Methods.findMethods(Statement.class, "getFetchDirection", "getFetchSize", "getGeneratedKeys", "getMaxFieldSize", "getMaxRows", "getQueryTimeout", "getResultSetConcurrency", "getResultSetHoldability", "getResultSetType", "getUpdateCount", "getWarnings", "isClosed", "isPoolable");
055        private static final Set<Method> driverWriteMethodSet = Methods.findMethods(Statement.class, "clearWarnings", "setCursorName", "setEscapeProcessing", "setFetchDirection", "setFetchSize", "setMaxFieldSize", "setMaxRows", "setPoolable", "setQueryTimeout");
056        private static final Set<Method> executeMethodSet = Methods.findMethods(Statement.class, "execute(Update)?");
057        
058        private static final Method getConnectionMethod = Methods.getMethod(Statement.class, "getConnection");
059        private static final Method executeQueryMethod = Methods.getMethod(Statement.class, "executeQuery", String.class);
060        private static final Method clearBatchMethod = Methods.getMethod(Statement.class, "clearBatch");
061        private static final Method executeBatchMethod = Methods.getMethod(Statement.class, "executeBatch");
062        private static final Method getMoreResultsMethod = Methods.getMethod(Statement.class, "getMoreResults", Integer.TYPE);
063        private static final Method getResultSetMethod = Methods.getMethod(Statement.class, "getResultSet");
064        private static final Method addBatchMethod = Methods.getMethod(Statement.class, "addBatch", String.class);
065        private static final Method closeMethod = Methods.getMethod(Statement.class, "close");
066        
067        protected TransactionContext<D> transactionContext;
068        protected FileSupport fileSupport;
069        
070        private List<Invoker<D, S, ?>> invokerList = new LinkedList<Invoker<D, S, ?>>();
071        private List<String> sqlList = new LinkedList<String>();
072        
073        /**
074         * @param connection the parent connection of this statement
075         * @param proxy the parent invocation handler
076         * @param invoker the invoker that created this statement
077         * @param statementClass 
078         * @param statementMap a map of database to underlying statement
079         * @param transactionContext 
080         * @param fileSupport support object for streams
081         * @throws Exception
082         */
083        protected AbstractStatementInvocationHandler(Connection connection, SQLProxy<D, Connection> proxy, Invoker<D, Connection, S> invoker, Class<S> statementClass, Map<Database<D>, S> statementMap, TransactionContext<D> transactionContext, FileSupport fileSupport) throws Exception
084        {
085                super(connection, proxy, invoker, statementClass, statementMap);
086                
087                this.transactionContext = transactionContext;
088                this.fileSupport = fileSupport;
089        }
090
091        /**
092         * @see net.sf.hajdbc.sql.AbstractChildInvocationHandler#getInvocationStrategy(java.lang.Object, java.lang.reflect.Method, java.lang.Object[])
093         */
094        @Override
095        protected InvocationStrategy<D, S, ?> getInvocationStrategy(S statement, Method method, Object[] parameters) throws Exception
096        {
097                if (driverReadMethodSet.contains(method))
098                {
099                        return new DriverReadInvocationStrategy<D, S, Object>();
100                }
101                
102                if (driverWriteMethodSet.contains(method) || method.equals(closeMethod) || method.equals(addBatchMethod) || method.equals(clearBatchMethod))
103                {
104                        return new DriverWriteInvocationStrategy<D, S, Object>();
105                }
106                
107                if (executeMethodSet.contains(method))
108                {
109                        List<Lock> lockList = this.extractLocks((String) parameters[0]);
110                        
111                        return this.transactionContext.start(new LockingInvocationStrategy<D, S, Object>(new DatabaseWriteInvocationStrategy<D, S, Object>(this.cluster.getTransactionalExecutor()), lockList), this.getParent());
112                }
113                
114                if (method.equals(getConnectionMethod))
115                {
116                        return new InvocationStrategy<D, S, Connection>()
117                        {
118                                public Connection invoke(SQLProxy<D, S> proxy, Invoker<D, S, Connection> invoker) throws Exception
119                                {
120                                        return AbstractStatementInvocationHandler.this.getParent();
121                                }
122                        };
123                }
124                
125                if (method.equals(executeQueryMethod))
126                {
127                        String sql = (String) parameters[0];
128                        
129                        List<Lock> lockList = this.extractLocks(sql);
130                        
131                        int concurrency = statement.getResultSetConcurrency();
132                        boolean selectForUpdate = this.isSelectForUpdate(sql);
133                        
134                        if (lockList.isEmpty() && (concurrency == ResultSet.CONCUR_READ_ONLY) && !selectForUpdate)
135                        {
136                                return new LazyResultSetInvocationStrategy<D, S>(statement, this.transactionContext, this.fileSupport);
137                        }
138                        
139                        InvocationStrategy<D, S, ResultSet> strategy = new LockingInvocationStrategy<D, S, ResultSet>(new EagerResultSetInvocationStrategy<D, S>(this.cluster, statement, this.transactionContext, this.fileSupport), lockList);
140                        
141                        return selectForUpdate ? this.transactionContext.start(strategy, this.getParent()) : strategy;
142                }
143                
144                if (method.equals(executeBatchMethod))
145                {
146                        List<Lock> lockList = this.extractLocks(this.sqlList);
147                        
148                        return this.transactionContext.start(new LockingInvocationStrategy<D, S, Object>(new DatabaseWriteInvocationStrategy<D, S, Object>(this.cluster.getTransactionalExecutor()), lockList), this.getParent());
149                }
150                
151                if (method.equals(getMoreResultsMethod))
152                {
153                        if (parameters[0].equals(Statement.KEEP_CURRENT_RESULT))
154                        {
155                                return new DriverWriteInvocationStrategy<D, S, Object>();
156                        }
157                }
158                
159                if (method.equals(getResultSetMethod))
160                {
161                        if (statement.getResultSetConcurrency() == ResultSet.CONCUR_READ_ONLY)
162                        {
163                                return new LazyResultSetInvocationStrategy<D, S>(statement, this.transactionContext, this.fileSupport);
164                        }
165                        
166                        return new EagerResultSetInvocationStrategy<D, S>(this.cluster, statement, this.transactionContext, this.fileSupport);
167                }
168                
169                return super.getInvocationStrategy(statement, method, parameters);
170        }
171
172        /**
173         * @see net.sf.hajdbc.sql.AbstractChildInvocationHandler#isSQLMethod(java.lang.reflect.Method)
174         */
175        @Override
176        protected boolean isSQLMethod(Method method)
177        {
178                return method.equals(addBatchMethod) || method.equals(executeQueryMethod) || executeMethodSet.contains(method);
179        }
180
181        /**
182         * @see net.sf.hajdbc.sql.AbstractChildInvocationHandler#postInvoke(java.lang.Object, java.lang.reflect.Method, java.lang.Object[])
183         */
184        @Override
185        protected void postInvoke(S statement, Method method, Object[] parameters)
186        {
187                if (method.equals(addBatchMethod))
188                {
189                        this.sqlList.add((String) parameters[0]);
190                }
191                else if (method.equals(closeMethod))
192                {
193                        this.fileSupport.close();
194                        
195                        this.getParentProxy().removeChild(this);
196                }
197                else if (method.equals(clearBatchMethod) || method.equals(executeBatchMethod))
198                {
199                        this.sqlList.clear();
200                }
201        }
202
203        /**
204         * @see net.sf.hajdbc.sql.SQLProxy#handlePartialFailure(java.util.SortedMap, java.util.SortedMap)
205         */
206        @Override
207        public <R> SortedMap<Database<D>, R> handlePartialFailure(SortedMap<Database<D>, R> resultMap, SortedMap<Database<D>, Exception> exceptionMap) throws Exception
208        {
209                if (this.getParent().getAutoCommit())
210                {
211                        return super.handlePartialFailure(resultMap, exceptionMap);
212                }
213                
214                // If auto-commit is off, throw exception to give client the opportunity to rollback the transaction
215                Map<Boolean, List<Database<D>>> aliveMap = this.cluster.getAliveMap(exceptionMap.keySet());
216
217                List<Database<D>> aliveList = aliveMap.get(true);
218
219                int size = aliveList.size();
220                
221                // Assume successful databases are alive
222                aliveList.addAll(resultMap.keySet());
223                
224                this.detectClusterPanic(aliveMap);
225                
226                List<Database<D>> deadList = aliveMap.get(false);
227                
228                for (Database<D> database: deadList)
229                {
230                        if (this.cluster.deactivate(database, this.cluster.getStateManager()))
231                        {
232                                this.logger.error(Messages.getMessage(Messages.DATABASE_DEACTIVATED, database, this.cluster), exceptionMap.get(database));
233                        }
234                }
235
236                // If failed databases are all dead
237                if (size == 0)
238                {
239                        return resultMap;
240                }
241                
242                // Chain exceptions from alive databases
243                SQLException exception = SQLExceptionFactory.createSQLException(exceptionMap.get(aliveList.get(0)));
244                
245                for (Database<D> database: aliveList.subList(1, size))
246                {
247                        exception.setNextException(SQLExceptionFactory.createSQLException(exceptionMap.get(database)));
248                }
249                
250                throw exception;
251        }
252        
253        protected boolean isSelectForUpdate(String sql) throws SQLException
254        {
255                return this.getDatabaseProperties().supportsSelectForUpdate() ? this.cluster.getDialect().isSelectForUpdate(sql) : false;
256        }
257        
258        protected List<Lock> extractLocks(String sql) throws SQLException
259        {
260                return this.extractLocks(Collections.singletonList(sql));
261        }
262        
263        private List<Lock> extractLocks(List<String> sqlList) throws SQLException
264        {
265                Set<String> identifierSet = new TreeSet<String>();
266                
267                for (String sql: sqlList)
268                {
269                        if (this.cluster.isSequenceDetectionEnabled())
270                        {
271                                String sequence = this.cluster.getDialect().parseSequence(sql);
272                                
273                                if (sequence != null)
274                                {
275                                        identifierSet.add(sequence);
276                                }
277                        }
278                        
279                        if (this.cluster.isIdentityColumnDetectionEnabled())
280                        {
281                                String table = this.cluster.getDialect().parseInsertTable(sql);
282                                
283                                if (table != null)
284                                {
285                                        TableProperties tableProperties = this.getDatabaseProperties().findTable(table);
286                                        
287                                        if (!tableProperties.getIdentityColumns().isEmpty())
288                                        {
289                                                identifierSet.add(tableProperties.getName());
290                                        }
291                                }
292                        }
293                }
294
295                List<Lock> lockList = new ArrayList<Lock>(identifierSet.size());
296                
297                if (!identifierSet.isEmpty())
298                {
299                        LockManager lockManager = this.cluster.getLockManager();
300                        
301                        for (String identifier: identifierSet)
302                        {
303                                lockList.add(lockManager.writeLock(identifier));
304                        }
305                }
306                
307                return lockList;
308        }
309
310        protected DatabaseProperties getDatabaseProperties() throws SQLException
311        {
312                return this.cluster.getDatabaseMetaDataCache().getDatabaseProperties(this.getParent());
313        }
314        
315        /**
316         * @see net.sf.hajdbc.sql.AbstractChildInvocationHandler#close(java.lang.Object, java.lang.Object)
317         */
318        @Override
319        protected void close(Connection connection, S statement) throws SQLException
320        {
321                statement.close();
322        }
323
324        /**
325         * @see net.sf.hajdbc.sql.AbstractInvocationHandler#record(net.sf.hajdbc.sql.Invoker, java.lang.reflect.Method, java.lang.Object[])
326         */
327        @Override
328        protected void record(Invoker<D, S, ?> invoker, Method method, Object[] parameters)
329        {
330                if (this.isBatchMethod(method))
331                {
332                        synchronized (this.invokerList)
333                        {
334                                this.invokerList.add(invoker);
335                        }
336                }
337                else if (this.isEndBatchMethod(method))
338                {
339                        synchronized (this.invokerList)
340                        {
341                                this.invokerList.clear();
342                        }
343                }
344                else
345                {
346                        super.record(invoker, method, parameters);
347                }
348        }
349
350        /**
351         * @see net.sf.hajdbc.sql.AbstractInvocationHandler#isRecordable(java.lang.reflect.Method)
352         */
353        @Override
354        protected boolean isRecordable(Method method)
355        {
356                return driverWriteMethodSet.contains(method);
357        }
358
359        protected boolean isBatchMethod(Method method)
360        {
361                return method.equals(addBatchMethod);
362        }
363
364        protected boolean isEndBatchMethod(Method method)
365        {
366                return method.equals(clearBatchMethod) || method.equals(executeBatchMethod);
367        }
368        
369        /**
370         * @see net.sf.hajdbc.sql.AbstractInvocationHandler#replay(net.sf.hajdbc.Database, java.lang.Object)
371         */
372        @Override
373        protected void replay(Database<D> database, S statement) throws Exception
374        {
375                super.replay(database, statement);
376                
377                synchronized (this.invokerList)
378                {
379                        for (Invoker<D, S, ?> invoker: this.invokerList)
380                        {
381                                invoker.invoke(database, statement);
382                        }
383                }
384        }
385}