From c41bf705ffb28148022fc2429104ce6d4fb1309f Mon Sep 17 00:00:00 2001 From: Eugen Wissner Date: Wed, 30 Nov 2016 06:32:15 +0100 Subject: [PATCH] Replace std.socketstream with the blocking sockets Fix #58. --- source/ddb/postgres.d | 116 +++++++++++++++++++++++++++--------------- 1 file changed, 76 insertions(+), 40 deletions(-) diff --git a/source/ddb/postgres.d b/source/ddb/postgres.d index 96476b1..7d177c4 100644 --- a/source/ddb/postgres.d +++ b/source/ddb/postgres.d @@ -153,12 +153,14 @@ OR MODIFICATIONS. */ module ddb.postgres; -version (Have_vibe_d_core) { - import vibe.core.net; - import vibe.core.stream; -} else { - import std.socket; - import std.socketstream; +version (Have_vibe_d_core) +{ + import vibe.core.net; + import vibe.core.stream; +} +else +{ + import std.socket; } import std.bitmanip; import std.exception; @@ -184,32 +186,64 @@ const PGEpochDateTime = DateTime(2000, 1, 1, 0, 0, 0); class PGStream { - private { - version (Have_vibe_d_core) TCPConnection m_socket; - else SocketStream m_socket; - } - version (Have_vibe_d_core){ - @property TCPConnection socket() { return m_socket; } - this(TCPConnection socket) - { - m_socket = socket; - } - }else{ - @property SocketStream socket() { return m_socket; } - this(SocketStream socket){ - m_socket = socket; - } - } + version (Have_vibe_d_core) + { + private TCPConnection m_socket; + @property TCPConnection socket() + { + return m_socket; + } - /* - * I'm not too sure about this function - * Should I keep the length? - */ - void write(ubyte[] x) - { - m_socket.write(x); - } + this(TCPConnection socket) + { + m_socket = socket; + } + } + else + { + private Socket m_socket; + + @property Socket socket() + { + return m_socket; + } + + this(Socket socket) + { + m_socket = socket; + } + } + + protected void read(ubyte[] buffer) + { + version(Have_vibe_d_core) + { + m_socket.read(buffer); + } + else + { + if (buffer.length > 0) + { + m_socket.receive(buffer); + } + } + } + + void write(ubyte[] x) + { + version(Have_vibe_d_core) + { + m_socket.write(x); + } + else + { + if (x.length > 0) + { + m_socket.send(x); + } + } + } void write(ubyte x) { @@ -1001,11 +1035,11 @@ class PGConnection char type; int len; ubyte[1] ub; - stream.socket.read(ub); // message type + stream.read(ub); // message type type = bigEndianToNative!char(ub); ubyte[4] ubi; - stream.socket.read(ubi); // message length, doesn't include type byte + stream.read(ubi); // message length, doesn't include type byte len = bigEndianToNative!int(ubi) - 4; @@ -1013,7 +1047,7 @@ class PGConnection if (len > 0) { msg = new ubyte[len]; - stream.socket.read(msg); + stream.read(msg); } return Message(this, type, msg); @@ -1610,13 +1644,15 @@ class PGConnection ushort port = "port" in params? parse!ushort(p["port"]) : 5432; - version(Have_vibe_d_core){ - stream = new PGStream(connectTCP(params["host"], port)); - } else { - stream = new PGStream(new SocketStream(new TcpSocket)); - stream.socket.socket.connect(new InternetAddress(params["host"], port)); - } - + version(Have_vibe_d_core) + { + stream = new PGStream(connectTCP(params["host"], port)); + } + else + { + stream = new PGStream(new TcpSocket); + stream.socket.connect(new InternetAddress(params["host"], port)); + } sendStartupMessage(params); receive: