• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::io;
15 use std::ops::Deref;
16 
17 use ylong_io::{Interest, Source};
18 
19 use crate::io::poll_ready;
20 use crate::net::ScheduleIO;
21 use crate::util::slab::Ref;
22 
23 cfg_not_ffrt!(
24     use std::sync::Arc;
25     use crate::executor::driver::Handle;
26 );
27 
28 cfg_net!(
29     use std::task::{Context, Poll};
30     use std::mem::MaybeUninit;
31     use crate::io::ReadBuf;
32     use crate::net::ReadyEvent;
33     use std::io::{Read, Write};
34 );
35 
36 /// Wrapper that turns a sync `Source` io into an async one. This struct
37 /// interacts with the reactor of the runtime.
38 pub(crate) struct AsyncSource<E: Source> {
39     /// Sync io that implements `Source` trait.
40     io: Option<E>,
41 
42     /// Entry list of the runtime's reactor, `AsyncSource` object will be
43     /// registered into it when created.
44     pub(crate) entry: Ref<ScheduleIO>,
45 
46     #[cfg(not(feature = "ffrt"))]
47     /// Handle to the IO Driver, used for deregistration
48     pub(crate) handle: Arc<Handle>,
49 }
50 
51 impl<E: Source> AsyncSource<E> {
52     /// Wraps a `Source` object into an `AsyncSource`. When the `AsyncSource`
53     /// object is created, it's fd will be registered into runtime's
54     /// reactor.
55     ///
56     /// If `interest` passed in is None, the interested event for fd
57     /// registration will be both readable and writable.
58     ///
59     /// # Error
60     ///
61     /// If no reactor is found or fd registration fails, an error will be
62     /// returned.
63     #[cfg(not(feature = "ffrt"))]
new(mut io: E, interest: Option<Interest>) -> io::Result<AsyncSource<E>>64     pub fn new(mut io: E, interest: Option<Interest>) -> io::Result<AsyncSource<E>> {
65         let inner = Handle::get_handle()?;
66 
67         let interest = interest.unwrap_or_else(|| Interest::READABLE | Interest::WRITABLE);
68         let entry = inner.io_register(&mut io, interest)?;
69         Ok(AsyncSource {
70             io: Some(io),
71             entry,
72             handle: inner,
73         })
74     }
75 
76     /// Wraps a `Source` object into an `AsyncSource`. When the `AsyncSource`
77     /// object is created, it's fd will be registered into runtime's
78     /// reactor.
79     ///
80     /// If `interest` passed in is None, the interested event for fd
81     /// registration will be both readable and writable.
82     ///
83     /// # Error
84     ///
85     /// If no reactor is found or fd registration fails, an error will be
86     /// returned.
87     #[cfg(feature = "ffrt")]
new(mut io: E, interest: Option<Interest>) -> io::Result<AsyncSource<E>>88     pub fn new(mut io: E, interest: Option<Interest>) -> io::Result<AsyncSource<E>> {
89         let inner = crate::net::IoHandle::get_ref();
90 
91         let interest = interest.unwrap_or_else(|| Interest::READABLE | Interest::WRITABLE);
92         let entry = inner.register_source(&mut io, interest)?;
93         Ok(AsyncSource {
94             io: Some(io),
95             entry,
96         })
97     }
98 
99     /// Asynchronously waits for events to happen. If the io returns
100     /// `EWOULDBLOCK`, the readiness of the io will be reset. Otherwise, the
101     /// corresponding event will be returned.
async_process<F, R>(&self, interest: Interest, mut op: F) -> io::Result<R> where F: FnMut() -> io::Result<R>,102     pub(crate) async fn async_process<F, R>(&self, interest: Interest, mut op: F) -> io::Result<R>
103     where
104         F: FnMut() -> io::Result<R>,
105     {
106         loop {
107             let ready = self.entry.readiness(interest).await?;
108             match op() {
109                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
110                     self.entry.clear_readiness(ready);
111                 }
112                 x => return x,
113             }
114         }
115     }
116 
117     cfg_net! {
118         pub(crate) fn poll_ready(
119             &self,
120             cx: &mut Context<'_>,
121             interest: Interest,
122         ) -> Poll<io::Result<ReadyEvent>> {
123             let ready = self.entry.poll_readiness(cx, interest);
124             let x = match ready {
125                 Poll::Ready(x) => x,
126                 Poll::Pending => return Poll::Pending,
127             };
128 
129             Poll::Ready(Ok(x))
130         }
131 
132         pub(crate) fn poll_io<R>(
133             &self,
134             cx: &mut Context<'_>,
135             interest: Interest,
136             mut f: impl FnMut() -> io::Result<R>,
137         ) -> Poll<io::Result<R>> {
138             loop {
139                 let ready = poll_ready!(self.poll_ready(cx, interest))?;
140 
141                 match f() {
142                     Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
143                         self.entry.clear_readiness(ready);
144                     }
145                     x => return Poll::Ready(x),
146                 }
147             }
148         }
149 
150         pub(crate) fn try_io<R> (
151             &self,
152             interest: Interest,
153             mut f: impl FnMut() -> io::Result<R>,
154         ) -> io::Result<R> {
155             let event = self.entry.get_readiness(interest);
156 
157             if event.ready.is_empty() {
158                 return Err(io::ErrorKind::WouldBlock.into());
159             }
160 
161             match f() {
162                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
163                     self.entry.clear_readiness(event);
164                     Err(io::ErrorKind::WouldBlock.into())
165                 }
166                 res => res,
167             }
168         }
169 
170         #[inline]
171         pub(crate) fn poll_read_io<R>(
172             &self,
173             cx: &mut Context<'_>,
174             f: impl FnMut() -> io::Result<R>,
175         ) -> Poll<io::Result<R>> {
176             self.poll_io(cx, Interest::READABLE, f)
177         }
178 
179         #[inline]
180         pub(crate) fn poll_write_io<R>(
181             &self,
182             cx: &mut Context<'_>,
183             f: impl FnMut() -> io::Result<R>,
184         ) -> Poll<io::Result<R>> {
185             self.poll_io(cx, Interest::WRITABLE, f)
186         }
187 
188         pub(crate) fn poll_read<'a>(
189             &'a self,
190             cx: &mut Context<'_>,
191             buf: &mut ReadBuf<'_>,
192         ) -> Poll<io::Result<()>>
193         where
194             &'a E: io::Read + 'a,
195         {
196             let ret = self.poll_read_io(cx, || unsafe {
197                 let slice = &mut *(buf.unfilled_mut() as *mut [MaybeUninit<u8>] as *mut [u8]);
198                 self.io.as_ref().unwrap().read(slice)
199             });
200             let r_len = match ret {
201                 Poll::Ready(Ok(x)) => x,
202                 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
203                 Poll::Pending => return Poll::Pending,
204             };
205             buf.assume_init(r_len);
206             buf.advance(r_len);
207 
208             Poll::Ready(Ok(()))
209         }
210 
211         pub(crate) fn poll_write<'a>(
212             &'a self,
213             cx: &mut Context<'_>,
214             buf: &[u8],
215         ) -> Poll<io::Result<usize>>
216         where
217             &'a E: io::Write + 'a,
218         {
219             self.poll_write_io(cx, || {
220                 self.io.as_ref().unwrap().write(buf)
221             })
222         }
223 
224         pub(crate) fn poll_write_vectored<'a>(
225             &'a self,
226             cx: &mut Context<'_>,
227             bufs: &[io::IoSlice<'_>],
228         ) -> Poll<io::Result<usize>>
229         where
230             &'a E: io::Write + 'a,
231         {
232             self.poll_write_io(cx, || {
233                 self.io.as_ref().unwrap().write_vectored(bufs)
234             })
235         }
236     }
237 }
238 
239 impl<E: Source> Deref for AsyncSource<E> {
240     type Target = E;
241 
deref(&self) -> &Self::Target242     fn deref(&self) -> &Self::Target {
243         self.io.as_ref().unwrap()
244     }
245 }
246 
247 // Deregisters fd when the `AsyncSource` object get dropped.
248 #[cfg(not(feature = "ffrt"))]
249 impl<E: Source> Drop for AsyncSource<E> {
drop(&mut self)250     fn drop(&mut self) {
251         if let Some(mut io) = self.io.take() {
252             let _ = self.handle.io_deregister(&mut io);
253         }
254     }
255 }
256 
257 // Deregisters fd when the `AsyncSource` object get dropped.
258 #[cfg(feature = "ffrt")]
259 impl<E: Source> Drop for AsyncSource<E> {
drop(&mut self)260     fn drop(&mut self) {
261         if let Some(io) = self.io.take() {
262             unsafe {
263                 ylong_ffrt::ffrt_poller_deregister(io.as_raw_fd() as libc::c_int);
264             }
265         }
266     }
267 }
268