1 /**
2 * Copyright 2020-2021 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <iomanip>
18 #include "minddata/dataset/engine/cache/cache_client.h"
19 #include "minddata/dataset/engine/cache/cache_request.h"
20 #include "minddata/dataset/engine/cache/cache_fbb.h"
21 #include "minddata/dataset/util/bit.h"
22 #include "minddata/dataset/util/task_manager.h"
23
24 namespace mindspore {
25 namespace dataset {
Builder()26 CacheClient::Builder::Builder()
27 : session_id_(0), cache_mem_sz_(0), spill_(false), hostname_(""), port_(0), num_connections_(0), prefetch_size_(0) {
28 std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
29 hostname_ = cfg->cache_host();
30 port_ = cfg->cache_port();
31 num_connections_ = cfg->num_connections(); // number of async tcp/ip connections
32 prefetch_size_ = cfg->prefetch_size(); // prefetch size
33 }
34
Build(std::shared_ptr<CacheClient> * out)35 Status CacheClient::Builder::Build(std::shared_ptr<CacheClient> *out) {
36 RETURN_UNEXPECTED_IF_NULL(out);
37 RETURN_IF_NOT_OK(SanityCheck());
38 *out = std::make_shared<CacheClient>(session_id_, cache_mem_sz_, spill_, hostname_, port_, num_connections_,
39 prefetch_size_);
40 return Status::OK();
41 }
42
SanityCheck()43 Status CacheClient::Builder::SanityCheck() {
44 CHECK_FAIL_RETURN_SYNTAX_ERROR(session_id_ > 0, "session id must be positive.");
45 CHECK_FAIL_RETURN_SYNTAX_ERROR(cache_mem_sz_ >= 0, "cache memory size must not be negative (0 implies unlimited).");
46 CHECK_FAIL_RETURN_SYNTAX_ERROR(num_connections_ > 0, "number of tcp/ip connections must be positive.");
47 CHECK_FAIL_RETURN_SYNTAX_ERROR(prefetch_size_ > 0, "prefetch size must be positive.");
48 CHECK_FAIL_RETURN_SYNTAX_ERROR(!hostname_.empty(), "hostname must not be empty.");
49 CHECK_FAIL_RETURN_SYNTAX_ERROR(port_ >= kMinLegalPort, "Port must be in range (1025..65535).");
50 CHECK_FAIL_RETURN_SYNTAX_ERROR(port_ <= kMaxLegalPort, "Port must be in range (1025..65535).");
51 CHECK_FAIL_RETURN_SYNTAX_ERROR(hostname_ == "127.0.0.1",
52 "now cache client has to be on the same host with cache server.");
53 return Status::OK();
54 }
55
56 // Constructor
CacheClient(session_id_type session_id,uint64_t cache_mem_sz,bool spill,std::string hostname,int32_t port,int32_t num_connections,int32_t prefetch_size)57 CacheClient::CacheClient(session_id_type session_id, uint64_t cache_mem_sz, bool spill, std::string hostname,
58 int32_t port, int32_t num_connections, int32_t prefetch_size)
59 : cache_mem_sz_(cache_mem_sz),
60 spill_(spill),
61 server_connection_id_(0),
62 client_id_(-1),
63 local_bypass_(false),
64 num_connections_(num_connections),
65 prefetch_size_(prefetch_size),
66 fetch_all_keys_(true) {
67 cinfo_.set_session_id(session_id);
68 comm_ = std::make_shared<CacheClientGreeter>(hostname, port, num_connections_);
69 }
70
~CacheClient()71 CacheClient::~CacheClient() {
72 cache_miss_keys_wp_.Set();
73 // Manually release the async buffer because we need the comm layer.
74 if (async_buffer_stream_) {
75 Status rc = async_buffer_stream_->ReleaseBuffer();
76 if (rc.IsError()) MS_LOG(ERROR) << rc;
77 }
78 if (client_id_ != -1) {
79 try {
80 // Send a message to the server, saying I am done.
81 auto rq = std::make_shared<ConnectResetRequest>(server_connection_id_, client_id_);
82 Status rc = PushRequest(rq);
83 if (rc.IsOk()) {
84 rc = rq->Wait();
85 if (rc.IsOk()) {
86 MS_LOG(INFO) << "Disconnect from server successful";
87 }
88 }
89 } catch (const std::exception &e) {
90 // Can't do anything in destructor. So just log the error.
91 MS_LOG(ERROR) << e.what();
92 }
93 }
94 (void)comm_->ServiceStop();
95 }
96
97 // print method for display cache details
Print(std::ostream & out) const98 void CacheClient::Print(std::ostream &out) const {
99 out << " Session id: " << session_id() << "\n Cache crc: " << cinfo_.crc()
100 << "\n Server cache id: " << server_connection_id_ << "\n Cache mem size: " << GetCacheMemSz()
101 << "\n Spilling: " << std::boolalpha << isSpill() << "\n Number of rpc workers: " << GetNumConnections()
102 << "\n Prefetch size: " << GetPrefetchSize() << "\n Local client support: " << std::boolalpha
103 << SupportLocalClient();
104 }
105
GetHostname() const106 std::string CacheClient::GetHostname() const { return comm_->GetHostname(); }
GetPort() const107 int32_t CacheClient::GetPort() const { return comm_->GetPort(); }
108
WriteRow(const TensorRow & row,row_id_type * row_id_from_server) const109 Status CacheClient::WriteRow(const TensorRow &row, row_id_type *row_id_from_server) const {
110 auto rq = std::make_shared<CacheRowRequest>(this);
111 RETURN_IF_NOT_OK(rq->SerializeCacheRowRequest(this, row));
112 RETURN_IF_NOT_OK(PushRequest(rq));
113 RETURN_IF_NOT_OK(rq->Wait());
114 if (row_id_from_server != nullptr) {
115 *row_id_from_server = rq->GetRowIdAfterCache();
116 }
117 return Status::OK();
118 }
119
AsyncWriteRow(const TensorRow & row)120 Status CacheClient::AsyncWriteRow(const TensorRow &row) {
121 if (async_buffer_stream_ == nullptr) {
122 return Status(StatusCode::kMDNotImplementedYet);
123 }
124 RETURN_IF_NOT_OK(async_buffer_stream_->AsyncWrite(row));
125 return Status::OK();
126 }
127
GetRows(const std::vector<row_id_type> & row_id,TensorTable * out) const128 Status CacheClient::GetRows(const std::vector<row_id_type> &row_id, TensorTable *out) const {
129 RETURN_UNEXPECTED_IF_NULL(out);
130 auto rq = std::make_shared<BatchFetchRequest>(this, row_id);
131 RETURN_IF_NOT_OK(PushRequest(rq));
132 RETURN_IF_NOT_OK(rq->Wait());
133 int64_t mem_addr;
134 Status rc = rq->RestoreRows(out, comm_->SharedMemoryBaseAddr(), &mem_addr);
135 // Free the memory by sending a request back to the server.
136 if (mem_addr != -1) {
137 auto mfree_req = std::make_shared<FreeSharedBlockRequest>(server_connection_id_, client_id_, mem_addr);
138 Status rc2 = PushRequest(mfree_req);
139 // But we won't wait for the result for the sake of performance.
140 if (rc.IsOk() && rc2.IsError()) {
141 rc = rc2;
142 }
143 }
144 return rc;
145 }
146
CreateCache(uint32_t tree_crc,bool generate_id)147 Status CacheClient::CreateCache(uint32_t tree_crc, bool generate_id) {
148 UniqueLock lck(&mux_);
149 // To create a cache, we identify ourself at the client by:
150 // - the shared session id
151 // - a crc for the tree nodes from the cache downward
152 // Pack these 2 into a single 64 bit request id
153 //
154 // Consider this example:
155 // tree1: tfreader --> map(decode) --> cache (session id = 1, crc = 123) --> batch
156 // tree2: cifar10 --> map(rotate) --> cache (session id = 1, crc = 456) --> batch
157 // These are different trees in a single session, but the user wants to share the cache.
158 // This is not allowed because the data of these caches are different.
159 //
160 // Consider this example:
161 // tree1: tfreader --> map(decode) --> cache (session id = 1, crc = 123) --> batch
162 // tree2: tfreader --> map(decode) --> cache (session id = 1, crc = 123) --> map(rotate) --> batch
163 // These are different trees in the same session, but the cached data is the same, so it is okay
164 // to allow the sharing of this cache between these pipelines.
165
166 // The CRC is computed by the tree prepare phase and passed to this function when creating the cache.
167 // If we already have a server_connection_id_, then it means this same cache client has already been used
168 // to create a cache and some other tree is trying to use the same cache.
169 // That is allowed, however the crc better match!
170 if (server_connection_id_) {
171 if (cinfo_.crc() != tree_crc) {
172 RETURN_STATUS_UNEXPECTED("Cannot re-use a cache for a different tree!");
173 }
174 // Check the state of the server. For non-mappable case where there is a build phase and a fetch phase, we should
175 // skip the build phase.
176 lck.Unlock(); // GetStat will grab the mutex again. So unlock it to prevent deadlock.
177 int8_t out;
178 RETURN_IF_NOT_OK(GetState(&out));
179 auto cache_state = static_cast<CacheServiceState>(out);
180 if (cache_state == CacheServiceState::kFetchPhase ||
181 (cache_state == CacheServiceState::kBuildPhase && cookie_.empty())) {
182 return Status(StatusCode::kMDDuplicateKey, __LINE__, __FILE__,
183 "Not an error and we should bypass the build phase");
184 }
185 if (async_buffer_stream_) {
186 // Reset the async buffer stream to its initial state. Any stale status and data would get cleaned up.
187 RETURN_IF_NOT_OK(async_buffer_stream_->Reset());
188 }
189 } else {
190 cinfo_.set_crc(tree_crc); // It's really a new cache we're creating so save our crc in the client
191 // Now execute the cache create request using this identifier and other configs
192 CreateCacheRequest::CreateCacheFlag createFlag = CreateCacheRequest::CreateCacheFlag::kNone;
193 if (spill_) {
194 createFlag |= CreateCacheRequest::CreateCacheFlag::kSpillToDisk;
195 }
196 if (generate_id) {
197 createFlag |= CreateCacheRequest::CreateCacheFlag::kGenerateRowId;
198 }
199 // Start the comm layer to receive reply
200 RETURN_IF_NOT_OK(comm_->ServiceStart());
201 // Initiate connection
202 auto rq = std::make_shared<CreateCacheRequest>(this, cinfo_, cache_mem_sz_, createFlag);
203 RETURN_IF_NOT_OK(PushRequest(rq));
204 Status rc = rq->Wait();
205 bool success = (rc.IsOk() || rc.StatusCode() == StatusCode::kMDDuplicateKey);
206 // If we get kDuplicateKey, it just means we aren't the first one to create the cache,
207 // and we will continue to parse the result.
208 if (rc.StatusCode() == StatusCode::kMDDuplicateKey) {
209 RETURN_IF_NOT_OK(rq->PostReply());
210 }
211 if (success) {
212 // Attach to shared memory for local client
213 RETURN_IF_NOT_OK(comm_->AttachToSharedMemory(&local_bypass_));
214 if (local_bypass_) {
215 async_buffer_stream_ = std::make_shared<AsyncBufferStream>();
216 RETURN_IF_NOT_OK(async_buffer_stream_->Init(this));
217 }
218 }
219 // We are not resetting the Duplicate key return code. We are passing it back to the CacheOp. This will tell the
220 // CacheOp to bypass the build phase.
221 return rc;
222 }
223 return Status::OK();
224 }
225
DestroyCache()226 Status CacheClient::DestroyCache() {
227 UniqueLock lck(&mux_);
228 auto rq = std::make_shared<DestroyCacheRequest>(server_connection_id_);
229 RETURN_IF_NOT_OK(PushRequest(rq));
230 RETURN_IF_NOT_OK(rq->Wait());
231 return Status::OK();
232 }
233
GetStat(CacheServiceStat * stat)234 Status CacheClient::GetStat(CacheServiceStat *stat) {
235 SharedLock lck(&mux_);
236 RETURN_UNEXPECTED_IF_NULL(stat);
237 // GetStat has an external interface, so we have to make sure we have a valid connection id first
238 CHECK_FAIL_RETURN_UNEXPECTED(server_connection_id_ != 0, "GetStat called but the cache is not in use yet.");
239
240 auto rq = std::make_shared<GetStatRequest>(server_connection_id_);
241 RETURN_IF_NOT_OK(PushRequest(rq));
242 RETURN_IF_NOT_OK(rq->Wait());
243 rq->GetStat(stat);
244 return Status::OK();
245 }
246
GetState(int8_t * out)247 Status CacheClient::GetState(int8_t *out) {
248 SharedLock lck(&mux_);
249 RETURN_UNEXPECTED_IF_NULL(out);
250 CHECK_FAIL_RETURN_UNEXPECTED(server_connection_id_ != 0, "GetState called but the cache is not in use yet.");
251 auto rq = std::make_shared<GetCacheStateRequest>(server_connection_id_);
252 RETURN_IF_NOT_OK(PushRequest(rq));
253 RETURN_IF_NOT_OK(rq->Wait());
254 *out = rq->GetState();
255 return Status::OK();
256 }
257
CacheSchema(const std::unordered_map<std::string,int32_t> & map)258 Status CacheClient::CacheSchema(const std::unordered_map<std::string, int32_t> &map) {
259 SharedLock lck(&mux_);
260 auto rq = std::make_shared<CacheSchemaRequest>(server_connection_id_);
261 RETURN_IF_NOT_OK(rq->SerializeCacheSchemaRequest(map));
262 RETURN_IF_NOT_OK(PushRequest(rq));
263 RETURN_IF_NOT_OK(rq->Wait());
264 return Status::OK();
265 }
266
FetchSchema(std::unordered_map<std::string,int32_t> * map)267 Status CacheClient::FetchSchema(std::unordered_map<std::string, int32_t> *map) {
268 SharedLock lck(&mux_);
269 RETURN_UNEXPECTED_IF_NULL(map);
270 auto rq = std::make_shared<FetchSchemaRequest>(server_connection_id_);
271 RETURN_IF_NOT_OK(PushRequest(rq));
272 RETURN_IF_NOT_OK(rq->Wait());
273 *map = rq->GetColumnMap();
274 return Status::OK();
275 }
276
BuildPhaseDone() const277 Status CacheClient::BuildPhaseDone() const {
278 SharedLock lck(&mux_);
279 auto rq = std::make_shared<BuildPhaseDoneRequest>(server_connection_id_, cookie());
280 RETURN_IF_NOT_OK(PushRequest(rq));
281 RETURN_IF_NOT_OK(rq->Wait());
282 return Status::OK();
283 }
284
PushRequest(std::shared_ptr<BaseRequest> rq) const285 Status CacheClient::PushRequest(std::shared_ptr<BaseRequest> rq) const { return comm_->HandleRequest(std::move(rq)); }
286
ServerRunningOutOfResources()287 void CacheClient::ServerRunningOutOfResources() {
288 bool expected = true;
289 if (fetch_all_keys_.compare_exchange_strong(expected, false)) {
290 Status rc;
291 // Server runs out of memory or disk space to cache any more rows.
292 // First of all, we will turn off the locking.
293 auto toggle_write_mode_rq = std::make_shared<ToggleWriteModeRequest>(server_connection_id_, false);
294 rc = PushRequest(toggle_write_mode_rq);
295 if (rc.IsError()) {
296 return;
297 }
298 // Wait until we can toggle the state of the server to non-locking
299 rc = toggle_write_mode_rq->Wait();
300 if (rc.IsError()) {
301 return;
302 }
303 // Now we get a list of all the keys not cached at the server so
304 // we can filter out at the prefetch level.
305 auto cache_miss_rq = std::make_shared<GetCacheMissKeysRequest>(server_connection_id_);
306 rc = PushRequest(cache_miss_rq);
307 if (rc.IsError()) {
308 return;
309 }
310 rc = cache_miss_rq->Wait();
311 if (rc.IsError()) {
312 return;
313 }
314 // We will get back a vector of row id between [min,max] that are absent in the server.
315 auto &row_id_buf = cache_miss_rq->reply_.result();
316 auto p = flatbuffers::GetRoot<TensorRowIds>(row_id_buf.data());
317 std::vector<row_id_type> row_ids;
318 auto sz = p->row_id()->size();
319 row_ids.reserve(sz);
320 for (uint32_t i = 0; i < sz; ++i) {
321 row_ids.push_back(p->row_id()->Get(i));
322 }
323 cache_miss_keys_ = std::make_unique<CacheMissKeys>(row_ids);
324 // We are all set.
325 cache_miss_keys_wp_.Set();
326 }
327 }
328
CacheMissKeys(const std::vector<row_id_type> & v)329 CacheClient::CacheMissKeys::CacheMissKeys(const std::vector<row_id_type> &v) {
330 auto it = v.begin();
331 min_ = *it;
332 ++it;
333 max_ = *it;
334 ++it;
335 while (it != v.end()) {
336 gap_.insert(*it);
337 ++it;
338 }
339 MS_LOG(INFO) << "# of cache miss keys between min(" << min_ << ") and max(" << max_ << ") is " << gap_.size();
340 }
341
KeyIsCacheMiss(row_id_type key)342 bool CacheClient::CacheMissKeys::KeyIsCacheMiss(row_id_type key) {
343 if (key > max_ || key < min_) {
344 return true;
345 } else if (key == min_ || key == max_) {
346 return false;
347 } else {
348 auto it = gap_.find(key);
349 return it != gap_.end();
350 }
351 }
352
AsyncBufferStream()353 CacheClient::AsyncBufferStream::AsyncBufferStream() : cc_(nullptr), offset_addr_(-1), cur_(0) {}
354
~AsyncBufferStream()355 CacheClient::AsyncBufferStream::~AsyncBufferStream() {
356 (void)vg_.ServiceStop();
357 (void)ReleaseBuffer();
358 }
359
ReleaseBuffer()360 Status CacheClient::AsyncBufferStream::ReleaseBuffer() {
361 if (offset_addr_ != -1) {
362 auto mfree_req =
363 std::make_shared<FreeSharedBlockRequest>(cc_->server_connection_id_, cc_->GetClientId(), offset_addr_);
364 offset_addr_ = -1;
365 RETURN_IF_NOT_OK(cc_->PushRequest(mfree_req));
366 RETURN_IF_NOT_OK(mfree_req->Wait());
367 }
368 return Status::OK();
369 }
370
Init(CacheClient * cc)371 Status CacheClient::AsyncBufferStream::Init(CacheClient *cc) {
372 cc_ = cc;
373 // Allocate shared memory from the server
374 auto mem_rq = std::make_shared<AllocateSharedBlockRequest>(cc_->server_connection_id_, cc_->GetClientId(),
375 kAsyncBufferSize * kNumAsyncBuffer);
376 RETURN_IF_NOT_OK(cc->PushRequest(mem_rq));
377 RETURN_IF_NOT_OK(mem_rq->Wait());
378 offset_addr_ = mem_rq->GetAddr();
379 // Now we need to add that to the base address of where we attach.
380 auto base = cc->SharedMemoryBaseAddr();
381 auto start = reinterpret_cast<int64_t>(base) + offset_addr_;
382 for (auto i = 0; i < kNumAsyncBuffer; ++i) {
383 // We only need to set the pointer during init. Other fields will be set dynamically.
384 buf_arr_[i].buffer_ = reinterpret_cast<void *>(start + i * kAsyncBufferSize);
385 }
386 buf_arr_[0].bytes_avail_ = kAsyncBufferSize;
387 buf_arr_[0].num_ele_ = 0;
388 RETURN_IF_NOT_OK(vg_.ServiceStart());
389 return Status::OK();
390 }
391
AsyncWrite(const TensorRow & row)392 Status CacheClient::AsyncBufferStream::AsyncWrite(const TensorRow &row) {
393 std::vector<ReadableSlice> v;
394 v.reserve(row.size() + 1);
395 std::shared_ptr<flatbuffers::FlatBufferBuilder> fbb;
396 RETURN_IF_NOT_OK(::mindspore::dataset::SerializeTensorRowHeader(row, &fbb));
397 int64_t sz = fbb->GetSize();
398 v.emplace_back(fbb->GetBufferPointer(), sz);
399 for (const auto &ts : row) {
400 sz += ts->SizeInBytes();
401 v.emplace_back(ts->GetBuffer(), ts->SizeInBytes());
402 }
403 // If the size is too big, tell the user to send it directly.
404 if (sz > kAsyncBufferSize) {
405 return Status(StatusCode::kMDNotImplementedYet);
406 }
407 std::unique_lock<std::mutex> lock(mux_);
408 // Check error from the server side while we have the lock;
409 RETURN_IF_NOT_OK(flush_rc_);
410 AsyncWriter *asyncWriter = &buf_arr_[cur_];
411 if (asyncWriter->bytes_avail_ < sz) {
412 // Flush and switch to a new buffer while we have the lock.
413 RETURN_IF_NOT_OK(SyncFlush(AsyncFlushFlag::kCallerHasXLock));
414 // Refresh the pointer after we switch
415 asyncWriter = &buf_arr_[cur_];
416 }
417 RETURN_IF_NOT_OK(asyncWriter->Write(sz, v));
418 return Status::OK();
419 }
420
SyncFlush(AsyncFlushFlag flag)421 Status CacheClient::AsyncBufferStream::SyncFlush(AsyncFlushFlag flag) {
422 std::unique_lock lock(mux_, std::defer_lock_t());
423 bool callerHasXLock = (flag & AsyncFlushFlag::kCallerHasXLock) == AsyncFlushFlag::kCallerHasXLock;
424 if (!callerHasXLock) {
425 lock.lock();
426 }
427 auto *asyncWriter = &buf_arr_[cur_];
428 if (asyncWriter->num_ele_) {
429 asyncWriter->rq.reset(
430 new BatchCacheRowsRequest(cc_, offset_addr_ + cur_ * kAsyncBufferSize, asyncWriter->num_ele_));
431 flush_rc_ = cc_->PushRequest(asyncWriter->rq);
432 RETURN_IF_NOT_OK(flush_rc_);
433
434 // If we are asked to wait, say this is the final flush, just wait for its completion.
435 bool blocking = (flag & AsyncFlushFlag::kFlushBlocking) == AsyncFlushFlag::kFlushBlocking;
436 if (blocking) {
437 // Make sure we are done with all the buffers
438 for (auto i = 0; i < kNumAsyncBuffer; ++i) {
439 if (buf_arr_[i].rq) {
440 Status rc = buf_arr_[i].rq->Wait();
441 if (rc.IsError()) flush_rc_ = rc;
442 buf_arr_[i].rq.reset();
443 }
444 }
445 }
446 // Prepare for the next buffer.
447 cur_ = (cur_ + 1) % kNumAsyncBuffer;
448 asyncWriter = &buf_arr_[cur_];
449 // Update the cur_ while we have the lock.
450 // Before we do anything, make sure the cache server has done with this buffer, or we will corrupt its content
451 // Also we can also pick up any error from previous flush.
452 if (asyncWriter->rq) {
453 // Save the result into a common area, so worker can see it and quit.
454 flush_rc_ = asyncWriter->rq->Wait();
455 asyncWriter->rq.reset();
456 }
457 asyncWriter->bytes_avail_ = kAsyncBufferSize;
458 asyncWriter->num_ele_ = 0;
459 }
460
461 return flush_rc_;
462 }
463
Write(int64_t sz,const std::vector<ReadableSlice> & v)464 Status CacheClient::AsyncBufferStream::AsyncWriter::Write(int64_t sz, const std::vector<ReadableSlice> &v) {
465 CHECK_FAIL_RETURN_UNEXPECTED(sz <= bytes_avail_, "Programming error");
466 for (auto &p : v) {
467 auto write_sz = p.GetSize();
468 WritableSlice dest(reinterpret_cast<char *>(buffer_) + kAsyncBufferSize - bytes_avail_, write_sz);
469 RETURN_IF_NOT_OK(WritableSlice::Copy(&dest, p));
470 bytes_avail_ -= write_sz;
471 }
472 ++num_ele_;
473 return Status::OK();
474 }
475
Reset()476 Status CacheClient::AsyncBufferStream::Reset() {
477 // Clean up previous running state to be prepared for a new run.
478 cur_ = 0;
479 flush_rc_ = Status::OK();
480 for (auto i = 0; i < kNumAsyncBuffer; ++i) {
481 buf_arr_[i].bytes_avail_ = kAsyncBufferSize;
482 buf_arr_[i].num_ele_ = 0;
483 buf_arr_[i].rq.reset();
484 }
485 return Status::OK();
486 }
487 } // namespace dataset
488 } // namespace mindspore
489