# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! """Client and server classes corresponding to protobuf-defined services.""" import grpc from chromiumos.longrunning import operations_pb2 as chromiumos_dot_longrunning_dot_operations__pb2 from chromiumos.test.api import dut_service_pb2 as chromiumos_dot_test_dot_api_dot_dut__service__pb2 class DutServiceStub(object): """Provides network based access to a device under test for remote command execution and device state/identity retrieval. """ def __init__(self, channel): """Constructor. Args: channel: A grpc.Channel. """ self.ExecCommand = channel.unary_stream( '/chromiumos.test.api.DutService/ExecCommand', request_serializer=chromiumos_dot_test_dot_api_dot_dut__service__pb2.ExecCommandRequest.SerializeToString, response_deserializer=chromiumos_dot_test_dot_api_dot_dut__service__pb2.ExecCommandResponse.FromString, ) self.FetchCrashes = channel.unary_stream( '/chromiumos.test.api.DutService/FetchCrashes', request_serializer=chromiumos_dot_test_dot_api_dot_dut__service__pb2.FetchCrashesRequest.SerializeToString, response_deserializer=chromiumos_dot_test_dot_api_dot_dut__service__pb2.FetchCrashesResponse.FromString, ) self.Restart = channel.unary_unary( '/chromiumos.test.api.DutService/Restart', request_serializer=chromiumos_dot_test_dot_api_dot_dut__service__pb2.RestartRequest.SerializeToString, response_deserializer=chromiumos_dot_longrunning_dot_operations__pb2.Operation.FromString, ) self.DetectDeviceConfigId = channel.unary_stream( '/chromiumos.test.api.DutService/DetectDeviceConfigId', request_serializer=chromiumos_dot_test_dot_api_dot_dut__service__pb2.DetectDeviceConfigIdRequest.SerializeToString, response_deserializer=chromiumos_dot_test_dot_api_dot_dut__service__pb2.DetectDeviceConfigIdResponse.FromString, ) self.FetchFile = channel.unary_stream( '/chromiumos.test.api.DutService/FetchFile', request_serializer=chromiumos_dot_test_dot_api_dot_dut__service__pb2.FetchFileRequest.SerializeToString, response_deserializer=chromiumos_dot_test_dot_api_dot_dut__service__pb2.File.FromString, ) self.Cache = channel.unary_unary( '/chromiumos.test.api.DutService/Cache', request_serializer=chromiumos_dot_test_dot_api_dot_dut__service__pb2.CacheRequest.SerializeToString, response_deserializer=chromiumos_dot_longrunning_dot_operations__pb2.Operation.FromString, ) self.ForceReconnect = channel.unary_unary( '/chromiumos.test.api.DutService/ForceReconnect', request_serializer=chromiumos_dot_test_dot_api_dot_dut__service__pb2.ForceReconnectRequest.SerializeToString, response_deserializer=chromiumos_dot_longrunning_dot_operations__pb2.Operation.FromString, ) class DutServiceServicer(object): """Provides network based access to a device under test for remote command execution and device state/identity retrieval. """ def ExecCommand(self, request, context): """ExecCommand runs a command on a DUT. The working directory is /. A tty is not spawned for the command. The user and group is root. All signals have their default dispositions and are not masked. The umask is set to 0. The environment contains: TERM=dumb PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/bin LANG=en_US.UTF-8 USER=root HOME=/root The environment MAY also contain SSH client variables. The environment SHALL NOT contain variables not mentioned above. If the stream is interrupted, the implementation MAY attempt to stop the command by sending SIGINT, SIGHUP, SIGTERM, or SIGKILL. """ context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') def FetchCrashes(self, request, context): """FetchCrashes gets a stream of all crash reports currently on the DUT. The stream returned may split up a crash over multiple `FetchCrashesResponse` protos. See the definition of that proto for details. This call is read-only: it doesn't delete the crashes that it reads. """ context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') def Restart(self, request, context): """Restart simply reboots a DUT and returns when done. This is necessary as we need to refresh our connection to the DUT at restart, and this allows a signaling to the server that the connection will be severed. """ context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') def DetectDeviceConfigId(self, request, context): """Scans the live device to determine device config identifiers. The returned scan config can then be used to reverse lookup the actual DeviceConfigId values and corresponding configs. """ context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') def FetchFile(self, request, context): """Fetch a file or dir from the device. The files will be returned via a tar'd bytestream. """ context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') def Cache(self, request, context): """Downloads files from GS to the DUT The files downloaded may be decompressed in this layer to save cycles (and space) in the DUT. This utilizes the cacheForDUT endpoint to download the files. """ context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') def ForceReconnect(self, request, context): """Used to reestablish connection to DUT in case of drops. This is needed in case the connection to the DUT is lost and we need to keep the connection. Previously this was done by the service, but as we try to accomplish true microservice functionality (i.e.: no side-effects) we removed it and gave the option for the user to reconnect if needed with whichever algorithm they prefer. """ context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') def add_DutServiceServicer_to_server(servicer, server): rpc_method_handlers = { 'ExecCommand': grpc.unary_stream_rpc_method_handler( servicer.ExecCommand, request_deserializer=chromiumos_dot_test_dot_api_dot_dut__service__pb2.ExecCommandRequest.FromString, response_serializer=chromiumos_dot_test_dot_api_dot_dut__service__pb2.ExecCommandResponse.SerializeToString, ), 'FetchCrashes': grpc.unary_stream_rpc_method_handler( servicer.FetchCrashes, request_deserializer=chromiumos_dot_test_dot_api_dot_dut__service__pb2.FetchCrashesRequest.FromString, response_serializer=chromiumos_dot_test_dot_api_dot_dut__service__pb2.FetchCrashesResponse.SerializeToString, ), 'Restart': grpc.unary_unary_rpc_method_handler( servicer.Restart, request_deserializer=chromiumos_dot_test_dot_api_dot_dut__service__pb2.RestartRequest.FromString, response_serializer=chromiumos_dot_longrunning_dot_operations__pb2.Operation.SerializeToString, ), 'DetectDeviceConfigId': grpc.unary_stream_rpc_method_handler( servicer.DetectDeviceConfigId, request_deserializer=chromiumos_dot_test_dot_api_dot_dut__service__pb2.DetectDeviceConfigIdRequest.FromString, response_serializer=chromiumos_dot_test_dot_api_dot_dut__service__pb2.DetectDeviceConfigIdResponse.SerializeToString, ), 'FetchFile': grpc.unary_stream_rpc_method_handler( servicer.FetchFile, request_deserializer=chromiumos_dot_test_dot_api_dot_dut__service__pb2.FetchFileRequest.FromString, response_serializer=chromiumos_dot_test_dot_api_dot_dut__service__pb2.File.SerializeToString, ), 'Cache': grpc.unary_unary_rpc_method_handler( servicer.Cache, request_deserializer=chromiumos_dot_test_dot_api_dot_dut__service__pb2.CacheRequest.FromString, response_serializer=chromiumos_dot_longrunning_dot_operations__pb2.Operation.SerializeToString, ), 'ForceReconnect': grpc.unary_unary_rpc_method_handler( servicer.ForceReconnect, request_deserializer=chromiumos_dot_test_dot_api_dot_dut__service__pb2.ForceReconnectRequest.FromString, response_serializer=chromiumos_dot_longrunning_dot_operations__pb2.Operation.SerializeToString, ), } generic_handler = grpc.method_handlers_generic_handler( 'chromiumos.test.api.DutService', rpc_method_handlers) server.add_generic_rpc_handlers((generic_handler,)) # This class is part of an EXPERIMENTAL API. class DutService(object): """Provides network based access to a device under test for remote command execution and device state/identity retrieval. """ @staticmethod def ExecCommand(request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None): return grpc.experimental.unary_stream(request, target, '/chromiumos.test.api.DutService/ExecCommand', chromiumos_dot_test_dot_api_dot_dut__service__pb2.ExecCommandRequest.SerializeToString, chromiumos_dot_test_dot_api_dot_dut__service__pb2.ExecCommandResponse.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @staticmethod def FetchCrashes(request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None): return grpc.experimental.unary_stream(request, target, '/chromiumos.test.api.DutService/FetchCrashes', chromiumos_dot_test_dot_api_dot_dut__service__pb2.FetchCrashesRequest.SerializeToString, chromiumos_dot_test_dot_api_dot_dut__service__pb2.FetchCrashesResponse.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @staticmethod def Restart(request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None): return grpc.experimental.unary_unary(request, target, '/chromiumos.test.api.DutService/Restart', chromiumos_dot_test_dot_api_dot_dut__service__pb2.RestartRequest.SerializeToString, chromiumos_dot_longrunning_dot_operations__pb2.Operation.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @staticmethod def DetectDeviceConfigId(request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None): return grpc.experimental.unary_stream(request, target, '/chromiumos.test.api.DutService/DetectDeviceConfigId', chromiumos_dot_test_dot_api_dot_dut__service__pb2.DetectDeviceConfigIdRequest.SerializeToString, chromiumos_dot_test_dot_api_dot_dut__service__pb2.DetectDeviceConfigIdResponse.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @staticmethod def FetchFile(request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None): return grpc.experimental.unary_stream(request, target, '/chromiumos.test.api.DutService/FetchFile', chromiumos_dot_test_dot_api_dot_dut__service__pb2.FetchFileRequest.SerializeToString, chromiumos_dot_test_dot_api_dot_dut__service__pb2.File.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @staticmethod def Cache(request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None): return grpc.experimental.unary_unary(request, target, '/chromiumos.test.api.DutService/Cache', chromiumos_dot_test_dot_api_dot_dut__service__pb2.CacheRequest.SerializeToString, chromiumos_dot_longrunning_dot_operations__pb2.Operation.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @staticmethod def ForceReconnect(request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None): return grpc.experimental.unary_unary(request, target, '/chromiumos.test.api.DutService/ForceReconnect', chromiumos_dot_test_dot_api_dot_dut__service__pb2.ForceReconnectRequest.SerializeToString, chromiumos_dot_longrunning_dot_operations__pb2.Operation.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata)