1 module grpc.GrpcServer;
2 
3 import std.stdio;
4 import std..string;
5 
6 import hunt.http.codec.http.frame;
7 import hunt.http.codec.http.model;
8 import hunt.http.codec.http.stream;
9 import hunt.http.server;
10 import hunt.http.server.HttpServer;
11 import hunt.http.server.ServerHttpHandler;
12 import hunt.http.server.ServerSessionListener;
13 
14 import hunt.util.Common;
15 import hunt.collection;
16 import hunt.logging;
17 import hunt.system.Memory : totalCPUs;
18 
19 import grpc.GrpcService;
20 import grpc.GrpcStream;
21 import grpc.Status;
22 import grpc.StatusCode;
23 
24 import std.concurrency;
25 
26 import core.thread;
27 import core.stdc.stdlib : exit;
28 
29 version(Posix) {
30     import core.sys.posix.signal : bsd_signal;
31 }
32 
33 extern(C) void handleTermination(int)
34 {
35 
36 }
37 
38 class GrpcServer
39 {
40     this()
41     {
42         _HttpConfiguration = new HttpServerOptions();
43         _HttpConfiguration.setSecureConnectionEnabled(false);
44         _HttpConfiguration.setFlowControlStrategy("simple");
45         _HttpConfiguration.getTcpConfiguration().workerThreadSize = totalCPUs * 2;
46         //_HttpConfiguration.getTcpConfiguration().setTimeout(60 * 1000);
47         _HttpConfiguration.setProtocol(HttpVersion.HTTP_2.asString());
48 
49         _settings = new HashMap!(int, int)();
50         _settings.put(SettingsFrame.HEADER_TABLE_SIZE, _HttpConfiguration.getMaxDynamicTableSize());
51         _settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, _HttpConfiguration.getInitialStreamSendWindow());
52     }
53 
54     this(HttpServerOptions options) {
55         _HttpConfiguration = options;
56         _settings = new HashMap!(int, int)();
57         _settings.put(SettingsFrame.HEADER_TABLE_SIZE, _HttpConfiguration.getMaxDynamicTableSize());
58         _settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, _HttpConfiguration.getInitialStreamSendWindow());
59     }
60 
61     void listen(string host , ushort port)
62     {
63         _HttpConfiguration.setHost(host);
64         _HttpConfiguration.setPort(port);
65         _server = new HttpServer( _HttpConfiguration, new class ServerSessionListener {
66 
67             override
68             Map!(int, int) onPreface(Session session) {
69                 version(HUNT_GRPC_DEBUG) {
70                     infof("server received preface: %s", session);
71                 }
72                 return _settings;
73             }
74             override
75             StreamListener onNewStream(Stream stream, HeadersFrame frame) {
76                 infof("server created new stream: %d", stream.getId());
77                 infof("server created new stream headers: %s", frame.getMetaData().toString());
78                 auto request = cast(HttpRequest)frame.getMetaData();
79 
80                 string path = request.getURI().getPath();
81 
82                 auto arr = path.split("/");
83                 auto mod = arr[1];
84                 auto method = arr[2];
85                 auto service =  mod in _router ;
86             
87 
88                 HttpFields fields = new HttpFields();
89                 //fields.put("content-type" ,"application/grpc+proto");
90                 //fields.put("grpc-accept-encoding" , "identity");
91                 //fields.put("accept-encoding" , "identity");
92 
93                 auto response = new HttpResponse(HttpVersion.HTTP_2 , 200 , fields);
94                 auto res_header = new HeadersFrame(stream.getId(),response , null , false);
95                 stream.headers(res_header , Callback.NOOP);
96 
97                 if(service == null)
98                 {
99                     Status status = new Status(StatusCode.NOT_FOUND , "not found this module:" ~ mod);
100                     stream.headers(endHeaderFrame(status ,stream.getId()), Callback.NOOP);
101                     return null;
102                 }
103 
104                 GrpcStream grpcstream = new  GrpcStream();
105                 grpcstream.attachStream(stream);
106 
107                 grpcstream.onDataReceived((ubyte[] data) {
108                     service.process(method, grpcstream, data);
109                 });       
110 
111 
112                 auto listener =  new class StreamListener {
113 
114                     override
115                     void onHeaders(Stream stream, HeadersFrame frame) {
116                         grpcstream.onHeaders(stream , frame);
117                     }
118 
119                     override
120                     StreamListener onPush(Stream stream, PushPromiseFrame frame) {
121                         return null;
122                     }
123 
124                     override
125                     void onData(Stream stream, DataFrame frame, Callback callback) {
126                         try {
127                             grpcstream.onData(stream , frame);                            
128                             callback.succeeded();
129                         } catch (Exception x) {
130                             callback.failed(x);
131                         }
132 
133                         // ubyte[] complete = grpcstream.onDataTransitTask(stream , frame);
134                         // callback.succeeded();
135                         // if (complete !is null)
136                         // {
137                         //     import std.parallelism;
138                         //     auto t = task!(serviceTask , string , GrpcService , GrpcStream , 
139                         //             ubyte [])(method , *service , grpcstream , complete);
140                         //     taskPool.put(t);
141                         //     //service.process(method , grpcstream ,complete);
142                         // }
143                     }
144 
145                     void onReset(Stream stream, ResetFrame frame, Callback callback) {
146                         try {
147                             grpcstream.reSet();
148                             callback.succeeded();
149                         } catch (Exception x) {
150                             callback.failed(x);
151                         }
152                     }
153 
154                     override
155                     void onReset(Stream stream, ResetFrame frame) {
156                     }
157 
158                     override
159                     bool onIdleTimeout(Stream stream, Exception x) {
160                         return true;
161                     }
162 
163                     override string toString() {
164                         return super.toString();
165                     }
166 
167                 };
168 
169                 return listener;
170 
171             }
172 
173             override
174             void onSettings(Session session, SettingsFrame frame) {
175                 
176             }
177 
178             override
179             void onPing(Session session, PingFrame frame) {
180             }
181 
182             override
183             void onReset(Session session, ResetFrame frame) {
184                 
185             }
186 
187             override
188             void onClose(Session session, GoAwayFrame frame) {
189             }
190 
191             override
192             void onFailure(Session session, Exception failure) {
193             }
194 
195             void onClose(Session session, GoAwayFrame frame, Callback callback)
196             {
197                 try
198                 {
199                     onClose(session, frame);
200                     callback.succeeded();
201                 }
202                 catch (Exception x)
203                 {
204                     callback.failed(x);
205                 }
206             }
207 
208             void onFailure(Session session, Exception failure, Callback callback)
209             {
210                 try
211                 {
212                     onFailure(session, failure);
213                     callback.succeeded();
214                 }
215                 catch (Exception x)
216                 {
217                     callback.failed(x);
218                 }
219             }
220 
221             override
222             void onAccept(Session session) {
223             }
224 
225             override
226             bool onIdleTimeout(Session session) {
227                 return false;
228             }
229         }, new ServerHttpHandlerAdapter(_HttpConfiguration), null);
230     }
231 
232     void register(GrpcService service)
233     {
234         _router[service.getModule()] = service;
235     }
236 
237 
238 
239     void start()
240     {
241         version(Posix) bsd_signal(13, &handleTermination);
242         _server.start();
243     }
244 
245     void stop()
246     {
247         _server.stop();
248     }
249 
250     protected
251     {
252         HttpServerOptions      _HttpConfiguration;
253         Map!(int, int)          _settings;
254         HttpServer             _server;
255         GrpcService[string]     _router;
256     }
257 
258 }