use anyhow::{anyhow, Result}; use chrono::Local; use clap::Parser; use socket2::SockRef; use std::collections::HashMap; use std::convert::TryInto; use std::io::{Read, Write}; use std::net::TcpStream; use std::thread::sleep; use std::time::Duration; /// Validate that the provided port string can be parsed into a u16 and is nonzero. fn validate_port(port: &str) -> Result { match port.parse::() { Ok(p) if p != 0 => Ok(p), _ => Err(format!("Invalid port number: {}. Port must be greater than 0.", port)), } } /// Validate that the provided channel string can be parsed into a u8. fn validate_channel(channel: &str) -> Result { channel .parse::() .map_err(|_| format!("Invalid channel: {}. Channel must be between 0 and 255.", channel)) } /// Command-line arguments using Clap. #[derive(Parser, Debug)] #[command(author, version, about = "A tool for monitoring AGWPE frames from a Direwolf TNC.")] struct Cli { /// AGWPE server IP address (e.g. 127.0.0.1) #[arg(short = 'i', long)] ip: std::net::IpAddr, /// AGWPE server TCP port (e.g. 8000) #[arg(short = 'p', long, value_parser = validate_port)] port: u16, /// AGWPE channel to monitor (0-255) #[arg(short = 'c', long, value_parser = validate_channel, default_value_t = 0)] channel: u8, /// Enable debug mode #[arg(short = 'd', long, default_value_t = false)] debug: bool, /// Only monitor UI frames #[arg(short = 'u', long, default_value_t = false)] ui_only: bool, } /// Convert a byte slice into a hex-dump string for debugging purposes. /// The bytes are printed in chunks (here using chunks of 26 bytes). fn hex_dump(data: &[u8]) -> String { data.chunks(26) .map(|chunk| { chunk .iter() .map(|b| format!("{:02x}", b)) .collect::>() .join(" ") }) .collect::>() .join("\n") } /// Representation of an AGWPE frame header. #[derive(Debug, Copy, Clone)] struct AgwHeader { port: i32, data_kind: u8, _filler2: u8, _pid: u8, _filler3: u8, callfrom: [u8; 10], callto: [u8; 10], data_length: i32, _reserved: i32, } impl AgwHeader { /// Convert the null-terminated call-from field into a Rust String. fn callfrom_str(&self) -> String { let pos = self.callfrom.iter().position(|&b| b == 0).unwrap_or(self.callfrom.len()); String::from_utf8_lossy(&self.callfrom[..pos]).into_owned() } /// Convert the null-terminated call-to field into a Rust String. fn callto_str(&self) -> String { let pos = self.callto.iter().position(|&b| b == 0).unwrap_or(self.callto.len()); String::from_utf8_lossy(&self.callto[..pos]).into_owned() } } /// Representation of a complete AGWPE frame. #[derive(Debug)] struct AgwFrame { header: AgwHeader, payload: Vec, } /// Outputs a detailed debug log for a given AGW frame. /// It prints a timestamp, hex dumps for the full read, the header, and the payload, /// as well as an ASCII dump of the payload. fn debug_log_frame(raw_header: &[u8], header: &AgwHeader, payload: &[u8], full_read: &[u8]) { let timestamp = Local::now().format("%H:%M:%S").to_string(); println!("----------"); println!("[{}]", timestamp); println!("----------"); println!("Full Read (hex):"); println!("{}", hex_dump(full_read)); println!("Raw Header:"); println!("{}", hex_dump(raw_header)); println!("Parsed Header:"); println!(" Port: {}", header.port); println!(" Data Kind: '{}' (as char)", header.data_kind as char); println!(" Call From: {}", header.callfrom_str()); println!(" Call To: {}", header.callto_str()); println!(" Data Length: {}", header.data_length); println!("Payload (hex):"); println!("{}", hex_dump(payload)); println!("Payload (ascii):"); println!("{}", filter_text(payload)); } /// Parse an AGWPE header from the beginning of the provided input slice. /// Returns an error if there aren’t enough bytes. fn parse_header(input: &[u8]) -> Result { if input.len() < 36 { return Err(anyhow!("Not enough bytes for header")); } let port = i32::from_le_bytes(input[0..4].try_into()?); let data_kind = input[4]; let _filler2 = input[5]; let _pid = input[6]; let _filler3 = input[7]; let mut callfrom = [0u8; 10]; callfrom.copy_from_slice(&input[8..18]); let mut callto = [0u8; 10]; callto.copy_from_slice(&input[18..28]); let data_length = i32::from_le_bytes(input[28..32].try_into()?); let _reserved = i32::from_le_bytes(input[32..36].try_into()?); Ok(AgwHeader { port, data_kind, _filler2, _pid, _filler3, callfrom, callto, data_length, _reserved, }) } /// Parse an entire AGWPE frame from the buffer, including header and payload. /// Returns the total number of bytes consumed and the parsed frame. /// If the buffer doesn’t have enough data, an "Incomplete" error is returned. fn parse_frame(buffer: &[u8], debug: bool) -> Result<(usize, AgwFrame)> { const HEADER_SIZE: usize = 36; if buffer.len() < HEADER_SIZE { return Err(anyhow!("Incomplete: not enough bytes for header")); } let header = parse_header(&buffer[..HEADER_SIZE])?; let payload_len = header.data_length as usize; let total_len = HEADER_SIZE + payload_len; if buffer.len() < total_len { return Err(anyhow!( "Incomplete: need {} bytes total, have {}", total_len, buffer.len() )); } let payload = buffer[HEADER_SIZE..total_len].to_vec(); if debug { // Log the complete frame if debug mode is enabled. debug_log_frame(&buffer[..HEADER_SIZE], &header, &payload, &buffer[..total_len]); } Ok((total_len, AgwFrame { header, payload })) } /// Filter the provided data to only include printable ASCII characters (plus common whitespace). fn filter_text(data: &[u8]) -> String { data.iter() .filter(|&&b| (32..=126).contains(&b) || b == b'\r' || b == b'\n' || b == b'\t') .map(|&b| b as char) .collect() } /// Search for the "Via " marker in the payload and extract the associated chain value. /// Returns None if the marker is not found. fn extract_chain_via(payload: &str) -> Option { if let Some(pos) = payload.find("Via ") { let remaining = &payload[pos + 4..]; if let Some(end) = remaining.find(|c: char| c.is_whitespace() || c == '<') { Some(remaining[..end].to_string()) } else { Some(remaining.to_string()) } } else { None } } /// Generate a canonical session key from the port and two call signs. /// The call signs are sorted in uppercase order to ensure consistency. fn canonical_session_key(port: i32, c1: &str, c2: &str) -> String { let (low, high) = if c1.to_uppercase() <= c2.to_uppercase() { (c1, c2) } else { (c2, c1) }; format!("p{}:{}+{}", port, low, high) } /// Extracts and formats a summary of the inner command from the payload. /// This function looks for the last "<...>" block and processes it. /// Depending on the command token (such as SABME, UA, UI, etc.), it formats a summary. /// For I and RR frames, it may filter out tokens containing '='. fn format_payload_summary(payload: &str) -> String { // Look for the last occurrence of '<' if let Some(start_idx) = payload.rfind('<') { // Find the matching '>' after the '<' if let Some(end_idx) = payload[start_idx..].find('>') { let inner = payload[start_idx + 1..start_idx + end_idx].trim(); let tokens: Vec<&str> = inner.split_whitespace().collect(); if tokens.is_empty() { return "".to_string(); } // Match on the first token to decide the summary format. match tokens[0] { "SABME" => "SABME".to_string(), "SABM" => "SABM".to_string(), "UA" => "UA".to_string(), "UI" => "UI".to_string(), "DISC" => "DISC".to_string(), "XID" => "XID".to_string(), "DM" => "DM".to_string(), "I" => { if inner.contains("pid=") { let filtered: Vec<&str> = tokens.iter() .filter(|t| !t.contains('=')) .cloned() .collect(); if filtered.len() >= 5 { format!("I {} {} {} {}", filtered[1], filtered[2], filtered[3], filtered[4]) } else { filtered.join(" ") } } else if tokens.len() >= 5 { format!("I {} {} {} {}", tokens[1], tokens[2], tokens[3], tokens[4]) } else { inner.to_string() } }, "RR" => { if inner.contains("=") { let filtered: Vec<&str> = tokens.iter() .filter(|t| !t.contains('=')) .cloned() .collect(); if filtered.len() >= 2 { format!("RR {}", filtered[1]) } else { filtered.join(" ") } } else if tokens.len() >= 2 { format!("RR {}", tokens[1]) } else { inner.to_string() } }, _ => inner.to_string(), } } else { // No closing '>' found; return the trimmed payload. payload.trim().to_string() } } else { // No '<' found; return the trimmed payload. payload.trim().to_string() } } /// Prints a single session line with a timestamp, source, destination, and summary. fn print_session_line(timestamp: &str, source: &str, destination: &str, summary: &str) { let line = format!("{} {}>{} <{}>", timestamp, source, destination, summary); println!("{}", line); } /// Holds any partial lines of text that have not yet been processed for a session. struct SessionBuffer { partial: String, } /// Manages buffers for multiple sessions using a HashMap keyed by session. struct BufferManager { sessions: HashMap, } impl BufferManager { /// Create a new empty BufferManager. fn new() -> Self { BufferManager { sessions: HashMap::new(), } } /// Append new text to the session’s buffer and extract complete lines. /// Lines are split on any occurrence of '\r' or '\n'. fn append_and_extract_lines(&mut self, key: &str, text: &str) -> Vec { // Get or create the buffer for this session. let session = self .sessions .entry(key.to_string()) .or_insert(SessionBuffer { partial: String::new(), }); session.partial.push_str(text); let mut lines = Vec::new(); // Look for any newline or carriage return. while let Some(index) = session.partial.find(&['\r', '\n'][..]) { // Drain up to and including the newline. let line: String = session.partial.drain(..=index).collect(); let trimmed = line.trim_end_matches(&['\r', '\n'][..]).to_string(); if !trimmed.is_empty() { lines.push(trimmed); } } lines } } /// Process a single AGW frame. /// This function: /// - Verifies the channel matches the CLI-specified channel. /// - Extracts source and destination call signs. /// - Filters out frames destined for "NODES" and frames with an XID payload. /// - Optionally filters to only UI frames if requested. /// - Buffers multi-line frames and prints a formatted session line. fn handle_frame(frame: &AgwFrame, cli: &Cli, buffers: &mut BufferManager) { let hdr = &frame.header; // Process only frames on the specified channel. if hdr.port != cli.channel as i32 { return; } let source = hdr.callfrom_str(); let basic_destination = hdr.callto_str(); let timestamp = Local::now().format("%H:%M:%S").to_string(); // Filter and compute the text from the payload only once. let text = filter_text(&frame.payload); // Extract any VIA chain information if present. let chain = extract_chain_via(&text); // Combine the basic destination with the chain, if available. let final_destination = if let Some(chain_str) = chain { format!("{},{}", basic_destination, chain_str) } else { basic_destination.clone() }; // Ignore frames where the basic destination contains "NODES" (case‑insensitive). if basic_destination.to_uppercase().contains("NODES") { return; } // Generate a canonical key for the session. let key = canonical_session_key(hdr.port, &source, &final_destination); // Append the text to the session buffer and extract complete lines. let lines = buffers.append_and_extract_lines(&key, &text); // Use the first complete line to generate a payload summary. let summary = if !lines.is_empty() { format_payload_summary(&lines[0]) } else { "".to_string() }; // Ignore frames with a payload summary of "XID". if summary == "XID" { return; } // If the CLI requests only UI frames, skip non-UI frames. if cli.ui_only && summary != "UI" { return; } // In non-debug mode, print the session line and any additional lines. if !cli.debug { print_session_line(×tamp, &source, &final_destination, &summary); if lines.len() > 1 { for line in &lines[1..] { println!("{}", line); } } } } /// Main entry point: /// - Parses CLI options. /// - Connects to the AGWPE server and sends the monitor command. /// - Enters a loop to read frames, parse them, and process each frame. /// - On disconnection, waits a few seconds before attempting to reconnect. fn main() -> Result<()> { let cli = Cli::parse(); let addr = format!("{}:{}", cli.ip, cli.port); let reconnect_delay_ms = 5000; loop { println!("Connecting to AGWPE server at {addr}"); match TcpStream::connect(&addr) { Ok(mut stream) => { // Enable TCP keepalive on the connection. SockRef::from(&stream).set_keepalive(true)?; // Prepare the monitor command: // Set the first byte to the channel and the fifth byte to 'm' let mut cmd_buf = [0u8; 36]; cmd_buf[0] = cli.channel; cmd_buf[4] = b'm'; stream.write_all(&cmd_buf) .map_err(|e| anyhow!("Failed to send monitor command: {e}"))?; println!("Sent monitor command on channel {}. Waiting for frames...", cli.channel); let mut buffer = Vec::new(); let mut temp_buf = [0u8; 1024]; let mut buffers = BufferManager::new(); // Main read loop for the established connection. loop { match stream.read(&mut temp_buf) { Ok(0) => { println!("Connection closed by server."); break; } Ok(n) => { buffer.extend_from_slice(&temp_buf[..n]); // Attempt to parse complete frames from the buffer. while buffer.len() >= 36 { match parse_frame(&buffer, cli.debug) { Ok((consumed, frame)) => { handle_frame(&frame, &cli, &mut buffers); // Remove the processed frame from the buffer. buffer.drain(0..consumed); } Err(e) => { // If the error indicates an incomplete frame, break and wait for more data. if e.to_string().contains("Incomplete") { break; } else { eprintln!("Parsing error: {e}"); // Skip one byte and try again to avoid a dead loop. buffer.drain(0..1); } } } } } Err(e) => { eprintln!("Read error: {e}"); break; } } } } Err(e) => { eprintln!("Failed to connect: {e}"); } } println!("Disconnected. Reconnecting in {} ms...", reconnect_delay_ms); sleep(Duration::from_millis(reconnect_delay_ms)); } }