• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2011-2015, Intel Corporation
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without modification,
6  * are permitted provided that the following conditions are met:
7  *
8  * 1. Redistributions of source code must retain the above copyright notice, this
9  * list of conditions and the following disclaimer.
10  *
11  * 2. Redistributions in binary form must reproduce the above copyright notice,
12  * this list of conditions and the following disclaimer in the documentation and/or
13  * other materials provided with the distribution.
14  *
15  * 3. Neither the name of the copyright holder nor the names of its contributors
16  * may be used to endorse or promote products derived from this software without
17  * specific prior written permission.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
20  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22  * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
23  * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
26  * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29  */
30 #include "RemoteProcessorServer.h"
31 #include <iostream>
32 #include <memory>
33 #include <assert.h>
34 #include <string.h>
35 #include <unistd.h>
36 #include "RequestMessage.h"
37 #include "AnswerMessage.h"
38 #include "RemoteCommandHandler.h"
39 #include "Socket.h"
40 #include "convert.hpp"
41 
42 using std::string;
43 
CRemoteProcessorServer(std::string bindAddress)44 CRemoteProcessorServer::CRemoteProcessorServer(std::string bindAddress)
45     : _bindAddress(bindAddress), _io_service(), _acceptor(_io_service), _socket(_io_service)
46 {
47 }
48 
~CRemoteProcessorServer()49 CRemoteProcessorServer::~CRemoteProcessorServer()
50 {
51     stop();
52 }
53 
54 // State
start(string & error)55 bool CRemoteProcessorServer::start(string &error)
56 {
57     using namespace asio;
58 
59     try {
60         generic::stream_protocol::endpoint endpoint;
61         uint16_t port;
62         std::string endpointName;
63         bool isInet;
64         const std::string expectedForm{"Required: <hostname port|tcp://[host]:port|unix://path>"};
65 
66         // For backward compatibility, tcp port referred by its value only
67         if (convertTo(_bindAddress, port)) {
68             isInet = true;
69         } else {
70             const std::string tcpProtocol{"tcp"};
71             const std::string unixProtocol{"unix"};
72             const std::vector<std::string> supportedProtocols{tcpProtocol, unixProtocol};
73             const std::string protocolDel{"://"};
74 
75             size_t protocolDelPos = _bindAddress.find(protocolDel);
76             if (protocolDelPos == std::string::npos) {
77                 error = "bindaddress " + _bindAddress + " invalid, " + expectedForm;
78                 return false;
79             }
80             std::string protocol = _bindAddress.substr(0, protocolDelPos);
81 
82             if (std::find(begin(supportedProtocols), end(supportedProtocols), protocol) ==
83                 end(supportedProtocols)) {
84                 error = "bindaddress " + _bindAddress + " invalid, " + expectedForm;
85                 return false;
86             }
87             isInet = (_bindAddress.find(tcpProtocol) != std::string::npos);
88             if (isInet) {
89                 size_t portDelPos = _bindAddress.rfind(':');
90                 if (portDelPos == std::string::npos) {
91                     error = "bindaddress " + _bindAddress + " invalid, " + expectedForm;
92                     return false;
93                 }
94                 std::string portLiteral{_bindAddress.substr(portDelPos + 1)};
95                 if (!convertTo(portLiteral, port)) {
96                     error = "bindaddress " + _bindAddress + " invalid" + expectedForm;
97                     return false;
98                 }
99             } else {
100                 endpointName = _bindAddress.substr(protocolDelPos + protocolDel.size());
101             }
102         }
103 
104         if (isInet) {
105             endpoint = ip::tcp::endpoint(ip::tcp::v6(), port);
106         } else {
107             endpoint = local::stream_protocol::endpoint(endpointName);
108         }
109 
110         _acceptor.open(endpoint.protocol());
111 
112         if (endpoint.protocol().protocol() == ASIO_OS_DEF(IPPROTO_TCP)) {
113             _acceptor.set_option(ip::tcp::acceptor::reuse_address(true));
114         } else if (endpoint.protocol().protocol() == AF_UNSPEC) {
115             // In case of reuse, remote it first
116             unlink(endpointName.c_str());
117         }
118         _acceptor.set_option(socket_base::linger(true, 0));
119         _acceptor.set_option(socket_base::enable_connection_aborted(true));
120 
121         _acceptor.bind(endpoint);
122         _acceptor.listen();
123     } catch (std::exception &e) {
124         error = "Unable to listen on " + _bindAddress + ": " + e.what();
125         return false;
126     }
127 
128     return true;
129 }
130 
stop()131 bool CRemoteProcessorServer::stop()
132 {
133     _io_service.stop();
134 
135     return true;
136 }
137 
acceptRegister(IRemoteCommandHandler & commandHandler)138 void CRemoteProcessorServer::acceptRegister(IRemoteCommandHandler &commandHandler)
139 {
140     auto peerHandler = [this, &commandHandler](asio::error_code ec) {
141         if (ec) {
142             std::cerr << "Accept failed: " << ec.message() << std::endl;
143             return;
144         }
145 
146         const auto &endpoint = _socket.local_endpoint();
147         if (endpoint.protocol().protocol() == ASIO_OS_DEF(IPPROTO_TCP)) {
148             _socket.set_option(asio::ip::tcp::no_delay(true));
149         }
150         handleNewConnection(commandHandler);
151 
152         _socket.close();
153 
154         acceptRegister(commandHandler);
155     };
156 
157     _acceptor.async_accept(_socket, peerHandler);
158 }
159 
process(IRemoteCommandHandler & commandHandler)160 bool CRemoteProcessorServer::process(IRemoteCommandHandler &commandHandler)
161 {
162     acceptRegister(commandHandler);
163 
164     asio::error_code ec;
165 
166     _io_service.run(ec);
167 
168     if (ec) {
169         std::cerr << "Server failed: " << ec.message() << std::endl;
170     }
171 
172     return ec.value() == 0;
173 }
174 
175 // New connection
handleNewConnection(IRemoteCommandHandler & commandHandler)176 void CRemoteProcessorServer::handleNewConnection(IRemoteCommandHandler &commandHandler)
177 {
178     // Process all incoming requests from the client
179     while (true) {
180 
181         // Process requests
182         // Create command message
183         CRequestMessage requestMessage;
184 
185         string strError;
186         ///// Receive command
187         CRequestMessage::Result res;
188         res = requestMessage.serialize(Socket(_socket), false, strError);
189 
190         switch (res) {
191         case CRequestMessage::error:
192             std::cout << "Error while receiving message: " << strError << std::endl;
193         // fall through
194         case CRequestMessage::peerDisconnected:
195             // Consider peer disconnection as normal, no log
196             return; // Bail out
197         case CRequestMessage::success:
198             break; // No error, continue
199         }
200 
201         // Actually process the request
202         bool bSuccess;
203 
204         string strResult;
205 
206         bSuccess = commandHandler.remoteCommandProcess(requestMessage, strResult);
207 
208         // Send back answer
209         // Create answer message
210         CAnswerMessage answerMessage(strResult, bSuccess);
211 
212         ///// Send answer
213         res = answerMessage.serialize(_socket, true, strError);
214 
215         switch (res) {
216         case CRequestMessage::peerDisconnected:
217         // Peer should not disconnect while waiting for an answer
218         // Fall through to log the error and bail out
219         case CRequestMessage::error:
220             std::cout << "Error while receiving message: " << strError << std::endl;
221             return; // Bail out
222         case CRequestMessage::success:
223             break; // No error, continue
224         }
225     }
226 }
227