1 module redis.connection;
2 
3 import std.socket : TcpSocket;
4 
5 import std.array : appender, back, popBack;
6 import std.string : format;
7 public import redis;
8 import redis.response;
9 
10 debug(redis) {
11 	import std.stdio : writeln;
12 	import redis.encoder : escape;
13 }
14 version(Windows)
15 {
16 	import std.socket;
17 }
18 
19 public:
20 
21 /**
22  * Sends a pre-encoded string
23  *
24  * Params:
25  *   conn     	 = Connection to redis server.
26  *   encoded_cmd = The command to be sent.
27  *
28  * Throws: $(D ConnectionException) if sending fails.
29  */
30 void send(TcpSocket conn, string encoded_cmd)
31 {
32 	debug(redis) { writeln("Request : '", escape(encoded_cmd) ~ "'"); }
33 
34 	auto sent = conn.send(encoded_cmd);
35 	if (sent != (cast(byte[])encoded_cmd).length)
36 		throw new ConnectionException("Error while sending request");
37 }
38 
39 /**
40  * Receive responses from redis server
41  *
42  * Params:
43  *   conn    	  = Connection to redis server.
44  *   minResponses = The number of multibulks you expect
45  *
46  * Throws: $(D ConnectionException) if there is a socket error or server closes the connection.
47  */
48 Response[] receiveResponses(TcpSocket conn, size_t minResponses = 0)
49 {
50 	byte[] buffer;
51 	Response[] responses;
52 	Response*[] MultiBulks; //Stack of pointers to multibulks
53 	Response[]* stackPtr = &responses;
54 
55 	while(true)
56 	{
57 		receive(conn, buffer);
58 
59 		debug(redis) { writeln("BUFFER : ", escape(cast(string)buffer)); }
60 
61 		while(buffer.length > 0)
62 		{
63 			auto r = parseResponse(buffer);
64 			debug(redis) { writeln(__FUNCTION__,"\t",r.type,"\t",typeid(r));}	
65 			if(r.type == ResponseType.Invalid)
66 				break;
67 
68 			*stackPtr ~= r;
69 			if(r.isMoved)
70 			{
71 				break;
72 			}
73 			if(r.type == ResponseType.MultiBulk)
74 			{
75 				auto mb = &((*stackPtr)[$-1]);
76 				if(mb.count > 0)
77 				{
78 					MultiBulks ~= mb;
79 					stackPtr = &((*mb).values);
80 				}
81 			}
82 			else
83 				while(MultiBulks.length > 0)
84 				{
85 					auto mb = *(MultiBulks.back);
86 
87 					if(mb.count == mb.values.length)
88 					{
89 						MultiBulks.popBack();
90 
91 						if(MultiBulks.length > 0)
92 							stackPtr = &((*MultiBulks.back).values);
93 						else
94 							stackPtr = &responses;
95 					}
96 					else
97 						break;
98 				}
99 		}
100 
101 		if(buffer.length == 0 && MultiBulks.length == 0) //Make sure all the multi bulks got their data
102 		{
103 			debug(redis) {
104 				if(minResponses > 1 && responses.length < minResponses)
105 					writeln("WAITING FOR MORE RESPONSES ... ");
106 			}
107 
108 			if(responses.length < minResponses)
109 				continue;
110 
111 			break;
112 		}
113 
114 	}
115 
116 	debug(redis) { writeln(__FUNCTION__,"\tlen:",responses.length,"\t",responses);}
117 	return responses;
118 }
119 
120 /* -------- EXCEPTIONS ------------- */
121 
122 class ConnectionException : Exception {
123 	this(string msg) { super(msg); }
124 }
125 
126 
127 private :
128 
129 void receive(TcpSocket conn, ref byte[] buffer)
130 {
131 	byte[1024 * 16] buff;
132 	size_t len = conn.receive(buff);
133 
134 	if (conn.blocking)
135 	{
136 		if(len == 0)
137 			throw new ConnectionException("Server closed the connection!");
138 		else if(len == TcpSocket.ERROR)
139 			throw new ConnectionException("A socket error occurred!");
140 	}
141 	else
142 	{
143 		if (len == -1)
144 		{
145 			import core.stdc.errno;
146 
147 			if (errno == EWOULDBLOCK)
148 			{
149 				len = 0;
150 				errno = 0;
151 			}
152 			else
153 				throw new ConnectionException(format("A socket error occurred! errno: %s", errno));
154 		}
155 	}
156 
157 	buffer ~= buff[0 .. len];
158 	debug(redis) { writeln("Response : ", "'" ~ escape(cast(string)buffer) ~ "'", " Length : ", len); }
159 }