1 module redis.connection; 2 3 import std.socket : TcpSocket; 4 5 import std.array : appender, back, popBack; 6 import std..string : format; 7 public import redis; 8 import redis.response; 9 10 debug(redis) { 11 import std.stdio : writeln; 12 import redis.encoder : escape; 13 } 14 15 public: 16 17 /** 18 * Sends a pre-encoded string 19 * 20 * Params: 21 * conn = Connection to redis server. 22 * encoded_cmd = The command to be sent. 23 * 24 * Throws: $(D ConnectionException) if sending fails. 25 */ 26 void send(TcpSocket conn, string encoded_cmd) 27 { 28 debug(redis) { writeln("Request : '", escape(encoded_cmd) ~ "'"); } 29 30 auto sent = conn.send(encoded_cmd); 31 if (sent != (cast(byte[])encoded_cmd).length) 32 throw new ConnectionException("Error while sending request"); 33 } 34 35 /** 36 * Receive responses from redis server 37 * 38 * Params: 39 * conn = Connection to redis server. 40 * minResponses = The number of multibulks you expect 41 * 42 * Throws: $(D ConnectionException) if there is a socket error or server closes the connection. 43 */ 44 Response[] receiveResponses(TcpSocket conn, size_t minResponses = 0) 45 { 46 byte[] buffer; 47 Response[] responses; 48 Response*[] MultiBulks; //Stack of pointers to multibulks 49 Response[]* stackPtr = &responses; 50 51 while(true) 52 { 53 receive(conn, buffer); 54 55 debug(redis) { writeln("BUFFER : ", escape(cast(string)buffer)); } 56 57 while(buffer.length > 0) 58 { 59 auto r = parseResponse(buffer); 60 debug(redis) { writeln(__FUNCTION__,"\t",r.type,"\t",typeid(r));} 61 if(r.type == ResponseType.Invalid) 62 break; 63 64 *stackPtr ~= r; 65 if(r.isMoved) 66 { 67 break; 68 } 69 if(r.type == ResponseType.MultiBulk) 70 { 71 auto mb = &((*stackPtr)[$-1]); 72 if(mb.count > 0) 73 { 74 MultiBulks ~= mb; 75 stackPtr = &((*mb).values); 76 } 77 } 78 else 79 while(MultiBulks.length > 0) 80 { 81 auto mb = *(MultiBulks.back); 82 83 if(mb.count == mb.values.length) 84 { 85 MultiBulks.popBack(); 86 87 if(MultiBulks.length > 0) 88 stackPtr = &((*MultiBulks.back).values); 89 else 90 stackPtr = &responses; 91 } 92 else 93 break; 94 } 95 } 96 97 if(buffer.length == 0 && MultiBulks.length == 0) //Make sure all the multi bulks got their data 98 { 99 debug(redis) { 100 if(minResponses > 1 && responses.length < minResponses) 101 writeln("WAITING FOR MORE RESPONSES ... "); 102 } 103 104 if(responses.length < minResponses) 105 continue; 106 107 break; 108 } 109 110 } 111 112 debug(redis) { writeln(__FUNCTION__,"\tlen:",responses.length,"\t",responses);} 113 return responses; 114 } 115 116 /* -------- EXCEPTIONS ------------- */ 117 118 class ConnectionException : Exception { 119 this(string msg) { super(msg); } 120 } 121 122 123 private : 124 125 void receive(TcpSocket conn, ref byte[] buffer) 126 { 127 byte[1024 * 16] buff; 128 size_t len = conn.receive(buff); 129 130 if (conn.blocking) 131 { 132 if(len == 0) 133 throw new ConnectionException("Server closed the connection!"); 134 else if(len == TcpSocket.ERROR) 135 throw new ConnectionException("A socket error occurred!"); 136 } 137 else 138 { 139 if (len == -1) 140 { 141 import core.stdc.errno; 142 143 if (errno == EWOULDBLOCK) 144 { 145 len = 0; 146 errno = 0; 147 } 148 else 149 throw new ConnectionException(format("A socket error occurred! errno: %s", errno)); 150 } 151 } 152 153 buffer ~= buff[0 .. len]; 154 debug(redis) { writeln("Response : ", "'" ~ escape(cast(string)buffer) ~ "'", " Length : ", len); } 155 }