1 module grpc.GrpcStream;
2 
3 import grpc.Status;
4 import grpc.EvBuffer;
5 
6 import hunt.http.codec.http.stream;
7 import hunt.http.codec.http.frame;
8 import hunt.http.codec.http.model;
9 
10 import hunt.collection;
11 import hunt.util.Common;
12 
13 import hunt.logging;
14 
15 import google.protobuf;
16 
17 import core.thread;
18 import core.sync.condition;
19 import core.sync.mutex;
20 
21 import std.array;
22 import std.container : DList;
23 import std.bitmanip;
24 import std.conv;
25 import std.stdint;
26 
27 import grpc.StatusCode;
28 import std.concurrency;
29 import hunt.Exceptions;
30 import grpc.GrpcService;
31 
32 
33 alias StreamHandler = void delegate(ubyte[] data);
34 
35 class GrpcStream
36 {
37     alias void delegate(ubyte[] complete) Callback;
38 
39     private StreamHandler _dataHandler;
40 
41     const ulong DataHeadLen = 5;
42 
43     this(bool asyn = false)
44     {
45         // _stream = stream;
46        _status = Status.OK;
47        _end = false;
48        _asyn = asyn;
49        _mutex = new Mutex();
50        _condition = new Condition(_mutex);
51        _read_buffer = new EvBuffer!ubyte;
52        _dele = null;
53        _write_mutex = new Mutex();
54     }
55 
56     GrpcStream onDataReceived(StreamHandler handler) {
57         _dataHandler = handler;
58         return this;
59     }
60 
61     void attachStream(  Stream stream)
62     {
63         this._stream = stream;
64     }
65 
66     bool isClosed()
67     {
68         return this._stream.isClosed();
69     }
70 
71    /// client status.
72    void onHeaders(Stream stream, HeadersFrame frame)
73    {
74    }
75 
76 
77    private ubyte[] parse( DataFrame frame)
78    {
79        ubyte[] bodyDetail = null;
80 
81        if (frame !is null)
82        {
83            try {
84                ubyte[] bytes;
85                bytes = cast(ubyte[])BufferUtils.toString(frame.getData());
86                _read_buffer.mergeBuffer(bytes);
87 
88                ulong uBufLen = 0;
89                while ( (uBufLen = _read_buffer.getBufferLength()) >= DataHeadLen )
90                {
91                    auto head = new ubyte [DataHeadLen];
92                    if (!_read_buffer.copyOutFromHead(head ,DataHeadLen)) { break;}
93                    ulong bodyLength = bigEndianToNative!int32_t(head[1 .. 5]);
94                    if (bodyLength > 2147483647 || bodyLength < 0)
95                    {
96                        _read_buffer.reset();
97                        break;
98                    }
99                    if (uBufLen >=  bodyLength + DataHeadLen)
100                    {
101                        if (!_read_buffer.drainBufferFromHead(DataHeadLen)) { break;}
102                        if (bodyLength)
103                        {
104                            bodyDetail = new ubyte [bodyLength];
105                            if (!_read_buffer.removeBufferFromHead(bodyDetail,bodyLength))  {break;}
106                        }
107                    } else
108                    {
109                       break;
110                    }
111                }
112            } catch(Exception e){
113                _read_buffer.reset();
114                warning(e);
115                return null;
116            }
117        }
118        return bodyDetail;
119    }
120 
121    void  onDataTransitQueue(Stream stream, DataFrame frame)
122    {
123        if(frame.isEndStream())
124        {
125            _end = true;
126        }
127 
128        auto bodyDetail = parse(frame);
129        if (bodyDetail !is null)
130        {
131            push(bodyDetail);
132        }
133    }
134 
135    void onData(Stream stream, DataFrame frame) {
136        if(frame.isEndStream()) {
137            _end = true;
138        }
139 
140         ubyte[] _incomingData = parse(frame);
141         if(_incomingData.length > 0) {
142             version(HUNT_DEBUG) tracef("%(%02X %)", _incomingData);
143             
144             if(_dataHandler !is null) {
145                 _dataHandler(_incomingData);
146             }
147         } else {
148             warning("The data is not ready yet.");
149         }
150    }
151 
152 //    private ubyte[] _incomingData;
153 
154     ubyte[] onDataTransitTask(Stream stream, DataFrame frame)
155    {
156        if(frame.isEndStream())
157        {
158            _end = true;
159        }
160 
161        ubyte[] incomingData = parse(frame);
162 
163         version(HUNT_DEBUG) tracef("%(%02X %)", incomingData);
164 
165        return incomingData;
166    }
167 
168 
169     void write(IN)(IN obj , bool option = false)
170    {
171         ubyte compress = 0;
172         ubyte[] data = obj.toProtobuf.array;
173 
174         if (data.length > 2147483647 || data.length < 0 )
175         {
176             return;
177         }
178         ubyte[4] len = nativeToBigEndian(cast(int)data.length);
179         ubyte[] grpc_data;
180         grpc_data ~= compress;
181         grpc_data ~= len;
182         grpc_data ~= data;
183         try {
184             synchronized(this)
185             {
186                 auto dataFrame = new DataFrame( _stream.getId(),BufferUtils.toBuffer( cast(byte[])grpc_data), option);
187                 if (!_stream.isClosed())
188                 {
189                     _stream.data( dataFrame , new NoopCallback());
190                 }
191             }
192         }
193 
194         catch (IndexOutOfBoundsException e)
195         {
196             _status.setStatusCode(StatusCode.OUT_OF_RANGE);
197         } catch (Exception e)
198         {
199             _status.setStatusCode(StatusCode.INTERNAL);
200         }
201    }
202 
203    void writesdone()
204    {
205        auto dataFrame = new DataFrame(_stream.getId() ,
206        BufferUtils.toBuffer(cast(byte[])[]) , true);
207        _stream.data(dataFrame , new NoopCallback());
208    }
209 
210 
211    bool read(OUT)(ref OUT obj)
212    {
213        bool isTimeout = false;
214        while(true)
215        {
216            auto bytes = pop();
217            if(bytes == null || bytes.length == 0)
218            {
219                _condition.mutex().lock();
220                scope (exit)
221                     _condition.mutex().unlock();
222                if (!_condition.wait(dur!"seconds"(5)))
223                {
224                     import grpc.GrpcException : GrpcTimeoutException;
225                     throw new GrpcTimeoutException("Timedout after 5 seconds.");
226                }
227            }
228            else
229            {
230                if (obj is null)
231                     obj = new OUT();
232                bytes.fromProtobuf!OUT(obj);
233                return false;
234            }
235        }
236    }
237 
238    Status finish()
239    {
240        return _status;
241    }
242 
243     void push( ubyte[] packet = null)
244     {
245         if(packet !is null)
246         {
247             _condition.mutex().lock();
248 
249             _queue.insertBack( packet);
250             _condition.notify();
251 
252             _condition.mutex().unlock();
253         }
254     }
255 
256    ubyte[] pop()
257    {
258        _condition.mutex().lock();
259        if (_queue.empty())
260            {
261                _condition.mutex().unlock();
262                return null;
263            }
264        auto packet = _queue.front();
265        _queue.removeFront();
266        _condition.mutex().unlock();
267        return packet;
268        //}
269     }
270 
271     void setCallBack(Callback dele)
272     {
273         if (_dele is null && dele !is null)
274         {
275             _dele = dele;
276             _asyn = true;
277         }
278     }
279 
280     void onCallBack(ubyte[] complete)
281     {
282         if (_dele !is null)
283         {
284             _dele(complete);
285         }
286     }
287 
288     void reSet()
289     {
290         _read_buffer.reset();
291     }
292 
293     bool isAsyn() {
294         return _asyn;
295     }
296 
297     bool end() {
298         return _end;
299     }
300 
301     bool                _end;
302     bool                _asyn;
303     Condition           _condition;
304     Mutex               _mutex;
305     DList!(ubyte[])     _queue;
306     EvBuffer!ubyte      _read_buffer;
307     Stream              _stream;
308     Status              _status;
309     Callback            _dele;
310     Mutex               _write_mutex;
311 }