• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2025 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define FAILURE_DEBUG_PREFIX "AtChannel"
18 
19 #include <cstring>
20 #include <unistd.h>
21 
22 #include "AtChannel.h"
23 #include "debug.h"
24 
25 namespace aidl {
26 namespace android {
27 namespace hardware {
28 namespace radio {
29 namespace implementation {
30 namespace {
sendRequestImpl(const int fd,const char * data,size_t size)31 int sendRequestImpl(const int fd, const char* data, size_t size) {
32     while (size > 0) {
33         const ssize_t written = ::write(fd, data, size);
34         if (written >= 0) {
35             data += written;
36             size -= written;
37         } else if (errno == EINTR) {
38             continue;
39         } else {
40             return FAILURE(errno);
41         }
42     }
43 
44     return 0;
45 }
46 }  // namespace
47 
48 #undef FAILURE_DEBUG_PREFIX
49 #define FAILURE_DEBUG_PREFIX "AtChannel::RequestPipe"
50 
operator ()(const std::string_view request) const51 bool AtChannel::RequestPipe::operator()(const std::string_view request) const {
52     int err = sendRequestImpl(mFd, request.data(), request.size());
53     if (err == 0) {
54         const char kCR = 0x0D;
55         err = sendRequestImpl(mFd, &kCR, 1);
56     }
57 
58     return err == 0;
59 }
60 
61 #undef FAILURE_DEBUG_PREFIX
62 #define FAILURE_DEBUG_PREFIX "AtChannel"
63 
AtChannel(HostChannelFactory hostChannelFactory,InitSequence initSequence)64 AtChannel::AtChannel(HostChannelFactory hostChannelFactory,
65                      InitSequence initSequence)
66         : mHostChannelFactory(std::move(hostChannelFactory))
67         , mInitSequence(std::move(initSequence)) {
68     mRequestThread = std::thread(&AtChannel::requestLoop, this);
69 }
70 
~AtChannel()71 AtChannel::~AtChannel() {
72     queueRequester({});
73     mRequestThread.join();
74     mReaderThread.join();
75 }
76 
queueRequester(Requester requester)77 void AtChannel::queueRequester(Requester requester) {
78     std::lock_guard<std::mutex> lock(mRequestQueueMtx);
79     mRequesterQueue.push_back(std::move(requester));
80     mRequesterAvailable.notify_one();
81 }
82 
addResponseSink(ResponseSink responseSink)83 void AtChannel::addResponseSink(ResponseSink responseSink) {
84     std::lock_guard<std::mutex> lock(mResponseSinksMtx);
85     mResponseSinks.push_back(std::move(responseSink));
86 }
87 
requestLoop()88 void AtChannel::requestLoop() {
89     while (true) {
90         const Requester requester = getRequester();
91         if (requester) {
92             if (!requester(getHostChannelPipe())) {
93                 mHostChannel.reset();
94             }
95         } else {
96             break;
97         }
98     }
99 
100     mHostChannel.reset();
101 }
102 
readingLoop(const int hostChannelFd)103 void AtChannel::readingLoop(const int hostChannelFd) {
104     std::vector<char> unconsumed;
105     while (receiveResponses(hostChannelFd, &unconsumed)) {}
106     LOG_ALWAYS_FATAL("We could not parse the modem response");
107 }
108 
getRequester()109 AtChannel::Requester AtChannel::getRequester() {
110     std::unique_lock<std::mutex> lock(mRequestQueueMtx);
111     while (true) {
112         if (!mRequesterQueue.empty()) {
113             Requester requester(std::move(mRequesterQueue.front()));
114             mRequesterQueue.pop_front();
115             return requester;
116         } else {
117             mRequesterAvailable.wait(lock);
118         }
119     }
120 }
121 
broadcastResponse(const AtResponsePtr & response)122 void AtChannel::broadcastResponse(const AtResponsePtr& response) {
123     mConversation.send(response);
124 
125     std::lock_guard<std::mutex> lock(mResponseSinksMtx);
126 
127     const auto newEnd = std::remove_if(mResponseSinks.begin(), mResponseSinks.end(),
128         [&response](const ResponseSink& responseSink) -> bool {
129             return !responseSink(response);
130         });
131 
132     mResponseSinks.erase(newEnd, mResponseSinks.end());
133 }
134 
getHostChannelPipe()135 AtChannel::RequestPipe AtChannel::getHostChannelPipe() {
136     if (!mHostChannel.ok()) {
137         if (mReaderThread.joinable()) {
138             mReaderThread.join();
139         }
140 
141         mHostChannel = mHostChannelFactory();
142         LOG_ALWAYS_FATAL_IF(!mHostChannel.ok(),
143                             "%s:%d: Can't open the host channel", __func__, __LINE__);
144 
145         const int hostChannelFd = mHostChannel.get();
146         mReaderThread = std::thread([this, hostChannelFd](){
147             readingLoop(hostChannelFd);
148         });
149 
150         LOG_ALWAYS_FATAL_IF(!mInitSequence(RequestPipe(hostChannelFd), mConversation),
151                             "%s:%d: Can't init the host channel", __func__, __LINE__);
152     }
153 
154     return RequestPipe(mHostChannel.get());
155 }
156 
receiveResponses(const int hostChannelFd,std::vector<char> * unconsumed)157 bool AtChannel::receiveResponses(const int hostChannelFd,
158                                  std::vector<char>* unconsumed) {
159     const size_t unconsumedSize = unconsumed->size();
160     if (unconsumedSize == 0) {
161         char buf[128];
162         const int len = ::read(hostChannelFd, buf, sizeof(buf));
163         if (len > 0) {
164             return receiveResponsesImpl(buf, buf + len, unconsumed);
165         } else if (len < 0) {
166             const int err = errno;
167             if (err == EINTR) {
168                 return true;
169             } else {
170                 return FAILURE_V(false, "fd=%d, err=%s (%d)",
171                                  hostChannelFd, ::strerror(err), err);
172             }
173         }
174     } else {
175         const size_t newSize = std::max(unconsumedSize + 1024, unconsumed->capacity());
176         unconsumed->resize(newSize);
177         const int len = ::read(hostChannelFd, &(*unconsumed)[unconsumedSize],
178                                newSize - unconsumedSize);
179         if (len > 0) {
180             unconsumed->resize(unconsumedSize + len);
181             char* begin = unconsumed->data();
182             char* end = begin + unconsumedSize + len;
183             return receiveResponsesImpl(begin, end, unconsumed);
184         } else if (len < 0) {
185             const int err = errno;
186             if (err == EINTR) {
187                 return true;
188             } else {
189                 return FAILURE_V(false, "fd=%d, err=%s (%d)",
190                                  hostChannelFd, ::strerror(err), err);
191             }
192         }
193     }
194 
195     return true;
196 }
197 
198 // NOTE: [begin, end) could contain one or more requests,
199 // the last one might be incomplete
receiveResponsesImpl(const char * begin,const char * const end,std::vector<char> * unconsumed)200 bool AtChannel::receiveResponsesImpl(const char* begin, const char* const end,
201                                      std::vector<char>* unconsumed) {
202     while (begin < end) {
203         const char* next = receiveOneResponse(begin, end);
204         if (next == begin) {
205             unconsumed->assign(begin, end);
206             return true;
207         } else if (next == nullptr) {
208             return false;
209         } else {
210             begin = next;
211         }
212     }
213 
214     unconsumed->clear();
215     return true;
216 }
217 
receiveOneResponse(const char * const begin,const char * const end)218 const char* AtChannel::receiveOneResponse(const char* const begin, const char* const end) {
219     switch (*begin) {
220     case '\r':
221     case '\n':
222         return begin + 1;
223     }
224 
225     auto [consumed, response] = AtResponse::parse(std::string_view(begin, end - begin));
226     if (response) {
227         broadcastResponse(response);
228     }
229 
230     return (consumed >= 0) ? (begin + consumed) : nullptr;
231 }
232 
operator ()(const RequestPipe requestPipe,const std::string_view request,const AtChannel::Conversation::FilterFunc & filter,const AtChannel::Conversation::Duration timeout)233 AtResponsePtr AtChannel::Conversation::operator()(
234         const RequestPipe requestPipe,
235         const std::string_view request,
236         const AtChannel::Conversation::FilterFunc& filter,
237         const AtChannel::Conversation::Duration timeout) {
238     std::future<AtResponsePtr> futureResponse;
239 
240     {
241         std::lock_guard<std::mutex> lock(mMtx);
242         mFilter = &filter;
243         mSink = decltype(mSink)();
244         futureResponse = mSink.get_future();
245     }
246 
247     if (!requestPipe(request)) {
248         std::lock_guard<std::mutex> lock(mMtx);
249         mFilter = nullptr;
250         return nullptr;
251     } else if (futureResponse.wait_for(timeout) == std::future_status::ready) {
252         return futureResponse.get();
253     } else {
254         {
255             std::lock_guard<std::mutex> lock(mMtx);
256             mFilter = nullptr;
257         }
258 
259         const int requestLen = request.size();
260         return FAILURE_V(nullptr, "Timeout for '%*.*s'",
261                          requestLen, requestLen, request.data());
262     }
263 }
264 
operator ()(const RequestPipe requestPipe,const std::string_view request,const FilterFunc & filter)265 AtResponsePtr AtChannel::Conversation::operator()(
266         const RequestPipe requestPipe,
267         const std::string_view request,
268         const FilterFunc& filter) {
269     using namespace std::chrono_literals;
270     return (*this)(requestPipe, request, filter, 3s);
271 }
272 
send(const AtResponsePtr & response)273 bool AtChannel::Conversation::send(const AtResponsePtr& response) {
274     std::lock_guard<std::mutex> lock(mMtx);
275     if (mFilter && (*mFilter)(*response)) {
276         mFilter = nullptr;
277         mSink.set_value(response);
278         return true;
279     } else {
280         return false;
281     }
282 }
283 
284 }  // namespace implementation
285 }  // namespace radio
286 }  // namespace hardware
287 }  // namespace android
288 }  // namespace aidl
289