1 /*
2 * Copyright (c) 2011-2015, Intel Corporation
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without modification,
6 * are permitted provided that the following conditions are met:
7 *
8 * 1. Redistributions of source code must retain the above copyright notice, this
9 * list of conditions and the following disclaimer.
10 *
11 * 2. Redistributions in binary form must reproduce the above copyright notice,
12 * this list of conditions and the following disclaimer in the documentation and/or
13 * other materials provided with the distribution.
14 *
15 * 3. Neither the name of the copyright holder nor the names of its contributors
16 * may be used to endorse or promote products derived from this software without
17 * specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
20 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
23 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
26 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30 #include "RemoteProcessorServer.h"
31 #include <iostream>
32 #include <memory>
33 #include <assert.h>
34 #include <string.h>
35 #include <unistd.h>
36 #include "RequestMessage.h"
37 #include "AnswerMessage.h"
38 #include "RemoteCommandHandler.h"
39 #include "Socket.h"
40 #include "convert.hpp"
41
42 using std::string;
43
CRemoteProcessorServer(std::string bindAddress)44 CRemoteProcessorServer::CRemoteProcessorServer(std::string bindAddress)
45 : _bindAddress(bindAddress), _io_service(), _acceptor(_io_service), _socket(_io_service)
46 {
47 }
48
~CRemoteProcessorServer()49 CRemoteProcessorServer::~CRemoteProcessorServer()
50 {
51 stop();
52 }
53
54 // State
start(string & error)55 bool CRemoteProcessorServer::start(string &error)
56 {
57 using namespace asio;
58
59 try {
60 generic::stream_protocol::endpoint endpoint;
61 uint16_t port;
62 std::string endpointName;
63 bool isInet;
64 const std::string expectedForm{"Required: <hostname port|tcp://[host]:port|unix://path>"};
65
66 // For backward compatibility, tcp port referred by its value only
67 if (convertTo(_bindAddress, port)) {
68 isInet = true;
69 } else {
70 const std::string tcpProtocol{"tcp"};
71 const std::string unixProtocol{"unix"};
72 const std::vector<std::string> supportedProtocols{tcpProtocol, unixProtocol};
73 const std::string protocolDel{"://"};
74
75 size_t protocolDelPos = _bindAddress.find(protocolDel);
76 if (protocolDelPos == std::string::npos) {
77 error = "bindaddress " + _bindAddress + " invalid, " + expectedForm;
78 return false;
79 }
80 std::string protocol = _bindAddress.substr(0, protocolDelPos);
81
82 if (std::find(begin(supportedProtocols), end(supportedProtocols), protocol) ==
83 end(supportedProtocols)) {
84 error = "bindaddress " + _bindAddress + " invalid, " + expectedForm;
85 return false;
86 }
87 isInet = (_bindAddress.find(tcpProtocol) != std::string::npos);
88 if (isInet) {
89 size_t portDelPos = _bindAddress.rfind(':');
90 if (portDelPos == std::string::npos) {
91 error = "bindaddress " + _bindAddress + " invalid, " + expectedForm;
92 return false;
93 }
94 std::string portLiteral{_bindAddress.substr(portDelPos + 1)};
95 if (!convertTo(portLiteral, port)) {
96 error = "bindaddress " + _bindAddress + " invalid" + expectedForm;
97 return false;
98 }
99 } else {
100 endpointName = _bindAddress.substr(protocolDelPos + protocolDel.size());
101 }
102 }
103
104 if (isInet) {
105 endpoint = ip::tcp::endpoint(ip::tcp::v6(), port);
106 } else {
107 endpoint = local::stream_protocol::endpoint(endpointName);
108 }
109
110 _acceptor.open(endpoint.protocol());
111
112 if (endpoint.protocol().protocol() == ASIO_OS_DEF(IPPROTO_TCP)) {
113 _acceptor.set_option(ip::tcp::acceptor::reuse_address(true));
114 } else if (endpoint.protocol().protocol() == AF_UNSPEC) {
115 // In case of reuse, remote it first
116 unlink(endpointName.c_str());
117 }
118 _acceptor.set_option(socket_base::linger(true, 0));
119 _acceptor.set_option(socket_base::enable_connection_aborted(true));
120
121 _acceptor.bind(endpoint);
122 _acceptor.listen();
123 } catch (std::exception &e) {
124 error = "Unable to listen on " + _bindAddress + ": " + e.what();
125 return false;
126 }
127
128 return true;
129 }
130
stop()131 bool CRemoteProcessorServer::stop()
132 {
133 _io_service.stop();
134
135 return true;
136 }
137
acceptRegister(IRemoteCommandHandler & commandHandler)138 void CRemoteProcessorServer::acceptRegister(IRemoteCommandHandler &commandHandler)
139 {
140 auto peerHandler = [this, &commandHandler](asio::error_code ec) {
141 if (ec) {
142 std::cerr << "Accept failed: " << ec.message() << std::endl;
143 return;
144 }
145
146 const auto &endpoint = _socket.local_endpoint();
147 if (endpoint.protocol().protocol() == ASIO_OS_DEF(IPPROTO_TCP)) {
148 _socket.set_option(asio::ip::tcp::no_delay(true));
149 }
150 handleNewConnection(commandHandler);
151
152 _socket.close();
153
154 acceptRegister(commandHandler);
155 };
156
157 _acceptor.async_accept(_socket, peerHandler);
158 }
159
process(IRemoteCommandHandler & commandHandler)160 bool CRemoteProcessorServer::process(IRemoteCommandHandler &commandHandler)
161 {
162 acceptRegister(commandHandler);
163
164 asio::error_code ec;
165
166 _io_service.run(ec);
167
168 if (ec) {
169 std::cerr << "Server failed: " << ec.message() << std::endl;
170 }
171
172 return ec.value() == 0;
173 }
174
175 // New connection
handleNewConnection(IRemoteCommandHandler & commandHandler)176 void CRemoteProcessorServer::handleNewConnection(IRemoteCommandHandler &commandHandler)
177 {
178 // Process all incoming requests from the client
179 while (true) {
180
181 // Process requests
182 // Create command message
183 CRequestMessage requestMessage;
184
185 string strError;
186 ///// Receive command
187 CRequestMessage::Result res;
188 res = requestMessage.serialize(Socket(_socket), false, strError);
189
190 switch (res) {
191 case CRequestMessage::error:
192 std::cout << "Error while receiving message: " << strError << std::endl;
193 // fall through
194 case CRequestMessage::peerDisconnected:
195 // Consider peer disconnection as normal, no log
196 return; // Bail out
197 case CRequestMessage::success:
198 break; // No error, continue
199 }
200
201 // Actually process the request
202 bool bSuccess;
203
204 string strResult;
205
206 bSuccess = commandHandler.remoteCommandProcess(requestMessage, strResult);
207
208 // Send back answer
209 // Create answer message
210 CAnswerMessage answerMessage(strResult, bSuccess);
211
212 ///// Send answer
213 res = answerMessage.serialize(_socket, true, strError);
214
215 switch (res) {
216 case CRequestMessage::peerDisconnected:
217 // Peer should not disconnect while waiting for an answer
218 // Fall through to log the error and bail out
219 case CRequestMessage::error:
220 std::cout << "Error while receiving message: " << strError << std::endl;
221 return; // Bail out
222 case CRequestMessage::success:
223 break; // No error, continue
224 }
225 }
226 }
227