feat: add streams
All checks were successful
Linux arm64 / Build (push) Successful in 16s

This commit is contained in:
2025-06-26 19:17:52 +02:00
parent 59bedd6482
commit 0d26879152
25 changed files with 385 additions and 778 deletions

View File

@@ -1,33 +0,0 @@
#pragma once
#include <fstream>
#include <sp/io/IOInterface.h>
namespace sp {
namespace io {
struct FileTag {
enum OpenMode {
In = 1,
Out = 1 << 1,
};
};
template <>
class IOInterface<FileTag> {
private:
std::unique_ptr<std::ifstream> m_FileInput;
std::unique_ptr<std::ofstream> m_FileOutput;
public:
IOInterface(const std::string& a_FilePath, unsigned int a_OpenMode);
IOInterface(IOInterface&& other);
DataBuffer Read(std::size_t a_Amount);
void Write(const sp::DataBuffer& a_Data);
};
using File = IOInterface<FileTag>;
} // namespace io
} // namespace sp

View File

@@ -1,58 +0,0 @@
#pragma once
#include <memory>
#include <sp/common/DataBuffer.h>
namespace sp {
namespace io {
template <typename IOTag>
class IOInterface {
public:
DataBuffer Read(std::size_t a_Amount);
void Write(const DataBuffer& a_Data);
};
template <typename TOption>
class MessageEncapsulator {
public:
static DataBuffer Encapsulate(const DataBuffer& a_Data, const TOption& a_Option);
static DataBuffer Decapsulate(DataBuffer& a_Data, const TOption& a_Option);
};
template <typename IOTag, typename MessageDispatcher, typename MessageFactory, typename... TOptions>
class Stream {
protected:
MessageDispatcher m_Dispatcher;
IOInterface<IOTag> m_Interface;
std::tuple<TOptions...> m_Options;
using MessageBase = typename MessageDispatcher::MessageBaseType;
public:
Stream() {}
Stream(IOInterface<IOTag>&& a_Interface, TOptions&&... a_Options);
Stream(Stream&& a_Stream);
void RecieveMessages();
void SendMessage(const MessageBase& a_Message);
template <typename TOption>
TOption& GetOption() {
return std::get<TOption>(m_Options);
}
MessageDispatcher& GetDispatcher() {
return m_Dispatcher;
}
private:
static DataBuffer Encapsulate(const DataBuffer& a_Data, const TOptions&... a_Options);
static DataBuffer Decapsulate(DataBuffer& a_Data, const TOptions&... a_Options);
};
} // namespace io
} // namespace sp
#include <sp/io/IOInterfaceImpl.inl>

View File

