1 module grpc.GrpcStream; 2 3 import grpc.Status; 4 import grpc.EvBuffer; 5 6 import hunt.http.codec.http.stream; 7 import hunt.http.codec.http.frame; 8 import hunt.http.codec.http.model; 9 10 import hunt.collection; 11 import hunt.util.Common; 12 13 import hunt.logging; 14 15 import google.protobuf; 16 17 import core.thread; 18 import core.sync.condition; 19 import core.sync.mutex; 20 21 import std.array; 22 import std.container : DList; 23 import std.bitmanip; 24 import std.conv; 25 import std.stdint; 26 27 import grpc.StatusCode; 28 import std.concurrency; 29 import hunt.Exceptions; 30 import grpc.GrpcService; 31 32 33 alias StreamHandler = void delegate(ubyte[] data); 34 35 class GrpcStream 36 { 37 alias void delegate(ubyte[] complete) Callback; 38 39 private StreamHandler _dataHandler; 40 41 const ulong DataHeadLen = 5; 42 43 this(bool asyn = false) 44 { 45 // _stream = stream; 46 _status = Status.OK; 47 _end = false; 48 _asyn = asyn; 49 _mutex = new Mutex(); 50 _condition = new Condition(_mutex); 51 _read_buffer = new EvBuffer!ubyte; 52 _dele = null; 53 _write_mutex = new Mutex(); 54 } 55 56 GrpcStream onDataReceived(StreamHandler handler) { 57 _dataHandler = handler; 58 return this; 59 } 60 61 void attachStream( Stream stream) 62 { 63 this._stream = stream; 64 } 65 66 bool isClosed() 67 { 68 return this._stream.isClosed(); 69 } 70 71 /// client status. 72 void onHeaders(Stream stream, HeadersFrame frame) 73 { 74 } 75 76 77 private ubyte[] parse( DataFrame frame) 78 { 79 ubyte[] bodyDetail = null; 80 81 if (frame !is null) 82 { 83 try { 84 ubyte[] bytes; 85 bytes = cast(ubyte[])BufferUtils.toString(frame.getData()); 86 _read_buffer.mergeBuffer(bytes); 87 88 ulong uBufLen = 0; 89 while ( (uBufLen = _read_buffer.getBufferLength()) >= DataHeadLen ) 90 { 91 auto head = new ubyte [DataHeadLen]; 92 if (!_read_buffer.copyOutFromHead(head ,DataHeadLen)) { break;} 93 ulong bodyLength = bigEndianToNative!int32_t(head[1 .. 5]); 94 if (bodyLength > 2147483647 || bodyLength < 0) 95 { 96 _read_buffer.reset(); 97 break; 98 } 99 if (uBufLen >= bodyLength + DataHeadLen) 100 { 101 if (!_read_buffer.drainBufferFromHead(DataHeadLen)) { break;} 102 if (bodyLength) 103 { 104 bodyDetail = new ubyte [bodyLength]; 105 if (!_read_buffer.removeBufferFromHead(bodyDetail,bodyLength)) {break;} 106 } 107 } else 108 { 109 break; 110 } 111 } 112 } catch(Exception e){ 113 _read_buffer.reset(); 114 warning(e); 115 return null; 116 } 117 } 118 return bodyDetail; 119 } 120 121 void onDataTransitQueue(Stream stream, DataFrame frame) 122 { 123 if(frame.isEndStream()) 124 { 125 _end = true; 126 } 127 128 auto bodyDetail = parse(frame); 129 if (bodyDetail !is null) 130 { 131 push(bodyDetail); 132 } 133 } 134 135 void onData(Stream stream, DataFrame frame) { 136 if(frame.isEndStream()) { 137 _end = true; 138 } 139 140 ubyte[] _incomingData = parse(frame); 141 if(_incomingData.length > 0) { 142 version(HUNT_DEBUG) tracef("%(%02X %)", _incomingData); 143 144 if(_dataHandler !is null) { 145 _dataHandler(_incomingData); 146 } 147 } else { 148 warning("The data is not ready yet."); 149 } 150 } 151 152 // private ubyte[] _incomingData; 153 154 ubyte[] onDataTransitTask(Stream stream, DataFrame frame) 155 { 156 if(frame.isEndStream()) 157 { 158 _end = true; 159 } 160 161 ubyte[] incomingData = parse(frame); 162 163 version(HUNT_DEBUG) tracef("%(%02X %)", incomingData); 164 165 return incomingData; 166 } 167 168 169 void write(IN)(IN obj , bool option = false) 170 { 171 ubyte compress = 0; 172 ubyte[] data = obj.toProtobuf.array; 173 174 if (data.length > 2147483647 || data.length < 0 ) 175 { 176 return; 177 } 178 ubyte[4] len = nativeToBigEndian(cast(int)data.length); 179 ubyte[] grpc_data; 180 grpc_data ~= compress; 181 grpc_data ~= len; 182 grpc_data ~= data; 183 try { 184 synchronized(this) 185 { 186 auto dataFrame = new DataFrame( _stream.getId(),BufferUtils.toBuffer( cast(byte[])grpc_data), option); 187 if (!_stream.isClosed()) 188 { 189 _stream.data( dataFrame , new NoopCallback()); 190 } 191 } 192 } 193 194 catch (IndexOutOfBoundsException e) 195 { 196 _status.setStatusCode(StatusCode.OUT_OF_RANGE); 197 } catch (Exception e) 198 { 199 _status.setStatusCode(StatusCode.INTERNAL); 200 } 201 } 202 203 void writesdone() 204 { 205 auto dataFrame = new DataFrame(_stream.getId() , 206 BufferUtils.toBuffer(cast(byte[])[]) , true); 207 _stream.data(dataFrame , new NoopCallback()); 208 } 209 210 211 bool read(OUT)(ref OUT obj) 212 { 213 bool isTimeout = false; 214 while(true) 215 { 216 auto bytes = pop(); 217 if(bytes == null || bytes.length == 0) 218 { 219 _condition.mutex().lock(); 220 scope (exit) 221 _condition.mutex().unlock(); 222 if (!_condition.wait(dur!"seconds"(5))) 223 { 224 import grpc.GrpcException : GrpcTimeoutException; 225 throw new GrpcTimeoutException("Timedout after 5 seconds."); 226 } 227 } 228 else 229 { 230 if (obj is null) 231 obj = new OUT(); 232 bytes.fromProtobuf!OUT(obj); 233 return false; 234 } 235 } 236 } 237 238 Status finish() 239 { 240 return _status; 241 } 242 243 void push( ubyte[] packet = null) 244 { 245 if(packet !is null) 246 { 247 _condition.mutex().lock(); 248 249 _queue.insertBack( packet); 250 _condition.notify(); 251 252 _condition.mutex().unlock(); 253 } 254 } 255 256 ubyte[] pop() 257 { 258 _condition.mutex().lock(); 259 if (_queue.empty()) 260 { 261 _condition.mutex().unlock(); 262 return null; 263 } 264 auto packet = _queue.front(); 265 _queue.removeFront(); 266 _condition.mutex().unlock(); 267 return packet; 268 //} 269 } 270 271 void setCallBack(Callback dele) 272 { 273 if (_dele is null && dele !is null) 274 { 275 _dele = dele; 276 _asyn = true; 277 } 278 } 279 280 void onCallBack(ubyte[] complete) 281 { 282 if (_dele !is null) 283 { 284 _dele(complete); 285 } 286 } 287 288 void reSet() 289 { 290 _read_buffer.reset(); 291 } 292 293 bool isAsyn() { 294 return _asyn; 295 } 296 297 bool end() { 298 return _end; 299 } 300 301 bool _end; 302 bool _asyn; 303 Condition _condition; 304 Mutex _mutex; 305 DList!(ubyte[]) _queue; 306 EvBuffer!ubyte _read_buffer; 307 Stream _stream; 308 Status _status; 309 Callback _dele; 310 Mutex _write_mutex; 311 }