1 module grpc.GrpcClient; 2 3 import hunt.concurrency.Promise; 4 5 import hunt.logging.ConsoleLogger; 6 7 import std.stdio; 8 import std..string; 9 import std.datetime; 10 import std.conv; 11 import std.format; 12 13 import hunt.http.client.ClientHttp2SessionListener; 14 import hunt.http.client.HttpClient; 15 import hunt.http.client.HttpClientOptions; 16 import hunt.http.client.Http2ClientConnection; 17 import hunt.http.client.HttpClientConnection; 18 19 import hunt.http.codec.http.frame; 20 import hunt.http.codec.http.model; 21 import hunt.http.codec.http.stream; 22 23 import hunt.http.HttpFields; 24 import hunt.http.HttpRequest; 25 import hunt.http.HttpScheme; 26 import hunt.http.client.HttpClientRequest; 27 28 import hunt.util.Common; 29 import hunt.concurrency.FuturePromise; 30 31 import hunt.collection; 32 //import hunt.net; 33 34 import grpc.GrpcException; 35 import grpc.GrpcStream; 36 import grpc.GrpcService; 37 import core.thread; 38 import hunt.collection.HashMap; 39 import std.parallelism; 40 41 import hunt.http.HttpVersion; 42 43 alias Channel = GrpcClient; 44 45 /** 46 * 47 */ 48 class GrpcClient { 49 50 enum int DEFAULT_RETRY_TIMES = 100; 51 enum Duration DEFAULT_RETRY_INTERVAL = 5.seconds; 52 53 this(string host, ushort port) { 54 this(); 55 connect(host, port); 56 } 57 58 this() { 59 _HttpConfiguration = new HttpClientOptions(); 60 _HttpConfiguration.setSecureConnectionEnabled(false); 61 _HttpConfiguration.setFlowControlStrategy("simple"); 62 //_HttpConfiguration.getTcpConfiguration().setStreamIdleTimeout(60 * 1000); 63 _HttpConfiguration.setProtocol(HttpVersion.HTTP_2.toString()); 64 _HttpConfiguration.setRetryTimes(DEFAULT_RETRY_TIMES); 65 _HttpConfiguration.setRetryInterval(DEFAULT_RETRY_INTERVAL); 66 _promise = new FuturePromise!(HttpClientConnection)(); 67 _client = new HttpClient(_HttpConfiguration); 68 _streamHash = new HashMap!(string,GrpcStream); 69 } 70 71 void connect(string host, ushort port) { 72 _host = host; 73 _port = port; 74 logInfo("host : ", host, " port :", port); 75 _client.setOnClosed(&onClose); 76 _client.connect(host, port, _promise, new ClientHttp2SessionListenerEx(_HttpConfiguration)); 77 } 78 79 GrpcStream createStream(string path) { 80 if (_streamHash.containsKey(path)) 81 { 82 return _streamHash.get(path); 83 } 84 else 85 { 86 HttpFields fields = new HttpFields(); 87 //fields.put( "te", "trailers"); 88 //fields.put( "content-type", "application/grpc+proto"); 89 //fields.put( "grpc-accept-encoding", "identity"); 90 //fields.put( "accept-encoding", "identity"); 91 92 HttpRequest metaData = new HttpRequest( "POST", HttpScheme.HTTP, 93 // new HostPortHttpField( format( "%s:%d", _host, _port)), 94 _host, _port, 95 path, HttpVersion.HTTP_2, fields); 96 97 auto conn = _promise.get(); 98 auto client = cast(Http2ClientConnection) conn; 99 auto streampromise = new FuturePromise!(Stream)(); 100 auto http2session = client.getHttp2Session(); 101 auto grpcstream = new GrpcStream(); 102 103 grpcstream.onDataReceived((ubyte[] data) { 104 grpcstream.onCallBack(data); 105 // service.process(method, grpcstream, data); 106 }); 107 108 // dfmt off 109 http2session.newStream( new HeadersFrame( metaData , null , false), streampromise , 110 new class StreamListener { 111 StreamListener onPush(Stream stream, 112 PushPromiseFrame frame) { 113 logInfo( "onPush"); 114 return null; 115 } 116 /// unused 117 override 118 void onReset(Stream stream, ResetFrame frame, Callback callback) { 119 logInfo( "onReset"); 120 try { 121 onReset( stream, frame); 122 callback.succeeded(); 123 } catch (Exception x) { 124 callback.failed( x); 125 } 126 } 127 /// unused 128 override 129 void onReset(Stream stream, ResetFrame frame) { 130 logInfo( "onReset2"); 131 } 132 /// unused 133 override 134 bool onIdleTimeout(Stream stream, Exception x) { 135 logInfo( "timeout"); 136 return true; 137 } 138 /// unused 139 override string toString() 140 { 141 return super.toString(); 142 } 143 144 override void onHeaders(Stream stream, HeadersFrame frame) { 145 grpcstream.onHeaders( stream , frame); 146 } 147 148 override void onData(Stream stream, DataFrame frame, Callback callback) { 149 // try { 150 // grpcstream.onData(stream , frame); 151 // callback.succeeded(); 152 // } catch (Exception x) { 153 // callback.failed(x); 154 // } 155 if (grpcstream.isAsyn()) 156 { 157 ubyte[] complete = grpcstream.onDataTransitTask(stream , frame); 158 callback.succeeded(); 159 if (complete !is null && !stream.isClosed()) 160 { 161 grpcstream.onCallBack(complete); 162 } 163 } else 164 { 165 grpcstream.onDataTransitQueue(stream , frame); 166 callback.succeeded(); 167 } 168 169 } 170 } 171 ); 172 173 // dfmt on 174 grpcstream.attachStream( streampromise.get()); 175 _streamHash.put(path,grpcstream); 176 return grpcstream; 177 } 178 } 179 180 GrpcStream getStream(string path) 181 { 182 return _streamHash[path]; 183 } 184 185 void onClose() 186 { 187 _streamHash.clear(); 188 } 189 190 191 void destroy() { 192 _streamHash.clear(); 193 _client.destroy(); 194 _promise.cancel(true); 195 } 196 197 protected { 198 string _host; 199 ushort _port; 200 HttpClient _client; 201 FuturePromise!(HttpClientConnection) _promise; 202 HttpClientOptions _HttpConfiguration; 203 HashMap!(string,GrpcStream) _streamHash; 204 } 205 } 206 207 class ClientHttp2SessionListenerEx : ClientHttp2SessionListener { 208 209 //alias hunt.http.codec.http.stream.Session = Session; 210 HttpClientOptions _HttpConfiguration; 211 this(HttpClientOptions config) { 212 this._HttpConfiguration = config; 213 } 214 215 override Map!(int, int) onPreface(Session session) { 216 Map!(int, int) settings = new HashMap!(int, int)(); 217 218 settings.put(SettingsFrame.HEADER_TABLE_SIZE, _HttpConfiguration.getMaxDynamicTableSize()); 219 settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, 220 _HttpConfiguration.getInitialStreamSendWindow()); 221 return settings; 222 } 223 224 override StreamListener onNewStream(Stream stream, HeadersFrame frame) { 225 return null; 226 } 227 228 override void onSettings(Session session, SettingsFrame frame) { 229 } 230 231 override void onPing(Session session, PingFrame frame) { 232 } 233 234 override void onReset(Session session, ResetFrame frame) { 235 logInfo("onReset"); 236 } 237 238 override void onClose(Session session, GoAwayFrame frame) { 239 logInfo("onClose"); 240 } 241 242 override void onFailure(Session session, Exception failure) { 243 warning("onFailure"); 244 } 245 246 override bool onIdleTimeout(Session session) { 247 return false; 248 } 249 }