1 module redis.connection; 2 3 import std.socket : TcpSocket; 4 5 import std.array : appender, back, popBack; 6 import std.string : format; 7 public import redis; 8 import redis.response; 9 10 debug(redis) { 11 import std.stdio : writeln; 12 import redis.encoder : escape; 13 } 14 version(Windows) 15 { 16 import std.socket; 17 } 18 19 public: 20 21 /** 22 * Sends a pre-encoded string 23 * 24 * Params: 25 * conn = Connection to redis server. 26 * encoded_cmd = The command to be sent. 27 * 28 * Throws: $(D ConnectionException) if sending fails. 29 */ 30 void send(TcpSocket conn, string encoded_cmd) 31 { 32 debug(redis) { writeln("Request : '", escape(encoded_cmd) ~ "'"); } 33 34 auto sent = conn.send(encoded_cmd); 35 if (sent != (cast(byte[])encoded_cmd).length) 36 throw new ConnectionException("Error while sending request"); 37 } 38 39 /** 40 * Receive responses from redis server 41 * 42 * Params: 43 * conn = Connection to redis server. 44 * minResponses = The number of multibulks you expect 45 * 46 * Throws: $(D ConnectionException) if there is a socket error or server closes the connection. 47 */ 48 Response[] receiveResponses(TcpSocket conn, size_t minResponses = 0) 49 { 50 byte[] buffer; 51 Response[] responses; 52 Response*[] MultiBulks; //Stack of pointers to multibulks 53 Response[]* stackPtr = &responses; 54 55 while(true) 56 { 57 receive(conn, buffer); 58 59 debug(redis) { writeln("BUFFER : ", escape(cast(string)buffer)); } 60 61 while(buffer.length > 0) 62 { 63 auto r = parseResponse(buffer); 64 debug(redis) { writeln(__FUNCTION__,"\t",r.type,"\t",typeid(r));} 65 if(r.type == ResponseType.Invalid) 66 break; 67 68 *stackPtr ~= r; 69 if(r.isMoved) 70 { 71 break; 72 } 73 if(r.type == ResponseType.MultiBulk) 74 { 75 auto mb = &((*stackPtr)[$-1]); 76 if(mb.count > 0) 77 { 78 MultiBulks ~= mb; 79 stackPtr = &((*mb).values); 80 } 81 } 82 else 83 while(MultiBulks.length > 0) 84 { 85 auto mb = *(MultiBulks.back); 86 87 if(mb.count == mb.values.length) 88 { 89 MultiBulks.popBack(); 90 91 if(MultiBulks.length > 0) 92 stackPtr = &((*MultiBulks.back).values); 93 else 94 stackPtr = &responses; 95 } 96 else 97 break; 98 } 99 } 100 101 if(buffer.length == 0 && MultiBulks.length == 0) //Make sure all the multi bulks got their data 102 { 103 debug(redis) { 104 if(minResponses > 1 && responses.length < minResponses) 105 writeln("WAITING FOR MORE RESPONSES ... "); 106 } 107 108 if(responses.length < minResponses) 109 continue; 110 111 break; 112 } 113 114 } 115 116 debug(redis) { writeln(__FUNCTION__,"\tlen:",responses.length,"\t",responses);} 117 return responses; 118 } 119 120 /* -------- EXCEPTIONS ------------- */ 121 122 class ConnectionException : Exception { 123 this(string msg) { super(msg); } 124 } 125 126 127 private : 128 129 void receive(TcpSocket conn, ref byte[] buffer) 130 { 131 byte[1024 * 16] buff; 132 size_t len = conn.receive(buff); 133 134 if (conn.blocking) 135 { 136 if(len == 0) 137 throw new ConnectionException("Server closed the connection!"); 138 else if(len == TcpSocket.ERROR) 139 throw new ConnectionException("A socket error occurred!"); 140 } 141 else 142 { 143 if (len == -1) 144 { 145 import core.stdc.errno; 146 147 if (errno == EWOULDBLOCK) 148 { 149 len = 0; 150 errno = 0; 151 } 152 else 153 throw new ConnectionException(format("A socket error occurred! errno: %s", errno)); 154 } 155 } 156 157 buffer ~= buff[0 .. len]; 158 debug(redis) { writeln("Response : ", "'" ~ escape(cast(string)buffer) ~ "'", " Length : ", len); } 159 }