1
//! Support for asynchronous packet iteration.
2
//!
3
//! See [`Capture::stream`](super::Capture::stream).
4
use std::io;
5
use std::marker::Unpin;
6
use std::pin::Pin;
7
use std::task::{self, Poll};
8

            
9
use futures::ready;
10
use tokio::io::unix::AsyncFd;
11

            
12
use crate::{
13
    capture::{selectable::SelectableCapture, Activated, Capture},
14
    codec::PacketCodec,
15
    Error,
16
};
17

            
18
/// Implement Stream for async use of pcap
19
pub struct PacketStream<T: Activated + ?Sized, C> {
20
    inner: AsyncFd<SelectableCapture<T>>,
21
    codec: C,
22
}
23

            
24
impl<T: Activated + ?Sized, C> PacketStream<T, C> {
25
    pub(crate) fn new(capture: Capture<T>, codec: C) -> Result<Self, Error> {
26
        let capture = SelectableCapture::new(capture)?;
27
        Ok(PacketStream {
28
            inner: AsyncFd::with_interest(capture, tokio::io::Interest::READABLE)?,
29
            codec,
30
        })
31
    }
32

            
33
    /// Returns a mutable reference to the inner [`Capture`].
34
    ///
35
    /// The caller must ensure the capture will not be set to be blocking.
36
    pub fn capture_mut(&mut self) -> &mut Capture<T> {
37
        self.inner.get_mut().get_inner_mut()
38
    }
39
}
40

            
41
impl<T: Activated + ?Sized, C> Unpin for PacketStream<T, C> {}
42

            
43
impl<T: Activated + ?Sized, C: PacketCodec> futures::Stream for PacketStream<T, C> {
44
    type Item = Result<C::Item, Error>;
45

            
46
    fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Option<Self::Item>> {
47
        let stream = Pin::into_inner(self);
48
        let codec = &mut stream.codec;
49

            
50
        loop {
51
            let mut guard = ready!(stream.inner.poll_read_ready_mut(cx))?;
52
            match guard.try_io(
53
                |inner| match inner.get_mut().get_inner_mut().next_packet() {
54
                    Ok(p) => Ok(Ok(codec.decode(p))),
55
                    Err(e @ Error::TimeoutExpired) => {
56
                        Err(io::Error::new(io::ErrorKind::WouldBlock, e))
57
                    }
58
                    Err(e) => Ok(Err(e)),
59
                },
60
            ) {
61
                Ok(result) => {
62
                    return Poll::Ready(Some(result?));
63
                }
64
                Err(_would_block) => continue,
65
            }
66
        }
67
    }
68
}