akonadi
resourcescheduler.cpp
00001 /* 00002 Copyright (c) 2007 Volker Krause <vkrause@kde.org> 00003 00004 This library is free software; you can redistribute it and/or modify it 00005 under the terms of the GNU Library General Public License as published by 00006 the Free Software Foundation; either version 2 of the License, or (at your 00007 option) any later version. 00008 00009 This library is distributed in the hope that it will be useful, but WITHOUT 00010 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 00011 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public 00012 License for more details. 00013 00014 You should have received a copy of the GNU Library General Public License 00015 along with this library; see the file COPYING.LIB. If not, write to the 00016 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 00017 02110-1301, USA. 00018 */ 00019 00020 #include "resourcescheduler_p.h" 00021 00022 #include "dbusconnectionpool.h" 00023 00024 #include <kdebug.h> 00025 #include <klocale.h> 00026 00027 #include <QtCore/QTimer> 00028 #include <QtDBus/QDBusInterface> 00029 #include <QtDBus/QDBusConnectionInterface> 00030 #include <boost/graph/graph_concepts.hpp> 00031 00032 using namespace Akonadi; 00033 00034 qint64 ResourceScheduler::Task::latestSerial = 0; 00035 static QDBusAbstractInterface *s_resourcetracker = 0; 00036 00037 //@cond PRIVATE 00038 00039 ResourceScheduler::ResourceScheduler( QObject *parent ) : 00040 QObject( parent ), 00041 mCurrentTasksQueue( -1 ), 00042 mOnline( false ) 00043 { 00044 } 00045 00046 void ResourceScheduler::scheduleFullSync() 00047 { 00048 Task t; 00049 t.type = SyncAll; 00050 TaskList& queue = queueForTaskType( t.type ); 00051 if ( queue.contains( t ) || mCurrentTask == t ) 00052 return; 00053 queue << t; 00054 signalTaskToTracker( t, "SyncAll" ); 00055 scheduleNext(); 00056 } 00057 00058 void ResourceScheduler::scheduleCollectionTreeSync() 00059 { 00060 Task t; 00061 t.type = SyncCollectionTree; 00062 TaskList& queue = queueForTaskType( t.type ); 00063 if ( queue.contains( t ) || mCurrentTask == t ) 00064 return; 00065 queue << t; 00066 signalTaskToTracker( t, "SyncCollectionTree" ); 00067 scheduleNext(); 00068 } 00069 00070 void ResourceScheduler::scheduleSync(const Collection & col) 00071 { 00072 Task t; 00073 t.type = SyncCollection; 00074 t.collection = col; 00075 TaskList& queue = queueForTaskType( t.type ); 00076 if ( queue.contains( t ) || mCurrentTask == t ) 00077 return; 00078 queue << t; 00079 signalTaskToTracker( t, "SyncCollection" ); 00080 scheduleNext(); 00081 } 00082 00083 void ResourceScheduler::scheduleAttributesSync( const Collection &collection ) 00084 { 00085 Task t; 00086 t.type = SyncCollectionAttributes; 00087 t.collection = collection; 00088 00089 TaskList& queue = queueForTaskType( t.type ); 00090 if ( queue.contains( t ) || mCurrentTask == t ) 00091 return; 00092 queue << t; 00093 signalTaskToTracker( t, "SyncCollectionAttributes" ); 00094 scheduleNext(); 00095 } 00096 00097 void ResourceScheduler::scheduleItemFetch(const Item & item, const QSet<QByteArray> &parts, const QDBusMessage & msg) 00098 { 00099 Task t; 00100 t.type = FetchItem; 00101 t.item = item; 00102 t.itemParts = parts; 00103 00104 // if the current task does already fetch the requested item, break here but 00105 // keep the dbus message, so we can send the reply later on 00106 if ( mCurrentTask == t ) { 00107 mCurrentTask.dbusMsgs << msg; 00108 return; 00109 } 00110 00111 // If this task is already in the queue, merge with it. 00112 TaskList& queue = queueForTaskType( t.type ); 00113 const int idx = queue.indexOf( t ); 00114 if ( idx != -1 ) { 00115 queue[ idx ].dbusMsgs << msg; 00116 return; 00117 } 00118 00119 t.dbusMsgs << msg; 00120 queue << t; 00121 signalTaskToTracker( t, "FetchItem" ); 00122 scheduleNext(); 00123 } 00124 00125 void ResourceScheduler::scheduleResourceCollectionDeletion() 00126 { 00127 Task t; 00128 t.type = DeleteResourceCollection; 00129 TaskList& queue = queueForTaskType( t.type ); 00130 if ( queue.contains( t ) || mCurrentTask == t ) 00131 return; 00132 queue << t; 00133 signalTaskToTracker( t, "DeleteResourceCollection" ); 00134 scheduleNext(); 00135 } 00136 00137 void ResourceScheduler::scheduleChangeReplay() 00138 { 00139 Task t; 00140 t.type = ChangeReplay; 00141 TaskList& queue = queueForTaskType( t.type ); 00142 // see ResourceBase::changeProcessed() for why we do not check for mCurrentTask == t here like in the other tasks 00143 if ( queue.contains( t ) ) 00144 return; 00145 queue << t; 00146 signalTaskToTracker( t, "ChangeReplay" ); 00147 scheduleNext(); 00148 } 00149 00150 void Akonadi::ResourceScheduler::scheduleFullSyncCompletion() 00151 { 00152 Task t; 00153 t.type = SyncAllDone; 00154 TaskList& queue = queueForTaskType( t.type ); 00155 // no compression here, all this does is emitting a D-Bus signal anyway, and compression can trigger races on the receiver side with the signal being lost 00156 queue << t; 00157 signalTaskToTracker( t, "SyncAllDone" ); 00158 scheduleNext(); 00159 } 00160 00161 void Akonadi::ResourceScheduler::scheduleCustomTask( QObject *receiver, const char* methodName, const QVariant &argument, ResourceBase::SchedulePriority priority ) 00162 { 00163 Task t; 00164 t.type = Custom; 00165 t.receiver = receiver; 00166 t.methodName = methodName; 00167 t.argument = argument; 00168 QueueType queueType = GenericTaskQueue; 00169 if ( priority == ResourceBase::AfterChangeReplay ) 00170 queueType = AfterChangeReplayQueue; 00171 else if ( priority == ResourceBase::Prepend ) 00172 queueType = PrependTaskQueue; 00173 TaskList& queue = mTaskList[ queueType ]; 00174 00175 if ( queue.contains( t ) ) 00176 return; 00177 00178 switch (priority) { 00179 case ResourceBase::Prepend: 00180 queue.prepend( t ); 00181 break; 00182 default: 00183 queue.append(t); 00184 break; 00185 } 00186 00187 signalTaskToTracker( t, "Custom-" + t.methodName ); 00188 scheduleNext(); 00189 } 00190 00191 void ResourceScheduler::taskDone() 00192 { 00193 if ( isEmpty() ) 00194 emit status( AgentBase::Idle, i18nc( "@info:status Application ready for work", "Ready" ) ); 00195 00196 if ( s_resourcetracker ) { 00197 QList<QVariant> argumentList; 00198 argumentList << QString::number( mCurrentTask.serial ) 00199 << QString(); 00200 s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobEnded" ), argumentList); 00201 } 00202 00203 mCurrentTask = Task(); 00204 mCurrentTasksQueue = -1; 00205 scheduleNext(); 00206 } 00207 00208 void ResourceScheduler::deferTask() 00209 { 00210 if ( mCurrentTask.type == Invalid ) 00211 return; 00212 00213 if ( s_resourcetracker ) { 00214 QList<QVariant> argumentList; 00215 argumentList << QString::number( mCurrentTask.serial ) 00216 << QString(); 00217 s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobEnded" ), argumentList); 00218 } 00219 00220 Task t = mCurrentTask; 00221 mCurrentTask = Task(); 00222 00223 Q_ASSERT( mCurrentTasksQueue >= 0 && mCurrentTasksQueue < NQueueCount ); 00224 mTaskList[mCurrentTasksQueue].prepend( t ); 00225 mCurrentTasksQueue = -1; 00226 00227 signalTaskToTracker( t, "DeferedTask" ); 00228 00229 scheduleNext(); 00230 } 00231 00232 bool ResourceScheduler::isEmpty() 00233 { 00234 for ( int i = 0; i < NQueueCount; ++i ) { 00235 if ( !mTaskList[i].isEmpty() ) 00236 return false; 00237 } 00238 return true; 00239 } 00240 00241 void ResourceScheduler::scheduleNext() 00242 { 00243 if ( mCurrentTask.type != Invalid || isEmpty() || !mOnline ) 00244 return; 00245 QTimer::singleShot( 0, this, SLOT( executeNext() ) ); 00246 } 00247 00248 void ResourceScheduler::executeNext() 00249 { 00250 if ( mCurrentTask.type != Invalid || isEmpty() ) 00251 return; 00252 00253 for ( int i = 0; i < NQueueCount; ++i ) { 00254 if ( !mTaskList[ i ].isEmpty() ) { 00255 mCurrentTask = mTaskList[ i ].takeFirst(); 00256 mCurrentTasksQueue = i; 00257 break; 00258 } 00259 } 00260 00261 if ( s_resourcetracker ) { 00262 QList<QVariant> argumentList; 00263 argumentList << QString::number( mCurrentTask.serial ); 00264 s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobStarted" ), argumentList); 00265 } 00266 00267 switch ( mCurrentTask.type ) { 00268 case SyncAll: 00269 emit executeFullSync(); 00270 break; 00271 case SyncCollectionTree: 00272 emit executeCollectionTreeSync(); 00273 break; 00274 case SyncCollection: 00275 emit executeCollectionSync( mCurrentTask.collection ); 00276 break; 00277 case SyncCollectionAttributes: 00278 emit executeCollectionAttributesSync( mCurrentTask.collection ); 00279 break; 00280 case FetchItem: 00281 emit executeItemFetch( mCurrentTask.item, mCurrentTask.itemParts ); 00282 break; 00283 case DeleteResourceCollection: 00284 emit executeResourceCollectionDeletion(); 00285 break; 00286 case ChangeReplay: 00287 emit executeChangeReplay(); 00288 break; 00289 case SyncAllDone: 00290 emit fullSyncComplete(); 00291 break; 00292 case Custom: 00293 { 00294 bool success = QMetaObject::invokeMethod( mCurrentTask.receiver, mCurrentTask.methodName, Q_ARG(QVariant, mCurrentTask.argument) ); 00295 Q_ASSERT_X( success || !mCurrentTask.argument.isValid(), "ResourceScheduler::executeNext", "Valid argument was provided but the method wasn't found" ); 00296 if ( !success ) 00297 success = QMetaObject::invokeMethod( mCurrentTask.receiver, mCurrentTask.methodName ); 00298 00299 if ( !success ) 00300 kError() << "Could not invoke slot" << mCurrentTask.methodName << "on" << mCurrentTask.receiver << "with argument" << mCurrentTask.argument; 00301 break; 00302 } 00303 default: { 00304 kError() << "Unhandled task type" << mCurrentTask.type; 00305 dump(); 00306 Q_ASSERT( false ); 00307 } 00308 } 00309 } 00310 00311 ResourceScheduler::Task ResourceScheduler::currentTask() const 00312 { 00313 return mCurrentTask; 00314 } 00315 00316 void ResourceScheduler::setOnline(bool state) 00317 { 00318 if ( mOnline == state ) 00319 return; 00320 mOnline = state; 00321 if ( mOnline ) { 00322 scheduleNext(); 00323 } else { 00324 if ( mCurrentTask.type != Invalid ) { 00325 // abort running task 00326 queueForTaskType( mCurrentTask.type ).prepend( mCurrentTask ); 00327 mCurrentTask = Task(); 00328 mCurrentTasksQueue = -1; 00329 } 00330 // abort pending synchronous tasks, might take longer until the resource goes online again 00331 TaskList& itemFetchQueue = queueForTaskType( FetchItem ); 00332 for ( QList< Task >::iterator it = itemFetchQueue.begin(); it != itemFetchQueue.end(); ) { 00333 if ( (*it).type == FetchItem ) { 00334 (*it).sendDBusReplies( false ); 00335 it = itemFetchQueue.erase( it ); 00336 if ( s_resourcetracker ) { 00337 QList<QVariant> argumentList; 00338 argumentList << QString::number( mCurrentTask.serial ) 00339 << QLatin1String( "Job canceled." ); 00340 s_resourcetracker->asyncCallWithArgumentList( QLatin1String( "jobEnded" ), argumentList ); 00341 } 00342 } else { 00343 ++it; 00344 } 00345 } 00346 } 00347 } 00348 00349 void ResourceScheduler::signalTaskToTracker( const Task &task, const QByteArray &taskType ) 00350 { 00351 // if there's a job tracer running, tell it about the new job 00352 if ( !s_resourcetracker && DBusConnectionPool::threadConnection().interface()->isServiceRegistered(QLatin1String( "org.kde.akonadiconsole" ) ) ) { 00353 s_resourcetracker = new QDBusInterface( QLatin1String( "org.kde.akonadiconsole" ), 00354 QLatin1String( "/resourcesJobtracker" ), 00355 QLatin1String( "org.freedesktop.Akonadi.JobTracker" ), 00356 DBusConnectionPool::threadConnection(), 0 ); 00357 } 00358 00359 if ( s_resourcetracker ) { 00360 QList<QVariant> argumentList; 00361 argumentList << static_cast<AgentBase*>( parent() )->identifier() 00362 << QString::number( task.serial ) 00363 << QString() 00364 << QString::fromLatin1( taskType ); 00365 s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobCreated" ), argumentList); 00366 } 00367 } 00368 00369 void ResourceScheduler::collectionRemoved( const Akonadi::Collection &collection ) 00370 { 00371 if ( !collection.isValid() ) // should not happen, but you never know... 00372 return; 00373 TaskList& queue = queueForTaskType( SyncCollection ); 00374 for ( QList<Task>::iterator it = queue.begin(); it != queue.end(); ) { 00375 if ( (*it).type == SyncCollection && (*it).collection == collection ) { 00376 it = queue.erase( it ); 00377 kDebug() << " erasing"; 00378 } else 00379 ++it; 00380 } 00381 } 00382 00383 void ResourceScheduler::Task::sendDBusReplies( bool success ) 00384 { 00385 Q_FOREACH( const QDBusMessage &msg, dbusMsgs ) { 00386 QDBusMessage reply( msg ); 00387 reply << success; 00388 DBusConnectionPool::threadConnection().send( reply ); 00389 } 00390 } 00391 00392 ResourceScheduler::QueueType ResourceScheduler::queueTypeForTaskType( TaskType type ) 00393 { 00394 switch( type ) { 00395 case ChangeReplay: 00396 return ChangeReplayQueue; 00397 case FetchItem: 00398 return ItemFetchQueue; 00399 default: 00400 return GenericTaskQueue; 00401 } 00402 } 00403 00404 ResourceScheduler::TaskList& ResourceScheduler::queueForTaskType( TaskType type ) 00405 { 00406 const QueueType qt = queueTypeForTaskType( type ); 00407 return mTaskList[ qt ]; 00408 } 00409 00410 void ResourceScheduler::dump() 00411 { 00412 kDebug() << "ResourceScheduler: Online:" << mOnline; 00413 kDebug() << " current task:" << mCurrentTask; 00414 for ( int i = 0; i < NQueueCount; ++i ) { 00415 const TaskList& queue = mTaskList[i]; 00416 kDebug() << " queue" << i << queue.size() << "tasks:"; 00417 for ( QList<Task>::const_iterator it = queue.begin(); it != queue.end(); ++it ) { 00418 kDebug() << " " << (*it); 00419 } 00420 } 00421 } 00422 00423 void ResourceScheduler::clear() 00424 { 00425 kDebug() << "Clearing ResourceScheduler queues:"; 00426 for ( int i = 0; i < NQueueCount; ++i ) { 00427 TaskList& queue = mTaskList[i]; 00428 queue.clear(); 00429 } 00430 mCurrentTask = Task(); 00431 mCurrentTasksQueue = -1; 00432 } 00433 00434 void Akonadi::ResourceScheduler::cancelQueues() 00435 { 00436 for ( int i = 0; i < NQueueCount; ++i ) { 00437 TaskList& queue = mTaskList[i]; 00438 if ( s_resourcetracker ) { 00439 foreach ( const Task &t, queue ) { 00440 QList<QVariant> argumentList; 00441 argumentList << QString::number( t.serial ) << QString(); 00442 s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobEnded" ), argumentList); 00443 } 00444 } 00445 queue.clear(); 00446 } 00447 } 00448 00449 static const char s_taskTypes[][25] = { 00450 "Invalid", 00451 "SyncAll", 00452 "SyncCollectionTree", 00453 "SyncCollection", 00454 "FetchItem", 00455 "ChangeReplay", 00456 "DeleteResourceCollection", 00457 "SyncAllDone", 00458 "Custom" 00459 }; 00460 00461 QDebug Akonadi::operator<<( QDebug d, const ResourceScheduler::Task& task ) 00462 { 00463 d << task.serial << s_taskTypes[task.type]; 00464 if ( task.type != ResourceScheduler::Invalid ) { 00465 if ( task.collection.id() != -1 ) 00466 d << "collection" << task.collection.id(); 00467 if ( task.item.id() != -1 ) 00468 d << "item" << task.item.id(); 00469 if ( !task.methodName.isEmpty() ) 00470 d << task.methodName << task.argument; 00471 } 00472 return d; 00473 } 00474 00475 //@endcond 00476 00477 #include "resourcescheduler_p.moc"