@@ -1,129 +0,0 @@
#pragma once
#include <sp/extensions/Compress.h>
#include <stdexcept>
namespace sp {
namespace details {
template <typename... TOptions>
struct MessageEncapsulatorPack {};
template <>
struct MessageEncapsulatorPack<> {
static DataBuffer Encapsulate(const DataBuffer& a_Data) {
return a_Data;
}
static DataBuffer Decapsulate(DataBuffer& a_Data) {
return a_Data;
}
};
template <typename TOption, typename... TOptions>
struct MessageEncapsulatorPack<TOption, TOptions...> {
static DataBuffer Encapsulate(const DataBuffer& a_Data, const TOption& a_Option, const TOptions&... a_Options) {
DataBuffer data = io::MessageEncapsulator<TOption>::Encapsulate(a_Data, a_Option);
return MessageEncapsulatorPack<TOptions...>::Encapsulate(data, a_Options...);
}
static DataBuffer Decapsulate(DataBuffer& a_Data, const TOption& a_Option, const TOptions&... a_Options) {
DataBuffer data = io::MessageEncapsulator<TOption>::Decapsulate(a_Data, a_Option);
return MessageEncapsulatorPack<TOptions...>::Decapsulate(data, a_Options...);
}
};
} // namespace details
namespace io {
template <typename IOTag, typename MessageDispatcher, typename MessageFactory, typename... TOptions>
Stream<IOTag, MessageDispatcher, MessageFactory, TOptions...>::Stream(IOInterface<IOTag>&& a_Interface, TOptions&&... a_Options) :
m_Interface(std::move(a_Interface)), m_Options(std::make_tuple<TOptions...>(std::move(a_Options)...)) {}
template <typename IOTag, typename MessageDispatcher, typename MessageFactory, typename... TOptions>
Stream<IOTag, MessageDispatcher, MessageFactory, TOptions...>::Stream(
Stream<IOTag, MessageDispatcher, MessageFactory, TOptions...>&& a_Stream) :
m_Dispatcher(std::move(a_Stream.m_Dispatcher)), m_Interface(std::move(a_Stream.m_Interface)) {}
template <typename IOTag, typename MessageDispatcher, typename MessageFactory, typename... TOptions>
void Stream<IOTag, MessageDispatcher, MessageFactory, TOptions...>::SendMessage(const MessageBase& a_Message) {
DataBuffer data = a_Message.Write();
TupleForEach([&data](const auto&... a_Options) {
data = Encapsulate(data, a_Options...);
}, m_Options);
DataBuffer finalData;
finalData.Reserve(data.GetSize() + sizeof(VarInt::MAX_VALUE));
finalData << VarInt{data.GetSize()} << data;
m_Interface.Write(finalData);
}
template <typename IOTag, typename MessageDispatcher, typename MessageFactory, typename... TOptions>
void Stream<IOTag, MessageDispatcher, MessageFactory, TOptions...>::RecieveMessages() {
while (true) {
// reading the first VarInt part byte by byte
std::uint64_t lenghtValue = 0;
unsigned int readPos = 0;
while (true) {
static constexpr int SEGMENT_BITS = (1 << 7) - 1;
static constexpr int CONTINUE_BIT = 1 << 7;
DataBuffer buffer = m_Interface.Read(sizeof(std::uint8_t));
// eof
if (buffer.GetSize() == 0)
return;
std::uint8_t part;
buffer >> part;
lenghtValue |= static_cast<std::uint64_t>(part & SEGMENT_BITS) << readPos;
if ((part & CONTINUE_BIT) == 0)
break;
readPos += 7;
if (readPos >= 8 * sizeof(lenghtValue))
throw std::runtime_error("VarInt is too big");
}
// nothing to read
if (lenghtValue == 0)
return;
DataBuffer buffer;
buffer = m_Interface.Read(lenghtValue);
TupleForEach([&buffer, lenghtValue](const auto&... a_Options) {
buffer = Decapsulate(buffer, a_Options...);
}, m_Options);
VarInt packetType;
buffer >> packetType;
static const MessageFactory messageFactory;
std::unique_ptr<MessageBase> message = messageFactory.CreateMessage(packetType.GetValue());
assert(message != nullptr);
message->Read(buffer);
GetDispatcher().Dispatch(*message);
}
}
template <typename IOTag, typename MessageDispatcher, typename MessageFactory, typename... TOptions>
DataBuffer Stream<IOTag, MessageDispatcher, MessageFactory, TOptions...>::Encapsulate(
const DataBuffer& a_Data, const TOptions&... a_Options) {
return details::MessageEncapsulatorPack<TOptions...>::Encapsulate(a_Data, a_Options...);
}
template <typename IOTag, typename MessageDispatcher, typename MessageFactory, typename... TOptions>
DataBuffer Stream<IOTag, MessageDispatcher, MessageFactory, TOptions...>::Decapsulate(
DataBuffer& a_Data, const TOptions&... a_Options) {
return details::MessageEncapsulatorPack<TOptions...>::Decapsulate(a_Data, a_Options...);
}
} // namespace io
} // namespace sp

View File

@@ -0,0 +1,13 @@
#pragma once
#include <sp/common/DataBuffer.h>
namespace sp {
class IoInterface {
public:
virtual DataBuffer Read(std::size_t a_Amount) = 0;
virtual void Write(const DataBuffer& a_Data) = 0;
};
} // namespace sp

View File

@@ -1,23 +0,0 @@
#pragma once
#include <sp/io/IOInterface.h>
namespace sp {
namespace io {
struct MemoryTag {};
template <>
class IOInterface<MemoryTag> {
private:
sp::DataBuffer m_VirtualIO;
public:
sp::DataBuffer Read(std::size_t a_Amount);
void Write(const sp::DataBuffer& a_Data);
};
using Memory = IOInterface<MemoryTag>;
} // namespace io
} // namespace sp

View File

@@ -0,0 +1,16 @@
#pragma once
#include <sp/common/DataBuffer.h>
namespace sp {
class MessageEncapsulator {
public:
MessageEncapsulator() {}
virtual ~MessageEncapsulator() {}
virtual DataBuffer Encapsulate(const DataBuffer& a_Data) = 0;
virtual DataBuffer Decapsulate(DataBuffer& a_Data) = 0;
};
} // namespace sp

36
include/sp/io/MessageIO.h Normal file
View File

@@ -0,0 +1,36 @@
#pragma once
#include <boost/pfr.hpp>
#include <sp/common/ByteSwapping.h>
#include <sp/common/DataBuffer.h>
namespace sp {
namespace details {
template <typename T>
void WriteField(DataBuffer& a_Buffer, const T& a_Data) {
T swapped = a_Data;
ToNetwork(swapped);
a_Buffer << swapped;
}
template <typename T>
void ReadField(DataBuffer& a_Buffer, T& a_Data) {
a_Buffer >> a_Data;
FromNetwork(a_Data);
}
template <typename TData>
DataBuffer WriteMessage(const TData& a_MessageData) {
DataBuffer buffer;
boost::pfr::for_each_field(a_MessageData, [&buffer](const auto& a_Field) { WriteField(buffer, a_Field); });
return buffer;
}
template <typename TData>
void ReadMessage(DataBuffer& a_Buffer, TData& a_MessageData) {
boost::pfr::for_each_field(a_MessageData, [&a_Buffer](auto& a_Field) { ReadField(a_Buffer, a_Field); });
}
} // namespace details
} // namespace sp

