1 module grpc.GrpcClient; 2 3 import hunt.concurrency.Promise; 4 import hunt.concurrency.CompletableFuture; 5 6 import hunt.logging.ConsoleLogger; 7 8 import std.stdio; 9 import std.string; 10 import std.datetime; 11 import std.conv; 12 import std.format; 13 14 import hunt.http.client.ClientHttp2SessionListener; 15 import hunt.http.client.HttpClient; 16 import hunt.http.client.HttpClientOptions; 17 import hunt.http.client.Http2ClientConnection; 18 import hunt.http.client.HttpClientConnection; 19 20 import hunt.http.codec.http.frame; 21 import hunt.http.codec.http.model; 22 import hunt.http.codec.http.stream; 23 24 import hunt.http.HttpFields; 25 import hunt.http.HttpRequest; 26 import hunt.http.HttpScheme; 27 import hunt.http.client.HttpClientRequest; 28 29 import hunt.util.Common; 30 import hunt.concurrency.FuturePromise; 31 32 import hunt.collection; 33 //import hunt.net; 34 35 import grpc.GrpcException; 36 import grpc.GrpcStream; 37 import grpc.GrpcService; 38 import core.thread; 39 import hunt.collection.HashMap; 40 import std.parallelism; 41 42 import hunt.http.HttpVersion; 43 44 alias Channel = GrpcClient; 45 46 /** 47 * 48 */ 49 class GrpcClient { 50 51 enum int DEFAULT_RETRY_TIMES = 100; 52 enum Duration DEFAULT_RETRY_INTERVAL = 5.seconds; 53 54 this(string host, ushort port) { 55 this(); 56 connect(host, port); 57 } 58 59 this() { 60 _HttpConfiguration = new HttpClientOptions(); 61 _HttpConfiguration.setSecureConnectionEnabled(false); 62 _HttpConfiguration.setFlowControlStrategy("simple"); 63 //_HttpConfiguration.getTcpConfiguration().setStreamIdleTimeout(60 * 1000); 64 _HttpConfiguration.setProtocol(HttpVersion.HTTP_2.toString()); 65 _HttpConfiguration.setRetryTimes(DEFAULT_RETRY_TIMES); 66 _HttpConfiguration.setRetryInterval(DEFAULT_RETRY_INTERVAL); 67 _promise = new FuturePromise!(HttpClientConnection)(); 68 _client = new HttpClient(_HttpConfiguration); 69 _streamHash = new HashMap!(string,GrpcStream); 70 } 71 72 void connect(string host, ushort port) { 73 _host = host; 74 _port = port; 75 logInfo("host : ", host, " port :", port); 76 _client.setOnClosed(&onClose); 77 _client.connect(host, port, _promise, new ClientHttp2SessionListenerEx(_HttpConfiguration)); 78 } 79 80 GrpcStream createStream(string path) { 81 if (_streamHash.containsKey(path)) 82 { 83 return _streamHash.get(path); 84 } 85 else 86 { 87 HttpFields fields = new HttpFields(); 88 //fields.put( "te", "trailers"); 89 //fields.put( "content-type", "application/grpc+proto"); 90 //fields.put( "grpc-accept-encoding", "identity"); 91 //fields.put( "accept-encoding", "identity"); 92 93 HttpRequest metaData = new HttpRequest( "POST", HttpScheme.HTTP, 94 // new HostPortHttpField( format( "%s:%d", _host, _port)), 95 _host, _port, 96 path, HttpVersion.HTTP_2, fields); 97 98 auto conn = _promise.get(); 99 auto client = cast(Http2ClientConnection) conn; 100 auto streampromise = new FuturePromise!(Stream)(); 101 auto http2session = client.getHttp2Session(); 102 auto grpcstream = new GrpcStream(); 103 104 grpcstream.onDataReceived((ubyte[] data) { 105 grpcstream.onCallBack(data); 106 // service.process(method, grpcstream, data); 107 }); 108 109 // dfmt off 110 http2session.newStream( new HeadersFrame( metaData , null , false), streampromise , 111 new class StreamListener { 112 StreamListener onPush(Stream stream, 113 PushPromiseFrame frame) { 114 logInfo( "onPush"); 115 return null; 116 } 117 /// unused 118 override 119 void onReset(Stream stream, ResetFrame frame, Callback callback) { 120 logInfo( "onReset"); 121 try { 122 onReset( stream, frame); 123 callback.succeeded(); 124 } catch (Exception x) { 125 callback.failed( x); 126 } 127 } 128 /// unused 129 override 130 void onReset(Stream stream, ResetFrame frame) { 131 logInfo( "onReset2"); 132 } 133 /// unused 134 override 135 bool onIdleTimeout(Stream stream, Exception x) { 136 logInfo( "timeout"); 137 return true; 138 } 139 /// unused 140 override string toString() 141 { 142 return super.toString(); 143 } 144 145 override void onHeaders(Stream stream, HeadersFrame frame) { 146 grpcstream.onHeaders( stream , frame); 147 } 148 149 override void onData(Stream stream, DataFrame frame, Callback callback) { 150 // try { 151 // grpcstream.onData(stream , frame); 152 // callback.succeeded(); 153 // } catch (Exception x) { 154 // callback.failed(x); 155 // } 156 if (grpcstream.isAsyn()) 157 { 158 ubyte[] complete = grpcstream.onDataTransitTask(stream , frame); 159 callback.succeeded(); 160 if (complete !is null && !stream.isClosed()) 161 { 162 grpcstream.onCallBack(complete); 163 } 164 } else 165 { 166 grpcstream.onDataTransitQueue(stream , frame); 167 callback.succeeded(); 168 } 169 170 } 171 } 172 ); 173 174 // dfmt on 175 grpcstream.attachStream( streampromise.get()); 176 _streamHash.put(path,grpcstream); 177 return grpcstream; 178 } 179 } 180 181 GrpcStream getStream(string path) 182 { 183 return _streamHash[path]; 184 } 185 186 void onClose() 187 { 188 _streamHash.clear(); 189 } 190 191 192 void destroy() { 193 _streamHash.clear(); 194 _client.destroy(); 195 _promise.cancel(true); 196 } 197 198 protected { 199 string _host; 200 ushort _port; 201 HttpClient _client; 202 FuturePromise!(HttpClientConnection) _promise; 203 HttpClientOptions _HttpConfiguration; 204 HashMap!(string,GrpcStream) _streamHash; 205 } 206 } 207 208 class ClientHttp2SessionListenerEx : ClientHttp2SessionListener { 209 210 //alias hunt.http.codec.http.stream.Session = Session; 211 HttpClientOptions _HttpConfiguration; 212 this(HttpClientOptions config) { 213 this._HttpConfiguration = config; 214 } 215 216 override Map!(int, int) onPreface(Session session) { 217 Map!(int, int) settings = new HashMap!(int, int)(); 218 219 settings.put(SettingsFrame.HEADER_TABLE_SIZE, _HttpConfiguration.getMaxDynamicTableSize()); 220 settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, 221 _HttpConfiguration.getInitialStreamSendWindow()); 222 return settings; 223 } 224 225 override StreamListener onNewStream(Stream stream, HeadersFrame frame) { 226 return null; 227 } 228 229 override void onSettings(Session session, SettingsFrame frame) { 230 } 231 232 override void onPing(Session session, PingFrame frame) { 233 } 234 235 override void onReset(Session session, ResetFrame frame) { 236 logInfo("onReset"); 237 } 238 239 override void onClose(Session session, GoAwayFrame frame) { 240 logInfo("onClose"); 241 } 242 243 override void onFailure(Session session, Exception failure) { 244 warning("onFailure"); 245 } 246 247 override bool onIdleTimeout(Session session) { 248 return false; 249 } 250 }