• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1syntax = "proto3";
2
3package tensorflow.eager;
4
5import "tensorflow/core/framework/attr_value.proto";
6import "tensorflow/core/framework/device_attributes.proto";
7import "tensorflow/core/framework/function.proto";
8import "tensorflow/core/framework/tensor.proto";
9import "tensorflow/core/framework/tensor_shape.proto";
10import "tensorflow/core/framework/versions.proto";
11import "tensorflow/core/protobuf/remote_tensor_handle.proto";
12import "tensorflow/core/protobuf/tensorflow_server.proto";
13
14option go_package = "github.com/tensorflow/tensorflow/tensorflow/go/core/protobuf/for_core_protos_go_proto";
15
16// A proto representation of an eager operation.
17message Operation {
18  // A unique identifier for the operation. Set by the client so that the client
19  // can uniquely identify the outputs of the scheduled operation.
20  //
21  // In the initial implementation, sending duplicate IDs has undefined
22  // behaviour, but additional constraints may be placed upon this in the
23  // future.
24  int64 id = 1;
25  string name = 2;
26
27  message Input {
28    oneof item {
29      RemoteTensorHandle remote_handle = 1;
30      TensorProto tensor = 2;
31    }
32  }
33
34  repeated Input op_inputs = 10;
35
36  // Control Operation IDs that will be respected when ops are re-ordered by
37  // async execution. If async execution (+ op re-ordering) is not enabled, this
38  // should have no effect.
39  repeated int64 control_op_ids = 4;
40  map<string, AttrValue> attrs = 5;
41  string device = 6;
42
43  // Indicates whether the op is a component of a multi-device function.
44  bool is_component_function = 7;
45  // Set when is_component_function is true. It's initially generated
46  // when we create an FunctionLibraryRuntime::Options (negative value) and used
47  // to create Rendezvous for function execution. All components of a
48  // multi-device function should use the same step id to make sure that they
49  // can communicate through Send/Recv ops.
50  int64 func_step_id = 8;
51  // Indicates whether the op is a function.
52  bool is_function = 9;
53
54  reserved 3;
55}
56
57message QueueItem {
58  // The remote executor should be able to handle either executing ops directly,
59  // or releasing any unused tensor handles, since the tensor lifetime is
60  // maintained by the client.
61  oneof item {
62    RemoteTensorHandle handle_to_decref = 1;
63    Operation operation = 2;
64    SendTensorOp send_tensor = 3;
65    // Takes a FunctionDef and makes it enqueable on the remote worker.
66    RegisterFunctionOp register_function = 4;
67    CleanupFunctionOp cleanup_function = 5;
68    // A remote executor is created to execute ops/functions asynchronously
69    // enqueued in streaming call. Request with this item type waits for pending
70    // nodes to finish on the remote executor and report status.
71    SyncRemoteExecutorForStream sync_remote_executor_for_stream = 6;
72    SendPackedHandleOp send_packed_handle = 7;
73  }
74}
75
76message QueueResponse {
77  // `shape` and `tensor` cannot be set in the same response.
78  // Shapes of output tensors for creating remote TensorHandles.
79  repeated TensorShapeProto shape = 1;
80  // Optional. If set, represents the output devices of a function.
81  repeated string device = 3;
82
83  // Output tensors of a remote function. Set when Operation.id is invalid.
84  repeated TensorProto tensor = 2;
85}
86
87message CreateContextRequest {
88  // Identifies the full cluster, and this particular worker's position within.
89  ServerDef server_def = 1;
90
91  // Whether the ops on the worker should be executed synchronously or
92  // asynchronously. By default, ops are executed synchronously.
93  bool async = 2;
94
95  // Number of seconds to keep the context alive. If more than keep_alive_secs
96  // has passed since a particular context has been communicated with, it will
97  // be garbage collected.
98  int64 keep_alive_secs = 3;
99
100  // This is the version for all the ops that will be enqueued by the client.
101  VersionDef version_def = 4;
102
103  // Device attributes in the cluster
104  repeated DeviceAttributes cluster_device_attributes = 6;
105
106  // The ID of the created context. This is usually a randomly generated number,
107  // that will be used to identify the context in future requests to the
108  // service. Contexts are not persisted through server restarts.
109  // This ID will be used for all future communications as well. It is essential
110  // that both ends use this ID for selecting a rendezvous to get everything to
111  // match.
112  fixed64 context_id = 7;
113
114  // The view ID of the context.
115  fixed64 context_view_id = 8;
116
117  // For a multi device function, if false, eagerly copy all remote inputs to
118  // the default function device; if true, lazily copy remote inputs to their
119  // target devices after function instantiation to avoid redundant copies.
120  bool lazy_copy_remote_function_inputs = 9;
121
122  reserved 5;
123}
124
125message CreateContextResponse {
126  // List of devices that are locally accessible to the worker.
127  repeated DeviceAttributes device_attributes = 2;
128
129  reserved 1;
130}
131
132message UpdateContextRequest {
133  // Identifies the full cluster, and this particular worker's position within.
134  ServerDef server_def = 1;
135
136  // Device attributes in the cluster.
137  // If this field is empty, it indicates that this is a simple update request
138  // that only increments the cluster view ID and does not require changes to
139  // the workers it connects to.
140  repeated DeviceAttributes cluster_device_attributes = 2;
141
142  // The ID of the context to be updated. A context with the specified ID must
143  // already exist on the recepient server of this request.
144  fixed64 context_id = 3;
145
146  // The view ID of the context, which should be contiguously incremented when
147  // updating the same context.
148  fixed64 context_view_id = 4;
149}
150
151message UpdateContextResponse {
152  // List of devices that are locally accessible to the worker.
153  repeated DeviceAttributes device_attributes = 1;
154}
155
156message EnqueueRequest {
157  fixed64 context_id = 1;
158
159  repeated QueueItem queue = 3;
160}
161
162message EnqueueResponse {
163  // A single operation response for every item in the request.
164  repeated QueueResponse queue_response = 1;
165}
166
167message WaitQueueDoneRequest {
168  fixed64 context_id = 1;
169
170  // Ids to wait on. If empty, wait on everything currently pending.
171  repeated int64 op_id = 2;
172}
173
174message WaitQueueDoneResponse {
175  // TODO(nareshmodi): Consider adding NodeExecStats here to be able to
176  // propagate some stats.
177}
178
179message RunComponentFunctionRequest {
180  fixed64 context_id = 1;
181
182  Operation operation = 2;
183
184  // The output indices of its parent function.
185  repeated int32 output_num = 3;
186}
187
188message RunComponentFunctionResponse {
189  repeated TensorShapeProto shape = 1;
190
191  repeated TensorProto tensor = 2;
192}
193
194message KeepAliveRequest {
195  fixed64 context_id = 1;
196}
197
198message KeepAliveResponse {
199  // If the requested context_id is on the remote host, set the context view ID.
200  fixed64 context_view_id = 1;
201}
202
203message CloseContextRequest {
204  fixed64 context_id = 1;
205  fixed64 context_view_id = 2;
206}
207
208message CloseContextResponse {}
209
210message RegisterFunctionOp {
211  FunctionDef function_def = 1;
212
213  // If true, it means that function_def is produced by graph partition during
214  // multi-device function instantiation.
215  bool is_component_function = 2;
216
217  // All necessary FunctionDefs and GradientDefs to expand `function_def`.
218  // When is_component_function is true, `function_def` could be a nested
219  // function, since some nodes in its parent's function body could be
220  // replaced with a new function by the graph optimization passes. No need to
221  // add FunctionDefs here to the function cache in EagerContext since they
222  // won't be executed as KernelAndDevices.
223  FunctionDefLibrary library = 3;
224}
225
226// Cleanup the step state of a multi-device function (e.g. tensors buffered by
227// a `Send` op but not picked up by its corresponding `Recv` op).
228message CleanupFunctionOp {
229  int64 step_id = 1;
230}
231
232message SyncRemoteExecutorForStream {}
233
234message SendTensorOp {
235  // All remote tensors are identified by <Op ID, Output num>. To mimic this
236  // situation when directly sending tensors, we include an "artificial" op ID
237  // (which would have corresponded to the _Recv op when not using SendTensor).
238  int64 op_id = 1;
239  // The index within the repeated field is the output number that will help
240  // uniquely identify (along with the above op_id) the particular tensor.
241  repeated TensorProto tensors = 2;
242
243  // The device on which the tensors should be resident.
244  string device_name = 3;
245}
246
247// Send a packed TensorHandle to a remote worker.
248message SendPackedHandleOp {
249  // Op id of the remote packed TensorHandle.
250  int64 op_id = 1;
251
252  message LocalTensorHandle {
253    TensorProto tensor = 1;
254    // Device where the tensor is produced.
255    string device = 2;
256  }
257
258  message Handle {
259    oneof item {
260      LocalTensorHandle local_handle = 1;
261      RemoteTensorHandle remote_handle = 2;
262    }
263  }
264
265  repeated Handle handles = 2;
266
267  string device_name = 3;
268}
269
270////////////////////////////////////////////////////////////////////////////////
271//
272// Eager Service defines a TensorFlow service that executes operations eagerly
273// on a set of local devices, on behalf of a remote Eager executor.
274//
275// The service impl will keep track of the various clients and devices it has
276// access to and allows the client to enqueue ops on any devices that it is able
277// to access and schedule data transfers from/to any of the peers.
278//
279// A client can generate multiple contexts to be able to independently execute
280// operations, but cannot share data between the two contexts.
281//
282// NOTE: Even though contexts generated by clients should be independent, the
283// lower level tensorflow execution engine is not, so they might share some data
284// (e.g. a Device's ResourceMgr).
285//
286////////////////////////////////////////////////////////////////////////////////
287service EagerService {
288  // This initializes the worker, informing it about the other workers in the
289  // cluster and exchanging authentication tokens which will be used in all
290  // other RPCs to detect whether the worker has restarted.
291  rpc CreateContext(CreateContextRequest) returns (CreateContextResponse);
292
293  // This updates the eager context on an existing worker when updating the set
294  // of servers in a distributed eager cluster.
295  rpc UpdateContext(UpdateContextRequest) returns (UpdateContextResponse);
296
297  // This takes a list of Execute and DeleteTensorHandle operations and enqueues
298  // (in async mode) or executes (in sync mode) them on the remote server.
299  // All outputs of ops which were not explicitly deleted with
300  // DeleteTensorHandle entries will be assumed to be alive and are usable by
301  // future calls to Enqueue.
302  rpc Enqueue(EnqueueRequest) returns (EnqueueResponse);
303
304  // A streaming version of Enqueue.
305  // Current server implementation sends one response per received request.
306  // The benefit for using a streaming version is that subsequent requests
307  // can be sent without waiting for a response to the previous request. This
308  // synchronization is required in the regular Enqueue call because gRPC does
309  // not guarantee to preserve request order.
310  rpc StreamingEnqueue(stream EnqueueRequest) returns (stream EnqueueResponse);
311
312  // Takes a set of op IDs and waits until those ops are done. Returns any error
313  // in the stream so far.
314  rpc WaitQueueDone(WaitQueueDoneRequest) returns (WaitQueueDoneResponse);
315
316  // This takes an Eager operation and executes it in async mode on the remote
317  // server. Different from EnqueueRequest, ops/functions sent through this
318  // type of requests are allowed to execute in parallel and no ordering is
319  // preserved by RPC stream or executor.
320  // This request type should only be used for executing component functions.
321  // Ordering of component functions should be enforced by their corresponding
322  // main functions. The runtime ensures the following invarients for component
323  // functions (CFs) and their main functions (MFs):
324  // (1) MF1 -> MF2 ==> CF1 -> CF2 ("->" indicates order of execution);
325  // (2) MF1 || MF2 ==> CF1 || CF2 ("||" indicates possible parallel execution);
326  // (3) For CF1 and CF2 that come from the same MF, CF1 || CF2
327  // For executing ops/main functions, use Enqueue or StreamingEnqueue instead
328  // for correct ordering.
329  rpc RunComponentFunction(RunComponentFunctionRequest)
330      returns (RunComponentFunctionResponse);
331
332  // Contexts are always created with a deadline and no RPCs within a deadline
333  // will trigger a context garbage collection. KeepAlive calls can be used to
334  // delay this. It can also be used to validate the existence of a context ID
335  // on remote eager worker. If the context is on remote worker, return the same
336  // ID and the current context view ID. This is useful for checking if the
337  // remote worker (potentially with the same task name and hostname / port) is
338  // replaced with a new process.
339  rpc KeepAlive(KeepAliveRequest) returns (KeepAliveResponse);
340
341  // Closes the context. No calls to other methods using the existing context ID
342  // are valid after this.
343  rpc CloseContext(CloseContextRequest) returns (CloseContextResponse);
344}
345