package network; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Stack; import network.protocol.Packet; import network.protocol.packets.AcknowlegdementPacket; import network.protocol.packets.DisconnectPacket; public class PacketPool { private final Stack packetQueue; private final Map addressContexts; private final Socket socket; private static int MAX_SEND_TRY = 50; private static long SEND_DELAY = 10; private static long RETRY_INTERVAL = SEND_DELAY * 2; private static float PACKET_LOSS_PROBABILITY = 0.1f; private static record ReliablePacketAddress(ReliablePacket packet, InetSocketAddress address) { @Override public final int hashCode() { return Objects.hash(packet, address); } @Override public final boolean equals(Object arg0) { if (arg0 instanceof ReliablePacketAddress packetAddress) return packetAddress.address().equals(this.address()) && packetAddress.packet().getSeq() == this.packet().getSeq(); return false; } } /** * Data specific to one connexion */ private static class AdressContext { public int currentSeqRecv = 0; public int currentSeqSend = 0; public final List sendThreads = new ArrayList<>(); public final List packetRecvBuffer = new ArrayList<>(); public final Map packetsSentTries = new HashMap<>(); } /** * @param address the address to send to * @return The next send sequence */ private int getNextSeqSend(InetSocketAddress address) { return this.addressContexts.get(address).currentSeqSend++; } /** * @param address the address which a packet was recieved * @return the current recieve sequence number */ private int getCurrentSeqRecv(InetSocketAddress address) { return this.addressContexts.get(address).currentSeqRecv; } /** * Set the recieve sequence number * @param address * @param newValue */ private void setSeqRecv(InetSocketAddress address, int newValue) { this.addressContexts.get(address).currentSeqRecv = newValue; } /** * Try to add the address into memory * @param address */ private void tryAddContext(InetSocketAddress address) { this.addressContexts.putIfAbsent(address, new AdressContext()); } /** * Construct a PacketPool * @param socket */ public PacketPool(Socket socket) { this.socket = socket; this.packetQueue = new Stack<>(); this.addressContexts = new HashMap<>(); } private void debugPrint(String msg) { // System.out.println(msg); } private void debugSend(ReliablePacket packet, InetSocketAddress address) { boolean client = address.getPort() == 6665; debugPrint((client ? "[Client]" : "[Server]") + " Sent " + packet.getPacket().getClass().getName() + " with seq : " + packet.getSeq()); } private void debugRecv(ReliablePacket packet, InetSocketAddress address) { boolean client = address.getPort() == 6665; debugPrint((client ? "[Client]" : "[Server]") + " Received " + packet.getPacket().getClass().getName() + " with seq : " + packet.getSeq()); } /** * Send a packet to the socket * @param packet * @param address * @throws IOException */ private void sendPacketToSocket(ReliablePacket packet, InetSocketAddress address) throws IOException { var packetsSentTries = this.addressContexts.get(address).packetsSentTries; new Thread(() -> { try { Thread.sleep(SEND_DELAY); if (Math.random() > PACKET_LOSS_PROBABILITY) this.socket.sendPacket(packet, address); } catch (InterruptedException | IOException e) { // e.printStackTrace(); } }).start(); if (packet.getPacket() instanceof AcknowlegdementPacket) return; Integer count = packetsSentTries.get(packet); if (count == null) { packetsSentTries.put(packet, 1); } else { packetsSentTries.put(packet, count + 1); } } /** * Send a packet to socket and try to resend if an acknowlegment was not recieved * @param packet * @param address * @throws IOException */ private synchronized void sendPacket(ReliablePacket packet, InetSocketAddress address) throws IOException { sendPacketToSocket(packet, address); debugSend(packet, address); ReliablePacketAddress reliablePacketAddress = new ReliablePacketAddress(packet, address); if (!(packet.getPacket() instanceof AcknowlegdementPacket)) { Thread newThread = new Thread(() -> tryResend(reliablePacketAddress)); this.addressContexts.get(address).sendThreads.add(newThread); newThread.start(); } } /** * Send a packet (and encapsulate it) * @param packet * @param address * @throws IOException */ public synchronized void sendPacket(Packet packet, InetSocketAddress address) throws IOException { tryAddContext(address); ReliablePacket reliablePacket = new ReliablePacket(packet, getNextSeqSend(address)); sendPacket(reliablePacket, address); } /** * Try to resend a packet * @param reliablePacketAddress */ private void tryResend(ReliablePacketAddress reliablePacketAddress) { try { while (!Thread.interrupted()) { Thread.sleep(RETRY_INTERVAL); var packetsSentTries = this.addressContexts.get(reliablePacketAddress.address()).packetsSentTries; // the packet has been received if (!packetsSentTries.containsKey(reliablePacketAddress.packet())) break; Integer sendCount = packetsSentTries.get(reliablePacketAddress.packet()); if (sendCount > MAX_SEND_TRY) { close(reliablePacketAddress.address()); debugPrint( "Packet" + reliablePacketAddress.packet() + " not send after " + MAX_SEND_TRY + " tries"); // simulating a fake disconnect packet this.socket.handlePacket(new DisconnectPacket("Timed out"), reliablePacketAddress.address()); break; } boolean client = reliablePacketAddress.address().getPort() == 6665; debugPrint((client ? "[Client]" : "[Server]") + " Trying to resend the packet " + reliablePacketAddress.packet().getSeq() + " ..."); sendPacketToSocket(reliablePacketAddress.packet(), reliablePacketAddress.address()); } } catch (InterruptedException e) { // e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } AdressContext ctx = this.addressContexts.get(reliablePacketAddress.address()); if (ctx != null) ctx.sendThreads.remove(Thread.currentThread()); } /** * @param address * @return The smallest sequence number recieved */ private ReliablePacket getMinimumSeqReceived(InetSocketAddress address) { List packetRecvBuffer = this.addressContexts.get(address).packetRecvBuffer; if (packetRecvBuffer.isEmpty()) return null; return Collections.min(packetRecvBuffer, (rel1, rel2) -> { return Integer.compare(rel1.getSeq(), rel2.getSeq()); }); } /** * Move packets in buffer to the packet recieved queue to be processed by the app * @param address * @return the sequence number of the last packet that was added to the queue */ private int fillPacketQueue(InetSocketAddress address) { List packetRecvBuffer = this.addressContexts.get(address).packetRecvBuffer; ReliablePacket minimum = getMinimumSeqReceived(address); int lastSeqProcessed = -1; while (true) { this.packetQueue.add(new ReliablePacketAddress(minimum, address)); packetRecvBuffer.remove(minimum); lastSeqProcessed = minimum.getSeq(); ReliablePacket nextMinimum = getMinimumSeqReceived(address); if (nextMinimum == null || nextMinimum.getSeq() != minimum.getSeq() + 1) break; minimum = nextMinimum; } Collections.reverse(this.packetQueue); return lastSeqProcessed; } /** * Process packet when recieved * @param packet * @param address * @throws IOException */ public void onPacketReceived(ReliablePacket packet, InetSocketAddress address) throws IOException { tryAddContext(address); var packetsSentTries = this.addressContexts.get(address).packetsSentTries; if (packet.getPacket() instanceof AcknowlegdementPacket ackPacket) { assert (ackPacket.getAck() != -1); for (var entry : packetsSentTries.entrySet()) { ReliablePacket reliablePacket = entry.getKey(); if (entry.getKey().getSeq() == ackPacket.getAck()) { packetsSentTries.remove(reliablePacket); break; } } return; } if (this.addressContexts.get(address).packetRecvBuffer.contains(packet)) { debugPrint("The packet has already been received !"); sendPacketToSocket( new ReliablePacket(new AcknowlegdementPacket(packet.getSeq()), -1), address); return; } if (packet.getSeq() < getCurrentSeqRecv(address)) { debugPrint("Packet too old, current : " + getCurrentSeqRecv(address)); sendPacketToSocket( new ReliablePacket(new AcknowlegdementPacket(packet.getSeq()), -1), address); return; } this.addressContexts.get(address).packetRecvBuffer.add(packet); debugRecv(packet, address); sendPacketToSocket( new ReliablePacket(new AcknowlegdementPacket(packet.getSeq()), -1), address); // got the packet in the right order if (packet.getSeq() == getCurrentSeqRecv(address)) { setSeqRecv(address, fillPacketQueue(address) + 1); } } /** * @return The next packet in the queue */ public Entry getNextPacket() { if (this.packetQueue.isEmpty()) return null; ReliablePacketAddress last = this.packetQueue.pop(); var entry = Map.entry(last.packet().getPacket(), last.address); return entry; } /** * Closes the connexion * @param adress */ private void close(InetSocketAddress adress) { var ctx = this.addressContexts.get(adress); if (ctx != null) close(ctx); } /** * Stop the threads of the connexion * @param adressContext */ private void close(AdressContext adressContext) { for (Thread thread : adressContext.sendThreads) { thread.interrupt(); } } /** * Stop the PacketPool */ public void close() { for (AdressContext adressContext : this.addressContexts.values()) { close(adressContext); } } }