1// Copyright 2019 The Pigweed Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); you may not 4// use this file except in compliance with the License. You may obtain a copy of 5// the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12// License for the specific language governing permissions and limitations under 13// the License. 14 15// Package pw_target_runner implements a target runner gRPC server which queues 16// and distributes executables among a group of worker routines. 17package pw_target_runner 18 19import ( 20 "context" 21 "errors" 22 "fmt" 23 "log" 24 "net" 25 "time" 26 27 "google.golang.org/grpc" 28 "google.golang.org/grpc/codes" 29 "google.golang.org/grpc/reflection" 30 "google.golang.org/grpc/status" 31 32 pb "pigweed/proto/pw_target_runner/target_runner_pb" 33) 34 35var ( 36 errServerNotBound = errors.New("Server not bound to a port") 37 errServerNotRunning = errors.New("Server is not running") 38) 39 40// Server is a gRPC server that runs a TargetRunner service. 41type Server struct { 42 grpcServer *grpc.Server 43 listener net.Listener 44 tasksPassed uint32 45 tasksFailed uint32 46 startTime time.Time 47 active bool 48 workerPool *WorkerPool 49} 50 51// NewServer creates a gRPC server with a registered TargetRunner service. 52func NewServer() *Server { 53 s := &Server{ 54 grpcServer: grpc.NewServer(), 55 workerPool: newWorkerPool("ServerWorkerPool"), 56 } 57 58 reflection.Register(s.grpcServer) 59 pb.RegisterTargetRunnerServer(s.grpcServer, &pwTargetRunnerService{s}) 60 61 return s 62} 63 64// Bind starts a TCP listener on a specified port. 65func (s *Server) Bind(port int) error { 66 lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) 67 if err != nil { 68 return err 69 } 70 s.listener = lis 71 return nil 72} 73 74// RegisterWorker adds a worker to the server's worker pool. 75func (s *Server) RegisterWorker(worker DeviceRunner) { 76 s.workerPool.RegisterWorker(worker) 77} 78 79// RunBinary runs an executable through a worker in the server, returning 80// the worker's response. The function blocks until the executable has been 81// processed. 82func (s *Server) RunBinary(path string) (*RunResponse, error) { 83 if !s.active { 84 return nil, errServerNotRunning 85 } 86 87 resChan := make(chan *RunResponse, 1) 88 defer close(resChan) 89 90 s.workerPool.QueueExecutable(&RunRequest{ 91 Path: path, 92 ResponseChannel: resChan, 93 }) 94 95 res := <-resChan 96 97 if res.Err != nil { 98 return nil, res.Err 99 } 100 101 if res.Status == pb.RunStatus_SUCCESS { 102 s.tasksPassed++ 103 } else { 104 s.tasksFailed++ 105 } 106 107 return res, nil 108} 109 110// Serve starts the gRPC server on its configured port. Bind must have been 111// called before this; an error is returned if it is not. This function blocks 112// until the server is terminated. 113func (s *Server) Serve() error { 114 if s.listener == nil { 115 return errServerNotBound 116 } 117 118 log.Printf("Starting gRPC server on %v\n", s.listener.Addr()) 119 120 s.startTime = time.Now() 121 s.active = true 122 s.workerPool.Start() 123 124 return s.grpcServer.Serve(s.listener) 125} 126 127// pwTargetRunnerService implements the pw.target_runner.TargetRunner gRPC 128// service. 129type pwTargetRunnerService struct { 130 server *Server 131} 132 133// RunBinary runs a single executable on-device and returns its result. 134func (s *pwTargetRunnerService) RunBinary( 135 ctx context.Context, 136 desc *pb.RunBinaryRequest, 137) (*pb.RunBinaryResponse, error) { 138 runRes, err := s.server.RunBinary(desc.FilePath) 139 if err != nil { 140 return nil, status.Error(codes.Internal, "Internal server error") 141 } 142 143 res := &pb.RunBinaryResponse{ 144 Result: runRes.Status, 145 QueueTimeNs: uint64(runRes.QueueTime), 146 RunTimeNs: uint64(runRes.RunTime), 147 Output: runRes.Output, 148 } 149 return res, nil 150} 151 152// Status returns information about the server. 153func (s *pwTargetRunnerService) Status( 154 ctx context.Context, 155 _ *pb.Empty, 156) (*pb.ServerStatus, error) { 157 resp := &pb.ServerStatus{ 158 UptimeNs: uint64(time.Since(s.server.startTime)), 159 TasksPassed: s.server.tasksPassed, 160 TasksFailed: s.server.tasksFailed, 161 } 162 163 return resp, nil 164} 165