1 module grpc.GrpcClient;
2 
3 import hunt.concurrency.Promise;
4 import hunt.concurrency.CompletableFuture;
5 
6 import hunt.logging.ConsoleLogger;
7 
8 import std.stdio;
9 import std.string;
10 import std.datetime;
11 import std.conv;
12 import std.format;
13 
14 import hunt.http.client.ClientHttp2SessionListener;
15 import hunt.http.client.HttpClient;
16 import hunt.http.client.HttpClientOptions;
17 import hunt.http.client.Http2ClientConnection;
18 import hunt.http.client.HttpClientConnection;
19 
20 import hunt.http.codec.http.frame;
21 import hunt.http.codec.http.model;
22 import hunt.http.codec.http.stream;
23 
24 import hunt.http.HttpFields;
25 import hunt.http.HttpRequest;
26 import hunt.http.HttpScheme;
27 import hunt.http.client.HttpClientRequest;
28 
29 import hunt.util.Common;
30 import hunt.concurrency.FuturePromise;
31 
32 import hunt.collection;
33 //import hunt.net;
34 
35 import grpc.GrpcException;
36 import grpc.GrpcStream;
37 import grpc.GrpcService;
38 import core.thread;
39 import hunt.collection.HashMap;
40 import std.parallelism;
41 
42 import hunt.http.HttpVersion;
43 
44 alias Channel = GrpcClient;
45 
46 /**
47  * 
48  */
49 class GrpcClient {
50 
51     enum int DEFAULT_RETRY_TIMES = 100;
52     enum Duration DEFAULT_RETRY_INTERVAL = 5.seconds;
53 
54     this(string host, ushort port) {
55         this();
56         connect(host, port);
57     }
58 
59     this() {
60         _HttpConfiguration = new HttpClientOptions();
61         _HttpConfiguration.setSecureConnectionEnabled(false);
62         _HttpConfiguration.setFlowControlStrategy("simple");
63         //_HttpConfiguration.getTcpConfiguration().setStreamIdleTimeout(60 * 1000);
64         _HttpConfiguration.setProtocol(HttpVersion.HTTP_2.toString());
65         _HttpConfiguration.setRetryTimes(DEFAULT_RETRY_TIMES);
66         _HttpConfiguration.setRetryInterval(DEFAULT_RETRY_INTERVAL);
67         _promise = new FuturePromise!(HttpClientConnection)();
68         _client = new HttpClient(_HttpConfiguration);
69         _streamHash = new HashMap!(string,GrpcStream);
70     }
71 
72     void connect(string host, ushort port) {
73         _host = host;
74         _port = port;
75         logInfo("host : ", host, " port :", port);
76         _client.setOnClosed(&onClose);
77         _client.connect(host, port, _promise, new ClientHttp2SessionListenerEx(_HttpConfiguration));
78     }
79 
80     GrpcStream createStream(string path) {
81         if (_streamHash.containsKey(path))
82         {
83             return _streamHash.get(path);
84         }
85         else
86         {
87             HttpFields fields = new HttpFields();
88             //fields.put( "te", "trailers");
89             //fields.put( "content-type", "application/grpc+proto");
90             //fields.put( "grpc-accept-encoding", "identity");
91             //fields.put( "accept-encoding", "identity");
92 
93             HttpRequest metaData = new HttpRequest( "POST", HttpScheme.HTTP, 
94                 // new HostPortHttpField( format( "%s:%d", _host, _port)), 
95                 _host, _port,
96                 path, HttpVersion.HTTP_2, fields);
97 
98             auto conn = _promise.get();
99             auto client = cast(Http2ClientConnection) conn;
100             auto streampromise = new FuturePromise!(Stream)();
101             auto http2session = client.getHttp2Session();
102             auto grpcstream = new GrpcStream();
103 
104             grpcstream.onDataReceived((ubyte[] data) {
105                 grpcstream.onCallBack(data);
106                 // service.process(method, grpcstream, data);
107             });    
108 
109             // dfmt off
110             http2session.newStream( new HeadersFrame( metaData , null , false), streampromise , 
111                 new class StreamListener {
112                     StreamListener onPush(Stream stream,
113                     PushPromiseFrame frame) {
114                         logInfo( "onPush");
115                         return null;
116                     }
117                     /// unused
118                     override
119                     void onReset(Stream stream, ResetFrame frame, Callback callback) {
120                         logInfo( "onReset");
121                         try {
122                             onReset( stream, frame);
123                             callback.succeeded();
124                         } catch (Exception x) {
125                             callback.failed( x);
126                         }
127                     }
128                     /// unused
129                     override
130                     void onReset(Stream stream, ResetFrame frame) {
131                         logInfo( "onReset2");
132                     }
133                     /// unused
134                     override
135                     bool onIdleTimeout(Stream stream, Exception x) {
136                         logInfo( "timeout");
137                         return true;
138                     }
139                     /// unused
140                     override string toString()
141                     {
142                         return super.toString();
143                     }
144 
145                     override void onHeaders(Stream stream, HeadersFrame frame) {
146                         grpcstream.onHeaders( stream , frame);
147                     }
148 
149                     override void onData(Stream stream, DataFrame frame, Callback callback) {
150                         // try {
151                         //     grpcstream.onData(stream , frame);                            
152                         //     callback.succeeded();
153                         // } catch (Exception x) {
154                         //     callback.failed(x);
155                         // }                        
156                         if (grpcstream.isAsyn())
157                         {
158                             ubyte[] complete = grpcstream.onDataTransitTask(stream , frame);
159                             callback.succeeded();
160                             if (complete !is null && !stream.isClosed())
161                             {
162                                 grpcstream.onCallBack(complete);
163                             }
164                         } else
165                         {
166                             grpcstream.onDataTransitQueue(stream , frame);
167                             callback.succeeded();
168                         }
169 
170                     }
171                 }
172             );
173 
174             // dfmt on
175             grpcstream.attachStream( streampromise.get());
176             _streamHash.put(path,grpcstream);
177             return grpcstream;
178         }
179     }
180 
181     GrpcStream getStream(string path)
182     {
183         return _streamHash[path];
184     }
185 
186     void onClose()
187     {
188         _streamHash.clear();
189     }
190 
191 
192     void destroy() {
193         _streamHash.clear();
194         _client.destroy();
195         _promise.cancel(true);
196     }
197 
198     protected {
199         string _host;
200         ushort _port;
201         HttpClient _client;
202         FuturePromise!(HttpClientConnection) _promise;
203         HttpClientOptions _HttpConfiguration;
204         HashMap!(string,GrpcStream) _streamHash;
205     }
206 }
207 
208 class ClientHttp2SessionListenerEx : ClientHttp2SessionListener {
209 
210     //alias hunt.http.codec.http.stream.Session = Session;
211     HttpClientOptions _HttpConfiguration;
212     this(HttpClientOptions config) {
213         this._HttpConfiguration = config;
214     }
215 
216     override Map!(int, int) onPreface(Session session) {
217         Map!(int, int) settings = new HashMap!(int, int)();
218 
219         settings.put(SettingsFrame.HEADER_TABLE_SIZE, _HttpConfiguration.getMaxDynamicTableSize());
220         settings.put(SettingsFrame.INITIAL_WINDOW_SIZE,
221                 _HttpConfiguration.getInitialStreamSendWindow());
222         return settings;
223     }
224 
225     override StreamListener onNewStream(Stream stream, HeadersFrame frame) {
226         return null;
227     }
228 
229     override void onSettings(Session session, SettingsFrame frame) {
230     }
231 
232     override void onPing(Session session, PingFrame frame) {
233     }
234 
235     override void onReset(Session session, ResetFrame frame) {
236         logInfo("onReset");
237     }
238 
239     override void onClose(Session session, GoAwayFrame frame) {
240         logInfo("onClose");
241     }
242 
243     override void onFailure(Session session, Exception failure) {
244         warning("onFailure");
245     }
246 
247     override bool onIdleTimeout(Session session) {
248         return false;
249     }
250 }