1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use std::io; 15 use std::ops::Deref; 16 17 use ylong_io::{Interest, Source}; 18 19 use crate::io::poll_ready; 20 use crate::net::ScheduleIO; 21 use crate::util::slab::Ref; 22 23 cfg_not_ffrt!( 24 use std::sync::Arc; 25 use crate::executor::driver::Handle; 26 ); 27 28 cfg_net!( 29 use std::task::{Context, Poll}; 30 use std::mem::MaybeUninit; 31 use crate::io::ReadBuf; 32 use crate::net::ReadyEvent; 33 use std::io::{Read, Write}; 34 ); 35 36 /// Wrapper that turns a sync `Source` io into an async one. This struct 37 /// interacts with the reactor of the runtime. 38 pub(crate) struct AsyncSource<E: Source> { 39 /// Sync io that implements `Source` trait. 40 io: Option<E>, 41 42 /// Entry list of the runtime's reactor, `AsyncSource` object will be 43 /// registered into it when created. 44 pub(crate) entry: Ref<ScheduleIO>, 45 46 #[cfg(not(feature = "ffrt"))] 47 /// Handle to the IO Driver, used for deregistration 48 pub(crate) handle: Arc<Handle>, 49 } 50 51 impl<E: Source> AsyncSource<E> { 52 /// Wraps a `Source` object into an `AsyncSource`. When the `AsyncSource` 53 /// object is created, it's fd will be registered into runtime's 54 /// reactor. 55 /// 56 /// If `interest` passed in is None, the interested event for fd 57 /// registration will be both readable and writable. 58 /// 59 /// # Error 60 /// 61 /// If no reactor is found or fd registration fails, an error will be 62 /// returned. 63 #[cfg(not(feature = "ffrt"))] new(mut io: E, interest: Option<Interest>) -> io::Result<AsyncSource<E>>64 pub fn new(mut io: E, interest: Option<Interest>) -> io::Result<AsyncSource<E>> { 65 let inner = Handle::get_handle()?; 66 67 let interest = interest.unwrap_or_else(|| Interest::READABLE | Interest::WRITABLE); 68 let entry = inner.io_register(&mut io, interest)?; 69 Ok(AsyncSource { 70 io: Some(io), 71 entry, 72 handle: inner, 73 }) 74 } 75 76 /// Wraps a `Source` object into an `AsyncSource`. When the `AsyncSource` 77 /// object is created, it's fd will be registered into runtime's 78 /// reactor. 79 /// 80 /// If `interest` passed in is None, the interested event for fd 81 /// registration will be both readable and writable. 82 /// 83 /// # Error 84 /// 85 /// If no reactor is found or fd registration fails, an error will be 86 /// returned. 87 #[cfg(feature = "ffrt")] new(mut io: E, interest: Option<Interest>) -> io::Result<AsyncSource<E>>88 pub fn new(mut io: E, interest: Option<Interest>) -> io::Result<AsyncSource<E>> { 89 let inner = crate::net::IoHandle::get_ref(); 90 91 let interest = interest.unwrap_or_else(|| Interest::READABLE | Interest::WRITABLE); 92 let entry = inner.register_source(&mut io, interest)?; 93 Ok(AsyncSource { 94 io: Some(io), 95 entry, 96 }) 97 } 98 99 /// Asynchronously waits for events to happen. If the io returns 100 /// `EWOULDBLOCK`, the readiness of the io will be reset. Otherwise, the 101 /// corresponding event will be returned. async_process<F, R>(&self, interest: Interest, mut op: F) -> io::Result<R> where F: FnMut() -> io::Result<R>,102 pub(crate) async fn async_process<F, R>(&self, interest: Interest, mut op: F) -> io::Result<R> 103 where 104 F: FnMut() -> io::Result<R>, 105 { 106 loop { 107 let ready = self.entry.readiness(interest).await?; 108 match op() { 109 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 110 self.entry.clear_readiness(ready); 111 } 112 x => return x, 113 } 114 } 115 } 116 117 cfg_net! { 118 pub(crate) fn poll_ready( 119 &self, 120 cx: &mut Context<'_>, 121 interest: Interest, 122 ) -> Poll<io::Result<ReadyEvent>> { 123 let ready = self.entry.poll_readiness(cx, interest); 124 let x = match ready { 125 Poll::Ready(x) => x, 126 Poll::Pending => return Poll::Pending, 127 }; 128 129 Poll::Ready(Ok(x)) 130 } 131 132 pub(crate) fn poll_io<R>( 133 &self, 134 cx: &mut Context<'_>, 135 interest: Interest, 136 mut f: impl FnMut() -> io::Result<R>, 137 ) -> Poll<io::Result<R>> { 138 loop { 139 let ready = poll_ready!(self.poll_ready(cx, interest))?; 140 141 match f() { 142 Err(e) if e.kind() == io::ErrorKind::WouldBlock => { 143 self.entry.clear_readiness(ready); 144 } 145 x => return Poll::Ready(x), 146 } 147 } 148 } 149 150 pub(crate) fn try_io<R> ( 151 &self, 152 interest: Interest, 153 mut f: impl FnMut() -> io::Result<R>, 154 ) -> io::Result<R> { 155 let event = self.entry.get_readiness(interest); 156 157 if event.ready.is_empty() { 158 return Err(io::ErrorKind::WouldBlock.into()); 159 } 160 161 match f() { 162 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 163 self.entry.clear_readiness(event); 164 Err(io::ErrorKind::WouldBlock.into()) 165 } 166 res => res, 167 } 168 } 169 170 #[inline] 171 pub(crate) fn poll_read_io<R>( 172 &self, 173 cx: &mut Context<'_>, 174 f: impl FnMut() -> io::Result<R>, 175 ) -> Poll<io::Result<R>> { 176 self.poll_io(cx, Interest::READABLE, f) 177 } 178 179 #[inline] 180 pub(crate) fn poll_write_io<R>( 181 &self, 182 cx: &mut Context<'_>, 183 f: impl FnMut() -> io::Result<R>, 184 ) -> Poll<io::Result<R>> { 185 self.poll_io(cx, Interest::WRITABLE, f) 186 } 187 188 pub(crate) fn poll_read<'a>( 189 &'a self, 190 cx: &mut Context<'_>, 191 buf: &mut ReadBuf<'_>, 192 ) -> Poll<io::Result<()>> 193 where 194 &'a E: io::Read + 'a, 195 { 196 let ret = self.poll_read_io(cx, || unsafe { 197 let slice = &mut *(buf.unfilled_mut() as *mut [MaybeUninit<u8>] as *mut [u8]); 198 self.io.as_ref().unwrap().read(slice) 199 }); 200 let r_len = match ret { 201 Poll::Ready(Ok(x)) => x, 202 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), 203 Poll::Pending => return Poll::Pending, 204 }; 205 buf.assume_init(r_len); 206 buf.advance(r_len); 207 208 Poll::Ready(Ok(())) 209 } 210 211 pub(crate) fn poll_write<'a>( 212 &'a self, 213 cx: &mut Context<'_>, 214 buf: &[u8], 215 ) -> Poll<io::Result<usize>> 216 where 217 &'a E: io::Write + 'a, 218 { 219 self.poll_write_io(cx, || { 220 self.io.as_ref().unwrap().write(buf) 221 }) 222 } 223 224 pub(crate) fn poll_write_vectored<'a>( 225 &'a self, 226 cx: &mut Context<'_>, 227 bufs: &[io::IoSlice<'_>], 228 ) -> Poll<io::Result<usize>> 229 where 230 &'a E: io::Write + 'a, 231 { 232 self.poll_write_io(cx, || { 233 self.io.as_ref().unwrap().write_vectored(bufs) 234 }) 235 } 236 } 237 } 238 239 impl<E: Source> Deref for AsyncSource<E> { 240 type Target = E; 241 deref(&self) -> &Self::Target242 fn deref(&self) -> &Self::Target { 243 self.io.as_ref().unwrap() 244 } 245 } 246 247 // Deregisters fd when the `AsyncSource` object get dropped. 248 #[cfg(not(feature = "ffrt"))] 249 impl<E: Source> Drop for AsyncSource<E> { drop(&mut self)250 fn drop(&mut self) { 251 if let Some(mut io) = self.io.take() { 252 let _ = self.handle.io_deregister(&mut io); 253 } 254 } 255 } 256 257 // Deregisters fd when the `AsyncSource` object get dropped. 258 #[cfg(feature = "ffrt")] 259 impl<E: Source> Drop for AsyncSource<E> { drop(&mut self)260 fn drop(&mut self) { 261 if let Some(io) = self.io.take() { 262 unsafe { 263 ylong_ffrt::ffrt_poller_deregister(io.as_raw_fd() as libc::c_int); 264 } 265 } 266 } 267 } 268