uggly reliable

This commit is contained in:
2025-03-06 11:46:45 +01:00
parent 6950810b95
commit c8a748fe71
12 changed files with 401 additions and 92 deletions

View File

@@ -0,0 +1,253 @@
package network;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Stack;
import network.protocol.Packet;
import network.protocol.packets.AcknowlegdementPacket;
import network.protocol.packets.DisconnectPacket;
public class PacketPool {
private final Stack<ReliablePacketAddress> packetQueue;
private final Map<InetSocketAddress, AdressContext> addressContexts;
private final Socket socket;
private static int MAX_SEND_TRY = 50;
private static long RETRY_INTERVAL = 100;
private static float PACKET_LOSS_PROBABILITY = 0.75f;
private static record ReliablePacketAddress(ReliablePacket packet, InetSocketAddress address) {
@Override
public final int hashCode() {
return Objects.hash(packet, address);
}
@Override
public final boolean equals(Object arg0) {
if (arg0 instanceof ReliablePacketAddress packetAddress)
return packetAddress.address().equals(this.address())
&& packetAddress.packet().getSeq() == this.packet().getSeq();
return false;
}
}
private static class AdressContext {
public int currentSeqRecv = 0;
public int currentSeqSend = 0;
public final List<Thread> sendThreads = new ArrayList<>();
public final List<ReliablePacket> packetRecvBuffer = new ArrayList<>();
public final Map<ReliablePacket, Integer> packetsSentTries = new HashMap<>();
}
private int getNextSeqSend(InetSocketAddress address) {
return this.addressContexts.get(address).currentSeqSend++;
}
private int getCurrentSeqRecv(InetSocketAddress address) {
return this.addressContexts.get(address).currentSeqRecv;
}
private void setSeqRecv(InetSocketAddress address, int newValue) {
this.addressContexts.get(address).currentSeqRecv = newValue;
}
private void tryAddContext(InetSocketAddress address) {
this.addressContexts.putIfAbsent(address, new AdressContext());
}
public PacketPool(Socket socket) {
this.socket = socket;
this.packetQueue = new Stack<>();
this.addressContexts = new HashMap<>();
}
private void debugPrint(String msg) {
// System.out.println(msg);
}
private void debugSend(ReliablePacket packet, InetSocketAddress address) {
boolean client = address.getPort() == 6665;
debugPrint((client ? "[Client]" : "[Server]") + " Sent " + packet.getPacket().getClass().getName()
+ " with seq : " + packet.getSeq() + " and ack " + packet.getAck());
}
private void debugRecv(ReliablePacket packet, InetSocketAddress address) {
boolean client = address.getPort() == 6665;
debugPrint((client ? "[Client]" : "[Server]") + " Received " + packet.getPacket().getClass().getName()
+ " with seq : " + packet.getSeq() + " and ack " + packet.getAck());
}
private void sendPacketToSocket(ReliablePacket packet, InetSocketAddress address) throws IOException {
var packetsSentTries = this.addressContexts.get(address).packetsSentTries;
if (Math.random() > PACKET_LOSS_PROBABILITY)
this.socket.sendPacket(packet, address);
if (packet.getPacket() instanceof AcknowlegdementPacket)
return;
Integer count = packetsSentTries.get(packet);
if (count == null) {
packetsSentTries.put(packet, 1);
} else {
packetsSentTries.put(packet, count + 1);
}
}
private synchronized void sendPacket(ReliablePacket packet, InetSocketAddress address) throws IOException {
sendPacketToSocket(packet, address);
debugSend(packet, address);
ReliablePacketAddress reliablePacketAddress = new ReliablePacketAddress(packet, address);
if (!(packet.getPacket() instanceof AcknowlegdementPacket)) {
Thread newThread = new Thread(() -> tryResend(reliablePacketAddress));
this.addressContexts.get(address).sendThreads.add(newThread);
newThread.start();
}
}
public synchronized void sendPacket(Packet packet, InetSocketAddress address) throws IOException {
tryAddContext(address);
ReliablePacket reliablePacket = new ReliablePacket(packet, getNextSeqSend(address), -1);
sendPacket(reliablePacket, address);
}
private void tryResend(ReliablePacketAddress reliablePacketAddress) {
try {
while (!Thread.interrupted()) {
Thread.sleep(RETRY_INTERVAL);
var packetsSentTries = this.addressContexts.get(reliablePacketAddress.address()).packetsSentTries;
// the packet has been received
if (!packetsSentTries.containsKey(reliablePacketAddress.packet()))
break;
Integer sendCount = packetsSentTries.get(reliablePacketAddress.packet());
if (sendCount > MAX_SEND_TRY) {
close(reliablePacketAddress.address());
debugPrint("Packet" + reliablePacketAddress.packet() + " not send after " + MAX_SEND_TRY + " tries");
// simulating a fake disconnect packet
this.socket.handlePacket(new DisconnectPacket("Timed out"), reliablePacketAddress.address());
break;
}
boolean client = reliablePacketAddress.address().getPort() == 6665;
debugPrint((client ? "[Client]" : "[Server]") + " Trying to resend the packet "
+ reliablePacketAddress.packet().getSeq() + " ...");
sendPacketToSocket(reliablePacketAddress.packet(), reliablePacketAddress.address());
}
} catch (InterruptedException e) {
// e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
AdressContext ctx = this.addressContexts.get(reliablePacketAddress.address());
if (ctx != null)
ctx.sendThreads.remove(Thread.currentThread());
}
private ReliablePacket getMinimumSeqReceived(InetSocketAddress address) {
List<ReliablePacket> packetRecvBuffer = this.addressContexts.get(address).packetRecvBuffer;
if (packetRecvBuffer.isEmpty())
return null;
return Collections.min(packetRecvBuffer, (rel1, rel2) -> {return Integer.compare(rel1.getSeq(), rel2.getSeq());});
}
private int fillPacketQueue(InetSocketAddress address) {
List<ReliablePacket> packetRecvBuffer = this.addressContexts.get(address).packetRecvBuffer;
ReliablePacket minimum = getMinimumSeqReceived(address);
int lastSeqProcessed = -1;
while (true) {
this.packetQueue.add(new ReliablePacketAddress(minimum, address));
packetRecvBuffer.remove(minimum);
lastSeqProcessed = minimum.getSeq();
ReliablePacket nextMinimum = getMinimumSeqReceived(address);
if (nextMinimum == null || nextMinimum.getSeq() != minimum.getSeq() + 1)
break;
minimum = nextMinimum;
}
Collections.reverse(this.packetQueue);
return lastSeqProcessed;
}
public void onPacketReceived(ReliablePacket packet, InetSocketAddress address) throws IOException {
tryAddContext(address);
var packetsSentTries = this.addressContexts.get(address).packetsSentTries;
if (packet.getPacket() instanceof AcknowlegdementPacket) {
assert (packet.getAck() != -1);
for (var entry : packetsSentTries.entrySet()) {
ReliablePacket reliablePacket = entry.getKey();
if (entry.getKey().getSeq() == packet.getAck()) {
packetsSentTries.remove(reliablePacket);
break;
}
}
return;
}
if (this.addressContexts.get(address).packetRecvBuffer.contains(packet)) {
debugPrint("The packet has already been received !");
sendPacketToSocket(
new ReliablePacket(new AcknowlegdementPacket(), -1, packet.getSeq()), address);
return;
}
if (packet.getSeq() < getCurrentSeqRecv(address)) {
debugPrint("Packet too old, current : " + getCurrentSeqRecv(address));
sendPacketToSocket(
new ReliablePacket(new AcknowlegdementPacket(), -1, packet.getSeq()), address);
return;
}
this.addressContexts.get(address).packetRecvBuffer.add(packet);
debugRecv(packet, address);
sendPacketToSocket(
new ReliablePacket(new AcknowlegdementPacket(), -1, packet.getSeq()), address);
// got the packet in the right order
if (packet.getSeq() == getCurrentSeqRecv(address)) {
setSeqRecv(address, fillPacketQueue(address) + 1);
}
}
public Entry<Packet, InetSocketAddress> getNextPacket() {
if (this.packetQueue.isEmpty())
return null;
ReliablePacketAddress last = this.packetQueue.pop();
var entry = Map.entry(last.packet().getPacket(), last.address);
return entry;
}
private void close(InetSocketAddress adress) {
var ctx = this.addressContexts.get(adress);
if (ctx != null)
close(ctx);
}
private void close(AdressContext adressContext) {
for (Thread thread : adressContext.sendThreads) {
thread.interrupt();
}
}
public void close() {
for (AdressContext adressContext : this.addressContexts.values()) {
close(adressContext);
}
}
}