/* * Copyright (C) 2017 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "chre_host/socket_client.h" #include #include #include #include #include #include #include #include #include "chre_host/log.h" namespace android { namespace chre { SocketClient::SocketClient() { std::atomic_init(&mSockFd, INVALID_SOCKET); } SocketClient::~SocketClient() { disconnect(); } bool SocketClient::connect(const char *socketName, const sp &callbacks) { return doConnect(socketName, callbacks, false /* connectInBackground */); } bool SocketClient::connectInBackground(const char *socketName, const sp &callbacks) { return doConnect(socketName, callbacks, true /* connectInBackground */); } void SocketClient::disconnect() { if (inReceiveThread()) { LOGE("disconnect() can't be called from a receive thread callback"); } else if (receiveThreadRunning()) { // Inform the RX thread that we're requesting a shutdown, breaking it out of // the retry wait if it's currently blocked there { std::lock_guard lock(mShutdownMutex); mGracefulShutdown = true; } mShutdownCond.notify_all(); // Invalidate the socket (will kick the RX thread out of recv if it's // currently blocked there) if (mSockFd != INVALID_SOCKET && shutdown(mSockFd, SHUT_RDWR) != 0) { LOG_ERROR("Couldn't shut down socket", errno); } if (mRxThread.joinable()) { LOGD("Waiting for RX thread to exit"); mRxThread.join(); } } } bool SocketClient::isConnected() const { return (mSockFd != INVALID_SOCKET); } bool SocketClient::sendMessage(const void *data, size_t length) { bool success = false; if (mSockFd == INVALID_SOCKET) { LOGW("Tried sending a message, but don't have a valid socket handle"); } else { ssize_t bytesSent = send(mSockFd, data, length, 0); if (bytesSent < 0) { LOGE("Failed to send %zu bytes of data: %s", length, strerror(errno)); } else if (bytesSent == 0) { LOGW("Failed to send data; remote side disconnected"); } else if (static_cast(bytesSent) != length) { LOGW("Truncated packet, tried sending %zu bytes, only %zd went through", length, bytesSent); } else { success = true; } } return success; } bool SocketClient::doConnect(const char *socketName, const sp &callbacks, bool connectInBackground) { bool success = false; if (inReceiveThread()) { LOGE("Can't attempt to connect from a receive thread callback"); } else { if (receiveThreadRunning()) { LOGW("Re-connecting socket with implicit disconnect"); disconnect(); } size_t socketNameLen = strlcpy(mSocketName, socketName, sizeof(mSocketName)); if (socketNameLen >= sizeof(mSocketName)) { LOGE("Socket name length parameter is too long (%zu, max %zu)", socketNameLen, sizeof(mSocketName)); } else if (callbacks == nullptr) { LOGE("Callbacks parameter must be provided"); } else if (connectInBackground || tryConnect()) { mGracefulShutdown = false; mCallbacks = callbacks; mRxThread = std::thread([this]() { receiveThread(); }); success = true; } } return success; } bool SocketClient::inReceiveThread() const { return (std::this_thread::get_id() == mRxThread.get_id()); } void SocketClient::receiveThread() { constexpr size_t kReceiveBufferSize = 4096; uint8_t buffer[kReceiveBufferSize]; LOGV("Receive thread started"); while (!mGracefulShutdown && (mSockFd != INVALID_SOCKET || reconnect())) { while (!mGracefulShutdown) { ssize_t bytesReceived = recv(mSockFd, buffer, sizeof(buffer), 0); if (bytesReceived < 0) { LOG_ERROR("Exiting RX thread", errno); break; } else if (bytesReceived == 0) { if (!mGracefulShutdown) { LOGI("Socket disconnected on remote end"); mCallbacks->onDisconnected(); } break; } mCallbacks->onMessageReceived(buffer, bytesReceived); } if (close(mSockFd) != 0) { LOG_ERROR("Couldn't close socket", errno); } mSockFd = INVALID_SOCKET; } if (!mGracefulShutdown) { mCallbacks->onConnectionAborted(); } mCallbacks.clear(); LOGV("Exiting receive thread"); } bool SocketClient::receiveThreadRunning() const { return mRxThread.joinable(); } bool SocketClient::reconnect() { constexpr auto kMinDelay = std::chrono::duration(250); constexpr auto kMaxDelay = std::chrono::minutes(5); // Try reconnecting at initial delay this many times before backing off constexpr unsigned int kExponentialBackoffDelay = std::chrono::seconds(10) / kMinDelay; // Give up after this many tries (~2.5 hours) constexpr unsigned int kRetryLimit = kExponentialBackoffDelay + 40; auto delay = kMinDelay; unsigned int retryCount = 0; while (retryCount++ < kRetryLimit) { { std::unique_lock lock(mShutdownMutex); mShutdownCond.wait_for(lock, delay, [this]() { return mGracefulShutdown.load(); }); if (mGracefulShutdown) { break; } } bool suppressErrorLogs = (delay == kMinDelay); if (!tryConnect(suppressErrorLogs)) { if (!suppressErrorLogs) { LOGW("Failed to (re)connect, next try in %" PRId32 " ms", delay.count()); } if (retryCount > kExponentialBackoffDelay) { delay *= 2; } if (delay > kMaxDelay) { delay = kMaxDelay; } } else { LOGD("Successfully (re)connected"); mCallbacks->onConnected(); return true; } } return false; } bool SocketClient::tryConnect(bool suppressErrorLogs) { bool success = false; errno = 0; int sockFd = socket(AF_LOCAL, SOCK_SEQPACKET, 0); if (sockFd >= 0) { // Set the send buffer size to 2MB to allow plenty of room for nanoapp // loading int sndbuf = 2 * 1024 * 1024; // Normally, send() should effectively return immediately, but in the event // that we get blocked due to flow control, don't stay blocked for more than // 3 seconds struct timeval timeout = { .tv_sec = 3, .tv_usec = 0, }; int ret; if ((ret = setsockopt(sockFd, SOL_SOCKET, SO_SNDBUF, &sndbuf, sizeof(sndbuf))) != 0) { if (!suppressErrorLogs) { LOGE("Failed to set SO_SNDBUF to %d: %s", sndbuf, strerror(errno)); } } else if ((ret = setsockopt(sockFd, SOL_SOCKET, SO_SNDTIMEO, &timeout, sizeof(timeout))) != 0) { if (!suppressErrorLogs) { LOGE("Failed to set SO_SNDTIMEO: %s", strerror(errno)); } } else { mSockFd = socket_local_client_connect(sockFd, mSocketName, ANDROID_SOCKET_NAMESPACE_RESERVED, SOCK_SEQPACKET); if (mSockFd != INVALID_SOCKET) { success = true; } else if (!suppressErrorLogs) { LOGE("Couldn't connect client socket to '%s': %s", mSocketName, strerror(errno)); } } if (!success) { close(sockFd); } } else if (!suppressErrorLogs) { LOGE("Couldn't create local socket: %s", strerror(errno)); } return success; } } // namespace chre } // namespace android