1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #ifndef TENSORFLOW_CORE_DISTRIBUTED_RUNTIME_MESSAGE_WRAPPERS_H_ 17 #define TENSORFLOW_CORE_DISTRIBUTED_RUNTIME_MESSAGE_WRAPPERS_H_ 18 19 #include "tensorflow/core/framework/allocator.h" 20 #include "tensorflow/core/framework/cost_graph.pb.h" 21 #include "tensorflow/core/framework/graph.pb.h" 22 #include "tensorflow/core/framework/step_stats.pb.h" 23 #include "tensorflow/core/framework/tensor.h" 24 #include "tensorflow/core/framework/tensor.pb.h" 25 #include "tensorflow/core/framework/versions.pb.h" 26 #include "tensorflow/core/protobuf/config.pb.h" 27 #include "tensorflow/core/protobuf/master.pb.h" 28 #include "tensorflow/core/protobuf/worker.pb.h" 29 30 namespace tensorflow { 31 32 //////////////////////////////////////////////////////////////////////////////// 33 // 34 // Wrapper classes for the `MasterService.RunStep` request message. 35 // 36 // The `RunStepRequest` message can contain potentially large tensor 37 // data as part of its `feed` submessages. Here we provide specialized 38 // wrappers that avoid copying the tensor data wherever possible. 39 // 40 // See `RunStepRequest` in tensorflow/core/protobuf/master.proto for the 41 // protocol buffer definition. 42 // 43 //////////////////////////////////////////////////////////////////////////////// 44 45 // Abstract interface for an immutable RunStepRequest message. 46 // 47 // This interface is typically used by server-side components in the 48 // TensorFlow master. 49 class RunStepRequestWrapper { 50 public: ~RunStepRequestWrapper()51 virtual ~RunStepRequestWrapper() {} 52 53 // REQUIRED: session_handle must be returned by a CreateSession call 54 // to the same master service. 55 virtual const string& session_handle() const = 0; 56 57 // Partial run handle (optional). If specified, this will be a partial run 58 // execution, run up to the specified fetches. 59 virtual const string& partial_run_handle() const = 0; 60 61 // Tensors to be fed in the step. Each feed is a named tensor. 62 virtual size_t num_feeds() const = 0; 63 virtual const string& feed_name(size_t i) const = 0; 64 65 // Stores the content of the feed value at index `i` in `tensor`. 66 virtual Status FeedValue(size_t i, Tensor* out_tensor) const = 0; 67 virtual Status FeedValue(size_t i, TensorProto* out_tensor) const = 0; 68 69 // Fetches. A list of tensor names. The caller expects a tensor to 70 // be returned for each fetch[i] (see RunStepResponse.tensor). The 71 // order of specified fetches does not change the execution order. 72 virtual size_t num_fetches() const = 0; 73 virtual const string& fetch_name(size_t i) const = 0; 74 75 // Target Nodes. A list of node names. The named nodes will be run 76 // to but their outputs will not be fetched. 77 virtual size_t num_targets() const = 0; 78 virtual const string& target_name(size_t i) const = 0; 79 80 // Options for the run call. 81 virtual const RunOptions& options() const = 0; 82 83 // If true then some errors, e.g., execution errors that have long 84 // error messages, may return an OK RunStepResponse with the actual 85 // error saved in the status_code/status_error_message fields of the 86 // response body. This is a workaround since the RPC subsystem may 87 // truncate long metadata messages. 88 virtual bool store_errors_in_response_body() const = 0; 89 90 // Unique identifier for this request. Every RunGraphRequest must have a 91 // unique request_id, and retried RunGraphRequests must have the same 92 // request_id. If request_id is zero, retry detection is disabled. 93 virtual int64 request_id() const = 0; 94 95 // Returns a human-readable representation of this message for debugging. 96 virtual string DebugString() const = 0; 97 98 // Returns the wrapped data as a protocol buffer message. 99 virtual const RunStepRequest& ToProto() const = 0; 100 }; 101 102 // Abstract interface for a mutable RunStepRequest message. 103 // 104 // See `RunStepRequestWrapper` above for a description of the fields. 105 class MutableRunStepRequestWrapper : public RunStepRequestWrapper { 106 public: 107 virtual void set_session_handle(const string& handle) = 0; 108 virtual void set_partial_run_handle(const string& handle) = 0; 109 virtual void add_feed(const string& name, const Tensor& value) = 0; 110 virtual void add_fetch(const string& name) = 0; 111 virtual void add_target(const string& name) = 0; 112 virtual RunOptions* mutable_options() = 0; 113 virtual void set_store_errors_in_response_body(bool store_errors) = 0; 114 }; 115 116 // Specialized (and mutable) wrapper for RunStep requests between a client and 117 // master in the same address space. 118 class InMemoryRunStepRequest : public MutableRunStepRequestWrapper { 119 public: 120 // RunStepRequestWrapper methods. 121 const string& session_handle() const override; 122 const string& partial_run_handle() const override; 123 size_t num_feeds() const override; 124 const string& feed_name(size_t i) const override; 125 Status FeedValue(size_t i, Tensor* out_tensor) const override; 126 Status FeedValue(size_t i, TensorProto* out_tensor) const override; 127 size_t num_fetches() const override; 128 const string& fetch_name(size_t i) const override; 129 size_t num_targets() const override; 130 const string& target_name(size_t i) const override; 131 const RunOptions& options() const override; 132 string DebugString() const override; 133 const RunStepRequest& ToProto() const override; 134 bool store_errors_in_response_body() const override; 135 int64 request_id() const override; 136 137 // MutableRunStepRequestWrapper methods. 138 void set_session_handle(const string& handle) override; 139 void set_partial_run_handle(const string& handle) override; 140 void add_feed(const string& name, const Tensor& value) override; 141 void add_fetch(const string& name) override; 142 void add_target(const string& name) override; 143 RunOptions* mutable_options() override; 144 void set_store_errors_in_response_body(bool store_errors) override; 145 146 private: 147 string session_handle_; 148 string partial_run_handle_; 149 gtl::InlinedVector<std::pair<string, Tensor>, 4> feeds_; 150 gtl::InlinedVector<string, 4> fetches_; 151 gtl::InlinedVector<string, 4> targets_; 152 RunOptions options_; 153 bool store_errors_in_response_body_ = false; 154 155 // Holds a cached and owned representation of the proto 156 // representation of this request, if needed, so that `ToProto()` 157 // can return a const RunStepRequest&. 158 // NOTE(mrry): Although calls to `ToProto()` on this class are 159 // expected to be rare, retaining ownership of the returned message 160 // makes it easier to return a reference from the proto-backed 161 // representations. 162 mutable std::unique_ptr<RunStepRequest> proto_version_; 163 }; 164 165 // Wrapper for mutable RunStep requests that uses a protobuf message. 166 // 167 // This wrapper class should be used for RunStep requests between a 168 // client and master in different address spaces. 169 class MutableProtoRunStepRequest : public MutableRunStepRequestWrapper { 170 public: 171 // RunStepRequestWrapper methods. 172 const string& session_handle() const override; 173 const string& partial_run_handle() const override; 174 size_t num_feeds() const override; 175 const string& feed_name(size_t i) const override; 176 Status FeedValue(size_t i, Tensor* out_tensor) const override; 177 Status FeedValue(size_t i, TensorProto* out_tensor) const override; 178 size_t num_fetches() const override; 179 const string& fetch_name(size_t i) const override; 180 size_t num_targets() const override; 181 const string& target_name(size_t i) const override; 182 const RunOptions& options() const override; 183 string DebugString() const override; 184 const RunStepRequest& ToProto() const override; 185 bool store_errors_in_response_body() const override; 186 int64 request_id() const override; 187 188 // MutableRunStepRequestWrapper methods. 189 void set_session_handle(const string& handle) override; 190 void set_partial_run_handle(const string& handle) override; 191 void add_feed(const string& name, const Tensor& value) override; 192 void add_fetch(const string& name) override; 193 void add_target(const string& name) override; 194 RunOptions* mutable_options() override; 195 void set_store_errors_in_response_body(bool store_errors) override; 196 197 private: 198 RunStepRequest request_; 199 friend class MasterInterface; 200 }; 201 202 // Wrapper for immutable RunStep requests that use a non-owned 203 // protobuf message. 204 // 205 // This interface is typically used by server-side components in the 206 // TensorFlow master, where the incoming message is a (possibly const) 207 // `RunStepRequest*`. 208 class ProtoRunStepRequest : public RunStepRequestWrapper { 209 public: 210 ProtoRunStepRequest(const RunStepRequest* request); 211 212 // RunStepRequestWrapper methods. 213 const string& session_handle() const override; 214 const string& partial_run_handle() const override; 215 size_t num_feeds() const override; 216 const string& feed_name(size_t i) const override; 217 Status FeedValue(size_t i, Tensor* out_tensor) const override; 218 Status FeedValue(size_t i, TensorProto* out_tensor) const override; 219 size_t num_fetches() const override; 220 const string& fetch_name(size_t i) const override; 221 size_t num_targets() const override; 222 const string& target_name(size_t i) const override; 223 const RunOptions& options() const override; 224 string DebugString() const override; 225 const RunStepRequest& ToProto() const override; 226 bool store_errors_in_response_body() const override; 227 int64 request_id() const override; 228 229 private: 230 const RunStepRequest* const request_; // Not owned. 231 }; 232 233 //////////////////////////////////////////////////////////////////////////////// 234 // 235 // Wrapper classes for the `WorkerService.RunGraph` request message. 236 // 237 // The `RunGraphRequest` message can contain potentially large tensor 238 // data as part of its `send` submessages. Here we provide specialized 239 // wrappers that avoid copying the tensor data wherever possible. 240 // 241 // See `RunGraphRequest` in tensorflow/core/protobuf/worker.proto for the 242 // protocol buffer definition. 243 // 244 //////////////////////////////////////////////////////////////////////////////// 245 246 // Abstract interface for an immutable RunGraphRequest message. 247 // 248 // This interface is typically used by server-side components in the 249 // TensorFlow worker. 250 class RunGraphRequestWrapper { 251 public: ~RunGraphRequestWrapper()252 virtual ~RunGraphRequestWrapper() {} 253 254 // The session handle used to register the graph. If empty, a single global 255 // namespace is used. 256 virtual const string& session_handle() const = 0; 257 258 // Set to true if `CreateWorkerSession` was called for `session_handle`. 259 virtual bool create_worker_session_called() const = 0; 260 261 // REQUIRED: graph_handle must be returned by a RegisterGraph call 262 // to the same WorkerService. 263 virtual const string& graph_handle() const = 0; 264 265 // A unique ID to distinguish different runs of the same graph. 266 // 267 // The master generates a global unique `step_id` to distinguish 268 // different runs of the graph computation. Subgraphs communicate 269 // (e.g., send/recv ops) with each other using `step_id` to 270 // distinguish tensors generated by different runs. 271 virtual int64 step_id() const = 0; 272 273 // Options for this step. 274 virtual const ExecutorOpts& exec_opts() const = 0; 275 276 // Sends the tensors in "send" into the graph before the run. 277 virtual size_t num_sends() const = 0; 278 virtual const string& send_key(size_t i) const = 0; 279 virtual Status SendValue(size_t i, Tensor* out_tensor) const = 0; 280 281 // Fetches the keys into `RunGraphResponse.recv` after the run. 282 virtual size_t num_recvs() const = 0; 283 virtual const string& recv_key(size_t i) const = 0; 284 285 // True if the RunGraphRequest is a partial run request. 286 virtual bool is_partial() const = 0; 287 288 // True if this is the last partial run request in a sequence of requests. 289 virtual bool is_last_partial_run() const = 0; 290 291 // If true then some errors, e.g., execution errors that have long 292 // error messages, may return an OK RunStepResponse with the actual 293 // error saved in the status_code/status_error_message fields of the 294 // response body. This is a workaround since the RPC subsystem may 295 // truncate long metadata messages. 296 virtual bool store_errors_in_response_body() const = 0; 297 298 virtual int64 request_id() const = 0; 299 300 // Returns the wrapped data as a protocol buffer message. 301 virtual const RunGraphRequest& ToProto() const = 0; 302 }; 303 304 // Abstract interface for a mutable RunGraphRequest message. 305 // 306 // See `RunGraphRequestWrapper` above for a description of the fields. 307 class MutableRunGraphRequestWrapper : public RunGraphRequestWrapper { 308 public: 309 virtual void set_session_handle(const string& handle) = 0; 310 virtual void set_create_worker_session_called(bool called) = 0; 311 virtual void set_graph_handle(const string& handle) = 0; 312 virtual void set_step_id(int64_t step_id) = 0; 313 virtual ExecutorOpts* mutable_exec_opts() = 0; 314 315 // Stores the i^{th} feed value in `run_step_request` in this 316 // request with the given `send_key`. 317 virtual Status AddSendFromRunStepRequest( 318 const RunStepRequestWrapper& run_step_request, size_t i, 319 const string& send_key) = 0; 320 virtual Status AddSendFromRunCallableRequest( 321 const RunCallableRequest& run_callable_request, size_t i, 322 const string& send_key) = 0; 323 324 virtual void add_recv_key(const string& recv_key) = 0; 325 virtual void set_is_partial(bool is_partial) = 0; 326 virtual void set_is_last_partial_run(bool is_last_partial_run) = 0; 327 virtual void set_store_errors_in_response_body(bool store_errors) = 0; 328 virtual void set_request_id(int64_t request_id) = 0; 329 }; 330 331 class InMemoryRunGraphRequest : public MutableRunGraphRequestWrapper { 332 public: 333 // RunGraphRequestWrapper methods. 334 const string& session_handle() const override; 335 const string& graph_handle() const override; 336 bool create_worker_session_called() const override; 337 int64 step_id() const override; 338 const ExecutorOpts& exec_opts() const override; 339 size_t num_sends() const override; 340 const string& send_key(size_t i) const override; 341 Status SendValue(size_t i, Tensor* out_tensor) const override; 342 size_t num_recvs() const override; 343 const string& recv_key(size_t i) const override; 344 bool is_partial() const override; 345 bool is_last_partial_run() const override; 346 const RunGraphRequest& ToProto() const override; 347 bool store_errors_in_response_body() const override; 348 int64 request_id() const override; 349 350 // MutableRunGraphRequestWrapper methods. 351 void set_session_handle(const string& handle) override; 352 void set_create_worker_session_called(bool called) override; 353 void set_graph_handle(const string& handle) override; 354 void set_step_id(int64_t step_id) override; 355 ExecutorOpts* mutable_exec_opts() override; 356 Status AddSendFromRunStepRequest( 357 const RunStepRequestWrapper& run_step_request, size_t i, 358 const string& send_key) override; 359 Status AddSendFromRunCallableRequest( 360 const RunCallableRequest& run_callable_request, size_t i, 361 const string& send_key) override; 362 void add_recv_key(const string& recv_key) override; 363 void set_is_partial(bool is_partial) override; 364 void set_is_last_partial_run(bool is_last_partial_run) override; 365 void set_store_errors_in_response_body(bool store_errors) override; 366 void set_request_id(int64_t request_id) override; 367 368 private: 369 string session_handle_; 370 bool create_worker_session_called_ = false; 371 string graph_handle_; 372 int64 step_id_; 373 ExecutorOpts exec_opts_; 374 gtl::InlinedVector<std::pair<string, Tensor>, 4> sends_; 375 gtl::InlinedVector<string, 4> recvs_; 376 bool is_partial_ = false; 377 bool is_last_partial_run_ = false; 378 bool store_errors_in_response_body_ = false; 379 int64 request_id_ = 0; 380 381 // Holds a cached and owned representation of the proto 382 // representation of this request, if needed, so that `ToProto()` 383 // can return a const RunGraphRequest&. 384 // NOTE(mrry): Although calls to `ToProto()` on this class are 385 // expected to be rare, retaining ownership of the returned message 386 // makes it easier to return a reference from the proto-backed 387 // representations. 388 mutable std::unique_ptr<RunGraphRequest> proto_version_; 389 }; 390 391 class MutableProtoRunGraphRequest : public MutableRunGraphRequestWrapper { 392 public: 393 // RunGraphRequestWrapper methods. 394 const string& session_handle() const override; 395 bool create_worker_session_called() const override; 396 const string& graph_handle() const override; 397 int64 step_id() const override; 398 const ExecutorOpts& exec_opts() const override; 399 size_t num_sends() const override; 400 const string& send_key(size_t i) const override; 401 Status SendValue(size_t i, Tensor* out_tensor) const override; 402 size_t num_recvs() const override; 403 const string& recv_key(size_t i) const override; 404 bool is_partial() const override; 405 bool is_last_partial_run() const override; 406 bool store_errors_in_response_body() const override; 407 int64 request_id() const override; 408 const RunGraphRequest& ToProto() const override; 409 410 // MutableRunGraphRequestWrapper methods. 411 void set_session_handle(const string& handle) override; 412 void set_create_worker_session_called(bool called) override; 413 void set_graph_handle(const string& handle) override; 414 void set_step_id(int64_t step_id) override; 415 ExecutorOpts* mutable_exec_opts() override; 416 Status AddSendFromRunStepRequest( 417 const RunStepRequestWrapper& run_step_request, size_t i, 418 const string& send_key) override; 419 Status AddSendFromRunCallableRequest( 420 const RunCallableRequest& run_callable_request, size_t i, 421 const string& send_key) override; 422 void add_recv_key(const string& recv_key) override; 423 void set_is_partial(bool is_partial) override; 424 void set_is_last_partial_run(bool is_last_partial_run) override; 425 void set_store_errors_in_response_body(bool store_errors) override; 426 void set_request_id(int64_t request_id) override; 427 428 private: 429 RunGraphRequest request_; 430 }; 431 432 class ProtoRunGraphRequest : public RunGraphRequestWrapper { 433 public: 434 ProtoRunGraphRequest(const RunGraphRequest* request); 435 436 // RunGraphRequestWrapper methods. 437 const string& session_handle() const override; 438 bool create_worker_session_called() const override; 439 const string& graph_handle() const override; 440 int64 step_id() const override; 441 const ExecutorOpts& exec_opts() const override; 442 size_t num_sends() const override; 443 const string& send_key(size_t i) const override; 444 Status SendValue(size_t i, Tensor* out_tensor) const override; 445 size_t num_recvs() const override; 446 const string& recv_key(size_t i) const override; 447 bool is_partial() const override; 448 bool is_last_partial_run() const override; 449 bool store_errors_in_response_body() const override; 450 int64 request_id() const override; 451 const RunGraphRequest& ToProto() const override; 452 453 private: 454 const RunGraphRequest* const request_; // Not owned. 455 }; 456 457 //////////////////////////////////////////////////////////////////////////////// 458 // 459 // Wrapper classes for the `WorkerService.RunGraph` response message. 460 // 461 // The `RunGraphResponse` message can contain potentially large tensor 462 // data as part of its `recv` submessages. Here we provide specialized 463 // wrappers that avoid copying the tensor data wherever possible. 464 // 465 // See `RunGraphResponse` in tensorflow/core/protobuf/worker.proto for the 466 // protocol buffer definition. 467 // 468 //////////////////////////////////////////////////////////////////////////////// 469 470 // Abstract interface for a mutable RunGraphResponse message. 471 // 472 // Note that there is no corresponding (immutable) 473 // RunGraphResponseWrapper class, because the RunGraphResponse object 474 // is always used as a mutable pointer. 475 class MutableRunGraphResponseWrapper { 476 public: ~MutableRunGraphResponseWrapper()477 virtual ~MutableRunGraphResponseWrapper() {} 478 479 // A list of tensors corresponding to those requested by 480 // `RunGraphRequest.recv_key`. 481 virtual size_t num_recvs() const = 0; 482 virtual const string& recv_key(size_t i) const = 0; 483 // NOTE: The following methods may perform a destructive read, for 484 // efficiency. 485 virtual Status RecvValue(size_t i, TensorProto* out_tensor) = 0; 486 virtual Status RecvValue(size_t i, Tensor* out_tensor) = 0; 487 virtual void AddRecv(const string& key, const Tensor& value) = 0; 488 489 // Submessages that store performance statistics about the subgraph 490 // execution, if necessary. 491 virtual StepStats* mutable_step_stats() = 0; 492 virtual CostGraphDef* mutable_cost_graph() = 0; 493 virtual size_t num_partition_graphs() const = 0; 494 virtual GraphDef* mutable_partition_graph(size_t i) = 0; 495 virtual void AddPartitionGraph(const GraphDef& partition_graph) = 0; 496 497 // Returned status if requested. 498 virtual errors::Code status_code() const = 0; 499 virtual const string& status_error_message() const = 0; 500 virtual void set_status(const Status& status) = 0; 501 502 protected: 503 // Returns a mutable protobuf message that represents the contents of 504 // this wrapper, for passing to an RPC subsystem that will populate 505 // the message. 506 // 507 // NOTE: Only `WorkerInterface` subclasses may call this method. The 508 // `InMemoryRunGraphResponse` subclass does not implement this 509 // method, and attempts to call it will fail with a fatal 510 // error. However, as long as callers always call 511 // `WorkerInterface::RunGraphAsync()` with a wrapper object returned 512 // from `WorkerInterface::CreateRunGraphResponse()` called on the 513 // *same* WorkerInterface object, this error will never trigger. 514 virtual RunGraphResponse* get_proto() = 0; 515 friend class WorkerInterface; 516 }; 517 518 class InMemoryRunGraphResponse : public MutableRunGraphResponseWrapper { 519 public: 520 // MutableRunGraphResponseWrapper methods. 521 size_t num_recvs() const override; 522 const string& recv_key(size_t i) const override; 523 Status RecvValue(size_t i, TensorProto* out_tensor) override; 524 Status RecvValue(size_t i, Tensor* out_tensor) override; 525 void AddRecv(const string& key, const Tensor& value) override; 526 StepStats* mutable_step_stats() override; 527 CostGraphDef* mutable_cost_graph() override; 528 size_t num_partition_graphs() const override; 529 GraphDef* mutable_partition_graph(size_t i) override; 530 void AddPartitionGraph(const GraphDef& partition_graph) override; 531 errors::Code status_code() const override; 532 const string& status_error_message() const override; 533 void set_status(const Status& status) override; 534 535 protected: 536 // NOTE: This method is not implemented. See 537 // MutableRunGraphResponseWrapper for an explanation. 538 RunGraphResponse* get_proto() override; 539 540 private: 541 gtl::InlinedVector<std::pair<string, Tensor>, 4> recvs_; 542 StepStats step_stats_; 543 CostGraphDef cost_graph_; 544 std::vector<GraphDef> partition_graphs_; 545 // Store the code and message separately so that they can be updated 546 // independently by setters. 547 Status status_; 548 }; 549 550 // Proto-based message wrapper for use on the client side of the RunGraph RPC. 551 class OwnedProtoRunGraphResponse : public MutableRunGraphResponseWrapper { 552 public: 553 // MutableRunGraphResponseWrapper methods. 554 size_t num_recvs() const override; 555 const string& recv_key(size_t i) const override; 556 Status RecvValue(size_t i, TensorProto* out_tensor) override; 557 Status RecvValue(size_t i, Tensor* out_tensor) override; 558 void AddRecv(const string& key, const Tensor& value) override; 559 StepStats* mutable_step_stats() override; 560 CostGraphDef* mutable_cost_graph() override; 561 size_t num_partition_graphs() const override; 562 GraphDef* mutable_partition_graph(size_t i) override; 563 void AddPartitionGraph(const GraphDef& partition_graph) override; 564 errors::Code status_code() const override; 565 const string& status_error_message() const override; 566 void set_status(const Status& status) override; 567 568 protected: 569 RunGraphResponse* get_proto() override; 570 571 private: 572 RunGraphResponse response_; 573 }; 574 575 // Proto-based message wrapper for use on the server side of the RunGraph RPC. 576 class NonOwnedProtoRunGraphResponse : public MutableRunGraphResponseWrapper { 577 public: 578 NonOwnedProtoRunGraphResponse(RunGraphResponse* response); 579 580 // MutableRunGraphResponseWrapper methods. 581 size_t num_recvs() const override; 582 const string& recv_key(size_t i) const override; 583 Status RecvValue(size_t i, TensorProto* out_tensor) override; 584 Status RecvValue(size_t i, Tensor* out_tensor) override; 585 void AddRecv(const string& key, const Tensor& value) override; 586 StepStats* mutable_step_stats() override; 587 CostGraphDef* mutable_cost_graph() override; 588 size_t num_partition_graphs() const override; 589 GraphDef* mutable_partition_graph(size_t i) override; 590 void AddPartitionGraph(const GraphDef& partition_graph) override; 591 errors::Code status_code() const override; 592 const string& status_error_message() const override; 593 void set_status(const Status& status) override; 594 595 protected: 596 RunGraphResponse* get_proto() override; 597 598 private: 599 RunGraphResponse* const response_; 600 }; 601 602 //////////////////////////////////////////////////////////////////////////////// 603 // 604 // Wrapper classes for the `MasterService.RunStep` response message. 605 // 606 // The `RunStepResponse` message can contain potentially large tensor 607 // data as part of its `tensor` submessages. Here we provide specialized 608 // wrappers that avoid copying the tensor data wherever possible. 609 // 610 // See `RunStepResponse` in tensorflow/core/protobuf/master.proto for the 611 // protocol buffer definition. 612 // 613 //////////////////////////////////////////////////////////////////////////////// 614 615 // Abstract interface for a mutable RunStepResponse message. 616 // 617 // Note that there is no corresponding (immutable) 618 // RunStepResponseWrapper class, because the RunStepResponse object is 619 // always used as a mutable pointer. 620 class MutableRunStepResponseWrapper { 621 public: 622 virtual ~MutableRunStepResponseWrapper(); 623 624 // The values of the tensors whose fetching was requested in the 625 // RunStep call. 626 // 627 // NOTE: The order of the returned tensors may or may not match 628 // the fetch order specified in RunStepRequest. 629 virtual size_t num_tensors() const = 0; 630 virtual const string& tensor_name(size_t i) const = 0; 631 virtual Status TensorValue(size_t i, Tensor* out_tensor) const = 0; 632 633 // Stores the i^{th} recv value in `run_graph_response` in this 634 // response with the given `name`. 635 virtual Status AddTensorFromRunGraphResponse( 636 const string& name, MutableRunGraphResponseWrapper* run_graph_response, 637 size_t i) = 0; 638 639 // Returned metadata if requested in the options. 640 virtual const RunMetadata& metadata() const = 0; 641 virtual RunMetadata* mutable_metadata() = 0; 642 643 // Returned status if requested. 644 virtual errors::Code status_code() const = 0; 645 virtual const string& status_error_message() const = 0; 646 virtual void set_status(const Status& status) = 0; 647 648 protected: 649 // Returns a mutable protobuf message that represents the contents of 650 // this wrapper, for passing to an RPC subsystem that will populate 651 // the message. 652 // 653 // NOTE: Only `MasterInterface` subclasses may call this method. The 654 // `InMemoryRunStepResponse` subclass does not implement this 655 // method, and attempts to call it will fail with a fatal 656 // error. However, as long as callers always call 657 // `MasterInterface::RunStep()` with a wrapper object returned 658 // from `MasterInterface::CreateRunStepResponse()` called on the 659 // *same* MasterInterface object, this error will never trigger. 660 virtual RunStepResponse* get_proto() = 0; 661 friend class MasterInterface; 662 }; 663 664 class InMemoryRunStepResponse : public MutableRunStepResponseWrapper { 665 public: 666 // MutableRunStepResponseWrapper methods. 667 size_t num_tensors() const override; 668 const string& tensor_name(size_t i) const override; 669 Status TensorValue(size_t i, Tensor* out_tensor) const override; 670 Status AddTensorFromRunGraphResponse( 671 const string& name, MutableRunGraphResponseWrapper* run_graph_response, 672 size_t i) override; 673 const RunMetadata& metadata() const override; 674 RunMetadata* mutable_metadata() override; 675 errors::Code status_code() const override; 676 const string& status_error_message() const override; 677 void set_status(const Status& status) override; 678 679 protected: 680 // NOTE: This method is not implemented. See 681 // MutableRunGraphResponseWrapper for an explanation. 682 RunStepResponse* get_proto() override; 683 684 private: 685 gtl::InlinedVector<std::pair<string, Tensor>, 4> tensors_; 686 RunMetadata metadata_; 687 // Store the code and message separately so that they can be updated 688 // independently by setters. 689 Status status_; 690 }; 691 692 // Proto-based message wrapper for use on the client side of the RunStep RPC. 693 class OwnedProtoRunStepResponse : public MutableRunStepResponseWrapper { 694 public: 695 // MutableRunStepResponseWrapper methods. 696 size_t num_tensors() const override; 697 const string& tensor_name(size_t i) const override; 698 Status TensorValue(size_t i, Tensor* out_tensor) const override; 699 Status AddTensorFromRunGraphResponse( 700 const string& name, MutableRunGraphResponseWrapper* run_graph_response, 701 size_t i) override; 702 const RunMetadata& metadata() const override; 703 RunMetadata* mutable_metadata() override; 704 errors::Code status_code() const override; 705 const string& status_error_message() const override; 706 void set_status(const Status& status) override; 707 708 protected: 709 RunStepResponse* get_proto() override; 710 711 private: 712 RunStepResponse response_; 713 }; 714 715 // Proto-based message wrapper for use on the server side of the RunStep RPC. 716 class NonOwnedProtoRunStepResponse : public MutableRunStepResponseWrapper { 717 public: 718 NonOwnedProtoRunStepResponse(RunStepResponse* response); 719 720 // MutableRunStepResponseWrapper methods. 721 size_t num_tensors() const override; 722 const string& tensor_name(size_t i) const override; 723 Status TensorValue(size_t i, Tensor* out_tensor) const override; 724 Status AddTensorFromRunGraphResponse( 725 const string& name, MutableRunGraphResponseWrapper* run_graph_response, 726 size_t i) override; 727 const RunMetadata& metadata() const override; 728 RunMetadata* mutable_metadata() override; 729 errors::Code status_code() const override; 730 const string& status_error_message() const override; 731 void set_status(const Status& status) override; 732 733 protected: 734 RunStepResponse* get_proto() override; 735 736 private: 737 RunStepResponse* response_; // Not owned. 738 }; 739 740 bool ParseTensorProtoToTensor(const TensorProto& tensor_proto, 741 Tensor* out_tensor); 742 743 } // namespace tensorflow 744 745 #endif // TENSORFLOW_CORE_DISTRIBUTED_RUNTIME_MESSAGE_WRAPPERS_H_ 746