• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1syntax = "proto3";
2
3package tensorflow;
4
5import "tensorflow/compiler/xla/pjrt/distributed/protocol.proto";
6import "tensorflow/core/framework/device_attributes.proto";
7
8option go_package = "github.com/tensorflow/tensorflow/tensorflow/go/core/protobuf/for_core_protos_go_proto";
9
10// Represents a remote worker task, specified by job name and task id.
11message CoordinatedTask {
12  string job_name = 1;
13  int32 task_id = 2;
14}
15
16// Represents the state of a remote worker
17enum CoordinatedTaskState {
18  // TASKSTATE_UNSPECIFIED is an invalid state such that indicates a bug.
19  TASKSTATE_UNSPECIFIED = 0;
20  // TASKSTATE_UNINITIALIZED is an agent-only state. While the agent is
21  // disconnected, the service has no way of knowing if the task is
22  // initialized/uninitialized.
23  TASKSTATE_UNINITIALIZED = 1;
24  TASKSTATE_DISCONNECTED = 2;
25  TASKSTATE_CONNECTED = 3;
26  TASKSTATE_ERROR = 4;
27}
28
29// Status payload for all coordination service errors.
30// Note: an empty proto may be set if the error is triggered by the task's own
31// agent calls (i.e. not propagated by the service from another remote task).
32message CoordinationServiceError {
33  // Removed fields which used to specify the error origin.
34  reserved 1, 2;
35  // If true, error is reported via the agent API by the user (and not an
36  // internal service error).
37  bool is_reported_error = 3;
38  // Denotes which task hit the error. If unset, the error originated from the
39  // same task that is processing this error.
40  CoordinatedTask source_task = 4;
41}
42
43// Represent device information from different runtimes.
44message TfDeviceList {
45  repeated DeviceAttributes devices = 1;
46}
47message XlaDeviceList {
48  xla.GlobalTopologyProto devices = 1;
49}
50message CoordinationServiceDeviceInfo {
51  oneof type {
52    TfDeviceList tf = 1;
53    XlaDeviceList xla = 2;
54  }
55}
56
57// Request and response messages for registering a task to the cluster leader.
58// A task is uniquely represented by its `job_name`, `task_id` and
59// `incarnation`. Leader responds with its `incarnation` to identify a leader
60// process.
61message RegisterTaskRequest {
62  // Removed fields which used to specify the task.
63  reserved 1, 2;
64  fixed64 incarnation = 3;
65  // Moved the field `local_device_attributes` from this request message to
66  // WaitForAllTasksRequest defined below.
67  reserved 4;
68  CoordinatedTask source_task = 5;
69}
70
71message RegisterTaskResponse {
72  fixed64 leader_incarnation = 1;
73}
74
75// Request and response messages for sending heartbeats.
76message HeartbeatRequest {
77  // Removed fields which used to specify the remote task.
78  reserved 1, 2;
79  fixed64 incarnation = 3;
80  CoordinatedTask source_task = 4;
81}
82
83message HeartbeatResponse {
84  fixed64 leader_incarnation = 1;
85  // If there are failures in cluster, use additional metadata in response to
86  // broadcast error code and message to other tasks.
87}
88
89// Request and response messages for waiting for all tasks.
90message WaitForAllTasksRequest {
91  // Removed fields which used to specify the remote task.
92  reserved 1, 2;
93  // Removed field that specifically used TF device info.
94  reserved 3;
95  // All local device attributes on the request sender.
96  CoordinationServiceDeviceInfo local_device_info = 4;
97  CoordinatedTask source_task = 5;
98}
99
100message WaitForAllTasksResponse {
101  fixed64 leader_incarnation = 1;
102  // Removed field that specifically used TF device info.
103  reserved 2;
104  // All devices in the cluster.
105  CoordinationServiceDeviceInfo cluster_device_info = 3;
106}
107
108// Request and response messages for disconnecting a task from the service.
109message ShutdownTaskRequest {
110  CoordinatedTask source_task = 1;
111}
112
113message ShutdownTaskResponse {}
114
115// Request and response messages for resetting a task state in the service.
116message ResetTaskRequest {
117  CoordinatedTask source_task = 1;
118}
119
120message ResetTaskResponse {}
121
122// Request and response messages for reporting errors to task.
123message ReportErrorToTaskRequest {
124  int32 error_code = 1;
125  string error_message = 2;
126  // Removed fields that are embedded in payload.
127  reserved 3, 4;
128  CoordinationServiceError error_payload = 5;
129}
130
131message ReportErrorToTaskResponse {}
132
133// Request and response messages for reporting errors to service instance.
134message ReportErrorToServiceRequest {
135  int32 error_code = 1;
136  string error_message = 2;
137  // Removed fields which used to specify the error origin.
138  reserved 3, 4;
139  CoordinatedTask error_origin = 5;
140}
141
142message ReportErrorToServiceResponse {}
143
144// Message for configuration key value.
145// Key is structured like Unix file system, with multiple levels of directory
146// names separated by the slash ('/') characters.
147message KeyValueEntry {
148  string key = 1;
149  bytes value = 2;
150}
151
152// Request and response messages for inserting configuration key-value data.
153message InsertKeyValueRequest {
154  KeyValueEntry kv = 1;
155}
156
157message InsertKeyValueResponse {}
158
159// Request and response messages for getting configuration key-value data.
160message GetKeyValueRequest {
161  string key = 1;
162}
163
164message GetKeyValueResponse {
165  KeyValueEntry kv = 1;
166}
167
168message TryGetKeyValueRequest {
169  string key = 1;
170}
171
172message TryGetKeyValueResponse {
173  KeyValueEntry kv = 1;
174}
175
176message GetKeyValueDirRequest {
177  string directory_key = 1;
178}
179
180message GetKeyValueDirResponse {
181  string directory_key = 1;
182  repeated KeyValueEntry kv = 2;
183}
184
185// Request and response messages for deleting configuration key-value data.
186// When is_directory is true, delete key-values recursively under `key`.
187message DeleteKeyValueRequest {
188  string key = 1;
189  bool is_directory = 2;
190}
191
192message DeleteKeyValueResponse {}
193
194// Request and response messages for generic sync barriers.
195message BarrierRequest {
196  string barrier_id = 1;
197  int64 barrier_timeout_in_ms = 2;
198  // Denotes list of tasks that will wait for the barrier. If unspecified, it
199  // implies that the entire cluster is participating in the barrier.
200  repeated CoordinatedTask tasks = 3;
201  // Task that is making the request.
202  CoordinatedTask source_task = 4;
203}
204
205message BarrierResponse {}
206
207// Request and response messages for  cancelling generic sync barriers.
208message CancelBarrierRequest {
209  string barrier_id = 1;
210  // Task that is making the request.
211  CoordinatedTask source_task = 2;
212}
213
214message CancelBarrierResponse {}
215
216// Coordination Service defines a TensorFlow service that controls and
217// coordinates distributed execution in a cluster of multiple tasks.
218//
219// The service keeps track of the cluster configuration and the state of cluster
220// members or the leader depending on the role of the current task. The
221// distributed runtime leverages this service to coordinate and perform cluster
222// initialization, check the healthiness of tasks, and propagate error
223// messages to the cluster.
224service CoordinationService {
225  // Register task to coordination service so that the service starts to track
226  // liveness of the task. RPC blocks and returns only when it registers to
227  // the service successfully, or error happens in the registering process.
228  rpc RegisterTask(RegisterTaskRequest) returns (RegisterTaskResponse);
229
230  // Heartbeat message from task to coordination service. Heartbeat is sent from
231  // a task to refresh its timestamp on leader to avoid it becoming stale.
232  // RPC responds immediately after refreshing the timestamp on leader.
233  rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse);
234
235  // Wait for all tasks in the cluster to be up and running. The RPC request
236  // only gets responded when all tasks have registered, or some error occurs.
237  rpc WaitForAllTasks(WaitForAllTasksRequest) returns (WaitForAllTasksResponse);
238
239  // Disconnects task from the service. If `shutdown_barrier_timeout_in_ms` is
240  // specified in the config, blocks until all tasks reach the barrier before
241  // disconnecting together. If the barrier times out, tasks at the barrier will
242  // still disconnect, while an error is reported to tasks that did not reach
243  // the barrier on time.
244  rpc ShutdownTask(ShutdownTaskRequest) returns (ShutdownTaskResponse);
245
246  // Disconnects task from the service if it is in an ERROR state, thereby
247  // allowing it to reconnect via RegisterTask() in the future.
248  rpc ResetTask(ResetTaskRequest) returns (ResetTaskResponse);
249
250  // Report error to the task. RPC sets the receiving instance of coordination
251  // service agent to error state permanently.
252  // TODO(b/195990880): Consider splitting this into a different RPC service.
253  rpc ReportErrorToTask(ReportErrorToTaskRequest)
254      returns (ReportErrorToTaskResponse);
255
256  // Report task error to coordination service. RPC sets the service-side task
257  // state to error, and propagate the error to other tasks in the cluster.
258  rpc ReportErrorToService(ReportErrorToServiceRequest)
259      returns (ReportErrorToServiceResponse);
260
261  // Insert configuration key-value that will be accessible to all cluster
262  // tasks. The key can be formatted as Unix file path with hierarchy. The
263  // coordination service key-value store should only be used for cluster
264  // configuration data.
265  rpc InsertKeyValue(InsertKeyValueRequest) returns (InsertKeyValueResponse);
266
267  // Get configuration key-value. The request blocks until the key-value data
268  // becomes available (i.e., set by a task in the cluster).
269  rpc GetKeyValue(GetKeyValueRequest) returns (GetKeyValueResponse);
270
271  // Get configuration key-value. The request does not block, but returns an
272  // error if the requested key does not exist.
273  rpc TryGetKeyValue(TryGetKeyValueRequest) returns (TryGetKeyValueResponse);
274
275  // Same as GetKeyValue, but returns all values that have keys which are
276  // prefixed with the directory key.
277  rpc GetKeyValueDir(GetKeyValueDirRequest) returns (GetKeyValueDirResponse);
278
279  // Delete configuration key-value. If is_directory is set in request,
280  // recursively clean up all key-values under the path specified by `key`.
281  rpc DeleteKeyValue(DeleteKeyValueRequest) returns (DeleteKeyValueResponse);
282
283  // Blocks until all (or a subset of) tasks are at the barrier or the barrier
284  // fails.
285  //
286  // `barrier_id` should be unique across barriers. Once the barrier has passed
287  // or failed, subsequent calls will not block, and immediately respond with
288  // the previous response.
289  //
290  // The first WaitAtBarrier() call received by the service for a particular
291  // barrier id is special in that it determines the barrier deadline based on
292  // timeout duration.
293  // However, if subsequent calls by different agents specify a different set of
294  // `tasks` for the same `barrier_id`, the barrier will fail instantly.
295  //
296  // If no tasks are specified (default), the barrier will block for all the
297  // connected tasks.
298  //
299  // Possible service errors:
300  //   - DeadlineExceeded: Timed out waiting for specified tasks at the barrier.
301  //      Deadline is determined by the server timestamp when it receives the
302  //      first WaitAtBarrier() + timeout duration.
303  //   - Cancelled: One of the tasks called CancelBarrier().
304  //   - Aborted: Service is shutting down.
305  //   - Internal: Any participating task is in ERROR state.
306  //   - InvalidArgument: (1) Conflicting tasks specified by different agents
307  //       for the same barrier, (2) one of the participating tasks is not in
308  //       the cluster, or (3) task making the request is not included in the
309  //       list of participating tasks.
310  rpc Barrier(BarrierRequest) returns (BarrierResponse);
311
312  // Aborts the barrier if it is ongoing.
313  // Current and future WaitAtBarrier() calls with the same id will return a
314  // CANCELLED error status.
315  // Possible service errors:
316  //   - FailedPrecondition: Barrier has already been passed.
317  rpc CancelBarrier(CancelBarrierRequest) returns (CancelBarrierResponse);
318}
319