1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "sync/internal_api/public/attachments/attachment_service_impl.h"
6
7 #include <iterator>
8
9 #include "base/bind.h"
10 #include "base/message_loop/message_loop.h"
11 #include "base/thread_task_runner_handle.h"
12 #include "base/time/time.h"
13 #include "sync/api/attachments/attachment.h"
14 #include "sync/api/attachments/fake_attachment_store.h"
15 #include "sync/internal_api/public/attachments/fake_attachment_downloader.h"
16 #include "sync/internal_api/public/attachments/fake_attachment_uploader.h"
17
18 namespace syncer {
19
20 // GetOrDownloadAttachments starts multiple parallel DownloadAttachment calls.
21 // GetOrDownloadState tracks completion of these calls and posts callback for
22 // consumer once all attachments are either retrieved or reported unavailable.
23 class AttachmentServiceImpl::GetOrDownloadState
24 : public base::RefCounted<GetOrDownloadState>,
25 public base::NonThreadSafe {
26 public:
27 // GetOrDownloadState gets parameter from values passed to
28 // AttachmentService::GetOrDownloadAttachments.
29 // |attachment_ids| is a list of attachmens to retrieve.
30 // |callback| will be posted on current thread when all attachments retrieved
31 // or confirmed unavailable.
32 GetOrDownloadState(const AttachmentIdList& attachment_ids,
33 const GetOrDownloadCallback& callback);
34
35 // Attachment was just retrieved. Add it to retrieved attachments.
36 void AddAttachment(const Attachment& attachment);
37
38 // Both reading from local store and downloading attachment failed.
39 // Add it to unavailable set.
40 void AddUnavailableAttachmentId(const AttachmentId& attachment_id);
41
42 private:
43 friend class base::RefCounted<GetOrDownloadState>;
44 virtual ~GetOrDownloadState();
45
46 // If all attachment requests completed then post callback to consumer with
47 // results.
48 void PostResultIfAllRequestsCompleted();
49
50 GetOrDownloadCallback callback_;
51
52 // Requests for these attachments are still in progress.
53 AttachmentIdSet in_progress_attachments_;
54
55 AttachmentIdSet unavailable_attachments_;
56 scoped_ptr<AttachmentMap> retrieved_attachments_;
57
58 DISALLOW_COPY_AND_ASSIGN(GetOrDownloadState);
59 };
60
GetOrDownloadState(const AttachmentIdList & attachment_ids,const GetOrDownloadCallback & callback)61 AttachmentServiceImpl::GetOrDownloadState::GetOrDownloadState(
62 const AttachmentIdList& attachment_ids,
63 const GetOrDownloadCallback& callback)
64 : callback_(callback), retrieved_attachments_(new AttachmentMap()) {
65 std::copy(
66 attachment_ids.begin(),
67 attachment_ids.end(),
68 std::inserter(in_progress_attachments_, in_progress_attachments_.end()));
69 PostResultIfAllRequestsCompleted();
70 }
71
~GetOrDownloadState()72 AttachmentServiceImpl::GetOrDownloadState::~GetOrDownloadState() {
73 DCHECK(CalledOnValidThread());
74 }
75
AddAttachment(const Attachment & attachment)76 void AttachmentServiceImpl::GetOrDownloadState::AddAttachment(
77 const Attachment& attachment) {
78 DCHECK(CalledOnValidThread());
79 DCHECK(retrieved_attachments_->find(attachment.GetId()) ==
80 retrieved_attachments_->end());
81 retrieved_attachments_->insert(
82 std::make_pair(attachment.GetId(), attachment));
83 DCHECK(in_progress_attachments_.find(attachment.GetId()) !=
84 in_progress_attachments_.end());
85 in_progress_attachments_.erase(attachment.GetId());
86 PostResultIfAllRequestsCompleted();
87 }
88
AddUnavailableAttachmentId(const AttachmentId & attachment_id)89 void AttachmentServiceImpl::GetOrDownloadState::AddUnavailableAttachmentId(
90 const AttachmentId& attachment_id) {
91 DCHECK(CalledOnValidThread());
92 DCHECK(unavailable_attachments_.find(attachment_id) ==
93 unavailable_attachments_.end());
94 unavailable_attachments_.insert(attachment_id);
95 DCHECK(in_progress_attachments_.find(attachment_id) !=
96 in_progress_attachments_.end());
97 in_progress_attachments_.erase(attachment_id);
98 PostResultIfAllRequestsCompleted();
99 }
100
101 void
PostResultIfAllRequestsCompleted()102 AttachmentServiceImpl::GetOrDownloadState::PostResultIfAllRequestsCompleted() {
103 if (in_progress_attachments_.empty()) {
104 // All requests completed. Let's notify consumer.
105 GetOrDownloadResult result =
106 unavailable_attachments_.empty() ? GET_SUCCESS : GET_UNSPECIFIED_ERROR;
107 base::MessageLoop::current()->PostTask(
108 FROM_HERE,
109 base::Bind(callback_, result, base::Passed(&retrieved_attachments_)));
110 }
111 }
112
AttachmentServiceImpl(scoped_refptr<AttachmentStore> attachment_store,scoped_ptr<AttachmentUploader> attachment_uploader,scoped_ptr<AttachmentDownloader> attachment_downloader,Delegate * delegate,const base::TimeDelta & initial_backoff_delay,const base::TimeDelta & max_backoff_delay)113 AttachmentServiceImpl::AttachmentServiceImpl(
114 scoped_refptr<AttachmentStore> attachment_store,
115 scoped_ptr<AttachmentUploader> attachment_uploader,
116 scoped_ptr<AttachmentDownloader> attachment_downloader,
117 Delegate* delegate,
118 const base::TimeDelta& initial_backoff_delay,
119 const base::TimeDelta& max_backoff_delay)
120 : attachment_store_(attachment_store),
121 attachment_uploader_(attachment_uploader.Pass()),
122 attachment_downloader_(attachment_downloader.Pass()),
123 delegate_(delegate),
124 weak_ptr_factory_(this) {
125 DCHECK(CalledOnValidThread());
126 DCHECK(attachment_store_.get());
127
128 // TODO(maniscalco): Observe network connectivity change events. When the
129 // network becomes disconnected, consider suspending queue dispatch. When
130 // connectivity is restored, consider clearing any dispatch backoff (bug
131 // 411981).
132 upload_task_queue_.reset(new TaskQueue<AttachmentId>(
133 base::Bind(&AttachmentServiceImpl::BeginUpload,
134 weak_ptr_factory_.GetWeakPtr()),
135 initial_backoff_delay,
136 max_backoff_delay));
137
138 net::NetworkChangeNotifier::AddNetworkChangeObserver(this);
139 }
140
~AttachmentServiceImpl()141 AttachmentServiceImpl::~AttachmentServiceImpl() {
142 DCHECK(CalledOnValidThread());
143 net::NetworkChangeNotifier::RemoveNetworkChangeObserver(this);
144 }
145
146 // Static.
CreateForTest()147 scoped_ptr<syncer::AttachmentService> AttachmentServiceImpl::CreateForTest() {
148 scoped_refptr<syncer::AttachmentStore> attachment_store(
149 new syncer::FakeAttachmentStore(base::ThreadTaskRunnerHandle::Get()));
150 scoped_ptr<AttachmentUploader> attachment_uploader(
151 new FakeAttachmentUploader);
152 scoped_ptr<AttachmentDownloader> attachment_downloader(
153 new FakeAttachmentDownloader());
154 scoped_ptr<syncer::AttachmentService> attachment_service(
155 new syncer::AttachmentServiceImpl(attachment_store,
156 attachment_uploader.Pass(),
157 attachment_downloader.Pass(),
158 NULL,
159 base::TimeDelta(),
160 base::TimeDelta()));
161 return attachment_service.Pass();
162 }
163
GetStore()164 AttachmentStore* AttachmentServiceImpl::GetStore() {
165 return attachment_store_.get();
166 }
167
GetOrDownloadAttachments(const AttachmentIdList & attachment_ids,const GetOrDownloadCallback & callback)168 void AttachmentServiceImpl::GetOrDownloadAttachments(
169 const AttachmentIdList& attachment_ids,
170 const GetOrDownloadCallback& callback) {
171 DCHECK(CalledOnValidThread());
172 scoped_refptr<GetOrDownloadState> state(
173 new GetOrDownloadState(attachment_ids, callback));
174 attachment_store_->Read(attachment_ids,
175 base::Bind(&AttachmentServiceImpl::ReadDone,
176 weak_ptr_factory_.GetWeakPtr(),
177 state));
178 }
179
DropAttachments(const AttachmentIdList & attachment_ids,const DropCallback & callback)180 void AttachmentServiceImpl::DropAttachments(
181 const AttachmentIdList& attachment_ids,
182 const DropCallback& callback) {
183 DCHECK(CalledOnValidThread());
184 attachment_store_->Drop(attachment_ids,
185 base::Bind(&AttachmentServiceImpl::DropDone,
186 weak_ptr_factory_.GetWeakPtr(),
187 callback));
188 }
189
ReadDone(const scoped_refptr<GetOrDownloadState> & state,const AttachmentStore::Result & result,scoped_ptr<AttachmentMap> attachments,scoped_ptr<AttachmentIdList> unavailable_attachment_ids)190 void AttachmentServiceImpl::ReadDone(
191 const scoped_refptr<GetOrDownloadState>& state,
192 const AttachmentStore::Result& result,
193 scoped_ptr<AttachmentMap> attachments,
194 scoped_ptr<AttachmentIdList> unavailable_attachment_ids) {
195 // Add read attachments to result.
196 for (AttachmentMap::const_iterator iter = attachments->begin();
197 iter != attachments->end();
198 ++iter) {
199 state->AddAttachment(iter->second);
200 }
201
202 AttachmentIdList::const_iterator iter = unavailable_attachment_ids->begin();
203 AttachmentIdList::const_iterator end = unavailable_attachment_ids->end();
204 if (attachment_downloader_.get()) {
205 // Try to download locally unavailable attachments.
206 for (; iter != end; ++iter) {
207 attachment_downloader_->DownloadAttachment(
208 *iter,
209 base::Bind(&AttachmentServiceImpl::DownloadDone,
210 weak_ptr_factory_.GetWeakPtr(),
211 state,
212 *iter));
213 }
214 } else {
215 // No downloader so all locally unavailable attachments are unavailable.
216 for (; iter != end; ++iter) {
217 state->AddUnavailableAttachmentId(*iter);
218 }
219 }
220 }
221
DropDone(const DropCallback & callback,const AttachmentStore::Result & result)222 void AttachmentServiceImpl::DropDone(const DropCallback& callback,
223 const AttachmentStore::Result& result) {
224 AttachmentService::DropResult drop_result =
225 AttachmentService::DROP_UNSPECIFIED_ERROR;
226 if (result == AttachmentStore::SUCCESS) {
227 drop_result = AttachmentService::DROP_SUCCESS;
228 }
229 // TODO(maniscalco): Deal with case where an error occurred (bug 361251).
230 base::MessageLoop::current()->PostTask(FROM_HERE,
231 base::Bind(callback, drop_result));
232 }
233
UploadDone(const AttachmentUploader::UploadResult & result,const AttachmentId & attachment_id)234 void AttachmentServiceImpl::UploadDone(
235 const AttachmentUploader::UploadResult& result,
236 const AttachmentId& attachment_id) {
237 DCHECK(CalledOnValidThread());
238 switch (result) {
239 case AttachmentUploader::UPLOAD_SUCCESS:
240 upload_task_queue_->MarkAsSucceeded(attachment_id);
241 if (delegate_) {
242 delegate_->OnAttachmentUploaded(attachment_id);
243 }
244 break;
245 case AttachmentUploader::UPLOAD_TRANSIENT_ERROR:
246 upload_task_queue_->MarkAsFailed(attachment_id);
247 upload_task_queue_->AddToQueue(attachment_id);
248 break;
249 case AttachmentUploader::UPLOAD_UNSPECIFIED_ERROR:
250 // TODO(pavely): crbug/372622: Deal with UploadAttachment failures.
251 upload_task_queue_->MarkAsFailed(attachment_id);
252 break;
253 }
254 }
255
DownloadDone(const scoped_refptr<GetOrDownloadState> & state,const AttachmentId & attachment_id,const AttachmentDownloader::DownloadResult & result,scoped_ptr<Attachment> attachment)256 void AttachmentServiceImpl::DownloadDone(
257 const scoped_refptr<GetOrDownloadState>& state,
258 const AttachmentId& attachment_id,
259 const AttachmentDownloader::DownloadResult& result,
260 scoped_ptr<Attachment> attachment) {
261 switch (result) {
262 case AttachmentDownloader::DOWNLOAD_SUCCESS:
263 state->AddAttachment(*attachment.get());
264 break;
265 case AttachmentDownloader::DOWNLOAD_TRANSIENT_ERROR:
266 case AttachmentDownloader::DOWNLOAD_UNSPECIFIED_ERROR:
267 state->AddUnavailableAttachmentId(attachment_id);
268 break;
269 }
270 }
271
BeginUpload(const AttachmentId & attachment_id)272 void AttachmentServiceImpl::BeginUpload(const AttachmentId& attachment_id) {
273 DCHECK(CalledOnValidThread());
274 AttachmentIdList attachment_ids;
275 attachment_ids.push_back(attachment_id);
276 attachment_store_->Read(attachment_ids,
277 base::Bind(&AttachmentServiceImpl::ReadDoneNowUpload,
278 weak_ptr_factory_.GetWeakPtr()));
279 }
280
UploadAttachments(const AttachmentIdSet & attachment_ids)281 void AttachmentServiceImpl::UploadAttachments(
282 const AttachmentIdSet& attachment_ids) {
283 DCHECK(CalledOnValidThread());
284 if (!attachment_uploader_.get()) {
285 return;
286 }
287 AttachmentIdSet::const_iterator iter = attachment_ids.begin();
288 AttachmentIdSet::const_iterator end = attachment_ids.end();
289 for (; iter != end; ++iter) {
290 upload_task_queue_->AddToQueue(*iter);
291 }
292 }
293
OnNetworkChanged(net::NetworkChangeNotifier::ConnectionType type)294 void AttachmentServiceImpl::OnNetworkChanged(
295 net::NetworkChangeNotifier::ConnectionType type) {
296 if (type != net::NetworkChangeNotifier::CONNECTION_NONE) {
297 upload_task_queue_->ResetBackoff();
298 }
299 }
300
ReadDoneNowUpload(const AttachmentStore::Result & result,scoped_ptr<AttachmentMap> attachments,scoped_ptr<AttachmentIdList> unavailable_attachment_ids)301 void AttachmentServiceImpl::ReadDoneNowUpload(
302 const AttachmentStore::Result& result,
303 scoped_ptr<AttachmentMap> attachments,
304 scoped_ptr<AttachmentIdList> unavailable_attachment_ids) {
305 DCHECK(CalledOnValidThread());
306 if (!unavailable_attachment_ids->empty()) {
307 // TODO(maniscalco): We failed to read some attachments. What should we do
308 // now?
309 AttachmentIdList::const_iterator iter = unavailable_attachment_ids->begin();
310 AttachmentIdList::const_iterator end = unavailable_attachment_ids->end();
311 for (; iter != end; ++iter) {
312 upload_task_queue_->Cancel(*iter);
313 }
314 }
315
316 AttachmentMap::const_iterator iter = attachments->begin();
317 AttachmentMap::const_iterator end = attachments->end();
318 for (; iter != end; ++iter) {
319 attachment_uploader_->UploadAttachment(
320 iter->second,
321 base::Bind(&AttachmentServiceImpl::UploadDone,
322 weak_ptr_factory_.GetWeakPtr()));
323 }
324 }
325
SetTimerForTest(scoped_ptr<base::Timer> timer)326 void AttachmentServiceImpl::SetTimerForTest(scoped_ptr<base::Timer> timer) {
327 upload_task_queue_->SetTimerForTest(timer.Pass());
328 }
329
330 } // namespace syncer
331