1 module grpc.GrpcServer; 2 3 import std.stdio; 4 import std.string; 5 6 import hunt.http.codec.http.frame; 7 import hunt.http.codec.http.model; 8 import hunt.http.codec.http.stream; 9 import hunt.http.server; 10 import hunt.http.server.HttpServer; 11 import hunt.http.server.ServerHttpHandler; 12 import hunt.http.server.ServerSessionListener; 13 14 import hunt.util.Common; 15 import hunt.collection; 16 import hunt.logging; 17 18 import grpc.GrpcService; 19 import grpc.GrpcStream; 20 import grpc.Status; 21 import grpc.StatusCode; 22 import core.thread; 23 import std.concurrency; 24 import core.stdc.stdlib : exit; 25 26 version(Posix) { 27 import core.sys.posix.signal : bsd_signal; 28 } 29 30 extern(C) void handleTermination(int) 31 { 32 33 } 34 35 alias Server = GrpcServer; 36 class GrpcServer 37 { 38 this() 39 { 40 _HttpConfiguration = new HttpServerOptions(); 41 _HttpConfiguration.setSecureConnectionEnabled(false); 42 _HttpConfiguration.setFlowControlStrategy("simple"); 43 //_HttpConfiguration.getTcpConfiguration().setTimeout(60 * 1000); 44 _HttpConfiguration.setProtocol(HttpVersion.HTTP_2.asString()); 45 46 _settings = new HashMap!(int, int)(); 47 _settings.put(SettingsFrame.HEADER_TABLE_SIZE, _HttpConfiguration.getMaxDynamicTableSize()); 48 _settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, _HttpConfiguration.getInitialStreamSendWindow()); 49 } 50 51 this(HttpServerOptions options) { 52 _HttpConfiguration = options; 53 _settings = new HashMap!(int, int)(); 54 _settings.put(SettingsFrame.HEADER_TABLE_SIZE, _HttpConfiguration.getMaxDynamicTableSize()); 55 _settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, _HttpConfiguration.getInitialStreamSendWindow()); 56 } 57 58 void listen(string host , ushort port) 59 { 60 _HttpConfiguration.setHost(host); 61 _HttpConfiguration.setPort(port); 62 _server = new HttpServer( _HttpConfiguration, new class ServerSessionListener { 63 64 override 65 Map!(int, int) onPreface(Session session) { 66 version(HUNT_GRPC_DEBUG) { 67 infof("server received preface: %s", session); 68 } 69 return _settings; 70 } 71 override 72 StreamListener onNewStream(Stream stream, HeadersFrame frame) { 73 infof("server created new stream: %d", stream.getId()); 74 infof("server created new stream headers: %s", frame.getMetaData().toString()); 75 auto request = cast(HttpRequest)frame.getMetaData(); 76 77 string path = request.getURI().getPath(); 78 79 auto arr = path.split("/"); 80 auto mod = arr[1]; 81 auto method = arr[2]; 82 auto service = mod in _router ; 83 84 85 HttpFields fields = new HttpFields(); 86 //fields.put("content-type" ,"application/grpc+proto"); 87 //fields.put("grpc-accept-encoding" , "identity"); 88 //fields.put("accept-encoding" , "identity"); 89 90 auto response = new HttpResponse(HttpVersion.HTTP_2 , 200 , fields); 91 auto res_header = new HeadersFrame(stream.getId(),response , null , false); 92 stream.headers(res_header , Callback.NOOP); 93 94 if(service == null) 95 { 96 Status status = new Status(StatusCode.NOT_FOUND , "not found this module:" ~ mod); 97 stream.headers(endHeaderFrame(status ,stream.getId()), Callback.NOOP); 98 return null; 99 } 100 101 GrpcStream grpcstream = new GrpcStream(); 102 grpcstream.attachStream(stream); 103 104 grpcstream.onDataReceived((ubyte[] data) { 105 service.process(method, grpcstream, data); 106 }); 107 108 109 auto listener = new class StreamListener { 110 111 override 112 void onHeaders(Stream stream, HeadersFrame frame) { 113 grpcstream.onHeaders(stream , frame); 114 } 115 116 override 117 StreamListener onPush(Stream stream, PushPromiseFrame frame) { 118 return null; 119 } 120 121 override 122 void onData(Stream stream, DataFrame frame, Callback callback) { 123 try { 124 grpcstream.onData(stream , frame); 125 callback.succeeded(); 126 } catch (Exception x) { 127 callback.failed(x); 128 } 129 130 // ubyte[] complete = grpcstream.onDataTransitTask(stream , frame); 131 // callback.succeeded(); 132 // if (complete !is null) 133 // { 134 // import std.parallelism; 135 // auto t = task!(serviceTask , string , GrpcService , GrpcStream , 136 // ubyte [])(method , *service , grpcstream , complete); 137 // taskPool.put(t); 138 // //service.process(method , grpcstream ,complete); 139 // } 140 } 141 142 void onReset(Stream stream, ResetFrame frame, Callback callback) { 143 try { 144 grpcstream.reSet(); 145 callback.succeeded(); 146 } catch (Exception x) { 147 callback.failed(x); 148 } 149 } 150 151 override 152 void onReset(Stream stream, ResetFrame frame) { 153 } 154 155 override 156 bool onIdleTimeout(Stream stream, Exception x) { 157 return true; 158 } 159 160 override string toString() { 161 return super.toString(); 162 } 163 164 }; 165 166 return listener; 167 168 } 169 170 override 171 void onSettings(Session session, SettingsFrame frame) { 172 173 } 174 175 override 176 void onPing(Session session, PingFrame frame) { 177 } 178 179 override 180 void onReset(Session session, ResetFrame frame) { 181 182 } 183 184 override 185 void onClose(Session session, GoAwayFrame frame) { 186 } 187 188 override 189 void onFailure(Session session, Exception failure) { 190 } 191 192 void onClose(Session session, GoAwayFrame frame, Callback callback) 193 { 194 try 195 { 196 onClose(session, frame); 197 callback.succeeded(); 198 } 199 catch (Exception x) 200 { 201 callback.failed(x); 202 } 203 } 204 205 void onFailure(Session session, Exception failure, Callback callback) 206 { 207 try 208 { 209 onFailure(session, failure); 210 callback.succeeded(); 211 } 212 catch (Exception x) 213 { 214 callback.failed(x); 215 } 216 } 217 218 override 219 void onAccept(Session session) { 220 } 221 222 override 223 bool onIdleTimeout(Session session) { 224 return false; 225 } 226 }, new ServerHttpHandlerAdapter(_HttpConfiguration), null); 227 } 228 229 void register(GrpcService service) 230 { 231 _router[service.getModule()] = service; 232 } 233 234 235 236 void start() 237 { 238 version(Posix) bsd_signal(13, &handleTermination); 239 _server.start(); 240 } 241 242 void stop() 243 { 244 _server.stop(); 245 } 246 247 protected 248 { 249 HttpServerOptions _HttpConfiguration; 250 Map!(int, int) _settings; 251 HttpServer _server; 252 GrpcService[string] _router; 253 } 254 255 }