• Skip to content
  • Skip to link menu
KDE 4.6 API Reference
  • KDE API Reference
  • KDE-PIM Libraries
  • KDE Home
  • Contact Us
 

akonadi

resourcebase.cpp
00001 /*
00002     Copyright (c) 2006 Till Adam <adam@kde.org>
00003     Copyright (c) 2007 Volker Krause <vkrause@kde.org>
00004 
00005     This library is free software; you can redistribute it and/or modify it
00006     under the terms of the GNU Library General Public License as published by
00007     the Free Software Foundation; either version 2 of the License, or (at your
00008     option) any later version.
00009 
00010     This library is distributed in the hope that it will be useful, but WITHOUT
00011     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
00012     FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Library General Public
00013     License for more details.
00014 
00015     You should have received a copy of the GNU Library General Public License
00016     along with this library; see the file COPYING.LIB.  If not, write to the
00017     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
00018     02110-1301, USA.
00019 */
00020 
00021 #include "resourcebase.h"
00022 #include "agentbase_p.h"
00023 
00024 #include "resourceadaptor.h"
00025 #include "collectiondeletejob.h"
00026 #include "collectionsync_p.h"
00027 #include "dbusconnectionpool.h"
00028 #include "itemsync.h"
00029 #include "resourcescheduler_p.h"
00030 #include "tracerinterface.h"
00031 #include "xdgbasedirs_p.h"
00032 
00033 #include "changerecorder.h"
00034 #include "collectionfetchjob.h"
00035 #include "collectionfetchscope.h"
00036 #include "collectionmodifyjob.h"
00037 #include "itemfetchjob.h"
00038 #include "itemfetchscope.h"
00039 #include "itemmodifyjob.h"
00040 #include "itemmodifyjob_p.h"
00041 #include "session.h"
00042 #include "resourceselectjob_p.h"
00043 #include "monitor_p.h"
00044 #include "servermanager_p.h"
00045 
00046 #include <kaboutdata.h>
00047 #include <kcmdlineargs.h>
00048 #include <kdebug.h>
00049 #include <klocale.h>
00050 
00051 #include <QtCore/QDebug>
00052 #include <QtCore/QDir>
00053 #include <QtCore/QHash>
00054 #include <QtCore/QSettings>
00055 #include <QtCore/QTimer>
00056 #include <QtGui/QApplication>
00057 #include <QtDBus/QtDBus>
00058 
00059 using namespace Akonadi;
00060 
00061 class Akonadi::ResourceBasePrivate : public AgentBasePrivate
00062 {
00063   Q_OBJECT
00064   Q_CLASSINFO( "D-Bus Interface", "org.kde.dfaure" )
00065 
00066   public:
00067     ResourceBasePrivate( ResourceBase *parent )
00068       : AgentBasePrivate( parent ),
00069         scheduler( 0 ),
00070         mItemSyncer( 0 ),
00071         mItemSyncFetchScope( 0 ),
00072         mItemTransactionMode( ItemSync::SingleTransaction ),
00073         mCollectionSyncer( 0 ),
00074         mHierarchicalRid( false ),
00075         mUnemittedProgress( 0 )
00076     {
00077       Internal::setClientType( Internal::Resource );
00078       mStatusMessage = defaultReadyMessage();
00079       mProgressEmissionCompressor.setInterval( 1000 );
00080       mProgressEmissionCompressor.setSingleShot( true );
00081     }
00082 
00083     ~ResourceBasePrivate()
00084     {
00085       delete mItemSyncFetchScope;
00086     }
00087 
00088     Q_DECLARE_PUBLIC( ResourceBase )
00089 
00090     void delayedInit()
00091     {
00092       if ( !DBusConnectionPool::threadConnection().registerService( QLatin1String( "org.freedesktop.Akonadi.Resource." ) + mId ) ) {
00093         QString reason = DBusConnectionPool::threadConnection().lastError().message();
00094         if ( reason.isEmpty() ) {
00095           reason = QString::fromLatin1( "this service is probably running already." );
00096         }
00097         kError() << "Unable to register service at D-Bus: " << reason;
00098 
00099         if ( QThread::currentThread() == QCoreApplication::instance()->thread() )
00100           QCoreApplication::instance()->exit(1);
00101 
00102       } else {
00103         AgentBasePrivate::delayedInit();
00104       }
00105     }
00106 
00107     virtual void changeProcessed()
00108     {
00109       mChangeRecorder->changeProcessed();
00110       if ( !mChangeRecorder->isEmpty() )
00111         scheduler->scheduleChangeReplay();
00112       scheduler->taskDone();
00113     }
00114 
00115     void slotAbortRequested();
00116 
00117     void slotDeliveryDone( KJob* job );
00118     void slotCollectionSyncDone( KJob *job );
00119     void slotLocalListDone( KJob *job );
00120     void slotSynchronizeCollection( const Collection &col );
00121     void slotCollectionListDone( KJob *job );
00122     void slotSynchronizeCollectionAttributes( const Collection &col );
00123     void slotCollectionListForAttributesDone( KJob *job );
00124     void slotCollectionAttributesSyncDone( KJob *job );
00125 
00126     void slotItemSyncDone( KJob *job );
00127 
00128     void slotPercent( KJob* job, unsigned long percent );
00129     void slotDelayedEmitProgress();
00130     void slotDeleteResourceCollection();
00131     void slotDeleteResourceCollectionDone( KJob *job );
00132     void slotCollectionDeletionDone( KJob *job );
00133 
00134     void slotPrepareItemRetrieval( const Akonadi::Item &item );
00135     void slotPrepareItemRetrievalResult( KJob* job );
00136 
00137     void changeCommittedResult( KJob* job );
00138 
00139     void slotSessionReconnected()
00140     {
00141       Q_Q( ResourceBase );
00142 
00143       new ResourceSelectJob( q->identifier() );
00144     }
00145 
00146     void createItemSyncInstanceIfMissing()
00147     {
00148       Q_Q( ResourceBase );
00149       Q_ASSERT_X( scheduler->currentTask().type == ResourceScheduler::SyncCollection,
00150                   "createItemSyncInstance", "Calling items retrieval methods although no item retrieval is in progress" );
00151       if ( !mItemSyncer ) {
00152         mItemSyncer = new ItemSync( q->currentCollection() );
00153         mItemSyncer->setTransactionMode( mItemTransactionMode );
00154         if ( mItemSyncFetchScope )
00155           mItemSyncer->setFetchScope( *mItemSyncFetchScope );
00156         mItemSyncer->setProperty( "collection", QVariant::fromValue( q->currentCollection() ) );
00157         connect( mItemSyncer, SIGNAL( percent( KJob*, unsigned long ) ), q, SLOT( slotPercent( KJob*, unsigned long ) ) );
00158         connect( mItemSyncer, SIGNAL( result( KJob* ) ), q, SLOT( slotItemSyncDone( KJob* ) ) );
00159       }
00160       Q_ASSERT( mItemSyncer );
00161     }
00162 
00163   public Q_SLOTS:
00164     Q_SCRIPTABLE void dump()
00165     {
00166       scheduler->dump();
00167     }
00168 
00169     Q_SCRIPTABLE void clear()
00170     {
00171       scheduler->clear();
00172     }
00173 
00174   protected Q_SLOTS:
00175     // reimplementations from AgentbBasePrivate, containing sanity checks that only apply to resources
00176     // such as making sure that RIDs are present as well as translations of cross-resource moves
00177     // TODO: we could possibly add recovery code for no-RID notifications by re-enquing those to the change recorder
00178     // as the corresponding Add notifications, although that contains a risk of endless fail/retry loops
00179 
00180     void itemAdded(const Akonadi::Item& item, const Akonadi::Collection& collection)
00181     {
00182       if ( collection.remoteId().isEmpty() ) {
00183         changeProcessed();
00184         return;
00185       }
00186       AgentBasePrivate::itemAdded( item, collection );
00187     }
00188 
00189     void itemChanged(const Akonadi::Item& item, const QSet< QByteArray >& partIdentifiers)
00190     {
00191       if ( item.remoteId().isEmpty() ) {
00192         changeProcessed();
00193         return;
00194       }
00195       AgentBasePrivate::itemChanged( item, partIdentifiers );
00196     }
00197 
00198     // TODO move the move translation code from AgebtBasePrivate here, it's wrong for agents
00199     void itemMoved(const Akonadi::Item &item, const Akonadi::Collection &source, const Akonadi::Collection &destination)
00200     {
00201       if ( item.remoteId().isEmpty() || destination.remoteId().isEmpty() || destination == source ) {
00202         changeProcessed();
00203         return;
00204       }
00205       AgentBasePrivate::itemMoved( item, source, destination );
00206     }
00207 
00208     void itemRemoved(const Akonadi::Item& item)
00209     {
00210       if ( item.remoteId().isEmpty() ) {
00211         changeProcessed();
00212         return;
00213       }
00214       AgentBasePrivate::itemRemoved( item );
00215     }
00216 
00217     void collectionAdded(const Akonadi::Collection& collection, const Akonadi::Collection& parent)
00218     {
00219       if ( parent.remoteId().isEmpty() ) {
00220         changeProcessed();
00221         return;
00222       }
00223       AgentBasePrivate::collectionAdded( collection, parent );
00224     }
00225 
00226     void collectionChanged(const Akonadi::Collection& collection)
00227     {
00228       if ( collection.remoteId().isEmpty() ) {
00229         changeProcessed();
00230         return;
00231       }
00232       AgentBasePrivate::collectionChanged( collection );
00233     }
00234 
00235     void collectionChanged(const Akonadi::Collection& collection, const QSet< QByteArray >& partIdentifiers)
00236     {
00237       if ( collection.remoteId().isEmpty() ) {
00238         changeProcessed();
00239         return;
00240       }
00241       AgentBasePrivate::collectionChanged( collection, partIdentifiers );
00242     }
00243 
00244     // TODO move the move translation code from AgebtBasePrivate here, it's wrong for agents
00245     void collectionMoved(const Akonadi::Collection& collection, const Akonadi::Collection& source, const Akonadi::Collection& destination)
00246     {
00247       if ( collection.remoteId().isEmpty() || destination.remoteId().isEmpty() || source == destination ) {
00248         changeProcessed();
00249         return;
00250       }
00251       AgentBasePrivate::collectionMoved( collection, source, destination );
00252     }
00253 
00254     void collectionRemoved(const Akonadi::Collection& collection)
00255     {
00256       if ( collection.remoteId().isEmpty() ) {
00257         changeProcessed();
00258         return;
00259       }
00260       AgentBasePrivate::collectionRemoved( collection );
00261     }
00262 
00263   public:
00264     // synchronize states
00265     Collection currentCollection;
00266 
00267     ResourceScheduler *scheduler;
00268     ItemSync *mItemSyncer;
00269     ItemFetchScope *mItemSyncFetchScope;
00270     ItemSync::TransactionMode mItemTransactionMode;
00271     CollectionSync *mCollectionSyncer;
00272     bool mHierarchicalRid;
00273     QTimer mProgressEmissionCompressor;
00274     int mUnemittedProgress;
00275     QMap<Akonadi::Collection::Id, QVariantMap> mUnemittedAdvancedStatus;
00276 };
00277 
00278 ResourceBase::ResourceBase( const QString & id )
00279   : AgentBase( new ResourceBasePrivate( this ), id )
00280 {
00281   Q_D( ResourceBase );
00282 
00283   new Akonadi__ResourceAdaptor( this );
00284 
00285   d->scheduler = new ResourceScheduler( this );
00286 
00287   d->mChangeRecorder->setChangeRecordingEnabled( true );
00288   connect( d->mChangeRecorder, SIGNAL( changesAdded() ),
00289            d->scheduler, SLOT( scheduleChangeReplay() ) );
00290 
00291   d->mChangeRecorder->setResourceMonitored( d->mId.toLatin1() );
00292   d->mChangeRecorder->fetchCollection( true );
00293 
00294   connect( d->scheduler, SIGNAL( executeFullSync() ),
00295            SLOT( retrieveCollections() ) );
00296   connect( d->scheduler, SIGNAL( executeCollectionTreeSync() ),
00297            SLOT( retrieveCollections() ) );
00298   connect( d->scheduler, SIGNAL( executeCollectionSync( const Akonadi::Collection& ) ),
00299            SLOT( slotSynchronizeCollection( const Akonadi::Collection& ) ) );
00300   connect( d->scheduler, SIGNAL( executeCollectionAttributesSync( const Akonadi::Collection& ) ),
00301            SLOT( slotSynchronizeCollectionAttributes(Akonadi::Collection)) );
00302   connect( d->scheduler, SIGNAL( executeItemFetch( const Akonadi::Item&, const QSet<QByteArray>& ) ),
00303            SLOT( slotPrepareItemRetrieval(Akonadi::Item)) );
00304   connect( d->scheduler, SIGNAL( executeResourceCollectionDeletion() ),
00305            SLOT( slotDeleteResourceCollection() ) );
00306   connect( d->scheduler, SIGNAL( status( int, const QString& ) ),
00307            SIGNAL( status( int, const QString& ) ) );
00308   connect( d->scheduler, SIGNAL( executeChangeReplay() ),
00309            d->mChangeRecorder, SLOT( replayNext() ) );
00310   connect( d->scheduler, SIGNAL( fullSyncComplete() ), SIGNAL( synchronized() ) );
00311   connect( d->mChangeRecorder, SIGNAL( nothingToReplay() ), d->scheduler, SLOT( taskDone() ) );
00312   connect( d->mChangeRecorder, SIGNAL( collectionRemoved( const Akonadi::Collection& ) ),
00313            d->scheduler, SLOT( collectionRemoved( const Akonadi::Collection& ) ) );
00314   connect( this, SIGNAL( abortRequested() ), this, SLOT( slotAbortRequested() ) );
00315   connect( this, SIGNAL( synchronized() ), d->scheduler, SLOT( taskDone() ) );
00316   connect( this, SIGNAL( agentNameChanged( const QString& ) ),
00317            this, SIGNAL( nameChanged( const QString& ) ) );
00318 
00319   connect( &d->mProgressEmissionCompressor, SIGNAL( timeout() ),
00320            this, SLOT( slotDelayedEmitProgress() ) );
00321 
00322   d->scheduler->setOnline( d->mOnline );
00323   if ( !d->mChangeRecorder->isEmpty() )
00324     d->scheduler->scheduleChangeReplay();
00325 
00326   DBusConnectionPool::threadConnection().registerObject( QLatin1String( "/Debug" ), d, QDBusConnection::ExportScriptableSlots );
00327 
00328   new ResourceSelectJob( identifier() );
00329 
00330   connect( d->mChangeRecorder->session(), SIGNAL( reconnected() ), SLOT( slotSessionReconnected() ) );
00331 }
00332 
00333 ResourceBase::~ResourceBase()
00334 {
00335 }
00336 
00337 void ResourceBase::synchronize()
00338 {
00339   d_func()->scheduler->scheduleFullSync();
00340 }
00341 
00342 void ResourceBase::setName( const QString &name )
00343 {
00344   AgentBase::setAgentName( name );
00345 }
00346 
00347 QString ResourceBase::name() const
00348 {
00349   return AgentBase::agentName();
00350 }
00351 
00352 QString ResourceBase::parseArguments( int argc, char **argv )
00353 {
00354   QString identifier;
00355   if ( argc < 3 ) {
00356     kDebug() << "Not enough arguments passed...";
00357     exit( 1 );
00358   }
00359 
00360   for ( int i = 1; i < argc - 1; ++i ) {
00361     if ( QLatin1String( argv[ i ] ) == QLatin1String( "--identifier" ) )
00362       identifier = QLatin1String( argv[ i + 1 ] );
00363   }
00364 
00365   if ( identifier.isEmpty() ) {
00366     kDebug() << "Identifier argument missing";
00367     exit( 1 );
00368   }
00369 
00370   const QFileInfo fi( QString::fromLocal8Bit( argv[0] ) );
00371   // strip off full path and possible .exe suffix
00372   const QByteArray catalog = fi.baseName().toLatin1();
00373 
00374   KCmdLineArgs::init( argc, argv, identifier.toLatin1(), catalog,
00375                       ki18nc( "@title application name", "Akonadi Resource" ), "0.1",
00376                       ki18nc( "@title application description", "Akonadi Resource" ) );
00377 
00378   KCmdLineOptions options;
00379   options.add( "identifier <argument>",
00380                ki18nc( "@label commandline option", "Resource identifier" ) );
00381   KCmdLineArgs::addCmdLineOptions( options );
00382 
00383   return identifier;
00384 }
00385 
00386 int ResourceBase::init( ResourceBase *r )
00387 {
00388   QApplication::setQuitOnLastWindowClosed( false );
00389   KGlobal::locale()->insertCatalog( QLatin1String( "libakonadi" ) );
00390   int rv = kapp->exec();
00391   delete r;
00392   return rv;
00393 }
00394 
00395 void ResourceBasePrivate::slotAbortRequested()
00396 {
00397   Q_Q( ResourceBase );
00398 
00399   scheduler->cancelQueues();
00400   QMetaObject::invokeMethod( q, "abortActivity" );
00401 }
00402 
00403 void ResourceBase::itemRetrieved( const Item &item )
00404 {
00405   Q_D( ResourceBase );
00406   Q_ASSERT( d->scheduler->currentTask().type == ResourceScheduler::FetchItem );
00407   if ( !item.isValid() ) {
00408     d->scheduler->currentTask().sendDBusReplies( false );
00409     d->scheduler->taskDone();
00410     return;
00411   }
00412 
00413   Item i( item );
00414   QSet<QByteArray> requestedParts = d->scheduler->currentTask().itemParts;
00415   foreach ( const QByteArray &part, requestedParts ) {
00416     if ( !item.loadedPayloadParts().contains( part ) ) {
00417       kWarning() << "Item does not provide part" << part;
00418     }
00419   }
00420 
00421   ItemModifyJob *job = new ItemModifyJob( i );
00422   // FIXME: remove once the item with which we call retrieveItem() has a revision number
00423   job->disableRevisionCheck();
00424   connect( job, SIGNAL( result( KJob* ) ), SLOT( slotDeliveryDone( KJob* ) ) );
00425 }
00426 
00427 void ResourceBasePrivate::slotDeliveryDone(KJob * job)
00428 {
00429   Q_Q( ResourceBase );
00430   Q_ASSERT( scheduler->currentTask().type == ResourceScheduler::FetchItem );
00431   if ( job->error() ) {
00432     emit q->error( QLatin1String( "Error while creating item: " ) + job->errorString() );
00433   }
00434   scheduler->currentTask().sendDBusReplies( !job->error() );
00435   scheduler->taskDone();
00436 }
00437 
00438 void ResourceBase::collectionAttributesRetrieved( const Collection &collection )
00439 {
00440   Q_D( ResourceBase );
00441   Q_ASSERT( d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionAttributes );
00442   if ( !collection.isValid() ) {
00443     emit attributesSynchronized( d->scheduler->currentTask().collection.id() );
00444     d->scheduler->taskDone();
00445     return;
00446   }
00447 
00448   CollectionModifyJob *job = new CollectionModifyJob( collection );
00449   connect( job, SIGNAL( result( KJob* ) ), SLOT( slotCollectionAttributesSyncDone( KJob* ) ) );
00450 }
00451 
00452 void ResourceBasePrivate::slotCollectionAttributesSyncDone(KJob * job)
00453 {
00454   Q_Q( ResourceBase );
00455   Q_ASSERT( scheduler->currentTask().type == ResourceScheduler::SyncCollectionAttributes );
00456   if ( job->error() ) {
00457     emit q->error( QLatin1String( "Error while updating collection: " ) + job->errorString() );
00458   }
00459   emit q->attributesSynchronized( scheduler->currentTask().collection.id() );
00460   scheduler->taskDone();
00461 }
00462 
00463 void ResourceBasePrivate::slotDeleteResourceCollection()
00464 {
00465   Q_Q( ResourceBase );
00466 
00467   CollectionFetchJob *job = new CollectionFetchJob( Collection::root(), CollectionFetchJob::FirstLevel );
00468   job->fetchScope().setResource( q->identifier() );
00469   connect( job, SIGNAL( result( KJob* ) ), q, SLOT( slotDeleteResourceCollectionDone( KJob* ) ) );
00470 }
00471 
00472 void ResourceBasePrivate::slotDeleteResourceCollectionDone( KJob *job )
00473 {
00474   Q_Q( ResourceBase );
00475   if ( job->error() ) {
00476     emit q->error( job->errorString() );
00477     scheduler->taskDone();
00478   } else {
00479     const CollectionFetchJob *fetchJob = static_cast<const CollectionFetchJob*>( job );
00480 
00481     if ( !fetchJob->collections().isEmpty() ) {
00482       CollectionDeleteJob *job = new CollectionDeleteJob( fetchJob->collections().first() );
00483       connect( job, SIGNAL( result( KJob* ) ), q, SLOT( slotCollectionDeletionDone( KJob* ) ) );
00484     } else {
00485       // there is no resource collection, so just ignore the request
00486       scheduler->taskDone();
00487     }
00488   }
00489 }
00490 
00491 void ResourceBasePrivate::slotCollectionDeletionDone( KJob *job )
00492 {
00493   Q_Q( ResourceBase );
00494   if ( job->error() ) {
00495     emit q->error( job->errorString() );
00496   }
00497 
00498   scheduler->taskDone();
00499 }
00500 
00501 void ResourceBase::changeCommitted( const Item& item )
00502 {
00503   Q_D( ResourceBase );
00504   ItemModifyJob *job = new ItemModifyJob( item );
00505   job->d_func()->setClean();
00506   job->disableRevisionCheck(); // TODO: remove, but where/how do we handle the error?
00507   job->setIgnorePayload( true ); // we only want to reset the dirty flag and update the remote id
00508   d->changeProcessed();
00509 }
00510 
00511 void ResourceBase::changeCommitted( const Collection &collection )
00512 {
00513   CollectionModifyJob *job = new CollectionModifyJob( collection );
00514   connect( job, SIGNAL( result( KJob* ) ), SLOT( changeCommittedResult( KJob* ) ) );
00515 }
00516 
00517 void ResourceBasePrivate::changeCommittedResult( KJob *job )
00518 {
00519   Q_Q( ResourceBase );
00520   if ( job->error() )
00521     emit q->error( i18nc( "@info", "Updating local collection failed: %1.", job->errorText() ) );
00522   mChangeRecorder->d_ptr->invalidateCache( static_cast<CollectionModifyJob*>( job )->collection() );
00523   changeProcessed();
00524 }
00525 
00526 bool ResourceBase::requestItemDelivery( qint64 uid, const QString & remoteId,
00527                                         const QString &mimeType, const QStringList &_parts )
00528 {
00529   Q_D( ResourceBase );
00530   if ( !isOnline() ) {
00531     emit error( i18nc( "@info", "Cannot fetch item in offline mode." ) );
00532     return false;
00533   }
00534 
00535   setDelayedReply( true );
00536   // FIXME: we need at least the revision number too
00537   Item item( uid );
00538   item.setMimeType( mimeType );
00539   item.setRemoteId( remoteId );
00540 
00541   QSet<QByteArray> parts;
00542   Q_FOREACH( const QString &str, _parts )
00543     parts.insert( str.toLatin1() );
00544 
00545   d->scheduler->scheduleItemFetch( item, parts, message().createReply() );
00546 
00547   return true;
00548 }
00549 
00550 void ResourceBase::collectionsRetrieved( const Collection::List & collections )
00551 {
00552   Q_D( ResourceBase );
00553   Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree ||
00554               d->scheduler->currentTask().type == ResourceScheduler::SyncAll,
00555               "ResourceBase::collectionsRetrieved()",
00556               "Calling collectionsRetrieved() although no collection retrieval is in progress" );
00557   if ( !d->mCollectionSyncer ) {
00558     d->mCollectionSyncer = new CollectionSync( identifier() );
00559     d->mCollectionSyncer->setHierarchicalRemoteIds( d->mHierarchicalRid );
00560     connect( d->mCollectionSyncer, SIGNAL( percent( KJob*, unsigned long ) ), SLOT( slotPercent( KJob*, unsigned long ) ) );
00561     connect( d->mCollectionSyncer, SIGNAL( result( KJob* ) ), SLOT( slotCollectionSyncDone( KJob* ) ) );
00562   }
00563   d->mCollectionSyncer->setRemoteCollections( collections );
00564 }
00565 
00566 void ResourceBase::collectionsRetrievedIncremental( const Collection::List & changedCollections,
00567                                                     const Collection::List & removedCollections )
00568 {
00569   Q_D( ResourceBase );
00570   Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree ||
00571               d->scheduler->currentTask().type == ResourceScheduler::SyncAll,
00572               "ResourceBase::collectionsRetrievedIncremental()",
00573               "Calling collectionsRetrievedIncremental() although no collection retrieval is in progress" );
00574   if ( !d->mCollectionSyncer ) {
00575     d->mCollectionSyncer = new CollectionSync( identifier() );
00576     d->mCollectionSyncer->setHierarchicalRemoteIds( d->mHierarchicalRid );
00577     connect( d->mCollectionSyncer, SIGNAL( percent( KJob*, unsigned long ) ), SLOT( slotPercent( KJob*, unsigned long ) ) );
00578     connect( d->mCollectionSyncer, SIGNAL( result( KJob* ) ), SLOT( slotCollectionSyncDone( KJob* ) ) );
00579   }
00580   d->mCollectionSyncer->setRemoteCollections( changedCollections, removedCollections );
00581 }
00582 
00583 void ResourceBase::setCollectionStreamingEnabled( bool enable )
00584 {
00585   Q_D( ResourceBase );
00586   Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree ||
00587               d->scheduler->currentTask().type == ResourceScheduler::SyncAll,
00588               "ResourceBase::setCollectionStreamingEnabled()",
00589               "Calling setCollectionStreamingEnabled() although no collection retrieval is in progress" );
00590   if ( !d->mCollectionSyncer ) {
00591     d->mCollectionSyncer = new CollectionSync( identifier() );
00592     d->mCollectionSyncer->setHierarchicalRemoteIds( d->mHierarchicalRid );
00593     connect( d->mCollectionSyncer, SIGNAL( percent( KJob*, unsigned long ) ), SLOT( slotPercent( KJob*, unsigned long ) ) );
00594     connect( d->mCollectionSyncer, SIGNAL( result( KJob* ) ), SLOT( slotCollectionSyncDone( KJob* ) ) );
00595   }
00596   d->mCollectionSyncer->setStreamingEnabled( enable );
00597 }
00598 
00599 void ResourceBase::collectionsRetrievalDone()
00600 {
00601   Q_D( ResourceBase );
00602   Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree ||
00603               d->scheduler->currentTask().type == ResourceScheduler::SyncAll,
00604               "ResourceBase::collectionsRetrievalDone()",
00605               "Calling collectionsRetrievalDone() although no collection retrieval is in progress" );
00606   // streaming enabled, so finalize the sync
00607   if ( d->mCollectionSyncer ) {
00608     d->mCollectionSyncer->retrievalDone();
00609   }
00610   // user did the sync himself, we are done now
00611   else {
00612     // FIXME: we need the same special case for SyncAll as in slotCollectionSyncDone here!
00613     d->scheduler->taskDone();
00614   }
00615 }
00616 
00617 void ResourceBasePrivate::slotCollectionSyncDone( KJob * job )
00618 {
00619   Q_Q( ResourceBase );
00620   mCollectionSyncer = 0;
00621   if ( job->error() ) {
00622     if ( job->error() != Job::UserCanceled )
00623       emit q->error( job->errorString() );
00624   } else {
00625     if ( scheduler->currentTask().type == ResourceScheduler::SyncAll ) {
00626       CollectionFetchJob *list = new CollectionFetchJob( Collection::root(), CollectionFetchJob::Recursive );
00627       list->setFetchScope( q->changeRecorder()->collectionFetchScope() );
00628       list->fetchScope().setResource( mId );
00629       q->connect( list, SIGNAL( result( KJob* ) ), q, SLOT( slotLocalListDone( KJob* ) ) );
00630       return;
00631     }
00632   }
00633   scheduler->taskDone();
00634 }
00635 
00636 void ResourceBasePrivate::slotLocalListDone( KJob * job )
00637 {
00638   Q_Q( ResourceBase );
00639   if ( job->error() ) {
00640     emit q->error( job->errorString() );
00641   } else {
00642     Collection::List cols = static_cast<CollectionFetchJob*>( job )->collections();
00643     foreach ( const Collection &col, cols ) {
00644       scheduler->scheduleSync( col );
00645     }
00646     scheduler->scheduleFullSyncCompletion();
00647   }
00648   scheduler->taskDone();
00649 }
00650 
00651 void ResourceBasePrivate::slotSynchronizeCollection( const Collection &col )
00652 {
00653   Q_Q( ResourceBase );
00654   currentCollection = col;
00655   // check if this collection actually can contain anything
00656   QStringList contentTypes = currentCollection.contentMimeTypes();
00657   contentTypes.removeAll( Collection::mimeType() );
00658   if ( !contentTypes.isEmpty() || (col.rights() & (Collection::CanLinkItem)) ) { // HACK to check for virtual collections
00659     emit q->status( AgentBase::Running, i18nc( "@info:status", "Syncing collection '%1'", currentCollection.name() ) );
00660     q->retrieveItems( currentCollection );
00661     return;
00662   }
00663   scheduler->taskDone();
00664 }
00665 
00666 void ResourceBasePrivate::slotSynchronizeCollectionAttributes( const Collection &col )
00667 {
00668   Q_Q( ResourceBase );
00669   QMetaObject::invokeMethod( q, "retrieveCollectionAttributes", Q_ARG( Akonadi::Collection, col ) );
00670 }
00671 
00672 void ResourceBasePrivate::slotPrepareItemRetrieval( const Akonadi::Item &item )
00673 {
00674   Q_Q( ResourceBase );
00675   ItemFetchJob *fetch = new ItemFetchJob( item, this );
00676   fetch->fetchScope().setAncestorRetrieval( q->changeRecorder()->itemFetchScope().ancestorRetrieval() );
00677   fetch->fetchScope().setCacheOnly( true );
00678 
00679   // copy list of attributes to fetch
00680   const QSet<QByteArray> attributes = q->changeRecorder()->itemFetchScope().attributes();
00681   foreach ( const QByteArray &attribute, attributes )
00682     fetch->fetchScope().fetchAttribute( attribute );
00683 
00684   q->connect( fetch, SIGNAL( result( KJob* ) ), SLOT( slotPrepareItemRetrievalResult( KJob* ) ) );
00685 }
00686 
00687 void ResourceBasePrivate::slotPrepareItemRetrievalResult( KJob* job )
00688 {
00689   Q_Q( ResourceBase );
00690   Q_ASSERT_X( scheduler->currentTask().type == ResourceScheduler::FetchItem,
00691             "ResourceBasePrivate::slotPrepareItemRetrievalResult()",
00692             "Preparing item retrieval although no item retrieval is in progress" );
00693   if ( job->error() ) {
00694     q->cancelTask( job->errorText() );
00695     return;
00696   }
00697   ItemFetchJob *fetch = qobject_cast<ItemFetchJob*>( job );
00698   if ( fetch->items().count() != 1 ) {
00699     q->cancelTask( i18n( "The requested item no longer exists" ) );
00700     return;
00701   }
00702   const Item item = fetch->items().first();
00703   const QSet<QByteArray> parts = scheduler->currentTask().itemParts;
00704   if ( !q->retrieveItem( item, parts ) )
00705     q->cancelTask();
00706 }
00707 
00708 void ResourceBase::itemsRetrievalDone()
00709 {
00710   Q_D( ResourceBase );
00711   // streaming enabled, so finalize the sync
00712   if ( d->mItemSyncer ) {
00713     d->mItemSyncer->deliveryDone();
00714   }
00715   // user did the sync himself, we are done now
00716   else {
00717     d->scheduler->taskDone();
00718   }
00719 }
00720 
00721 void ResourceBase::clearCache()
00722 {
00723   Q_D( ResourceBase );
00724   d->scheduler->scheduleResourceCollectionDeletion();
00725 }
00726 
00727 Collection ResourceBase::currentCollection() const
00728 {
00729   Q_D( const ResourceBase );
00730   Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollection ,
00731               "ResourceBase::currentCollection()",
00732               "Trying to access current collection although no item retrieval is in progress" );
00733   return d->currentCollection;
00734 }
00735 
00736 Item ResourceBase::currentItem() const
00737 {
00738   Q_D( const ResourceBase );
00739   Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::FetchItem ,
00740               "ResourceBase::currentItem()",
00741               "Trying to access current item although no item retrieval is in progress" );
00742   return d->scheduler->currentTask().item;
00743 }
00744 
00745 void ResourceBase::synchronizeCollectionTree()
00746 {
00747   d_func()->scheduler->scheduleCollectionTreeSync();
00748 }
00749 
00750 void ResourceBase::cancelTask()
00751 {
00752   Q_D( ResourceBase );
00753   switch ( d->scheduler->currentTask().type ) {
00754     case ResourceScheduler::FetchItem:
00755       itemRetrieved( Item() ); // sends the error reply and
00756       break;
00757     case ResourceScheduler::ChangeReplay:
00758       d->changeProcessed();
00759       break;
00760     case ResourceScheduler::SyncCollectionTree:
00761     case ResourceScheduler::SyncAll:
00762       if ( d->mCollectionSyncer )
00763         d->mCollectionSyncer->rollback();
00764       else
00765         d->scheduler->taskDone();
00766       break;
00767     case ResourceScheduler::SyncCollection:
00768       if ( d->mItemSyncer )
00769         d->mItemSyncer->rollback();
00770       else
00771         d->scheduler->taskDone();
00772       break;
00773     default:
00774       d->scheduler->taskDone();
00775   }
00776 }
00777 
00778 void ResourceBase::cancelTask( const QString &msg )
00779 {
00780   cancelTask();
00781 
00782   emit error( msg );
00783 }
00784 
00785 void ResourceBase::deferTask()
00786 {
00787   Q_D( ResourceBase );
00788   d->scheduler->deferTask();
00789 }
00790 
00791 void ResourceBase::doSetOnline( bool state )
00792 {
00793   d_func()->scheduler->setOnline( state );
00794 }
00795 
00796 void ResourceBase::synchronizeCollection( qint64 collectionId )
00797 {
00798   synchronizeCollection( collectionId, false );
00799 }
00800 
00801 void ResourceBase::synchronizeCollection( qint64 collectionId, bool recursive )
00802 {
00803   CollectionFetchJob* job = new CollectionFetchJob( Collection( collectionId ), recursive ? CollectionFetchJob::Recursive : CollectionFetchJob::Base );
00804   job->setFetchScope( changeRecorder()->collectionFetchScope() );
00805   job->fetchScope().setResource( identifier() );
00806   job->setProperty( "recursive", recursive );
00807   connect( job, SIGNAL( result( KJob* ) ), SLOT( slotCollectionListDone( KJob* ) ) );
00808 }
00809 
00810 void ResourceBasePrivate::slotCollectionListDone( KJob *job )
00811 {
00812   if ( !job->error() ) {
00813     Collection::List list = static_cast<CollectionFetchJob*>( job )->collections();
00814     if ( !list.isEmpty() ) {
00815       if ( job->property( "recursive" ).toBool() ) {
00816         Q_FOREACH ( const Collection &collection, list ) {
00817           scheduler->scheduleSync( collection );
00818         }
00819       } else {
00820         scheduler->scheduleSync( list.first() );
00821       }
00822     }
00823   }
00824   // TODO: error handling
00825 }
00826 
00827 void ResourceBase::synchronizeCollectionAttributes( qint64 collectionId )
00828 {
00829   CollectionFetchJob* job = new CollectionFetchJob( Collection( collectionId ), CollectionFetchJob::Base );
00830   job->setFetchScope( changeRecorder()->collectionFetchScope() );
00831   job->fetchScope().setResource( identifier() );
00832   connect( job, SIGNAL( result( KJob* ) ), SLOT( slotCollectionListForAttributesDone( KJob* ) ) );
00833 }
00834 
00835 void ResourceBasePrivate::slotCollectionListForAttributesDone( KJob *job )
00836 {
00837   if ( !job->error() ) {
00838     Collection::List list = static_cast<CollectionFetchJob*>( job )->collections();
00839     if ( !list.isEmpty() ) {
00840       Collection col = list.first();
00841       scheduler->scheduleAttributesSync( col );
00842     }
00843   }
00844   // TODO: error handling
00845 }
00846 
00847 void ResourceBase::setTotalItems( int amount )
00848 {
00849   kDebug() << amount;
00850   Q_D( ResourceBase );
00851   setItemStreamingEnabled( true );
00852   d->mItemSyncer->setTotalItems( amount );
00853 }
00854 
00855 void ResourceBase::setItemStreamingEnabled( bool enable )
00856 {
00857   Q_D( ResourceBase );
00858   d->createItemSyncInstanceIfMissing();
00859   d->mItemSyncer->setStreamingEnabled( enable );
00860 }
00861 
00862 void ResourceBase::itemsRetrieved( const Item::List &items )
00863 {
00864   Q_D( ResourceBase );
00865   d->createItemSyncInstanceIfMissing();
00866   d->mItemSyncer->setFullSyncItems( items );
00867 }
00868 
00869 void ResourceBase::itemsRetrievedIncremental( const Item::List &changedItems, const Item::List &removedItems )
00870 {
00871   Q_D( ResourceBase );
00872   d->createItemSyncInstanceIfMissing();
00873   d->mItemSyncer->setIncrementalSyncItems( changedItems, removedItems );
00874 }
00875 
00876 void ResourceBasePrivate::slotItemSyncDone( KJob *job )
00877 {
00878   mItemSyncer = 0;
00879   Q_Q( ResourceBase );
00880   if ( job->error() && job->error() != Job::UserCanceled ) {
00881     emit q->error( job->errorString() );
00882   }
00883   scheduler->taskDone();
00884 }
00885 
00886 
00887 void ResourceBasePrivate::slotDelayedEmitProgress()
00888 {
00889   Q_Q( ResourceBase );
00890   emit q->percent( mUnemittedProgress );
00891 
00892   Q_FOREACH( const QVariantMap &statusMap, mUnemittedAdvancedStatus ) {
00893       emit q->advancedStatus( statusMap );
00894   }
00895   mUnemittedProgress = 0;
00896   mUnemittedAdvancedStatus.clear();
00897 }
00898 
00899 void ResourceBasePrivate::slotPercent( KJob *job, unsigned long percent )
00900 {
00901   Q_Q( ResourceBase );
00902 
00903   mUnemittedProgress = percent;
00904 
00905   const Collection collection = job->property( "collection" ).value<Collection>();
00906   if ( collection.isValid() ) {
00907     QVariantMap statusMap;
00908     statusMap.insert( QLatin1String( "key" ), QString::fromLatin1( "collectionSyncProgress" ) );
00909     statusMap.insert( QLatin1String( "collectionId" ), collection.id() );
00910     statusMap.insert( QLatin1String( "percent" ), static_cast<unsigned int>( percent ) );
00911 
00912     mUnemittedAdvancedStatus[collection.id()] = statusMap;
00913   }
00914   // deliver completion right away, intermediate progress at 1s intervals
00915   if ( percent == 100 ) {
00916     mProgressEmissionCompressor.stop();
00917     slotDelayedEmitProgress();
00918   } else if ( !mProgressEmissionCompressor.isActive() ) {
00919     mProgressEmissionCompressor.start();
00920   }
00921 }
00922 
00923 void ResourceBase::setHierarchicalRemoteIdentifiersEnabled( bool enable )
00924 {
00925   Q_D( ResourceBase );
00926   d->mHierarchicalRid = enable;
00927 }
00928 
00929 void ResourceBase::scheduleCustomTask( QObject *receiver, const char *method, const QVariant &argument, SchedulePriority priority )
00930 {
00931   Q_D( ResourceBase );
00932   d->scheduler->scheduleCustomTask( receiver, method, argument, priority );
00933 }
00934 
00935 void ResourceBase::taskDone()
00936 {
00937   Q_D( ResourceBase );
00938   d->scheduler->taskDone();
00939 }
00940 
00941 void ResourceBase::retrieveCollectionAttributes( const Collection &collection )
00942 {
00943   collectionAttributesRetrieved( collection );
00944 }
00945 
00946 void Akonadi::ResourceBase::abortActivity()
00947 {
00948 
00949 }
00950 
00951 void ResourceBase::setItemTransactionMode(ItemSync::TransactionMode mode)
00952 {
00953   Q_D( ResourceBase );
00954   d->mItemTransactionMode = mode;
00955 }
00956 
00957 void ResourceBase::setItemSynchronizationFetchScope(const ItemFetchScope& fetchScope)
00958 {
00959   Q_D( ResourceBase );
00960   if ( !d->mItemSyncFetchScope )
00961     d->mItemSyncFetchScope = new ItemFetchScope;
00962   *(d->mItemSyncFetchScope) = fetchScope;
00963 }
00964 
00965 #include "resourcebase.moc"
00966 #include "moc_resourcebase.cpp"

akonadi

Skip menu "akonadi"
  • Main Page
  • Modules
  • Namespace List
  • Class Hierarchy
  • Alphabetical List
  • Class List
  • File List
  • Namespace Members
  • Class Members
  • Related Pages

KDE-PIM Libraries

Skip menu "KDE-PIM Libraries"
  • akonadi
  •   contact
  •   kmime
  • kabc
  • kblog
  • kcal
  • kcalcore
  • kcalutils
  • kholidays
  • kimap
  • kioslave
  •   imap4
  •   mbox
  •   nntp
  • kldap
  • kmbox
  • kmime
  • kontactinterface
  • kpimidentities
  • kpimtextedit
  •   richtextbuilders
  • kpimutils
  • kresources
  • ktnef
  • kxmlrpcclient
  • mailtransport
  • microblog
  • qgpgme
  • syndication
  •   atom
  •   rdf
  •   rss2
Generated for KDE-PIM Libraries by doxygen 1.7.4
This website is maintained by Adriaan de Groot and Allen Winter.
KDE® and the K Desktop Environment® logo are registered trademarks of KDE e.V. | Legal