View File

@@ -0,0 +1,34 @@
#pragma once
#include <sp/common/DataBuffer.h>
#include <sp/io/IoInterface.h>
#include <sp/io/MessageEncapsulator.h>
#include <vector>
namespace sp {
template <typename TMessageFactory>
class MessageStream {
private:
std::vector<MessageEncapsulator> m_Encapsulators;
std::shared_ptr<IoInterface> m_Stream;
using MessageBaseType = typename TMessageFactory::MessageBaseType;
using MessageIdType = typename MessageBaseType::MessageIdType;
public:
MessageStream(std::shared_ptr<IoInterface>&& a_Stream) : m_Stream(std::move(a_Stream)) {}
std::unique_ptr<MessageBaseType> ReadMessage();
std::unique_ptr<MessageBaseType> ReadMessage(MessageIdType a_Id);
void WriteMessage(const MessageBaseType& a_Message, bool a_WriteId = true);
private:
DataBuffer ReadAndDecapsulate();
std::unique_ptr<MessageBaseType> MakeMessage(DataBuffer& buffer, MessageIdType a_Id);
};
} // namespace sp
#include <sp/io/MessageStream.inl>

View File

@@ -0,0 +1,63 @@
#pragma once
#include <sp/common/VarInt.h>
#include <sp/protocol/MessageFactory.h>
namespace sp {
template <typename TMessageFactory>
DataBuffer MessageStream<TMessageFactory>::ReadAndDecapsulate() {
VarInt messageLength;
messageLength.Read([this](std::uint8_t& data) { m_Stream->Read(1) >> data; });
std::size_t amount = messageLength.GetValue();
DataBuffer buffer = m_Stream->Read(amount);
for (MessageEncapsulator& enc : m_Encapsulators) {
buffer = enc.Decapsulate(buffer);
}
return buffer;
}
template <typename TMessageFactory>
std::unique_ptr<typename TMessageFactory::MessageBaseType> MessageStream<TMessageFactory>::MakeMessage(DataBuffer& buffer, MessageIdType a_Id) {
static const TMessageFactory FACTORY;
auto message = FACTORY.CreateMessage(a_Id);
message->Read(buffer);
return message;
}
template <typename TMessageFactory>
std::unique_ptr<typename TMessageFactory::MessageBaseType> MessageStream<TMessageFactory>::ReadMessage(MessageIdType a_Id) {
DataBuffer buffer = ReadAndDecapsulate();
return MakeMessage(buffer, a_Id);
}
template <typename TMessageFactory>
std::unique_ptr<typename TMessageFactory::MessageBaseType> MessageStream<TMessageFactory>::ReadMessage() {
DataBuffer buffer = ReadAndDecapsulate();
VarInt messageId;
buffer >> messageId;
return MakeMessage(buffer, MessageIdType(messageId.GetValue()));
}
template <typename TMessageFactory>
void MessageStream<TMessageFactory>::WriteMessage(const MessageBaseType& a_Message, bool a_WriteId) {
DataBuffer buffer;
if (a_WriteId)
buffer << VarInt{static_cast<std::uint64_t>(a_Message.GetId())};
buffer << a_Message.Write();
for (MessageEncapsulator& enc : m_Encapsulators) {
buffer = enc.Encapsulate(buffer);
}
DataBuffer header;
header << VarInt{buffer.GetSize()};
m_Stream->Write(header);
m_Stream->Write(buffer);
}
} // namespace sp

40
include/sp/io/StdIo.h Normal file
View File

@@ -0,0 +1,40 @@
#pragma once
#include <sp/io/IoInterface.h>
#include <iostream>
namespace sp {
class StdInput : public IoInterface {
private:
std::istream& m_Io;
public:
StdInput(std::istream& a_Io) : m_Io(a_Io) {}
virtual DataBuffer Read(std::size_t a_Amount) override {
DataBuffer buffer(a_Amount);
m_Io.read(reinterpret_cast<char*>(buffer.data()), a_Amount);
return buffer;
}
virtual void Write(const DataBuffer& a_Data) override {}
};
class StdOuput : public IoInterface {
private:
std::ostream& m_Io;
public:
StdOuput(std::ostream& a_Io) : m_Io(a_Io) {}
virtual DataBuffer Read(std::size_t a_Amount) override {
return {};
}
virtual void Write(const DataBuffer& a_Data) override {
m_Io.write(reinterpret_cast<const char*>(a_Data.data()), a_Data.GetSize());
}
};
} // namespace sp