001/* 002 * HA-JDBC: High-Availability JDBC 003 * Copyright (c) 2004-2007 Paul Ferraro 004 * 005 * This library is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Lesser General Public License as published by the 007 * Free Software Foundation; either version 2.1 of the License, or (at your 008 * option) any later version. 009 * 010 * This library is distributed in the hope that it will be useful, but WITHOUT 011 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 012 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Lesser General Public License 016 * along with this library; if not, write to the Free Software Foundation, 017 * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 018 * 019 * Contact: ferraro@users.sourceforge.net 020 */ 021package net.sf.hajdbc.sync; 022 023import java.sql.Connection; 024import java.sql.PreparedStatement; 025import java.sql.ResultSet; 026import java.sql.SQLException; 027import java.sql.Statement; 028import java.util.Collection; 029import java.util.Collections; 030import java.util.concurrent.Callable; 031import java.util.concurrent.ExecutionException; 032import java.util.concurrent.ExecutorService; 033import java.util.concurrent.Future; 034 035import net.sf.hajdbc.Dialect; 036import net.sf.hajdbc.Messages; 037import net.sf.hajdbc.SynchronizationContext; 038import net.sf.hajdbc.SynchronizationStrategy; 039import net.sf.hajdbc.TableProperties; 040import net.sf.hajdbc.util.SQLExceptionFactory; 041import net.sf.hajdbc.util.Strings; 042 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046/** 047 * Database-independent synchronization strategy that does full record transfer between two databases. 048 * This strategy is best used when there are <em>many</em> differences between the active database and the inactive database (i.e. very much out of sync). 049 * The following algorithm is used: 050 * <ol> 051 * <li>Drop the foreign keys on the inactive database (to avoid integrity constraint violations)</li> 052 * <li>For each database table: 053 * <ol> 054 * <li>Delete all rows in the inactive database table</li> 055 * <li>Query all rows on the active database table</li> 056 * <li>For each row in active database table: 057 * <ol> 058 * <li>Insert new row into inactive database table</li> 059 * </ol> 060 * </li> 061 * </ol> 062 * </li> 063 * <li>Re-create the foreign keys on the inactive database</li> 064 * <li>Synchronize sequences</li> 065 * </ol> 066 * @author Paul Ferraro 067 */ 068public class FullSynchronizationStrategy implements SynchronizationStrategy 069{ 070 private static Logger logger = LoggerFactory.getLogger(FullSynchronizationStrategy.class); 071 072 private int maxBatchSize = 100; 073 private int fetchSize = 0; 074 075 /** 076 * @see net.sf.hajdbc.SynchronizationStrategy#synchronize(net.sf.hajdbc.SynchronizationContext) 077 */ 078 @Override 079 public <D> void synchronize(SynchronizationContext<D> context) throws SQLException 080 { 081 Connection sourceConnection = context.getConnection(context.getSourceDatabase()); 082 Connection targetConnection = context.getConnection(context.getTargetDatabase()); 083 084 Dialect dialect = context.getDialect(); 085 ExecutorService executor = context.getExecutor(); 086 087 boolean autoCommit = targetConnection.getAutoCommit(); 088 089 targetConnection.setAutoCommit(true); 090 091 SynchronizationSupport.dropForeignKeys(context); 092 093 targetConnection.setAutoCommit(false); 094 095 try 096 { 097 for (TableProperties table: context.getSourceDatabaseProperties().getTables()) 098 { 099 String tableName = table.getName(); 100 Collection<String> columns = table.getColumns(); 101 102 String commaDelimitedColumns = Strings.join(columns, Strings.PADDED_COMMA); 103 104 final String selectSQL = "SELECT " + commaDelimitedColumns + " FROM " + tableName; //$NON-NLS-1$ //$NON-NLS-2$ 105 106 final Statement selectStatement = sourceConnection.createStatement(); 107 selectStatement.setFetchSize(this.fetchSize); 108 109 Callable<ResultSet> callable = new Callable<ResultSet>() 110 { 111 public ResultSet call() throws SQLException 112 { 113 return selectStatement.executeQuery(selectSQL); 114 } 115 }; 116 117 Future<ResultSet> future = executor.submit(callable); 118 119 String deleteSQL = dialect.getTruncateTableSQL(table); 120 121 logger.debug(deleteSQL); 122 123 Statement deleteStatement = targetConnection.createStatement(); 124 125 int deletedRows = deleteStatement.executeUpdate(deleteSQL); 126 127 logger.info(Messages.getMessage(Messages.DELETE_COUNT, deletedRows, tableName)); 128 129 deleteStatement.close(); 130 131 ResultSet resultSet = future.get(); 132 133 String insertSQL = "INSERT INTO " + tableName + " (" + commaDelimitedColumns + ") VALUES (" + Strings.join(Collections.nCopies(columns.size(), Strings.QUESTION), Strings.PADDED_COMMA) + ")"; //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$ 134 135 logger.debug(insertSQL); 136 137 PreparedStatement insertStatement = targetConnection.prepareStatement(insertSQL); 138 int statementCount = 0; 139 140 while (resultSet.next()) 141 { 142 int index = 0; 143 144 for (String column: columns) 145 { 146 index += 1; 147 148 int type = dialect.getColumnType(table.getColumnProperties(column)); 149 150 Object object = SynchronizationSupport.getObject(resultSet, index, type); 151 152 if (resultSet.wasNull()) 153 { 154 insertStatement.setNull(index, type); 155 } 156 else 157 { 158 insertStatement.setObject(index, object, type); 159 } 160 } 161 162 insertStatement.addBatch(); 163 statementCount += 1; 164 165 if ((statementCount % this.maxBatchSize) == 0) 166 { 167 insertStatement.executeBatch(); 168 insertStatement.clearBatch(); 169 } 170 171 insertStatement.clearParameters(); 172 } 173 174 if ((statementCount % this.maxBatchSize) > 0) 175 { 176 insertStatement.executeBatch(); 177 } 178 179 logger.info(Messages.getMessage(Messages.INSERT_COUNT, statementCount, tableName)); 180 181 insertStatement.close(); 182 selectStatement.close(); 183 184 targetConnection.commit(); 185 } 186 } 187 catch (InterruptedException e) 188 { 189 SynchronizationSupport.rollback(targetConnection); 190 191 throw SQLExceptionFactory.createSQLException(e); 192 } 193 catch (ExecutionException e) 194 { 195 SynchronizationSupport.rollback(targetConnection); 196 197 throw SQLExceptionFactory.createSQLException(e.getCause()); 198 } 199 catch (SQLException e) 200 { 201 SynchronizationSupport.rollback(targetConnection); 202 203 throw e; 204 } 205 206 targetConnection.setAutoCommit(true); 207 208 SynchronizationSupport.restoreForeignKeys(context); 209 210 SynchronizationSupport.synchronizeIdentityColumns(context); 211 SynchronizationSupport.synchronizeSequences(context); 212 213 targetConnection.setAutoCommit(autoCommit); 214 } 215 216 /** 217 * @return the fetchSize. 218 */ 219 public int getFetchSize() 220 { 221 return this.fetchSize; 222 } 223 224 /** 225 * @param fetchSize the fetchSize to set. 226 */ 227 public void setFetchSize(int fetchSize) 228 { 229 this.fetchSize = fetchSize; 230 } 231 232 /** 233 * @return the maxBatchSize. 234 */ 235 public int getMaxBatchSize() 236 { 237 return this.maxBatchSize; 238 } 239 240 /** 241 * @param maxBatchSize the maxBatchSize to set. 242 */ 243 public void setMaxBatchSize(int maxBatchSize) 244 { 245 this.maxBatchSize = maxBatchSize; 246 } 247}