1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
mod connection;
mod handler;

use bstr::BString;
use num_traits::FromPrimitive;
use std::{
    net::{SocketAddr, SocketAddrV4},
    time::Instant,
};
use tokio::net::UdpSocket;
use tracing::{debug, error, info, warn};

use crate::{RakPeerConfig, ID};
pub use connection::RemoteSystem;
pub use handler::PacketHandler;

pub struct RakPeer<H> {
    handler: H,
    socket: UdpSocket,
    local: SocketAddrV4,
}

impl<H: PacketHandler> RakPeer<H> {
    pub async fn new(local: SocketAddrV4, handler: H) -> Result<Self, tokio::io::Error> {
        let socket = UdpSocket::bind(local).await?;
        info!("Server started on {:?}", local);
        Ok(Self {
            socket,
            local,
            handler,
        })
    }

    pub async fn run(&mut self, password: String) -> Result<(), tokio::io::Error> {
        let start = Instant::now();
        let mut buf = vec![0; 2048];
        let mut connections: Vec<RemoteSystem> = Vec::new();
        let peer_config = RakPeerConfig {
            max_incoming_connections: 10,
            incoming_password: BString::from(password),
        };

        loop {
            for conn in &mut connections {
                conn.update(&self.socket).await?;
            }

            let (length, remote) = self.socket.recv_from(&mut buf).await?;
            let origin = match remote {
                SocketAddr::V4(v4) => v4,
                SocketAddr::V6(_v6) => {
                    eprintln!("IPv6 not supported");
                    continue;
                }
            };
            let bytes = &buf[..length];

            let conn = connections.iter_mut().find(|x| x.addr == origin);

            debug!("{} bytes from {}", bytes.len(), origin);
            if let Some(connection) = conn {
                connection.on_packet(
                    bytes,
                    self.local,
                    Instant::now().duration_since(start),
                    &peer_config,
                    &mut self.handler,
                );
                connection.update(&self.socket).await?;
                if connection.queue.is_empty() && connection.pending_disconnect() {
                    if let Some(index) = connections.iter_mut().position(|x| x.addr == origin) {
                        connections.remove(index);
                    } else {
                        warn!("Failed to find connection {} to remove!", origin);
                    }
                }
            } else {
                let id_byte = match ID::of_packet(bytes) {
                    Ok(b) => b,
                    Err(e) => {
                        error!("{}", e);
                        continue;
                    }
                };
                match ID::from_u8(id_byte) {
                    Some(id) => {
                        debug!("raw: {:?}", id);
                        match id {
                            ID::OpenConnectionRequest => {
                                let reply = if true {
                                    connections.push(RemoteSystem::new(origin));
                                    ID::OpenConnectionReply
                                } else {
                                    ID::NoFreeIncomingConnections
                                };
                                self.socket.send_to(&[reply as u8, 0], origin).await?;
                            }
                            _ => debug!("bytes: {:?}", &bytes[1..]),
                        }
                    }
                    None => error!("Missing or invalid first byte from {}: {:?}", origin, bytes),
                }
            }
        }
    }
}