This commit is contained in:
2025-03-12 12:56:45 +01:00
parent 07272732d4
commit 0900113bb8
7 changed files with 147 additions and 10 deletions

View File

@@ -42,6 +42,9 @@ public class PacketPool {
}
}
/**
* Data specific to one connexion
*/
private static class AdressContext {
public int currentSeqRecv = 0;
public int currentSeqSend = 0;
@@ -50,22 +53,43 @@ public class PacketPool {
public final Map<ReliablePacket, Integer> packetsSentTries = new HashMap<>();
}
/**
* @param address the address to send to
* @return The next send sequence
*/
private int getNextSeqSend(InetSocketAddress address) {
return this.addressContexts.get(address).currentSeqSend++;
}
/**
* @param address the address which a packet was recieved
* @return the current recieve sequence number
*/
private int getCurrentSeqRecv(InetSocketAddress address) {
return this.addressContexts.get(address).currentSeqRecv;
}
/**
* Set the recieve sequence number
* @param address
* @param newValue
*/
private void setSeqRecv(InetSocketAddress address, int newValue) {
this.addressContexts.get(address).currentSeqRecv = newValue;
}
/**
* Try to add the address into memory
* @param address
*/
private void tryAddContext(InetSocketAddress address) {
this.addressContexts.putIfAbsent(address, new AdressContext());
}
/**
* Construct a PacketPool
* @param socket
*/
public PacketPool(Socket socket) {
this.socket = socket;
this.packetQueue = new Stack<>();
@@ -88,6 +112,12 @@ public class PacketPool {
+ " with seq : " + packet.getSeq());
}
/**
* Send a packet to the socket
* @param packet
* @param address
* @throws IOException
*/
private void sendPacketToSocket(ReliablePacket packet, InetSocketAddress address) throws IOException {
var packetsSentTries = this.addressContexts.get(address).packetsSentTries;
@@ -112,6 +142,12 @@ public class PacketPool {
}
}
/**
* Send a packet to socket and try to resend if an acknowlegment was not recieved
* @param packet
* @param address
* @throws IOException
*/
private synchronized void sendPacket(ReliablePacket packet, InetSocketAddress address) throws IOException {
sendPacketToSocket(packet, address);
debugSend(packet, address);
@@ -125,12 +161,22 @@ public class PacketPool {
}
}
/**
* Send a packet (and encapsulate it)
* @param packet
* @param address
* @throws IOException
*/
public synchronized void sendPacket(Packet packet, InetSocketAddress address) throws IOException {
tryAddContext(address);
ReliablePacket reliablePacket = new ReliablePacket(packet, getNextSeqSend(address));
sendPacket(reliablePacket, address);
}
/**
* Try to resend a packet
* @param reliablePacketAddress
*/
private void tryResend(ReliablePacketAddress reliablePacketAddress) {
try {
while (!Thread.interrupted()) {
@@ -166,6 +212,11 @@ public class PacketPool {
ctx.sendThreads.remove(Thread.currentThread());
}
/**
* @param address
* @return The smallest sequence number recieved
*/
private ReliablePacket getMinimumSeqReceived(InetSocketAddress address) {
List<ReliablePacket> packetRecvBuffer = this.addressContexts.get(address).packetRecvBuffer;
if (packetRecvBuffer.isEmpty())
@@ -175,6 +226,11 @@ public class PacketPool {
});
}
/**
* Move packets in buffer to the packet recieved queue to be processed by the app
* @param address
* @return the sequence number of the last packet that was added to the queue
*/
private int fillPacketQueue(InetSocketAddress address) {
List<ReliablePacket> packetRecvBuffer = this.addressContexts.get(address).packetRecvBuffer;
ReliablePacket minimum = getMinimumSeqReceived(address);
@@ -193,6 +249,12 @@ public class PacketPool {
return lastSeqProcessed;
}
/**
* Process packet when recieved
* @param packet
* @param address
* @throws IOException
*/
public void onPacketReceived(ReliablePacket packet, InetSocketAddress address) throws IOException {
tryAddContext(address);
@@ -236,6 +298,9 @@ public class PacketPool {
}
/**
* @return The next packet in the queue
*/
public Entry<Packet, InetSocketAddress> getNextPacket() {
if (this.packetQueue.isEmpty())
return null;
@@ -244,18 +309,29 @@ public class PacketPool {
return entry;
}
/**
* Closes the connexion
* @param adress
*/
private void close(InetSocketAddress adress) {
var ctx = this.addressContexts.get(adress);
if (ctx != null)
close(ctx);
}
/**
* Stop the threads of the connexion
* @param adressContext
*/
private void close(AdressContext adressContext) {
for (Thread thread : adressContext.sendThreads) {
thread.interrupt();
}
}
/**
* Stop the PacketPool
*/
public void close() {
for (AdressContext adressContext : this.addressContexts.values()) {
close(adressContext);