• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "sync/internal_api/public/attachments/attachment_service_impl.h"
6 
7 #include <iterator>
8 
9 #include "base/bind.h"
10 #include "base/message_loop/message_loop.h"
11 #include "base/thread_task_runner_handle.h"
12 #include "base/time/time.h"
13 #include "sync/api/attachments/attachment.h"
14 #include "sync/api/attachments/fake_attachment_store.h"
15 #include "sync/internal_api/public/attachments/fake_attachment_downloader.h"
16 #include "sync/internal_api/public/attachments/fake_attachment_uploader.h"
17 
18 namespace syncer {
19 
20 // GetOrDownloadAttachments starts multiple parallel DownloadAttachment calls.
21 // GetOrDownloadState tracks completion of these calls and posts callback for
22 // consumer once all attachments are either retrieved or reported unavailable.
23 class AttachmentServiceImpl::GetOrDownloadState
24     : public base::RefCounted<GetOrDownloadState>,
25       public base::NonThreadSafe {
26  public:
27   // GetOrDownloadState gets parameter from values passed to
28   // AttachmentService::GetOrDownloadAttachments.
29   // |attachment_ids| is a list of attachmens to retrieve.
30   // |callback| will be posted on current thread when all attachments retrieved
31   // or confirmed unavailable.
32   GetOrDownloadState(const AttachmentIdList& attachment_ids,
33                      const GetOrDownloadCallback& callback);
34 
35   // Attachment was just retrieved. Add it to retrieved attachments.
36   void AddAttachment(const Attachment& attachment);
37 
38   // Both reading from local store and downloading attachment failed.
39   // Add it to unavailable set.
40   void AddUnavailableAttachmentId(const AttachmentId& attachment_id);
41 
42  private:
43   friend class base::RefCounted<GetOrDownloadState>;
44   virtual ~GetOrDownloadState();
45 
46   // If all attachment requests completed then post callback to consumer with
47   // results.
48   void PostResultIfAllRequestsCompleted();
49 
50   GetOrDownloadCallback callback_;
51 
52   // Requests for these attachments are still in progress.
53   AttachmentIdSet in_progress_attachments_;
54 
55   AttachmentIdSet unavailable_attachments_;
56   scoped_ptr<AttachmentMap> retrieved_attachments_;
57 
58   DISALLOW_COPY_AND_ASSIGN(GetOrDownloadState);
59 };
60 
GetOrDownloadState(const AttachmentIdList & attachment_ids,const GetOrDownloadCallback & callback)61 AttachmentServiceImpl::GetOrDownloadState::GetOrDownloadState(
62     const AttachmentIdList& attachment_ids,
63     const GetOrDownloadCallback& callback)
64     : callback_(callback), retrieved_attachments_(new AttachmentMap()) {
65   std::copy(
66       attachment_ids.begin(),
67       attachment_ids.end(),
68       std::inserter(in_progress_attachments_, in_progress_attachments_.end()));
69   PostResultIfAllRequestsCompleted();
70 }
71 
~GetOrDownloadState()72 AttachmentServiceImpl::GetOrDownloadState::~GetOrDownloadState() {
73   DCHECK(CalledOnValidThread());
74 }
75 
AddAttachment(const Attachment & attachment)76 void AttachmentServiceImpl::GetOrDownloadState::AddAttachment(
77     const Attachment& attachment) {
78   DCHECK(CalledOnValidThread());
79   DCHECK(retrieved_attachments_->find(attachment.GetId()) ==
80          retrieved_attachments_->end());
81   retrieved_attachments_->insert(
82       std::make_pair(attachment.GetId(), attachment));
83   DCHECK(in_progress_attachments_.find(attachment.GetId()) !=
84          in_progress_attachments_.end());
85   in_progress_attachments_.erase(attachment.GetId());
86   PostResultIfAllRequestsCompleted();
87 }
88 
AddUnavailableAttachmentId(const AttachmentId & attachment_id)89 void AttachmentServiceImpl::GetOrDownloadState::AddUnavailableAttachmentId(
90     const AttachmentId& attachment_id) {
91   DCHECK(CalledOnValidThread());
92   DCHECK(unavailable_attachments_.find(attachment_id) ==
93          unavailable_attachments_.end());
94   unavailable_attachments_.insert(attachment_id);
95   DCHECK(in_progress_attachments_.find(attachment_id) !=
96          in_progress_attachments_.end());
97   in_progress_attachments_.erase(attachment_id);
98   PostResultIfAllRequestsCompleted();
99 }
100 
101 void
PostResultIfAllRequestsCompleted()102 AttachmentServiceImpl::GetOrDownloadState::PostResultIfAllRequestsCompleted() {
103   if (in_progress_attachments_.empty()) {
104     // All requests completed. Let's notify consumer.
105     GetOrDownloadResult result =
106         unavailable_attachments_.empty() ? GET_SUCCESS : GET_UNSPECIFIED_ERROR;
107     base::MessageLoop::current()->PostTask(
108         FROM_HERE,
109         base::Bind(callback_, result, base::Passed(&retrieved_attachments_)));
110   }
111 }
112 
AttachmentServiceImpl(scoped_refptr<AttachmentStore> attachment_store,scoped_ptr<AttachmentUploader> attachment_uploader,scoped_ptr<AttachmentDownloader> attachment_downloader,Delegate * delegate,const base::TimeDelta & initial_backoff_delay,const base::TimeDelta & max_backoff_delay)113 AttachmentServiceImpl::AttachmentServiceImpl(
114     scoped_refptr<AttachmentStore> attachment_store,
115     scoped_ptr<AttachmentUploader> attachment_uploader,
116     scoped_ptr<AttachmentDownloader> attachment_downloader,
117     Delegate* delegate,
118     const base::TimeDelta& initial_backoff_delay,
119     const base::TimeDelta& max_backoff_delay)
120     : attachment_store_(attachment_store),
121       attachment_uploader_(attachment_uploader.Pass()),
122       attachment_downloader_(attachment_downloader.Pass()),
123       delegate_(delegate),
124       weak_ptr_factory_(this) {
125   DCHECK(CalledOnValidThread());
126   DCHECK(attachment_store_.get());
127 
128   // TODO(maniscalco): Observe network connectivity change events.  When the
129   // network becomes disconnected, consider suspending queue dispatch.  When
130   // connectivity is restored, consider clearing any dispatch backoff (bug
131   // 411981).
132   upload_task_queue_.reset(new TaskQueue<AttachmentId>(
133       base::Bind(&AttachmentServiceImpl::BeginUpload,
134                  weak_ptr_factory_.GetWeakPtr()),
135       initial_backoff_delay,
136       max_backoff_delay));
137 
138   net::NetworkChangeNotifier::AddNetworkChangeObserver(this);
139 }
140 
~AttachmentServiceImpl()141 AttachmentServiceImpl::~AttachmentServiceImpl() {
142   DCHECK(CalledOnValidThread());
143   net::NetworkChangeNotifier::RemoveNetworkChangeObserver(this);
144 }
145 
146 // Static.
CreateForTest()147 scoped_ptr<syncer::AttachmentService> AttachmentServiceImpl::CreateForTest() {
148   scoped_refptr<syncer::AttachmentStore> attachment_store(
149       new syncer::FakeAttachmentStore(base::ThreadTaskRunnerHandle::Get()));
150   scoped_ptr<AttachmentUploader> attachment_uploader(
151       new FakeAttachmentUploader);
152   scoped_ptr<AttachmentDownloader> attachment_downloader(
153       new FakeAttachmentDownloader());
154   scoped_ptr<syncer::AttachmentService> attachment_service(
155       new syncer::AttachmentServiceImpl(attachment_store,
156                                         attachment_uploader.Pass(),
157                                         attachment_downloader.Pass(),
158                                         NULL,
159                                         base::TimeDelta(),
160                                         base::TimeDelta()));
161   return attachment_service.Pass();
162 }
163 
GetStore()164 AttachmentStore* AttachmentServiceImpl::GetStore() {
165   return attachment_store_.get();
166 }
167 
GetOrDownloadAttachments(const AttachmentIdList & attachment_ids,const GetOrDownloadCallback & callback)168 void AttachmentServiceImpl::GetOrDownloadAttachments(
169     const AttachmentIdList& attachment_ids,
170     const GetOrDownloadCallback& callback) {
171   DCHECK(CalledOnValidThread());
172   scoped_refptr<GetOrDownloadState> state(
173       new GetOrDownloadState(attachment_ids, callback));
174   attachment_store_->Read(attachment_ids,
175                           base::Bind(&AttachmentServiceImpl::ReadDone,
176                                      weak_ptr_factory_.GetWeakPtr(),
177                                      state));
178 }
179 
DropAttachments(const AttachmentIdList & attachment_ids,const DropCallback & callback)180 void AttachmentServiceImpl::DropAttachments(
181     const AttachmentIdList& attachment_ids,
182     const DropCallback& callback) {
183   DCHECK(CalledOnValidThread());
184   attachment_store_->Drop(attachment_ids,
185                           base::Bind(&AttachmentServiceImpl::DropDone,
186                                      weak_ptr_factory_.GetWeakPtr(),
187                                      callback));
188 }
189 
ReadDone(const scoped_refptr<GetOrDownloadState> & state,const AttachmentStore::Result & result,scoped_ptr<AttachmentMap> attachments,scoped_ptr<AttachmentIdList> unavailable_attachment_ids)190 void AttachmentServiceImpl::ReadDone(
191     const scoped_refptr<GetOrDownloadState>& state,
192     const AttachmentStore::Result& result,
193     scoped_ptr<AttachmentMap> attachments,
194     scoped_ptr<AttachmentIdList> unavailable_attachment_ids) {
195   // Add read attachments to result.
196   for (AttachmentMap::const_iterator iter = attachments->begin();
197        iter != attachments->end();
198        ++iter) {
199     state->AddAttachment(iter->second);
200   }
201 
202   AttachmentIdList::const_iterator iter = unavailable_attachment_ids->begin();
203   AttachmentIdList::const_iterator end = unavailable_attachment_ids->end();
204   if (attachment_downloader_.get()) {
205     // Try to download locally unavailable attachments.
206     for (; iter != end; ++iter) {
207       attachment_downloader_->DownloadAttachment(
208           *iter,
209           base::Bind(&AttachmentServiceImpl::DownloadDone,
210                      weak_ptr_factory_.GetWeakPtr(),
211                      state,
212                      *iter));
213     }
214   } else {
215     // No downloader so all locally unavailable attachments are unavailable.
216     for (; iter != end; ++iter) {
217       state->AddUnavailableAttachmentId(*iter);
218     }
219   }
220 }
221 
DropDone(const DropCallback & callback,const AttachmentStore::Result & result)222 void AttachmentServiceImpl::DropDone(const DropCallback& callback,
223                                      const AttachmentStore::Result& result) {
224   AttachmentService::DropResult drop_result =
225       AttachmentService::DROP_UNSPECIFIED_ERROR;
226   if (result == AttachmentStore::SUCCESS) {
227     drop_result = AttachmentService::DROP_SUCCESS;
228   }
229   // TODO(maniscalco): Deal with case where an error occurred (bug 361251).
230   base::MessageLoop::current()->PostTask(FROM_HERE,
231                                          base::Bind(callback, drop_result));
232 }
233 
UploadDone(const AttachmentUploader::UploadResult & result,const AttachmentId & attachment_id)234 void AttachmentServiceImpl::UploadDone(
235     const AttachmentUploader::UploadResult& result,
236     const AttachmentId& attachment_id) {
237   DCHECK(CalledOnValidThread());
238   switch (result) {
239     case AttachmentUploader::UPLOAD_SUCCESS:
240       upload_task_queue_->MarkAsSucceeded(attachment_id);
241       if (delegate_) {
242         delegate_->OnAttachmentUploaded(attachment_id);
243       }
244       break;
245     case AttachmentUploader::UPLOAD_TRANSIENT_ERROR:
246       upload_task_queue_->MarkAsFailed(attachment_id);
247       upload_task_queue_->AddToQueue(attachment_id);
248       break;
249     case AttachmentUploader::UPLOAD_UNSPECIFIED_ERROR:
250       // TODO(pavely): crbug/372622: Deal with UploadAttachment failures.
251       upload_task_queue_->MarkAsFailed(attachment_id);
252       break;
253   }
254 }
255 
DownloadDone(const scoped_refptr<GetOrDownloadState> & state,const AttachmentId & attachment_id,const AttachmentDownloader::DownloadResult & result,scoped_ptr<Attachment> attachment)256 void AttachmentServiceImpl::DownloadDone(
257     const scoped_refptr<GetOrDownloadState>& state,
258     const AttachmentId& attachment_id,
259     const AttachmentDownloader::DownloadResult& result,
260     scoped_ptr<Attachment> attachment) {
261   switch (result) {
262     case AttachmentDownloader::DOWNLOAD_SUCCESS:
263       state->AddAttachment(*attachment.get());
264       break;
265     case AttachmentDownloader::DOWNLOAD_TRANSIENT_ERROR:
266     case AttachmentDownloader::DOWNLOAD_UNSPECIFIED_ERROR:
267       state->AddUnavailableAttachmentId(attachment_id);
268       break;
269   }
270 }
271 
BeginUpload(const AttachmentId & attachment_id)272 void AttachmentServiceImpl::BeginUpload(const AttachmentId& attachment_id) {
273   DCHECK(CalledOnValidThread());
274   AttachmentIdList attachment_ids;
275   attachment_ids.push_back(attachment_id);
276   attachment_store_->Read(attachment_ids,
277                           base::Bind(&AttachmentServiceImpl::ReadDoneNowUpload,
278                                      weak_ptr_factory_.GetWeakPtr()));
279 }
280 
UploadAttachments(const AttachmentIdSet & attachment_ids)281 void AttachmentServiceImpl::UploadAttachments(
282     const AttachmentIdSet& attachment_ids) {
283   DCHECK(CalledOnValidThread());
284   if (!attachment_uploader_.get()) {
285     return;
286   }
287   AttachmentIdSet::const_iterator iter = attachment_ids.begin();
288   AttachmentIdSet::const_iterator end = attachment_ids.end();
289   for (; iter != end; ++iter) {
290     upload_task_queue_->AddToQueue(*iter);
291   }
292 }
293 
OnNetworkChanged(net::NetworkChangeNotifier::ConnectionType type)294 void AttachmentServiceImpl::OnNetworkChanged(
295     net::NetworkChangeNotifier::ConnectionType type) {
296   if (type != net::NetworkChangeNotifier::CONNECTION_NONE) {
297     upload_task_queue_->ResetBackoff();
298   }
299 }
300 
ReadDoneNowUpload(const AttachmentStore::Result & result,scoped_ptr<AttachmentMap> attachments,scoped_ptr<AttachmentIdList> unavailable_attachment_ids)301 void AttachmentServiceImpl::ReadDoneNowUpload(
302     const AttachmentStore::Result& result,
303     scoped_ptr<AttachmentMap> attachments,
304     scoped_ptr<AttachmentIdList> unavailable_attachment_ids) {
305   DCHECK(CalledOnValidThread());
306   if (!unavailable_attachment_ids->empty()) {
307     // TODO(maniscalco): We failed to read some attachments. What should we do
308     // now?
309     AttachmentIdList::const_iterator iter = unavailable_attachment_ids->begin();
310     AttachmentIdList::const_iterator end = unavailable_attachment_ids->end();
311     for (; iter != end; ++iter) {
312       upload_task_queue_->Cancel(*iter);
313     }
314   }
315 
316   AttachmentMap::const_iterator iter = attachments->begin();
317   AttachmentMap::const_iterator end = attachments->end();
318   for (; iter != end; ++iter) {
319     attachment_uploader_->UploadAttachment(
320         iter->second,
321         base::Bind(&AttachmentServiceImpl::UploadDone,
322                    weak_ptr_factory_.GetWeakPtr()));
323   }
324 }
325 
SetTimerForTest(scoped_ptr<base::Timer> timer)326 void AttachmentServiceImpl::SetTimerForTest(scoped_ptr<base::Timer> timer) {
327   upload_task_queue_->SetTimerForTest(timer.Pass());
328 }
329 
330 }  // namespace syncer
331