1 module grpc.GrpcClient;
2 
3 import hunt.concurrency.Promise;
4 
5 import hunt.logging.ConsoleLogger;
6 
7 import std.stdio;
8 import std..string;
9 import std.datetime;
10 import std.conv;
11 import std.format;
12 
13 import hunt.http.client.ClientHttp2SessionListener;
14 import hunt.http.client.HttpClient;
15 import hunt.http.client.HttpClientOptions;
16 import hunt.http.client.Http2ClientConnection;
17 import hunt.http.client.HttpClientConnection;
18 
19 import hunt.http.codec.http.frame;
20 import hunt.http.codec.http.model;
21 import hunt.http.codec.http.stream;
22 
23 import hunt.http.HttpFields;
24 import hunt.http.HttpRequest;
25 import hunt.http.HttpScheme;
26 import hunt.http.client.HttpClientRequest;
27 
28 import hunt.util.Common;
29 import hunt.concurrency.FuturePromise;
30 
31 import hunt.collection;
32 //import hunt.net;
33 
34 import grpc.GrpcException;
35 import grpc.GrpcStream;
36 import grpc.GrpcService;
37 import core.thread;
38 import hunt.collection.HashMap;
39 import std.parallelism;
40 
41 import hunt.http.HttpVersion;
42 
43 alias Channel = GrpcClient;
44 
45 /**
46  * 
47  */
48 class GrpcClient {
49 
50     enum int DEFAULT_RETRY_TIMES = 100;
51     enum Duration DEFAULT_RETRY_INTERVAL = 5.seconds;
52 
53     this(string host, ushort port) {
54         this();
55         connect(host, port);
56     }
57 
58     this() {
59         _HttpConfiguration = new HttpClientOptions();
60         _HttpConfiguration.setSecureConnectionEnabled(false);
61         _HttpConfiguration.setFlowControlStrategy("simple");
62         //_HttpConfiguration.getTcpConfiguration().setStreamIdleTimeout(60 * 1000);
63         _HttpConfiguration.setProtocol(HttpVersion.HTTP_2.toString());
64         _HttpConfiguration.setRetryTimes(DEFAULT_RETRY_TIMES);
65         _HttpConfiguration.setRetryInterval(DEFAULT_RETRY_INTERVAL);
66         _promise = new FuturePromise!(HttpClientConnection)();
67         _client = new HttpClient(_HttpConfiguration);
68         _streamHash = new HashMap!(string,GrpcStream);
69     }
70 
71     void connect(string host, ushort port) {
72         _host = host;
73         _port = port;
74         logInfo("host : ", host, " port :", port);
75         _client.setOnClosed(&onClose);
76         _client.connect(host, port, _promise, new ClientHttp2SessionListenerEx(_HttpConfiguration));
77     }
78 
79     GrpcStream createStream(string path) {
80         if (_streamHash.containsKey(path))
81         {
82             return _streamHash.get(path);
83         }
84         else
85         {
86             HttpFields fields = new HttpFields();
87             //fields.put( "te", "trailers");
88             //fields.put( "content-type", "application/grpc+proto");
89             //fields.put( "grpc-accept-encoding", "identity");
90             //fields.put( "accept-encoding", "identity");
91 
92             HttpRequest metaData = new HttpRequest( "POST", HttpScheme.HTTP, 
93                 // new HostPortHttpField( format( "%s:%d", _host, _port)), 
94                 _host, _port,
95                 path, HttpVersion.HTTP_2, fields);
96 
97             auto conn = _promise.get();
98             auto client = cast(Http2ClientConnection) conn;
99             auto streampromise = new FuturePromise!(Stream)();
100             auto http2session = client.getHttp2Session();
101             auto grpcstream = new GrpcStream();
102 
103             grpcstream.onDataReceived((ubyte[] data) {
104                 grpcstream.onCallBack(data);
105                 // service.process(method, grpcstream, data);
106             });    
107 
108             // dfmt off
109             http2session.newStream( new HeadersFrame( metaData , null , false), streampromise , 
110                 new class StreamListener {
111                     StreamListener onPush(Stream stream,
112                     PushPromiseFrame frame) {
113                         logInfo( "onPush");
114                         return null;
115                     }
116                     /// unused
117                     override
118                     void onReset(Stream stream, ResetFrame frame, Callback callback) {
119                         logInfo( "onReset");
120                         try {
121                             onReset( stream, frame);
122                             callback.succeeded();
123                         } catch (Exception x) {
124                             callback.failed( x);
125                         }
126                     }
127                     /// unused
128                     override
129                     void onReset(Stream stream, ResetFrame frame) {
130                         logInfo( "onReset2");
131                     }
132                     /// unused
133                     override
134                     bool onIdleTimeout(Stream stream, Exception x) {
135                         logInfo( "timeout");
136                         return true;
137                     }
138                     /// unused
139                     override string toString()
140                     {
141                         return super.toString();
142                     }
143 
144                     override void onHeaders(Stream stream, HeadersFrame frame) {
145                         grpcstream.onHeaders( stream , frame);
146                     }
147 
148                     override void onData(Stream stream, DataFrame frame, Callback callback) {
149                         // try {
150                         //     grpcstream.onData(stream , frame);                            
151                         //     callback.succeeded();
152                         // } catch (Exception x) {
153                         //     callback.failed(x);
154                         // }                        
155                         if (grpcstream.isAsyn())
156                         {
157                             ubyte[] complete = grpcstream.onDataTransitTask(stream , frame);
158                             callback.succeeded();
159                             if (complete !is null && !stream.isClosed())
160                             {
161                                 grpcstream.onCallBack(complete);
162                             }
163                         } else
164                         {
165                             grpcstream.onDataTransitQueue(stream , frame);
166                             callback.succeeded();
167                         }
168 
169                     }
170                 }
171             );
172 
173             // dfmt on
174             grpcstream.attachStream( streampromise.get());
175             _streamHash.put(path,grpcstream);
176             return grpcstream;
177         }
178     }
179 
180     GrpcStream getStream(string path)
181     {
182         return _streamHash[path];
183     }
184 
185     void onClose()
186     {
187         _streamHash.clear();
188     }
189 
190 
191     void destroy() {
192         _streamHash.clear();
193         _client.destroy();
194         _promise.cancel(true);
195     }
196 
197     protected {
198         string _host;
199         ushort _port;
200         HttpClient _client;
201         FuturePromise!(HttpClientConnection) _promise;
202         HttpClientOptions _HttpConfiguration;
203         HashMap!(string,GrpcStream) _streamHash;
204     }
205 }
206 
207 class ClientHttp2SessionListenerEx : ClientHttp2SessionListener {
208 
209     //alias hunt.http.codec.http.stream.Session = Session;
210     HttpClientOptions _HttpConfiguration;
211     this(HttpClientOptions config) {
212         this._HttpConfiguration = config;
213     }
214 
215     override Map!(int, int) onPreface(Session session) {
216         Map!(int, int) settings = new HashMap!(int, int)();
217 
218         settings.put(SettingsFrame.HEADER_TABLE_SIZE, _HttpConfiguration.getMaxDynamicTableSize());
219         settings.put(SettingsFrame.INITIAL_WINDOW_SIZE,
220                 _HttpConfiguration.getInitialStreamSendWindow());
221         return settings;
222     }
223 
224     override StreamListener onNewStream(Stream stream, HeadersFrame frame) {
225         return null;
226     }
227 
228     override void onSettings(Session session, SettingsFrame frame) {
229     }
230 
231     override void onPing(Session session, PingFrame frame) {
232     }
233 
234     override void onReset(Session session, ResetFrame frame) {
235         logInfo("onReset");
236     }
237 
238     override void onClose(Session session, GoAwayFrame frame) {
239         logInfo("onClose");
240     }
241 
242     override void onFailure(Session session, Exception failure) {
243         warning("onFailure");
244     }
245 
246     override bool onIdleTimeout(Session session) {
247         return false;
248     }
249 }