1syntax = "proto3"; 2 3package tensorflow; 4 5import "tensorflow/compiler/xla/pjrt/distributed/protocol.proto"; 6import "tensorflow/core/framework/device_attributes.proto"; 7 8option go_package = "github.com/tensorflow/tensorflow/tensorflow/go/core/protobuf/for_core_protos_go_proto"; 9 10// Represents a remote worker task, specified by job name and task id. 11message CoordinatedTask { 12 string job_name = 1; 13 int32 task_id = 2; 14} 15 16// Represents the state of a remote worker 17enum CoordinatedTaskState { 18 // TASKSTATE_UNSPECIFIED is an invalid state such that indicates a bug. 19 TASKSTATE_UNSPECIFIED = 0; 20 // TASKSTATE_UNINITIALIZED is an agent-only state. While the agent is 21 // disconnected, the service has no way of knowing if the task is 22 // initialized/uninitialized. 23 TASKSTATE_UNINITIALIZED = 1; 24 TASKSTATE_DISCONNECTED = 2; 25 TASKSTATE_CONNECTED = 3; 26 TASKSTATE_ERROR = 4; 27} 28 29// Status payload for all coordination service errors. 30// Note: an empty proto may be set if the error is triggered by the task's own 31// agent calls (i.e. not propagated by the service from another remote task). 32message CoordinationServiceError { 33 // Removed fields which used to specify the error origin. 34 reserved 1, 2; 35 // If true, error is reported via the agent API by the user (and not an 36 // internal service error). 37 bool is_reported_error = 3; 38 // Denotes which task hit the error. If unset, the error originated from the 39 // same task that is processing this error. 40 CoordinatedTask source_task = 4; 41} 42 43// Represent device information from different runtimes. 44message TfDeviceList { 45 repeated DeviceAttributes devices = 1; 46} 47message XlaDeviceList { 48 xla.GlobalTopologyProto devices = 1; 49} 50message CoordinationServiceDeviceInfo { 51 oneof type { 52 TfDeviceList tf = 1; 53 XlaDeviceList xla = 2; 54 } 55} 56 57// Request and response messages for registering a task to the cluster leader. 58// A task is uniquely represented by its `job_name`, `task_id` and 59// `incarnation`. Leader responds with its `incarnation` to identify a leader 60// process. 61message RegisterTaskRequest { 62 // Removed fields which used to specify the task. 63 reserved 1, 2; 64 fixed64 incarnation = 3; 65 // Moved the field `local_device_attributes` from this request message to 66 // WaitForAllTasksRequest defined below. 67 reserved 4; 68 CoordinatedTask source_task = 5; 69} 70 71message RegisterTaskResponse { 72 fixed64 leader_incarnation = 1; 73} 74 75// Request and response messages for sending heartbeats. 76message HeartbeatRequest { 77 // Removed fields which used to specify the remote task. 78 reserved 1, 2; 79 fixed64 incarnation = 3; 80 CoordinatedTask source_task = 4; 81} 82 83message HeartbeatResponse { 84 fixed64 leader_incarnation = 1; 85 // If there are failures in cluster, use additional metadata in response to 86 // broadcast error code and message to other tasks. 87} 88 89// Request and response messages for waiting for all tasks. 90message WaitForAllTasksRequest { 91 // Removed fields which used to specify the remote task. 92 reserved 1, 2; 93 // Removed field that specifically used TF device info. 94 reserved 3; 95 // All local device attributes on the request sender. 96 CoordinationServiceDeviceInfo local_device_info = 4; 97 CoordinatedTask source_task = 5; 98} 99 100message WaitForAllTasksResponse { 101 fixed64 leader_incarnation = 1; 102 // Removed field that specifically used TF device info. 103 reserved 2; 104 // All devices in the cluster. 105 CoordinationServiceDeviceInfo cluster_device_info = 3; 106} 107 108// Request and response messages for disconnecting a task from the service. 109message ShutdownTaskRequest { 110 CoordinatedTask source_task = 1; 111} 112 113message ShutdownTaskResponse {} 114 115// Request and response messages for resetting a task state in the service. 116message ResetTaskRequest { 117 CoordinatedTask source_task = 1; 118} 119 120message ResetTaskResponse {} 121 122// Request and response messages for reporting errors to task. 123message ReportErrorToTaskRequest { 124 int32 error_code = 1; 125 string error_message = 2; 126 // Removed fields that are embedded in payload. 127 reserved 3, 4; 128 CoordinationServiceError error_payload = 5; 129} 130 131message ReportErrorToTaskResponse {} 132 133// Request and response messages for reporting errors to service instance. 134message ReportErrorToServiceRequest { 135 int32 error_code = 1; 136 string error_message = 2; 137 // Removed fields which used to specify the error origin. 138 reserved 3, 4; 139 CoordinatedTask error_origin = 5; 140} 141 142message ReportErrorToServiceResponse {} 143 144// Message for configuration key value. 145// Key is structured like Unix file system, with multiple levels of directory 146// names separated by the slash ('/') characters. 147message KeyValueEntry { 148 string key = 1; 149 bytes value = 2; 150} 151 152// Request and response messages for inserting configuration key-value data. 153message InsertKeyValueRequest { 154 KeyValueEntry kv = 1; 155} 156 157message InsertKeyValueResponse {} 158 159// Request and response messages for getting configuration key-value data. 160message GetKeyValueRequest { 161 string key = 1; 162} 163 164message GetKeyValueResponse { 165 KeyValueEntry kv = 1; 166} 167 168message TryGetKeyValueRequest { 169 string key = 1; 170} 171 172message TryGetKeyValueResponse { 173 KeyValueEntry kv = 1; 174} 175 176message GetKeyValueDirRequest { 177 string directory_key = 1; 178} 179 180message GetKeyValueDirResponse { 181 string directory_key = 1; 182 repeated KeyValueEntry kv = 2; 183} 184 185// Request and response messages for deleting configuration key-value data. 186// When is_directory is true, delete key-values recursively under `key`. 187message DeleteKeyValueRequest { 188 string key = 1; 189 bool is_directory = 2; 190} 191 192message DeleteKeyValueResponse {} 193 194// Request and response messages for generic sync barriers. 195message BarrierRequest { 196 string barrier_id = 1; 197 int64 barrier_timeout_in_ms = 2; 198 // Denotes list of tasks that will wait for the barrier. If unspecified, it 199 // implies that the entire cluster is participating in the barrier. 200 repeated CoordinatedTask tasks = 3; 201 // Task that is making the request. 202 CoordinatedTask source_task = 4; 203} 204 205message BarrierResponse {} 206 207// Request and response messages for cancelling generic sync barriers. 208message CancelBarrierRequest { 209 string barrier_id = 1; 210 // Task that is making the request. 211 CoordinatedTask source_task = 2; 212} 213 214message CancelBarrierResponse {} 215 216// Coordination Service defines a TensorFlow service that controls and 217// coordinates distributed execution in a cluster of multiple tasks. 218// 219// The service keeps track of the cluster configuration and the state of cluster 220// members or the leader depending on the role of the current task. The 221// distributed runtime leverages this service to coordinate and perform cluster 222// initialization, check the healthiness of tasks, and propagate error 223// messages to the cluster. 224service CoordinationService { 225 // Register task to coordination service so that the service starts to track 226 // liveness of the task. RPC blocks and returns only when it registers to 227 // the service successfully, or error happens in the registering process. 228 rpc RegisterTask(RegisterTaskRequest) returns (RegisterTaskResponse); 229 230 // Heartbeat message from task to coordination service. Heartbeat is sent from 231 // a task to refresh its timestamp on leader to avoid it becoming stale. 232 // RPC responds immediately after refreshing the timestamp on leader. 233 rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse); 234 235 // Wait for all tasks in the cluster to be up and running. The RPC request 236 // only gets responded when all tasks have registered, or some error occurs. 237 rpc WaitForAllTasks(WaitForAllTasksRequest) returns (WaitForAllTasksResponse); 238 239 // Disconnects task from the service. If `shutdown_barrier_timeout_in_ms` is 240 // specified in the config, blocks until all tasks reach the barrier before 241 // disconnecting together. If the barrier times out, tasks at the barrier will 242 // still disconnect, while an error is reported to tasks that did not reach 243 // the barrier on time. 244 rpc ShutdownTask(ShutdownTaskRequest) returns (ShutdownTaskResponse); 245 246 // Disconnects task from the service if it is in an ERROR state, thereby 247 // allowing it to reconnect via RegisterTask() in the future. 248 rpc ResetTask(ResetTaskRequest) returns (ResetTaskResponse); 249 250 // Report error to the task. RPC sets the receiving instance of coordination 251 // service agent to error state permanently. 252 // TODO(b/195990880): Consider splitting this into a different RPC service. 253 rpc ReportErrorToTask(ReportErrorToTaskRequest) 254 returns (ReportErrorToTaskResponse); 255 256 // Report task error to coordination service. RPC sets the service-side task 257 // state to error, and propagate the error to other tasks in the cluster. 258 rpc ReportErrorToService(ReportErrorToServiceRequest) 259 returns (ReportErrorToServiceResponse); 260 261 // Insert configuration key-value that will be accessible to all cluster 262 // tasks. The key can be formatted as Unix file path with hierarchy. The 263 // coordination service key-value store should only be used for cluster 264 // configuration data. 265 rpc InsertKeyValue(InsertKeyValueRequest) returns (InsertKeyValueResponse); 266 267 // Get configuration key-value. The request blocks until the key-value data 268 // becomes available (i.e., set by a task in the cluster). 269 rpc GetKeyValue(GetKeyValueRequest) returns (GetKeyValueResponse); 270 271 // Get configuration key-value. The request does not block, but returns an 272 // error if the requested key does not exist. 273 rpc TryGetKeyValue(TryGetKeyValueRequest) returns (TryGetKeyValueResponse); 274 275 // Same as GetKeyValue, but returns all values that have keys which are 276 // prefixed with the directory key. 277 rpc GetKeyValueDir(GetKeyValueDirRequest) returns (GetKeyValueDirResponse); 278 279 // Delete configuration key-value. If is_directory is set in request, 280 // recursively clean up all key-values under the path specified by `key`. 281 rpc DeleteKeyValue(DeleteKeyValueRequest) returns (DeleteKeyValueResponse); 282 283 // Blocks until all (or a subset of) tasks are at the barrier or the barrier 284 // fails. 285 // 286 // `barrier_id` should be unique across barriers. Once the barrier has passed 287 // or failed, subsequent calls will not block, and immediately respond with 288 // the previous response. 289 // 290 // The first WaitAtBarrier() call received by the service for a particular 291 // barrier id is special in that it determines the barrier deadline based on 292 // timeout duration. 293 // However, if subsequent calls by different agents specify a different set of 294 // `tasks` for the same `barrier_id`, the barrier will fail instantly. 295 // 296 // If no tasks are specified (default), the barrier will block for all the 297 // connected tasks. 298 // 299 // Possible service errors: 300 // - DeadlineExceeded: Timed out waiting for specified tasks at the barrier. 301 // Deadline is determined by the server timestamp when it receives the 302 // first WaitAtBarrier() + timeout duration. 303 // - Cancelled: One of the tasks called CancelBarrier(). 304 // - Aborted: Service is shutting down. 305 // - Internal: Any participating task is in ERROR state. 306 // - InvalidArgument: (1) Conflicting tasks specified by different agents 307 // for the same barrier, (2) one of the participating tasks is not in 308 // the cluster, or (3) task making the request is not included in the 309 // list of participating tasks. 310 rpc Barrier(BarrierRequest) returns (BarrierResponse); 311 312 // Aborts the barrier if it is ongoing. 313 // Current and future WaitAtBarrier() calls with the same id will return a 314 // CANCELLED error status. 315 // Possible service errors: 316 // - FailedPrecondition: Barrier has already been passed. 317 rpc CancelBarrier(CancelBarrierRequest) returns (CancelBarrierResponse); 318} 319