1 /*
2 * Copyright (C) 2025 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #define FAILURE_DEBUG_PREFIX "AtChannel"
18
19 #include <cstring>
20 #include <unistd.h>
21
22 #include "AtChannel.h"
23 #include "debug.h"
24
25 namespace aidl {
26 namespace android {
27 namespace hardware {
28 namespace radio {
29 namespace implementation {
30 namespace {
sendRequestImpl(const int fd,const char * data,size_t size)31 int sendRequestImpl(const int fd, const char* data, size_t size) {
32 while (size > 0) {
33 const ssize_t written = ::write(fd, data, size);
34 if (written >= 0) {
35 data += written;
36 size -= written;
37 } else if (errno == EINTR) {
38 continue;
39 } else {
40 return FAILURE(errno);
41 }
42 }
43
44 return 0;
45 }
46 } // namespace
47
48 #undef FAILURE_DEBUG_PREFIX
49 #define FAILURE_DEBUG_PREFIX "AtChannel::RequestPipe"
50
operator ()(const std::string_view request) const51 bool AtChannel::RequestPipe::operator()(const std::string_view request) const {
52 int err = sendRequestImpl(mFd, request.data(), request.size());
53 if (err == 0) {
54 const char kCR = 0x0D;
55 err = sendRequestImpl(mFd, &kCR, 1);
56 }
57
58 return err == 0;
59 }
60
61 #undef FAILURE_DEBUG_PREFIX
62 #define FAILURE_DEBUG_PREFIX "AtChannel"
63
AtChannel(HostChannelFactory hostChannelFactory,InitSequence initSequence)64 AtChannel::AtChannel(HostChannelFactory hostChannelFactory,
65 InitSequence initSequence)
66 : mHostChannelFactory(std::move(hostChannelFactory))
67 , mInitSequence(std::move(initSequence)) {
68 mRequestThread = std::thread(&AtChannel::requestLoop, this);
69 }
70
~AtChannel()71 AtChannel::~AtChannel() {
72 queueRequester({});
73 mRequestThread.join();
74 mReaderThread.join();
75 }
76
queueRequester(Requester requester)77 void AtChannel::queueRequester(Requester requester) {
78 std::lock_guard<std::mutex> lock(mRequestQueueMtx);
79 mRequesterQueue.push_back(std::move(requester));
80 mRequesterAvailable.notify_one();
81 }
82
addResponseSink(ResponseSink responseSink)83 void AtChannel::addResponseSink(ResponseSink responseSink) {
84 std::lock_guard<std::mutex> lock(mResponseSinksMtx);
85 mResponseSinks.push_back(std::move(responseSink));
86 }
87
requestLoop()88 void AtChannel::requestLoop() {
89 while (true) {
90 const Requester requester = getRequester();
91 if (requester) {
92 if (!requester(getHostChannelPipe())) {
93 mHostChannel.reset();
94 }
95 } else {
96 break;
97 }
98 }
99
100 mHostChannel.reset();
101 }
102
readingLoop(const int hostChannelFd)103 void AtChannel::readingLoop(const int hostChannelFd) {
104 std::vector<char> unconsumed;
105 while (receiveResponses(hostChannelFd, &unconsumed)) {}
106 LOG_ALWAYS_FATAL("We could not parse the modem response");
107 }
108
getRequester()109 AtChannel::Requester AtChannel::getRequester() {
110 std::unique_lock<std::mutex> lock(mRequestQueueMtx);
111 while (true) {
112 if (!mRequesterQueue.empty()) {
113 Requester requester(std::move(mRequesterQueue.front()));
114 mRequesterQueue.pop_front();
115 return requester;
116 } else {
117 mRequesterAvailable.wait(lock);
118 }
119 }
120 }
121
broadcastResponse(const AtResponsePtr & response)122 void AtChannel::broadcastResponse(const AtResponsePtr& response) {
123 mConversation.send(response);
124
125 std::lock_guard<std::mutex> lock(mResponseSinksMtx);
126
127 const auto newEnd = std::remove_if(mResponseSinks.begin(), mResponseSinks.end(),
128 [&response](const ResponseSink& responseSink) -> bool {
129 return !responseSink(response);
130 });
131
132 mResponseSinks.erase(newEnd, mResponseSinks.end());
133 }
134
getHostChannelPipe()135 AtChannel::RequestPipe AtChannel::getHostChannelPipe() {
136 if (!mHostChannel.ok()) {
137 if (mReaderThread.joinable()) {
138 mReaderThread.join();
139 }
140
141 mHostChannel = mHostChannelFactory();
142 LOG_ALWAYS_FATAL_IF(!mHostChannel.ok(),
143 "%s:%d: Can't open the host channel", __func__, __LINE__);
144
145 const int hostChannelFd = mHostChannel.get();
146 mReaderThread = std::thread([this, hostChannelFd](){
147 readingLoop(hostChannelFd);
148 });
149
150 LOG_ALWAYS_FATAL_IF(!mInitSequence(RequestPipe(hostChannelFd), mConversation),
151 "%s:%d: Can't init the host channel", __func__, __LINE__);
152 }
153
154 return RequestPipe(mHostChannel.get());
155 }
156
receiveResponses(const int hostChannelFd,std::vector<char> * unconsumed)157 bool AtChannel::receiveResponses(const int hostChannelFd,
158 std::vector<char>* unconsumed) {
159 const size_t unconsumedSize = unconsumed->size();
160 if (unconsumedSize == 0) {
161 char buf[128];
162 const int len = ::read(hostChannelFd, buf, sizeof(buf));
163 if (len > 0) {
164 return receiveResponsesImpl(buf, buf + len, unconsumed);
165 } else if (len < 0) {
166 const int err = errno;
167 if (err == EINTR) {
168 return true;
169 } else {
170 return FAILURE_V(false, "fd=%d, err=%s (%d)",
171 hostChannelFd, ::strerror(err), err);
172 }
173 }
174 } else {
175 const size_t newSize = std::max(unconsumedSize + 1024, unconsumed->capacity());
176 unconsumed->resize(newSize);
177 const int len = ::read(hostChannelFd, &(*unconsumed)[unconsumedSize],
178 newSize - unconsumedSize);
179 if (len > 0) {
180 unconsumed->resize(unconsumedSize + len);
181 char* begin = unconsumed->data();
182 char* end = begin + unconsumedSize + len;
183 return receiveResponsesImpl(begin, end, unconsumed);
184 } else if (len < 0) {
185 const int err = errno;
186 if (err == EINTR) {
187 return true;
188 } else {
189 return FAILURE_V(false, "fd=%d, err=%s (%d)",
190 hostChannelFd, ::strerror(err), err);
191 }
192 }
193 }
194
195 return true;
196 }
197
198 // NOTE: [begin, end) could contain one or more requests,
199 // the last one might be incomplete
receiveResponsesImpl(const char * begin,const char * const end,std::vector<char> * unconsumed)200 bool AtChannel::receiveResponsesImpl(const char* begin, const char* const end,
201 std::vector<char>* unconsumed) {
202 while (begin < end) {
203 const char* next = receiveOneResponse(begin, end);
204 if (next == begin) {
205 unconsumed->assign(begin, end);
206 return true;
207 } else if (next == nullptr) {
208 return false;
209 } else {
210 begin = next;
211 }
212 }
213
214 unconsumed->clear();
215 return true;
216 }
217
receiveOneResponse(const char * const begin,const char * const end)218 const char* AtChannel::receiveOneResponse(const char* const begin, const char* const end) {
219 switch (*begin) {
220 case '\r':
221 case '\n':
222 return begin + 1;
223 }
224
225 auto [consumed, response] = AtResponse::parse(std::string_view(begin, end - begin));
226 if (response) {
227 broadcastResponse(response);
228 }
229
230 return (consumed >= 0) ? (begin + consumed) : nullptr;
231 }
232
operator ()(const RequestPipe requestPipe,const std::string_view request,const AtChannel::Conversation::FilterFunc & filter,const AtChannel::Conversation::Duration timeout)233 AtResponsePtr AtChannel::Conversation::operator()(
234 const RequestPipe requestPipe,
235 const std::string_view request,
236 const AtChannel::Conversation::FilterFunc& filter,
237 const AtChannel::Conversation::Duration timeout) {
238 std::future<AtResponsePtr> futureResponse;
239
240 {
241 std::lock_guard<std::mutex> lock(mMtx);
242 mFilter = &filter;
243 mSink = decltype(mSink)();
244 futureResponse = mSink.get_future();
245 }
246
247 if (!requestPipe(request)) {
248 std::lock_guard<std::mutex> lock(mMtx);
249 mFilter = nullptr;
250 return nullptr;
251 } else if (futureResponse.wait_for(timeout) == std::future_status::ready) {
252 return futureResponse.get();
253 } else {
254 {
255 std::lock_guard<std::mutex> lock(mMtx);
256 mFilter = nullptr;
257 }
258
259 const int requestLen = request.size();
260 return FAILURE_V(nullptr, "Timeout for '%*.*s'",
261 requestLen, requestLen, request.data());
262 }
263 }
264
operator ()(const RequestPipe requestPipe,const std::string_view request,const FilterFunc & filter)265 AtResponsePtr AtChannel::Conversation::operator()(
266 const RequestPipe requestPipe,
267 const std::string_view request,
268 const FilterFunc& filter) {
269 using namespace std::chrono_literals;
270 return (*this)(requestPipe, request, filter, 3s);
271 }
272
send(const AtResponsePtr & response)273 bool AtChannel::Conversation::send(const AtResponsePtr& response) {
274 std::lock_guard<std::mutex> lock(mMtx);
275 if (mFilter && (*mFilter)(*response)) {
276 mFilter = nullptr;
277 mSink.set_value(response);
278 return true;
279 } else {
280 return false;
281 }
282 }
283
284 } // namespace implementation
285 } // namespace radio
286 } // namespace hardware
287 } // namespace android
288 } // namespace aidl
289