Make streams and vfs and http shared_ptr

This commit is contained in:
2025-09-29 02:22:27 -05:00
parent 71d0e36a5c
commit d785508571
61 changed files with 541 additions and 951 deletions

View File

@@ -29,8 +29,8 @@ namespace Tesses::Framework::Http
public:
Threading::Mutex mtx;
ServerContext* ctx;
WebSocketConnection* conn;
Stream* strm;
std::shared_ptr<WebSocketConnection> conn;
std::shared_ptr<Stream> strm;
std::atomic_bool hasInit;
std::atomic<bool> closed;
void close()
@@ -42,7 +42,6 @@ namespace Tesses::Framework::Http
strm->WriteByte(firstByte);
strm->WriteByte(0);
delete strm;
this->strm = nullptr;
mtx.Unlock();
this->conn->OnClose(true);
@@ -180,11 +179,11 @@ namespace Tesses::Framework::Http
return true;
}
WSServer(ServerContext* ctx,WebSocketConnection* conn)
WSServer(ServerContext* ctx,std::shared_ptr<WebSocketConnection> conn)
{
this->ctx = ctx;
this->conn = conn;
this->strm = &this->ctx->GetStream();
this->strm = this->ctx->GetStream();
this->hasInit=false;
@@ -352,27 +351,23 @@ namespace Tesses::Framework::Http
if(this->queryParams.kvp.empty()) return this->originalPath;
return this->originalPath + "?" + HttpUtils::QueryParamsEncode(this->queryParams);
}
void ServerContext::ReadStream(Stream* strm)
void ServerContext::ReadStream(std::shared_ptr<Stream> strm)
{
if(strm == nullptr) return;
auto strm2 = this->OpenRequestStream();
if(strm2 != nullptr)
{
strm2->CopyTo(strm);
delete strm2;
}
}
void ServerContext::ReadStream(Stream& strm)
{
ReadStream(&strm);
}
std::string ServerContext::ReadString()
{
if(strm == nullptr) return {};
auto strm2 = this->OpenRequestStream();
if(strm2 != nullptr)
{
TextStreams::StreamReader reader(strm2,true);
TextStreams::StreamReader reader(strm2);
return reader.ReadToEnd();
}
return {};
@@ -389,7 +384,7 @@ namespace Tesses::Framework::Http
}
return false;
}
static bool parseUntillBoundaryEnd(Tesses::Framework::Streams::Stream* src, Tesses::Framework::Streams::Stream* dest, std::string boundary)
static bool parseUntillBoundaryEnd(std::shared_ptr<Tesses::Framework::Streams::Stream> src, std::shared_ptr<Tesses::Framework::Streams::Stream> dest, std::string boundary)
{
bool hasMore=true;
#if defined(_WIN32)
@@ -466,7 +461,7 @@ namespace Tesses::Framework::Http
return hasMore;
}
static bool parseSection(ServerContext* ctx, std::string boundary, std::function<Tesses::Framework::Streams::Stream*(std::string mime, std::string filename, std::string name)> cb)
static bool parseSection(ServerContext* ctx, std::string boundary, std::function<std::shared_ptr<Tesses::Framework::Streams::Stream>(std::string mime, std::string filename, std::string name)> cb)
{
HttpDictionary req;
StreamReader reader(ctx->GetStream());
@@ -489,10 +484,10 @@ namespace Tesses::Framework::Http
{
if(cd1.filename.empty())
{
MemoryStream ms(true);
std::shared_ptr<MemoryStream> ms=std::make_shared<MemoryStream>(true);
bool retVal = parseUntillBoundaryEnd(&ctx->GetStream(),&ms,boundary);
auto& buff = ms.GetBuffer();
bool retVal = parseUntillBoundaryEnd(ctx->GetStream(),ms,boundary);
auto& buff = ms->GetBuffer();
ctx->queryParams.AddValue(cd1.fieldName, std::string(buff.begin(),buff.end()));
@@ -502,16 +497,16 @@ namespace Tesses::Framework::Http
else
{
auto strm = cb(ct, cd1.filename, cd1.fieldName);
if(strm == nullptr) strm = new Stream();
bool retVal = parseUntillBoundaryEnd(&ctx->GetStream(),strm,boundary);
delete strm;
if(strm == nullptr) strm = std::make_shared<Stream>();
bool retVal = parseUntillBoundaryEnd(ctx->GetStream(),strm,boundary);
return retVal;
}
}
return false;
}
void ServerContext::ParseFormData(std::function<Tesses::Framework::Streams::Stream*(std::string mime, std::string filename, std::string name)> cb)
void ServerContext::ParseFormData(std::function<std::shared_ptr<Tesses::Framework::Streams::Stream>(std::string mime, std::string filename, std::string name)> cb)
{
std::string ct;
if(this->requestHeaders.TryGetFirst("Content-Type",ct))
@@ -525,51 +520,33 @@ namespace Tesses::Framework::Http
if(res == std::string::npos) return;
ct = "--" + ct.substr(res+9);
}
Stream nullStrm;
parseUntillBoundaryEnd(this->strm,&nullStrm, ct);
std::shared_ptr<Stream> nullStrm=std::make_shared<Stream>();
parseUntillBoundaryEnd(this->strm,nullStrm, ct);
while(parseSection(this, ct, cb));
}
HttpServer::HttpServer(Tesses::Framework::Streams::TcpServer& tcpServer, IHttpServer& http, bool showIPs) : HttpServer(&tcpServer,false,&http,false,showIPs)
{
}
HttpServer::HttpServer(Tesses::Framework::Streams::TcpServer* tcpServer, bool ownsTCP, IHttpServer& http, bool showIPs) : HttpServer(tcpServer,ownsTCP,&http,false,showIPs)
{
}
HttpServer::HttpServer(Tesses::Framework::Streams::TcpServer& tcpServer, IHttpServer* http, bool ownsHttpServer, bool showIPs) : HttpServer(&tcpServer,false,http,ownsHttpServer,showIPs)
{
}
HttpServer::HttpServer(Tesses::Framework::Streams::TcpServer* tcpServer, bool ownsTCP, IHttpServer* http, bool ownsHttpServer, bool showIPs)
HttpServer::HttpServer(std::shared_ptr<Tesses::Framework::Streams::TcpServer> tcpServer, std::shared_ptr<IHttpServer> http, bool showIPs)
{
this->server = tcpServer;
this->ownsTCP = ownsTCP;
this->http = http;
this->ownsHttp = ownsHttpServer;
this->showIPs = showIPs;
this->thrd=nullptr;
this->showARTL = showIPs;
}
HttpServer::HttpServer(uint16_t port, IHttpServer* http, bool owns, bool showIPs) : HttpServer(new TcpServer(port,10),true,http,owns,showIPs)
HttpServer::HttpServer(uint16_t port, std::shared_ptr<IHttpServer> http, bool showIPs) : HttpServer(std::make_shared<TcpServer>(port,10),http,showIPs)
{
}
HttpServer::HttpServer(uint16_t port, IHttpServer& http,bool showIPs) : HttpServer(port,&http,false,showIPs)
{
}
HttpServer::HttpServer(std::string unixPath, IHttpServer* http, bool owns) : HttpServer(new TcpServer(unixPath,10),true,http,owns,false)
HttpServer::HttpServer(std::string unixPath, std::shared_ptr<IHttpServer> http) : HttpServer(std::make_shared<TcpServer>(unixPath,10),http,false)
{
this->showARTL=true;
}
HttpServer::HttpServer(std::string unixPath, IHttpServer& http) : HttpServer(unixPath,&http,false)
{
}
uint16_t HttpServer::GetPort()
{
@@ -577,7 +554,7 @@ namespace Tesses::Framework::Http
return server->GetPort();
return 0;
}
Stream* ServerContext::OpenResponseStream()
std::shared_ptr<Stream> ServerContext::OpenResponseStream()
{
if(sent) return nullptr;
int64_t length = -1;
@@ -588,14 +565,14 @@ namespace Tesses::Framework::Http
this->responseHeaders.SetValue("Transfer-Encoding","chunked");
this->WriteHeaders();
return new HttpStream(this->strm,false,length,false,version == "HTTP/1.1");
return std::make_shared<HttpStream>(this->strm,length,false,version == "HTTP/1.1");
}
Stream* ServerContext::OpenRequestStream()
std::shared_ptr<Stream> ServerContext::OpenRequestStream()
{
int64_t length = -1;
if(!this->requestHeaders.TryGetFirstInt("Content-Length",length))
length = -1;
return new HttpStream(this->strm,false,length,true,version == "HTTP/1.1");
return std::make_shared<HttpStream>(this->strm,length,true,version == "HTTP/1.1");
}
void HttpServer::StartAccepting()
{
@@ -622,9 +599,9 @@ namespace Tesses::Framework::Http
TF_LOG("Before entering socket thread");
Threading::Thread thrd2([sock,http,ip,port]()->void {
TF_LOG("In thread to process");
HttpServer::Process(*sock,*http,ip,port,false);
HttpServer::Process(sock,http,ip,port,false);
TF_LOG("In thread after process");
delete sock;
});
TF_LOG("Before attach");
thrd2.Detach();
@@ -664,16 +641,13 @@ namespace Tesses::Framework::Http
this->thrd->Join();
delete this->thrd;
}
if(this->ownsHttp)
delete http;
if(this->ownsTCP)
delete this->server;
}
IHttpServer::~IHttpServer()
{
}
ServerContext::ServerContext(Stream* strm)
ServerContext::ServerContext(std::shared_ptr<Stream> strm)
{
this->statusCode = OK;
this->strm = strm;
@@ -681,14 +655,14 @@ namespace Tesses::Framework::Http
this->queryParams.SetCaseSensitive(true);
this->responseHeaders.AddValue("Server","TessesFrameworkWebServer");
}
Stream& ServerContext::GetStream()
std::shared_ptr<Stream> ServerContext::GetStream()
{
return *this->strm;
return this->strm;
}
void ServerContext::SendBytes(std::vector<uint8_t> buff)
{
MemoryStream strm(false);
strm.GetBuffer() = buff;
std::shared_ptr<MemoryStream> strm=std::make_shared<MemoryStream>(false);
strm->GetBuffer() = buff;
SendStream(strm);
}
ServerContext& ServerContext::WithLastModified(Date::DateTime dt)
@@ -699,8 +673,9 @@ namespace Tesses::Framework::Http
void ServerContext::SendText(std::string text)
{
MemoryStream strm(false);
auto& buff= strm.GetBuffer();
std::shared_ptr<MemoryStream> strm=std::make_shared<MemoryStream>(false);
auto& buff= strm->GetBuffer();
buff.insert(buff.end(),text.begin(),text.end());
SendStream(strm);
}
@@ -711,11 +686,7 @@ namespace Tesses::Framework::Http
WithMimeType("text/html").SendText(errorHtml);
}
void ServerContext::SendStream(Stream* strm)
{
if(strm == nullptr) return;
SendStream(*strm);
}
ServerContext::~ServerContext()
{
for(auto item : this->data)
@@ -727,14 +698,14 @@ namespace Tesses::Framework::Http
{
}
void ServerContext::SendStream(Stream& strm)
void ServerContext::SendStream(std::shared_ptr<Stream> strm)
{
if(sent) return;
if(!strm.CanRead()) throw std::runtime_error("Cannot read from stream");
if(strm.EndOfStream()) throw std::runtime_error("End of stream");
if(strm.CanSeek())
if(!strm->CanRead()) throw std::runtime_error("Cannot read from stream");
if(strm->EndOfStream()) throw std::runtime_error("End of stream");
if(strm->CanSeek())
{
int64_t len=strm.GetLength();
int64_t len=strm->GetLength();
std::string range={};
if(this->requestHeaders.TryGetFirst("Range",range))
{
@@ -815,7 +786,7 @@ namespace Tesses::Framework::Http
this->WithSingleHeader("Content-Range","bytes " + std::to_string(begin) + "-" + std::to_string(end) + "/" + std::to_string(len));
this->statusCode = PartialContent;
this->WriteHeaders();
strm.Seek(begin,SeekOrigin::Begin);
strm->Seek(begin,SeekOrigin::Begin);
uint8_t buffer[1024];
@@ -825,7 +796,7 @@ namespace Tesses::Framework::Http
myLen = (end - begin)+1;
if(myLen < read) read = (size_t)myLen;
if(read == 0) break;
read = strm.Read(buffer,read);
read = strm->Read(buffer,read);
if(read == 0) break;
this->strm->WriteBlock(buffer,read);
@@ -848,7 +819,7 @@ namespace Tesses::Framework::Http
this->WithSingleHeader("Accept-Range","bytes");
this->WithSingleHeader("Content-Length",std::to_string(len));
this->WriteHeaders();
strm.CopyTo(*this->strm);
strm->CopyTo(this->strm);
}
}
@@ -857,7 +828,7 @@ namespace Tesses::Framework::Http
{
auto chunkedStream = this->OpenResponseStream();
this->strm->CopyTo(chunkedStream);
delete chunkedStream;
}
}
@@ -920,7 +891,7 @@ namespace Tesses::Framework::Http
if(this->sent) return *this;
this->sent = true;
StreamWriter writer(this->strm,false);
StreamWriter writer(this->strm);
writer.newline = "\r\n";
writer.WriteLine("HTTP/1.1 " + std::to_string((int)statusCode) + " " + HttpUtils::StatusCodeString(statusCode));
for(auto& hdr : responseHeaders.kvp)
@@ -935,14 +906,14 @@ namespace Tesses::Framework::Http
return *this;
}
void HttpServer::Process(Stream& strm, IHttpServer& server, std::string ip, uint16_t port, bool encrypted)
void HttpServer::Process(std::shared_ptr<Stream> strm, std::shared_ptr<IHttpServer> server, std::string ip, uint16_t port, bool encrypted)
{
TF_LOG("In process");
while(true)
{
BufferedStream bStrm(strm);
std::shared_ptr<BufferedStream> bStrm = std::make_shared<BufferedStream>(strm);
StreamReader reader(bStrm);
ServerContext ctx(&bStrm);
ServerContext ctx(bStrm);
ctx.ip = ip;
ctx.port = port;
ctx.encrypted = encrypted;
@@ -1002,13 +973,13 @@ namespace Tesses::Framework::Http
{
size_t len = (size_t)length;
uint8_t* buffer = new uint8_t[len];
len = bStrm.ReadBlock(buffer,len);
len = bStrm->ReadBlock(buffer,len);
std::string query((const char*)buffer,len);
delete[] buffer;
HttpUtils::QueryParamsDecode(ctx.queryParams, query);
}
if(!server.Handle(ctx))
if(!server->Handle(ctx))
{
ctx.SendNotFound();
}
@@ -1047,7 +1018,7 @@ namespace Tesses::Framework::Http
if(HttpUtils::ToLower(connection) != "keep-alive") return;
}
if(bStrm.EndOfStream()) {
if(bStrm->EndOfStream()) {
return;
}
}
@@ -1059,16 +1030,16 @@ namespace Tesses::Framework::Http
}
void ServerContext::StartWebSocketSession(std::function<void(std::function<void(WebSocketMessage&)>,std::function<void()>,std::function<void()>)> onOpen, std::function<void(WebSocketMessage&)> onReceive, std::function<void(bool)> onClose)
{
CallbackWebSocketConnection wsc(onOpen,onReceive,onClose);
std::shared_ptr<CallbackWebSocketConnection> wsc = std::make_shared<CallbackWebSocketConnection>(onOpen,onReceive,onClose);
StartWebSocketSession(wsc);
}
void ServerContext::StartWebSocketSession(WebSocketConnection& connection)
void ServerContext::StartWebSocketSession(std::shared_ptr<WebSocketConnection> connection)
{
WSServer svr(this,&connection);
WSServer svr(this,connection);
Threading::Thread thrd([&svr,&connection]()->void{
try {
connection.OnOpen([&svr](WebSocketMessage& msg)->void {
connection->OnOpen([&svr](WebSocketMessage& msg)->void {
svr.send_msg(&msg);
},[&svr]()->void {
std::vector<uint8_t> p = {(uint8_t)'p',(uint8_t)'i',(uint8_t)'n',(uint8_t)'g'};