• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020-2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <iomanip>
18 #include "minddata/dataset/engine/cache/cache_client.h"
19 #include "minddata/dataset/engine/cache/cache_request.h"
20 #include "minddata/dataset/engine/cache/cache_fbb.h"
21 #include "minddata/dataset/util/bit.h"
22 #include "minddata/dataset/util/task_manager.h"
23 
24 namespace mindspore {
25 namespace dataset {
Builder()26 CacheClient::Builder::Builder()
27     : session_id_(0), cache_mem_sz_(0), spill_(false), hostname_(""), port_(0), num_connections_(0), prefetch_size_(0) {
28   std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
29   hostname_ = cfg->cache_host();
30   port_ = cfg->cache_port();
31   num_connections_ = cfg->num_connections();  // number of async tcp/ip connections
32   prefetch_size_ = cfg->prefetch_size();      // prefetch size
33 }
34 
Build(std::shared_ptr<CacheClient> * out)35 Status CacheClient::Builder::Build(std::shared_ptr<CacheClient> *out) {
36   RETURN_UNEXPECTED_IF_NULL(out);
37   RETURN_IF_NOT_OK(SanityCheck());
38   *out = std::make_shared<CacheClient>(session_id_, cache_mem_sz_, spill_, hostname_, port_, num_connections_,
39                                        prefetch_size_);
40   return Status::OK();
41 }
42 
SanityCheck()43 Status CacheClient::Builder::SanityCheck() {
44   CHECK_FAIL_RETURN_SYNTAX_ERROR(session_id_ > 0, "session id must be positive.");
45   CHECK_FAIL_RETURN_SYNTAX_ERROR(cache_mem_sz_ >= 0, "cache memory size must not be negative (0 implies unlimited).");
46   CHECK_FAIL_RETURN_SYNTAX_ERROR(num_connections_ > 0, "number of tcp/ip connections must be positive.");
47   CHECK_FAIL_RETURN_SYNTAX_ERROR(prefetch_size_ > 0, "prefetch size must be positive.");
48   CHECK_FAIL_RETURN_SYNTAX_ERROR(!hostname_.empty(), "hostname must not be empty.");
49   CHECK_FAIL_RETURN_SYNTAX_ERROR(port_ >= kMinLegalPort, "Port must be in range (1025..65535).");
50   CHECK_FAIL_RETURN_SYNTAX_ERROR(port_ <= kMaxLegalPort, "Port must be in range (1025..65535).");
51   CHECK_FAIL_RETURN_SYNTAX_ERROR(hostname_ == "127.0.0.1",
52                                  "now cache client has to be on the same host with cache server.");
53   return Status::OK();
54 }
55 
56 // Constructor
CacheClient(session_id_type session_id,uint64_t cache_mem_sz,bool spill,std::string hostname,int32_t port,int32_t num_connections,int32_t prefetch_size)57 CacheClient::CacheClient(session_id_type session_id, uint64_t cache_mem_sz, bool spill, std::string hostname,
58                          int32_t port, int32_t num_connections, int32_t prefetch_size)
59     : cache_mem_sz_(cache_mem_sz),
60       spill_(spill),
61       server_connection_id_(0),
62       client_id_(-1),
63       local_bypass_(false),
64       num_connections_(num_connections),
65       prefetch_size_(prefetch_size),
66       fetch_all_keys_(true) {
67   cinfo_.set_session_id(session_id);
68   comm_ = std::make_shared<CacheClientGreeter>(hostname, port, num_connections_);
69 }
70 
~CacheClient()71 CacheClient::~CacheClient() {
72   cache_miss_keys_wp_.Set();
73   // Manually release the async buffer because we need the comm layer.
74   if (async_buffer_stream_) {
75     Status rc = async_buffer_stream_->ReleaseBuffer();
76     if (rc.IsError()) MS_LOG(ERROR) << rc;
77   }
78   if (client_id_ != -1) {
79     try {
80       // Send a message to the server, saying I am done.
81       auto rq = std::make_shared<ConnectResetRequest>(server_connection_id_, client_id_);
82       Status rc = PushRequest(rq);
83       if (rc.IsOk()) {
84         rc = rq->Wait();
85         if (rc.IsOk()) {
86           MS_LOG(INFO) << "Disconnect from server successful";
87         }
88       }
89     } catch (const std::exception &e) {
90       // Can't do anything in destructor. So just log the error.
91       MS_LOG(ERROR) << e.what();
92     }
93   }
94   (void)comm_->ServiceStop();
95 }
96 
97 // print method for display cache details
Print(std::ostream & out) const98 void CacheClient::Print(std::ostream &out) const {
99   out << "  Session id: " << session_id() << "\n  Cache crc: " << cinfo_.crc()
100       << "\n  Server cache id: " << server_connection_id_ << "\n  Cache mem size: " << GetCacheMemSz()
101       << "\n  Spilling: " << std::boolalpha << isSpill() << "\n  Number of rpc workers: " << GetNumConnections()
102       << "\n  Prefetch size: " << GetPrefetchSize() << "\n  Local client support: " << std::boolalpha
103       << SupportLocalClient();
104 }
105 
GetHostname() const106 std::string CacheClient::GetHostname() const { return comm_->GetHostname(); }
GetPort() const107 int32_t CacheClient::GetPort() const { return comm_->GetPort(); }
108 
WriteRow(const TensorRow & row,row_id_type * row_id_from_server) const109 Status CacheClient::WriteRow(const TensorRow &row, row_id_type *row_id_from_server) const {
110   auto rq = std::make_shared<CacheRowRequest>(this);
111   RETURN_IF_NOT_OK(rq->SerializeCacheRowRequest(this, row));
112   RETURN_IF_NOT_OK(PushRequest(rq));
113   RETURN_IF_NOT_OK(rq->Wait());
114   if (row_id_from_server != nullptr) {
115     *row_id_from_server = rq->GetRowIdAfterCache();
116   }
117   return Status::OK();
118 }
119 
AsyncWriteRow(const TensorRow & row)120 Status CacheClient::AsyncWriteRow(const TensorRow &row) {
121   if (async_buffer_stream_ == nullptr) {
122     return Status(StatusCode::kMDNotImplementedYet);
123   }
124   RETURN_IF_NOT_OK(async_buffer_stream_->AsyncWrite(row));
125   return Status::OK();
126 }
127 
GetRows(const std::vector<row_id_type> & row_id,TensorTable * out) const128 Status CacheClient::GetRows(const std::vector<row_id_type> &row_id, TensorTable *out) const {
129   RETURN_UNEXPECTED_IF_NULL(out);
130   auto rq = std::make_shared<BatchFetchRequest>(this, row_id);
131   RETURN_IF_NOT_OK(PushRequest(rq));
132   RETURN_IF_NOT_OK(rq->Wait());
133   int64_t mem_addr;
134   Status rc = rq->RestoreRows(out, comm_->SharedMemoryBaseAddr(), &mem_addr);
135   // Free the memory by sending a request back to the server.
136   if (mem_addr != -1) {
137     auto mfree_req = std::make_shared<FreeSharedBlockRequest>(server_connection_id_, client_id_, mem_addr);
138     Status rc2 = PushRequest(mfree_req);
139     // But we won't wait for the result for the sake of performance.
140     if (rc.IsOk() && rc2.IsError()) {
141       rc = rc2;
142     }
143   }
144   return rc;
145 }
146 
CreateCache(uint32_t tree_crc,bool generate_id)147 Status CacheClient::CreateCache(uint32_t tree_crc, bool generate_id) {
148   UniqueLock lck(&mux_);
149   // To create a cache, we identify ourself at the client by:
150   // - the shared session id
151   // - a crc for the tree nodes from the cache downward
152   // Pack these 2 into a single 64 bit request id
153   //
154   // Consider this example:
155   // tree1: tfreader --> map(decode) --> cache (session id = 1, crc = 123) --> batch
156   // tree2: cifar10 --> map(rotate) --> cache (session id = 1, crc = 456) --> batch
157   // These are different trees in a single session, but the user wants to share the cache.
158   // This is not allowed because the data of these caches are different.
159   //
160   // Consider this example:
161   // tree1: tfreader --> map(decode) --> cache (session id = 1, crc = 123) --> batch
162   // tree2: tfreader --> map(decode) --> cache (session id = 1, crc = 123) --> map(rotate) --> batch
163   // These are different trees in the same session, but the cached data is the same, so it is okay
164   // to allow the sharing of this cache between these pipelines.
165 
166   // The CRC is computed by the tree prepare phase and passed to this function when creating the cache.
167   // If we already have a server_connection_id_, then it means this same cache client has already been used
168   // to create a cache and some other tree is trying to use the same cache.
169   // That is allowed, however the crc better match!
170   if (server_connection_id_) {
171     if (cinfo_.crc() != tree_crc) {
172       RETURN_STATUS_UNEXPECTED("Cannot re-use a cache for a different tree!");
173     }
174     // Check the state of the server. For non-mappable case where there is a build phase and a fetch phase, we should
175     // skip the build phase.
176     lck.Unlock();  // GetStat will grab the mutex again. So unlock it to prevent deadlock.
177     int8_t out;
178     RETURN_IF_NOT_OK(GetState(&out));
179     auto cache_state = static_cast<CacheServiceState>(out);
180     if (cache_state == CacheServiceState::kFetchPhase ||
181         (cache_state == CacheServiceState::kBuildPhase && cookie_.empty())) {
182       return Status(StatusCode::kMDDuplicateKey, __LINE__, __FILE__,
183                     "Not an error and we should bypass the build phase");
184     }
185     if (async_buffer_stream_) {
186       // Reset the async buffer stream to its initial state. Any stale status and data would get cleaned up.
187       RETURN_IF_NOT_OK(async_buffer_stream_->Reset());
188     }
189   } else {
190     cinfo_.set_crc(tree_crc);  // It's really a new cache we're creating so save our crc in the client
191     // Now execute the cache create request using this identifier and other configs
192     CreateCacheRequest::CreateCacheFlag createFlag = CreateCacheRequest::CreateCacheFlag::kNone;
193     if (spill_) {
194       createFlag |= CreateCacheRequest::CreateCacheFlag::kSpillToDisk;
195     }
196     if (generate_id) {
197       createFlag |= CreateCacheRequest::CreateCacheFlag::kGenerateRowId;
198     }
199     // Start the comm layer to receive reply
200     RETURN_IF_NOT_OK(comm_->ServiceStart());
201     // Initiate connection
202     auto rq = std::make_shared<CreateCacheRequest>(this, cinfo_, cache_mem_sz_, createFlag);
203     RETURN_IF_NOT_OK(PushRequest(rq));
204     Status rc = rq->Wait();
205     bool success = (rc.IsOk() || rc.StatusCode() == StatusCode::kMDDuplicateKey);
206     // If we get kDuplicateKey, it just means we aren't the first one to create the cache,
207     // and we will continue to parse the result.
208     if (rc.StatusCode() == StatusCode::kMDDuplicateKey) {
209       RETURN_IF_NOT_OK(rq->PostReply());
210     }
211     if (success) {
212       // Attach to shared memory for local client
213       RETURN_IF_NOT_OK(comm_->AttachToSharedMemory(&local_bypass_));
214       if (local_bypass_) {
215         async_buffer_stream_ = std::make_shared<AsyncBufferStream>();
216         RETURN_IF_NOT_OK(async_buffer_stream_->Init(this));
217       }
218     }
219     // We are not resetting the Duplicate key return code. We are passing it back to the CacheOp. This will tell the
220     // CacheOp to bypass the build phase.
221     return rc;
222   }
223   return Status::OK();
224 }
225 
DestroyCache()226 Status CacheClient::DestroyCache() {
227   UniqueLock lck(&mux_);
228   auto rq = std::make_shared<DestroyCacheRequest>(server_connection_id_);
229   RETURN_IF_NOT_OK(PushRequest(rq));
230   RETURN_IF_NOT_OK(rq->Wait());
231   return Status::OK();
232 }
233 
GetStat(CacheServiceStat * stat)234 Status CacheClient::GetStat(CacheServiceStat *stat) {
235   SharedLock lck(&mux_);
236   RETURN_UNEXPECTED_IF_NULL(stat);
237   // GetStat has an external interface, so we have to make sure we have a valid connection id first
238   CHECK_FAIL_RETURN_UNEXPECTED(server_connection_id_ != 0, "GetStat called but the cache is not in use yet.");
239 
240   auto rq = std::make_shared<GetStatRequest>(server_connection_id_);
241   RETURN_IF_NOT_OK(PushRequest(rq));
242   RETURN_IF_NOT_OK(rq->Wait());
243   rq->GetStat(stat);
244   return Status::OK();
245 }
246 
GetState(int8_t * out)247 Status CacheClient::GetState(int8_t *out) {
248   SharedLock lck(&mux_);
249   RETURN_UNEXPECTED_IF_NULL(out);
250   CHECK_FAIL_RETURN_UNEXPECTED(server_connection_id_ != 0, "GetState called but the cache is not in use yet.");
251   auto rq = std::make_shared<GetCacheStateRequest>(server_connection_id_);
252   RETURN_IF_NOT_OK(PushRequest(rq));
253   RETURN_IF_NOT_OK(rq->Wait());
254   *out = rq->GetState();
255   return Status::OK();
256 }
257 
CacheSchema(const std::unordered_map<std::string,int32_t> & map)258 Status CacheClient::CacheSchema(const std::unordered_map<std::string, int32_t> &map) {
259   SharedLock lck(&mux_);
260   auto rq = std::make_shared<CacheSchemaRequest>(server_connection_id_);
261   RETURN_IF_NOT_OK(rq->SerializeCacheSchemaRequest(map));
262   RETURN_IF_NOT_OK(PushRequest(rq));
263   RETURN_IF_NOT_OK(rq->Wait());
264   return Status::OK();
265 }
266 
FetchSchema(std::unordered_map<std::string,int32_t> * map)267 Status CacheClient::FetchSchema(std::unordered_map<std::string, int32_t> *map) {
268   SharedLock lck(&mux_);
269   RETURN_UNEXPECTED_IF_NULL(map);
270   auto rq = std::make_shared<FetchSchemaRequest>(server_connection_id_);
271   RETURN_IF_NOT_OK(PushRequest(rq));
272   RETURN_IF_NOT_OK(rq->Wait());
273   *map = rq->GetColumnMap();
274   return Status::OK();
275 }
276 
BuildPhaseDone() const277 Status CacheClient::BuildPhaseDone() const {
278   SharedLock lck(&mux_);
279   auto rq = std::make_shared<BuildPhaseDoneRequest>(server_connection_id_, cookie());
280   RETURN_IF_NOT_OK(PushRequest(rq));
281   RETURN_IF_NOT_OK(rq->Wait());
282   return Status::OK();
283 }
284 
PushRequest(std::shared_ptr<BaseRequest> rq) const285 Status CacheClient::PushRequest(std::shared_ptr<BaseRequest> rq) const { return comm_->HandleRequest(std::move(rq)); }
286 
ServerRunningOutOfResources()287 void CacheClient::ServerRunningOutOfResources() {
288   bool expected = true;
289   if (fetch_all_keys_.compare_exchange_strong(expected, false)) {
290     Status rc;
291     // Server runs out of memory or disk space to cache any more rows.
292     // First of all, we will turn off the locking.
293     auto toggle_write_mode_rq = std::make_shared<ToggleWriteModeRequest>(server_connection_id_, false);
294     rc = PushRequest(toggle_write_mode_rq);
295     if (rc.IsError()) {
296       return;
297     }
298     // Wait until we can toggle the state of the server to non-locking
299     rc = toggle_write_mode_rq->Wait();
300     if (rc.IsError()) {
301       return;
302     }
303     // Now we get a list of all the keys not cached at the server so
304     // we can filter out at the prefetch level.
305     auto cache_miss_rq = std::make_shared<GetCacheMissKeysRequest>(server_connection_id_);
306     rc = PushRequest(cache_miss_rq);
307     if (rc.IsError()) {
308       return;
309     }
310     rc = cache_miss_rq->Wait();
311     if (rc.IsError()) {
312       return;
313     }
314     // We will get back a vector of row id between [min,max] that are absent in the server.
315     auto &row_id_buf = cache_miss_rq->reply_.result();
316     auto p = flatbuffers::GetRoot<TensorRowIds>(row_id_buf.data());
317     std::vector<row_id_type> row_ids;
318     auto sz = p->row_id()->size();
319     row_ids.reserve(sz);
320     for (uint32_t i = 0; i < sz; ++i) {
321       row_ids.push_back(p->row_id()->Get(i));
322     }
323     cache_miss_keys_ = std::make_unique<CacheMissKeys>(row_ids);
324     // We are all set.
325     cache_miss_keys_wp_.Set();
326   }
327 }
328 
CacheMissKeys(const std::vector<row_id_type> & v)329 CacheClient::CacheMissKeys::CacheMissKeys(const std::vector<row_id_type> &v) {
330   auto it = v.begin();
331   min_ = *it;
332   ++it;
333   max_ = *it;
334   ++it;
335   while (it != v.end()) {
336     gap_.insert(*it);
337     ++it;
338   }
339   MS_LOG(INFO) << "# of cache miss keys between min(" << min_ << ") and max(" << max_ << ") is " << gap_.size();
340 }
341 
KeyIsCacheMiss(row_id_type key)342 bool CacheClient::CacheMissKeys::KeyIsCacheMiss(row_id_type key) {
343   if (key > max_ || key < min_) {
344     return true;
345   } else if (key == min_ || key == max_) {
346     return false;
347   } else {
348     auto it = gap_.find(key);
349     return it != gap_.end();
350   }
351 }
352 
AsyncBufferStream()353 CacheClient::AsyncBufferStream::AsyncBufferStream() : cc_(nullptr), offset_addr_(-1), cur_(0) {}
354 
~AsyncBufferStream()355 CacheClient::AsyncBufferStream::~AsyncBufferStream() {
356   (void)vg_.ServiceStop();
357   (void)ReleaseBuffer();
358 }
359 
ReleaseBuffer()360 Status CacheClient::AsyncBufferStream::ReleaseBuffer() {
361   if (offset_addr_ != -1) {
362     auto mfree_req =
363       std::make_shared<FreeSharedBlockRequest>(cc_->server_connection_id_, cc_->GetClientId(), offset_addr_);
364     offset_addr_ = -1;
365     RETURN_IF_NOT_OK(cc_->PushRequest(mfree_req));
366     RETURN_IF_NOT_OK(mfree_req->Wait());
367   }
368   return Status::OK();
369 }
370 
Init(CacheClient * cc)371 Status CacheClient::AsyncBufferStream::Init(CacheClient *cc) {
372   cc_ = cc;
373   // Allocate shared memory from the server
374   auto mem_rq = std::make_shared<AllocateSharedBlockRequest>(cc_->server_connection_id_, cc_->GetClientId(),
375                                                              kAsyncBufferSize * kNumAsyncBuffer);
376   RETURN_IF_NOT_OK(cc->PushRequest(mem_rq));
377   RETURN_IF_NOT_OK(mem_rq->Wait());
378   offset_addr_ = mem_rq->GetAddr();
379   // Now we need to add that to the base address of where we attach.
380   auto base = cc->SharedMemoryBaseAddr();
381   auto start = reinterpret_cast<int64_t>(base) + offset_addr_;
382   for (auto i = 0; i < kNumAsyncBuffer; ++i) {
383     // We only need to set the pointer during init. Other fields will be set dynamically.
384     buf_arr_[i].buffer_ = reinterpret_cast<void *>(start + i * kAsyncBufferSize);
385   }
386   buf_arr_[0].bytes_avail_ = kAsyncBufferSize;
387   buf_arr_[0].num_ele_ = 0;
388   RETURN_IF_NOT_OK(vg_.ServiceStart());
389   return Status::OK();
390 }
391 
AsyncWrite(const TensorRow & row)392 Status CacheClient::AsyncBufferStream::AsyncWrite(const TensorRow &row) {
393   std::vector<ReadableSlice> v;
394   v.reserve(row.size() + 1);
395   std::shared_ptr<flatbuffers::FlatBufferBuilder> fbb;
396   RETURN_IF_NOT_OK(::mindspore::dataset::SerializeTensorRowHeader(row, &fbb));
397   int64_t sz = fbb->GetSize();
398   v.emplace_back(fbb->GetBufferPointer(), sz);
399   for (const auto &ts : row) {
400     sz += ts->SizeInBytes();
401     v.emplace_back(ts->GetBuffer(), ts->SizeInBytes());
402   }
403   // If the size is too big, tell the user to send it directly.
404   if (sz > kAsyncBufferSize) {
405     return Status(StatusCode::kMDNotImplementedYet);
406   }
407   std::unique_lock<std::mutex> lock(mux_);
408   // Check error from the server side while we have the lock;
409   RETURN_IF_NOT_OK(flush_rc_);
410   AsyncWriter *asyncWriter = &buf_arr_[cur_];
411   if (asyncWriter->bytes_avail_ < sz) {
412     // Flush and switch to a new buffer while we have the lock.
413     RETURN_IF_NOT_OK(SyncFlush(AsyncFlushFlag::kCallerHasXLock));
414     // Refresh the pointer after we switch
415     asyncWriter = &buf_arr_[cur_];
416   }
417   RETURN_IF_NOT_OK(asyncWriter->Write(sz, v));
418   return Status::OK();
419 }
420 
SyncFlush(AsyncFlushFlag flag)421 Status CacheClient::AsyncBufferStream::SyncFlush(AsyncFlushFlag flag) {
422   std::unique_lock lock(mux_, std::defer_lock_t());
423   bool callerHasXLock = (flag & AsyncFlushFlag::kCallerHasXLock) == AsyncFlushFlag::kCallerHasXLock;
424   if (!callerHasXLock) {
425     lock.lock();
426   }
427   auto *asyncWriter = &buf_arr_[cur_];
428   if (asyncWriter->num_ele_) {
429     asyncWriter->rq.reset(
430       new BatchCacheRowsRequest(cc_, offset_addr_ + cur_ * kAsyncBufferSize, asyncWriter->num_ele_));
431     flush_rc_ = cc_->PushRequest(asyncWriter->rq);
432     RETURN_IF_NOT_OK(flush_rc_);
433 
434     // If we are asked to wait, say this is the final flush, just wait for its completion.
435     bool blocking = (flag & AsyncFlushFlag::kFlushBlocking) == AsyncFlushFlag::kFlushBlocking;
436     if (blocking) {
437       // Make sure we are done with all the buffers
438       for (auto i = 0; i < kNumAsyncBuffer; ++i) {
439         if (buf_arr_[i].rq) {
440           Status rc = buf_arr_[i].rq->Wait();
441           if (rc.IsError()) flush_rc_ = rc;
442           buf_arr_[i].rq.reset();
443         }
444       }
445     }
446     // Prepare for the next buffer.
447     cur_ = (cur_ + 1) % kNumAsyncBuffer;
448     asyncWriter = &buf_arr_[cur_];
449     // Update the cur_ while we have the lock.
450     // Before we do anything, make sure the cache server has done with this buffer, or we will corrupt its content
451     // Also we can also pick up any error from previous flush.
452     if (asyncWriter->rq) {
453       // Save the result into a common area, so worker can see it and quit.
454       flush_rc_ = asyncWriter->rq->Wait();
455       asyncWriter->rq.reset();
456     }
457     asyncWriter->bytes_avail_ = kAsyncBufferSize;
458     asyncWriter->num_ele_ = 0;
459   }
460 
461   return flush_rc_;
462 }
463 
Write(int64_t sz,const std::vector<ReadableSlice> & v)464 Status CacheClient::AsyncBufferStream::AsyncWriter::Write(int64_t sz, const std::vector<ReadableSlice> &v) {
465   CHECK_FAIL_RETURN_UNEXPECTED(sz <= bytes_avail_, "Programming error");
466   for (auto &p : v) {
467     auto write_sz = p.GetSize();
468     WritableSlice dest(reinterpret_cast<char *>(buffer_) + kAsyncBufferSize - bytes_avail_, write_sz);
469     RETURN_IF_NOT_OK(WritableSlice::Copy(&dest, p));
470     bytes_avail_ -= write_sz;
471   }
472   ++num_ele_;
473   return Status::OK();
474 }
475 
Reset()476 Status CacheClient::AsyncBufferStream::Reset() {
477   // Clean up previous running state to be prepared for a new run.
478   cur_ = 0;
479   flush_rc_ = Status::OK();
480   for (auto i = 0; i < kNumAsyncBuffer; ++i) {
481     buf_arr_[i].bytes_avail_ = kAsyncBufferSize;
482     buf_arr_[i].num_ele_ = 0;
483     buf_arr_[i].rq.reset();
484   }
485   return Status::OK();
486 }
487 }  // namespace dataset
488 }  // namespace mindspore
489