• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2020 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <chrono>
18 #include <ctime>
19 #include <fstream>
20 #include <iostream>
21 #include <memory>
22 #include <string>
23 
24 #include <grpcpp.h>
25 
26 #include "gnss_grpc_proxy.grpc.pb.h"
27 
28 #include <signal.h>
29 
30 #include <chrono>
31 #include <deque>
32 #include <mutex>
33 #include <sstream>
34 #include <thread>
35 #include <vector>
36 
37 #include <android-base/logging.h>
38 #include <android-base/strings.h>
39 #include <gflags/gflags.h>
40 
41 #include <common/libs/fs/shared_fd.h>
42 #include <common/libs/fs/shared_buf.h>
43 #include <common/libs/fs/shared_select.h>
44 #include <host/libs/config/cuttlefish_config.h>
45 
46 using grpc::Server;
47 using grpc::ServerBuilder;
48 using grpc::ServerContext;
49 using grpc::Status;
50 using gnss_grpc_proxy::SendNmeaRequest;
51 using gnss_grpc_proxy::SendNmeaReply;
52 using gnss_grpc_proxy::GnssGrpcProxy;
53 
54 DEFINE_int32(gnss_in_fd,
55              -1,
56              "File descriptor for the gnss's input channel");
57 DEFINE_int32(gnss_out_fd,
58              -1,
59              "File descriptor for the gnss's output channel");
60 
61 DEFINE_int32(gnss_grpc_port,
62              -1,
63              "Service port for gnss grpc");
64 
65 DEFINE_string(gnss_file_path,
66               "",
67               "NMEA file path for gnss grpc");
68 
69 constexpr char CMD_GET_LOCATION[] = "CMD_GET_LOCATION";
70 constexpr char CMD_GET_RAWMEASUREMENT[] = "CMD_GET_RAWMEASUREMENT";
71 constexpr char END_OF_MSG_MARK[] = "\n\n\n\n";
72 
73 constexpr uint32_t GNSS_SERIAL_BUFFER_SIZE = 4096;
74 // Logic and data behind the server's behavior.
75 class GnssGrpcProxyServiceImpl final : public GnssGrpcProxy::Service {
76   public:
GnssGrpcProxyServiceImpl(cuttlefish::SharedFD gnss_in,cuttlefish::SharedFD gnss_out)77     GnssGrpcProxyServiceImpl(cuttlefish::SharedFD gnss_in,
78                      cuttlefish::SharedFD gnss_out) : gnss_in_(gnss_in),
79                                                   gnss_out_(gnss_out) {}
SendNmea(ServerContext * context,const SendNmeaRequest * request,SendNmeaReply * reply)80     Status SendNmea(ServerContext* context, const SendNmeaRequest* request,
81                     SendNmeaReply* reply) override {
82       reply->set_reply("Received nmea record.");
83       auto buffer = request->nmea();
84       std::lock_guard<std::mutex> lock(cached_nmea_mutex);
85       cached_nmea = request->nmea();
86       return Status::OK;
87     }
88 
sendToSerial()89     void sendToSerial() {
90       std::lock_guard<std::mutex> lock(cached_nmea_mutex);
91       if (!isNMEA(cached_nmea)) {
92         return;
93       }
94       ssize_t bytes_written =
95           cuttlefish::WriteAll(gnss_in_, cached_nmea + END_OF_MSG_MARK);
96       if (bytes_written < 0) {
97           LOG(ERROR) << "Error writing to fd: " << gnss_in_->StrError();
98       }
99     }
100 
sendGnssRawToSerial()101     void sendGnssRawToSerial() {
102       std::lock_guard<std::mutex> lock(cached_gnss_raw_mutex);
103       if (!isGnssRawMeasurement(cached_gnss_raw)) {
104         return;
105       }
106       if (previous_cached_gnss_raw == cached_gnss_raw) {
107         // Skip for same record
108         return;
109       } else {
110         // Update cached data
111         LOG(DEBUG) << "Skip same record";
112         previous_cached_gnss_raw = cached_gnss_raw;
113       }
114       ssize_t bytes_written =
115           cuttlefish::WriteAll(gnss_in_, cached_gnss_raw + END_OF_MSG_MARK);
116       LOG(DEBUG) << "Send Gnss Raw to serial: bytes_written: " << bytes_written;
117       if (bytes_written < 0) {
118         LOG(ERROR) << "Error writing to fd: " << gnss_in_->StrError();
119       }
120     }
121 
StartServer()122     void StartServer() {
123       // Create a new thread to handle writes to the gnss and to the any client
124       // connected to the socket.
125       read_thread_ = std::thread([this]() { ReadLoop(); });
126     }
127 
StartReadNmeaFileThread()128     void StartReadNmeaFileThread() {
129       // Create a new thread to read nmea data.
130       nmea_file_read_thread_ =
131           std::thread([this]() { ReadNmeaFromLocalFile(); });
132     }
133 
StartReadGnssRawMeasurementFileThread()134     void StartReadGnssRawMeasurementFileThread() {
135       // Create a new thread to read raw measurement data.
136       measurement_file_read_thread_ =
137           std::thread([this]() { ReadGnssRawMeasurement(); });
138     }
139 
ReadNmeaFromLocalFile()140     void ReadNmeaFromLocalFile() {
141       std::ifstream file(FLAGS_gnss_file_path);
142       if (file.is_open()) {
143           std::string line;
144           std::string lastLine;
145           int count = 0;
146           while (std::getline(file, line)) {
147               count++;
148               /* Only support a lite version of NEMA format to make it simple.
149                * Records will only contains $GPGGA, $GPRMC,
150                * $GPGGA,213204.00,3725.371240,N,12205.589239,W,7,,0.38,-26.75,M,0.0,M,0.0,0000*78
151                * $GPRMC,213204.00,A,3725.371240,N,12205.589239,W,000.0,000.0,290819,,,A*49
152                * $GPGGA,....
153                * $GPRMC,....
154                * Sending at 1Hz, currently user should
155                * provide a NMEA file that has one location per second. need some extra work
156                * to make it more generic, i.e. align with the timestamp in the file.
157                */
158               if (count % 2 == 0) {
159                 {
160                   std::lock_guard<std::mutex> lock(cached_nmea_mutex);
161                   cached_nmea = lastLine + '\n' + line;
162                 }
163                 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
164               }
165               lastLine = line;
166           }
167           file.close();
168       } else {
169         LOG(ERROR) << "Can not open NMEA file: " << FLAGS_gnss_file_path ;
170         return;
171       }
172     }
173 
ReadGnssRawMeasurement()174     void ReadGnssRawMeasurement() {
175       std::ifstream file(FLAGS_gnss_file_path);
176 
177       if (file.is_open()) {
178         std::string line;
179         std::string cached_line = "";
180         std::string header = "";
181 
182         while (!cached_line.empty() || std::getline(file, line)) {
183           if (!cached_line.empty()) {
184             line = cached_line;
185             cached_line = "";
186           }
187 
188           // Get data header.
189           if (header.empty() && android::base::StartsWith(line, "# Raw")) {
190             header = line;
191             LOG(DEBUG) << "Header: " << header;
192             continue;
193           }
194 
195           // Ignore not raw measurement data.
196           if (!android::base::StartsWith(line, "Raw")) {
197             continue;
198           }
199 
200           {
201             std::lock_guard<std::mutex> lock(cached_gnss_raw_mutex);
202             cached_gnss_raw = header + "\n" + line;
203 
204             std::string new_line = "";
205             while (std::getline(file, new_line)) {
206               // Group raw data by TimeNanos.
207               if (getTimeNanosFromLine(new_line) ==
208                   getTimeNanosFromLine(line)) {
209                 cached_gnss_raw += "\n" + new_line;
210               } else {
211                 cached_line = new_line;
212                 break;
213               }
214             }
215           }
216           std::this_thread::sleep_for(std::chrono::milliseconds(1000));
217         }
218         file.close();
219       } else {
220         LOG(ERROR) << "Can not open GNSS Raw file: " << FLAGS_gnss_file_path;
221         return;
222       }
223     }
224 
~GnssGrpcProxyServiceImpl()225     ~GnssGrpcProxyServiceImpl() {
226       if (nmea_file_read_thread_.joinable()) {
227         nmea_file_read_thread_.join();
228       }
229       if (measurement_file_read_thread_.joinable()) {
230         measurement_file_read_thread_.join();
231       }
232       if (read_thread_.joinable()) {
233         read_thread_.join();
234       }
235     }
236 
237   private:
ReadLoop()238     [[noreturn]] void ReadLoop() {
239       cuttlefish::SharedFDSet read_set;
240       read_set.Set(gnss_out_);
241       std::vector<char> buffer(GNSS_SERIAL_BUFFER_SIZE);
242       int total_read = 0;
243       std::string gnss_cmd_str;
244       int flags = gnss_out_->Fcntl(F_GETFL, 0);
245       gnss_out_->Fcntl(F_SETFL, flags | O_NONBLOCK);
246       while (true) {
247         auto bytes_read = gnss_out_->Read(buffer.data(), buffer.size());
248         if (bytes_read > 0) {
249           std::string s(buffer.data(), bytes_read);
250           gnss_cmd_str += s;
251           // In case random string sent though /dev/gnss0, gnss_cmd_str will auto resize,
252           // to get rid of first page.
253           if (gnss_cmd_str.size() > GNSS_SERIAL_BUFFER_SIZE * 2) {
254             gnss_cmd_str = gnss_cmd_str.substr(gnss_cmd_str.size() - GNSS_SERIAL_BUFFER_SIZE);
255           }
256           total_read += bytes_read;
257           if (gnss_cmd_str.find(CMD_GET_LOCATION) != std::string::npos) {
258             sendToSerial();
259             gnss_cmd_str = "";
260             total_read = 0;
261           }
262 
263           if (gnss_cmd_str.find(CMD_GET_RAWMEASUREMENT) != std::string::npos) {
264             sendGnssRawToSerial();
265             gnss_cmd_str = "";
266             total_read = 0;
267           }
268         } else {
269           if (gnss_out_->GetErrno() == EAGAIN|| gnss_out_->GetErrno() == EWOULDBLOCK) {
270             std::this_thread::sleep_for(std::chrono::milliseconds(100));
271           } else {
272             LOG(ERROR) << "Error reading fd " << FLAGS_gnss_out_fd << ": "
273               << " Error code: " << gnss_out_->GetErrno()
274               << " Error sg:" << gnss_out_->StrError();
275           }
276         }
277       }
278     }
279 
getTimeNanosFromLine(const std::string & line)280     std::string getTimeNanosFromLine(const std::string& line) {
281       // TimeNanos is in column #3.
282       std::vector<std::string> vals = android::base::Split(line, ",");
283       return vals.size() >= 3 ? vals[2] : "-1";
284     }
285 
isGnssRawMeasurement(const std::string & inputStr)286     bool isGnssRawMeasurement(const std::string& inputStr) {
287       // TODO: add more logic check to by pass invalid data.
288       return !inputStr.empty() && android::base::StartsWith(inputStr, "# Raw");
289     }
290 
isNMEA(const std::string & inputStr)291     bool isNMEA(const std::string& inputStr) {
292       return !inputStr.empty() &&
293              (android::base::StartsWith(inputStr, "$GPRMC") ||
294               android::base::StartsWith(inputStr, "$GPRMA"));
295     }
296 
297     cuttlefish::SharedFD gnss_in_;
298     cuttlefish::SharedFD gnss_out_;
299     std::thread read_thread_;
300     std::thread nmea_file_read_thread_;
301     std::thread measurement_file_read_thread_;
302 
303     std::string cached_nmea;
304     std::mutex cached_nmea_mutex;
305 
306     std::string cached_gnss_raw;
307     std::string previous_cached_gnss_raw;
308     std::mutex cached_gnss_raw_mutex;
309 };
310 
RunServer()311 void RunServer() {
312   auto gnss_in = cuttlefish::SharedFD::Dup(FLAGS_gnss_in_fd);
313   close(FLAGS_gnss_in_fd);
314   if (!gnss_in->IsOpen()) {
315     LOG(ERROR) << "Error dupping fd " << FLAGS_gnss_in_fd << ": "
316                << gnss_in->StrError();
317     return;
318   }
319   close(FLAGS_gnss_in_fd);
320 
321   auto gnss_out = cuttlefish::SharedFD::Dup(FLAGS_gnss_out_fd);
322   close(FLAGS_gnss_out_fd);
323   if (!gnss_out->IsOpen()) {
324     LOG(ERROR) << "Error dupping fd " << FLAGS_gnss_out_fd << ": "
325                << gnss_out->StrError();
326     return;
327   }
328   auto server_address("0.0.0.0:" + std::to_string(FLAGS_gnss_grpc_port));
329   GnssGrpcProxyServiceImpl service(gnss_in, gnss_out);
330   service.StartServer();
331   if (!FLAGS_gnss_file_path.empty()) {
332     // TODO: On-demand start the read file threads according to data type.
333     service.StartReadNmeaFileThread();
334     service.StartReadGnssRawMeasurementFileThread();
335 
336     // In the local mode, we are not start a grpc server, use a infinite loop instead
337     while(true) {
338       std::this_thread::sleep_for(std::chrono::milliseconds(2000));
339     }
340   } else {
341     ServerBuilder builder;
342     // Listen on the given address without any authentication mechanism.
343     builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
344     // Register "service" as the instance through which we'll communicate with
345     // clients. In this case it corresponds to an *synchronous* service.
346     builder.RegisterService(&service);
347     // Finally assemble the server.
348     std::unique_ptr<Server> server(builder.BuildAndStart());
349     std::cout << "Server listening on " << server_address << std::endl;
350 
351     // Wait for the server to shutdown. Note that some other thread must be
352     // responsible for shutting down the server for this call to ever return.
353     server->Wait();
354   }
355 
356 }
357 
358 
main(int argc,char ** argv)359 int main(int argc, char** argv) {
360   ::android::base::InitLogging(argv, android::base::StderrLogger);
361   ::gflags::ParseCommandLineFlags(&argc, &argv, true);
362 
363   LOG(DEBUG) << "Starting gnss grpc proxy server...";
364   RunServer();
365 
366   return 0;
367 }