/* * Copyright (C) 2017 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "chre_host/socket_client.h" #include #include #include #include #include #include #include "chre_host/log.h" namespace android { namespace chre { SocketClient::SocketClient() { std::atomic_init(&mSockFd, INVALID_SOCKET); } SocketClient::~SocketClient() { disconnect(); } bool SocketClient::connect(const char *socketName, const sp& callbacks) { return doConnect(socketName, callbacks, false /* connectInBackground */); } bool SocketClient::connectInBackground(const char *socketName, const sp& callbacks) { return doConnect(socketName, callbacks, true /* connectInBackground */); } void SocketClient::disconnect() { if (inReceiveThread()) { LOGE("disconnect() can't be called from a receive thread callback"); } else if (receiveThreadRunning()) { // Inform the RX thread that we're requesting a shutdown, breaking it out of // the retry wait if it's currently blocked there { std::lock_guard lock(mShutdownMutex); mGracefulShutdown = true; } mShutdownCond.notify_all(); // Invalidate the socket (will kick the RX thread out of recv if it's // currently blocked there) if (mSockFd != INVALID_SOCKET && shutdown(mSockFd, SHUT_RDWR) != 0) { LOG_ERROR("Couldn't shut down socket", errno); } if (mRxThread.joinable()) { LOGD("Waiting for RX thread to exit"); mRxThread.join(); } } } bool SocketClient::isConnected() const { return (mSockFd != INVALID_SOCKET); } bool SocketClient::sendMessage(const void *data, size_t length) { bool success = false; if (mSockFd == INVALID_SOCKET) { LOGW("Tried sending a message, but don't have a valid socket handle"); } else { ssize_t bytesSent = send(mSockFd, data, length, 0); if (bytesSent < 0) { LOGE("Failed to send %zu bytes of data: %s", length, strerror(errno)); } else if (bytesSent == 0) { LOGW("Failed to send data; remote side disconnected"); } else if (static_cast(bytesSent) != length) { LOGW("Truncated packet, tried sending %zu bytes, only %zd went through", length, bytesSent); } else { success = true; } } return success; } bool SocketClient::doConnect(const char *socketName, const sp& callbacks, bool connectInBackground) { bool success = false; if (inReceiveThread()) { LOGE("Can't attempt to connect from a receive thread callback"); } else { if (receiveThreadRunning()) { LOGW("Re-connecting socket with implicit disconnect"); disconnect(); } size_t socketNameLen = strlcpy(mSocketName, socketName, sizeof(mSocketName)); if (socketNameLen >= sizeof(mSocketName)) { LOGE("Socket name length parameter is too long (%zu, max %zu)", socketNameLen, sizeof(mSocketName)); } else if (callbacks == nullptr) { LOGE("Callbacks parameter must be provided"); } else if (connectInBackground || tryConnect()) { mGracefulShutdown = false; mCallbacks = callbacks; mRxThread = std::thread([this]() { receiveThread(); }); success = true; } } return success; } bool SocketClient::inReceiveThread() const { return (std::this_thread::get_id() == mRxThread.get_id()); } void SocketClient::receiveThread() { constexpr size_t kReceiveBufferSize = 4096; uint8_t buffer[kReceiveBufferSize]; LOGV("Receive thread started"); while (!mGracefulShutdown && (mSockFd != INVALID_SOCKET || reconnect())) { while (!mGracefulShutdown) { ssize_t bytesReceived = recv(mSockFd, buffer, sizeof(buffer), 0); if (bytesReceived < 0) { LOG_ERROR("Exiting RX thread", errno); break; } else if (bytesReceived == 0) { if (!mGracefulShutdown) { LOGI("Socket disconnected on remote end"); mCallbacks->onDisconnected(); } break; } mCallbacks->onMessageReceived(buffer, bytesReceived); } if (close(mSockFd) != 0) { LOG_ERROR("Couldn't close socket", errno); } mSockFd = INVALID_SOCKET; } if (!mGracefulShutdown) { mCallbacks->onConnectionAborted(); } mCallbacks.clear(); LOGV("Exiting receive thread"); } bool SocketClient::receiveThreadRunning() const { return mRxThread.joinable(); } bool SocketClient::reconnect() { auto delay = std::chrono::duration(500); constexpr auto kMaxDelay = std::chrono::minutes(5); int retryLimit = 40; // ~2.5 hours total while (--retryLimit > 0) { { std::unique_lock lock(mShutdownMutex); mShutdownCond.wait_for(lock, delay, [this]() { return mGracefulShutdown.load(); }); if (mGracefulShutdown) { break; } } if (!tryConnect()) { LOGW("Failed to (re)connect, next try in %" PRId32 " ms", delay.count()); delay *= 2; if (delay > kMaxDelay) { delay = kMaxDelay; } } else { LOGD("Successfully (re)connected"); mCallbacks->onConnected(); return true; } } return false; } bool SocketClient::tryConnect() { errno = 0; mSockFd = socket_local_client(mSocketName, ANDROID_SOCKET_NAMESPACE_RESERVED, SOCK_SEQPACKET); if (mSockFd == INVALID_SOCKET) { LOGE("Couldn't create/connect client socket to '%s': %s", mSocketName, strerror(errno)); } return (mSockFd != INVALID_SOCKET); } } // namespace chre } // namespace android