1 module grpc.GrpcServer; 2 3 import std.stdio; 4 import std..string; 5 6 import hunt.http.codec.http.frame; 7 import hunt.http.codec.http.model; 8 import hunt.http.codec.http.stream; 9 import hunt.http.server; 10 import hunt.http.server.HttpServer; 11 import hunt.http.server.ServerHttpHandler; 12 import hunt.http.server.ServerSessionListener; 13 14 import hunt.util.Common; 15 import hunt.collection; 16 import hunt.logging; 17 import hunt.system.Memory : totalCPUs; 18 19 import grpc.GrpcService; 20 import grpc.GrpcStream; 21 import grpc.Status; 22 import grpc.StatusCode; 23 24 import std.concurrency; 25 26 import core.thread; 27 import core.stdc.stdlib : exit; 28 29 version(Posix) { 30 import core.sys.posix.signal : bsd_signal; 31 } 32 33 extern(C) void handleTermination(int) 34 { 35 36 } 37 38 class GrpcServer 39 { 40 this() 41 { 42 _HttpConfiguration = new HttpServerOptions(); 43 _HttpConfiguration.setSecureConnectionEnabled(false); 44 _HttpConfiguration.setFlowControlStrategy("simple"); 45 _HttpConfiguration.getTcpConfiguration().workerThreadSize = totalCPUs * 2; 46 //_HttpConfiguration.getTcpConfiguration().setTimeout(60 * 1000); 47 _HttpConfiguration.setProtocol(HttpVersion.HTTP_2.asString()); 48 49 _settings = new HashMap!(int, int)(); 50 _settings.put(SettingsFrame.HEADER_TABLE_SIZE, _HttpConfiguration.getMaxDynamicTableSize()); 51 _settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, _HttpConfiguration.getInitialStreamSendWindow()); 52 } 53 54 this(HttpServerOptions options) { 55 _HttpConfiguration = options; 56 _settings = new HashMap!(int, int)(); 57 _settings.put(SettingsFrame.HEADER_TABLE_SIZE, _HttpConfiguration.getMaxDynamicTableSize()); 58 _settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, _HttpConfiguration.getInitialStreamSendWindow()); 59 } 60 61 void listen(string host , ushort port) 62 { 63 _HttpConfiguration.setHost(host); 64 _HttpConfiguration.setPort(port); 65 _server = new HttpServer( _HttpConfiguration, new class ServerSessionListener { 66 67 override 68 Map!(int, int) onPreface(Session session) { 69 version(HUNT_GRPC_DEBUG) { 70 infof("server received preface: %s", session); 71 } 72 return _settings; 73 } 74 override 75 StreamListener onNewStream(Stream stream, HeadersFrame frame) { 76 infof("server created new stream: %d", stream.getId()); 77 infof("server created new stream headers: %s", frame.getMetaData().toString()); 78 auto request = cast(HttpRequest)frame.getMetaData(); 79 80 string path = request.getURI().getPath(); 81 82 auto arr = path.split("/"); 83 auto mod = arr[1]; 84 auto method = arr[2]; 85 auto service = mod in _router ; 86 87 88 HttpFields fields = new HttpFields(); 89 //fields.put("content-type" ,"application/grpc+proto"); 90 //fields.put("grpc-accept-encoding" , "identity"); 91 //fields.put("accept-encoding" , "identity"); 92 93 auto response = new HttpResponse(HttpVersion.HTTP_2 , 200 , fields); 94 auto res_header = new HeadersFrame(stream.getId(),response , null , false); 95 stream.headers(res_header , Callback.NOOP); 96 97 if(service == null) 98 { 99 Status status = new Status(StatusCode.NOT_FOUND , "not found this module:" ~ mod); 100 stream.headers(endHeaderFrame(status ,stream.getId()), Callback.NOOP); 101 return null; 102 } 103 104 GrpcStream grpcstream = new GrpcStream(); 105 grpcstream.attachStream(stream); 106 107 grpcstream.onDataReceived((ubyte[] data) { 108 service.process(method, grpcstream, data); 109 }); 110 111 112 auto listener = new class StreamListener { 113 114 override 115 void onHeaders(Stream stream, HeadersFrame frame) { 116 grpcstream.onHeaders(stream , frame); 117 } 118 119 override 120 StreamListener onPush(Stream stream, PushPromiseFrame frame) { 121 return null; 122 } 123 124 override 125 void onData(Stream stream, DataFrame frame, Callback callback) { 126 try { 127 grpcstream.onData(stream , frame); 128 callback.succeeded(); 129 } catch (Exception x) { 130 callback.failed(x); 131 } 132 133 // ubyte[] complete = grpcstream.onDataTransitTask(stream , frame); 134 // callback.succeeded(); 135 // if (complete !is null) 136 // { 137 // import std.parallelism; 138 // auto t = task!(serviceTask , string , GrpcService , GrpcStream , 139 // ubyte [])(method , *service , grpcstream , complete); 140 // taskPool.put(t); 141 // //service.process(method , grpcstream ,complete); 142 // } 143 } 144 145 void onReset(Stream stream, ResetFrame frame, Callback callback) { 146 try { 147 grpcstream.reSet(); 148 callback.succeeded(); 149 } catch (Exception x) { 150 callback.failed(x); 151 } 152 } 153 154 override 155 void onReset(Stream stream, ResetFrame frame) { 156 } 157 158 override 159 bool onIdleTimeout(Stream stream, Exception x) { 160 return true; 161 } 162 163 override string toString() { 164 return super.toString(); 165 } 166 167 }; 168 169 return listener; 170 171 } 172 173 override 174 void onSettings(Session session, SettingsFrame frame) { 175 176 } 177 178 override 179 void onPing(Session session, PingFrame frame) { 180 } 181 182 override 183 void onReset(Session session, ResetFrame frame) { 184 185 } 186 187 override 188 void onClose(Session session, GoAwayFrame frame) { 189 } 190 191 override 192 void onFailure(Session session, Exception failure) { 193 } 194 195 void onClose(Session session, GoAwayFrame frame, Callback callback) 196 { 197 try 198 { 199 onClose(session, frame); 200 callback.succeeded(); 201 } 202 catch (Exception x) 203 { 204 callback.failed(x); 205 } 206 } 207 208 void onFailure(Session session, Exception failure, Callback callback) 209 { 210 try 211 { 212 onFailure(session, failure); 213 callback.succeeded(); 214 } 215 catch (Exception x) 216 { 217 callback.failed(x); 218 } 219 } 220 221 override 222 void onAccept(Session session) { 223 } 224 225 override 226 bool onIdleTimeout(Session session) { 227 return false; 228 } 229 }, new ServerHttpHandlerAdapter(_HttpConfiguration), null); 230 } 231 232 void register(GrpcService service) 233 { 234 _router[service.getModule()] = service; 235 } 236 237 238 239 void start() 240 { 241 version(Posix) bsd_signal(13, &handleTermination); 242 _server.start(); 243 } 244 245 void stop() 246 { 247 _server.stop(); 248 } 249 250 protected 251 { 252 HttpServerOptions _HttpConfiguration; 253 Map!(int, int) _settings; 254 HttpServer _server; 255 GrpcService[string] _router; 256 } 257 258 }