1 module redis.connection;
2 
3 import std.socket : TcpSocket;
4 
5 import std.array : appender, back, popBack;
6 import std..string : format;
7 public import redis;
8 import redis.response;
9 
10 debug(redis) {
11 	import std.stdio : writeln;
12 	import redis.encoder : escape;
13 }
14 
15 public:
16 
17 /**
18  * Sends a pre-encoded string
19  *
20  * Params:
21  *   conn     	 = Connection to redis server.
22  *   encoded_cmd = The command to be sent.
23  *
24  * Throws: $(D ConnectionException) if sending fails.
25  */
26 void send(TcpSocket conn, string encoded_cmd)
27 {
28 	debug(redis) { writeln("Request : '", escape(encoded_cmd) ~ "'"); }
29 
30 	auto sent = conn.send(encoded_cmd);
31 	if (sent != (cast(byte[])encoded_cmd).length)
32 		throw new ConnectionException("Error while sending request");
33 }
34 
35 /**
36  * Receive responses from redis server
37  *
38  * Params:
39  *   conn    	  = Connection to redis server.
40  *   minResponses = The number of multibulks you expect
41  *
42  * Throws: $(D ConnectionException) if there is a socket error or server closes the connection.
43  */
44 Response[] receiveResponses(TcpSocket conn, size_t minResponses = 0)
45 {
46 	byte[] buffer;
47 	Response[] responses;
48 	Response*[] MultiBulks; //Stack of pointers to multibulks
49 	Response[]* stackPtr = &responses;
50 
51 	while(true)
52 	{
53 		receive(conn, buffer);
54 
55 		debug(redis) { writeln("BUFFER : ", escape(cast(string)buffer)); }
56 
57 		while(buffer.length > 0)
58 		{
59 			auto r = parseResponse(buffer);
60 			debug(redis) { writeln(__FUNCTION__,"\t",r.type,"\t",typeid(r));}	
61 			if(r.type == ResponseType.Invalid)
62 				break;
63 
64 			*stackPtr ~= r;
65 			if(r.isMoved)
66 			{
67 				break;
68 			}
69 			if(r.type == ResponseType.MultiBulk)
70 			{
71 				auto mb = &((*stackPtr)[$-1]);
72 				if(mb.count > 0)
73 				{
74 					MultiBulks ~= mb;
75 					stackPtr = &((*mb).values);
76 				}
77 			}
78 			else
79 				while(MultiBulks.length > 0)
80 				{
81 					auto mb = *(MultiBulks.back);
82 
83 					if(mb.count == mb.values.length)
84 					{
85 						MultiBulks.popBack();
86 
87 						if(MultiBulks.length > 0)
88 							stackPtr = &((*MultiBulks.back).values);
89 						else
90 							stackPtr = &responses;
91 					}
92 					else
93 						break;
94 				}
95 		}
96 
97 		if(buffer.length == 0 && MultiBulks.length == 0) //Make sure all the multi bulks got their data
98 		{
99 			debug(redis) {
100 				if(minResponses > 1 && responses.length < minResponses)
101 					writeln("WAITING FOR MORE RESPONSES ... ");
102 			}
103 
104 			if(responses.length < minResponses)
105 				continue;
106 
107 			break;
108 		}
109 
110 	}
111 
112 	debug(redis) { writeln(__FUNCTION__,"\tlen:",responses.length,"\t",responses);}
113 	return responses;
114 }
115 
116 /* -------- EXCEPTIONS ------------- */
117 
118 class ConnectionException : Exception {
119 	this(string msg) { super(msg); }
120 }
121 
122 
123 private :
124 
125 void receive(TcpSocket conn, ref byte[] buffer)
126 {
127 	byte[1024 * 16] buff;
128 	size_t len = conn.receive(buff);
129 
130 	if (conn.blocking)
131 	{
132 		if(len == 0)
133 			throw new ConnectionException("Server closed the connection!");
134 		else if(len == TcpSocket.ERROR)
135 			throw new ConnectionException("A socket error occurred!");
136 	}
137 	else
138 	{
139 		if (len == -1)
140 		{
141 			import core.stdc.errno;
142 
143 			if (errno == EWOULDBLOCK)
144 			{
145 				len = 0;
146 				errno = 0;
147 			}
148 			else
149 				throw new ConnectionException(format("A socket error occurred! errno: %s", errno));
150 		}
151 	}
152 
153 	buffer ~= buff[0 .. len];
154 	debug(redis) { writeln("Response : ", "'" ~ escape(cast(string)buffer) ~ "'", " Length : ", len); }
155 }