• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright 2019 The Pigweed Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not
4// use this file except in compliance with the License. You may obtain a copy of
5// the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12// License for the specific language governing permissions and limitations under
13// the License.
14
15// Package pw_target_runner implements a target runner gRPC server which queues
16// and distributes executables among a group of worker routines.
17package pw_target_runner
18
19import (
20	"context"
21	"errors"
22	"fmt"
23	"log"
24	"net"
25	"time"
26
27	"google.golang.org/grpc"
28	"google.golang.org/grpc/codes"
29	"google.golang.org/grpc/reflection"
30	"google.golang.org/grpc/status"
31
32	pb "pigweed/proto/pw_target_runner/target_runner_pb"
33)
34
35var (
36	errServerNotBound   = errors.New("Server not bound to a port")
37	errServerNotRunning = errors.New("Server is not running")
38)
39
40// Server is a gRPC server that runs a TargetRunner service.
41type Server struct {
42	grpcServer  *grpc.Server
43	listener    net.Listener
44	tasksPassed uint32
45	tasksFailed uint32
46	startTime   time.Time
47	active      bool
48	workerPool  *WorkerPool
49}
50
51// NewServer creates a gRPC server with a registered TargetRunner service.
52func NewServer() *Server {
53	s := &Server{
54		grpcServer: grpc.NewServer(),
55		workerPool: newWorkerPool("ServerWorkerPool"),
56	}
57
58	reflection.Register(s.grpcServer)
59	pb.RegisterTargetRunnerServer(s.grpcServer, &pwTargetRunnerService{s})
60
61	return s
62}
63
64// Bind starts a TCP listener on a specified port.
65func (s *Server) Bind(port int) error {
66	lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
67	if err != nil {
68		return err
69	}
70	s.listener = lis
71	return nil
72}
73
74// RegisterWorker adds a worker to the server's worker pool.
75func (s *Server) RegisterWorker(worker DeviceRunner) {
76	s.workerPool.RegisterWorker(worker)
77}
78
79// RunBinary runs an executable through a worker in the server, returning
80// the worker's response. The function blocks until the executable has been
81// processed.
82func (s *Server) RunBinary(path string) (*RunResponse, error) {
83	if !s.active {
84		return nil, errServerNotRunning
85	}
86
87	resChan := make(chan *RunResponse, 1)
88	defer close(resChan)
89
90	s.workerPool.QueueExecutable(&RunRequest{
91		Path:            path,
92		ResponseChannel: resChan,
93	})
94
95	res := <-resChan
96
97	if res.Err != nil {
98		return nil, res.Err
99	}
100
101	if res.Status == pb.RunStatus_SUCCESS {
102		s.tasksPassed++
103	} else {
104		s.tasksFailed++
105	}
106
107	return res, nil
108}
109
110// Serve starts the gRPC server on its configured port. Bind must have been
111// called before this; an error is returned if it is not. This function blocks
112// until the server is terminated.
113func (s *Server) Serve() error {
114	if s.listener == nil {
115		return errServerNotBound
116	}
117
118	log.Printf("Starting gRPC server on %v\n", s.listener.Addr())
119
120	s.startTime = time.Now()
121	s.active = true
122	s.workerPool.Start()
123
124	return s.grpcServer.Serve(s.listener)
125}
126
127// pwTargetRunnerService implements the pw.target_runner.TargetRunner gRPC
128// service.
129type pwTargetRunnerService struct {
130	server *Server
131}
132
133// RunBinary runs a single executable on-device and returns its result.
134func (s *pwTargetRunnerService) RunBinary(
135	ctx context.Context,
136	desc *pb.RunBinaryRequest,
137) (*pb.RunBinaryResponse, error) {
138	runRes, err := s.server.RunBinary(desc.FilePath)
139	if err != nil {
140		return nil, status.Error(codes.Internal, "Internal server error")
141	}
142
143	res := &pb.RunBinaryResponse{
144		Result:      runRes.Status,
145		QueueTimeNs: uint64(runRes.QueueTime),
146		RunTimeNs:   uint64(runRes.RunTime),
147		Output:      runRes.Output,
148	}
149	return res, nil
150}
151
152// Status returns information about the server.
153func (s *pwTargetRunnerService) Status(
154	ctx context.Context,
155	_ *pb.Empty,
156) (*pb.ServerStatus, error) {
157	resp := &pb.ServerStatus{
158		UptimeNs:    uint64(time.Since(s.server.startTime)),
159		TasksPassed: s.server.tasksPassed,
160		TasksFailed: s.server.tasksFailed,
161	}
162
163	return resp, nil
164}
165