1 module grpc.GrpcServer;
2 
3 import std.stdio;
4 import std.string;
5 
6 import hunt.http.codec.http.frame;
7 import hunt.http.codec.http.model;
8 import hunt.http.codec.http.stream;
9 import hunt.http.server;
10 import hunt.http.server.HttpServer;
11 import hunt.http.server.ServerHttpHandler;
12 import hunt.http.server.ServerSessionListener;
13 
14 import hunt.util.Common;
15 import hunt.collection;
16 import hunt.logging;
17 
18 import grpc.GrpcService;
19 import grpc.GrpcStream;
20 import grpc.Status;
21 import grpc.StatusCode;
22 import core.thread;
23 import std.concurrency;
24 import core.stdc.stdlib : exit;
25 
26 version(Posix) {
27     import core.sys.posix.signal : bsd_signal;
28 }
29 
30 extern(C) void handleTermination(int)
31 {
32 
33 }
34 
35 alias Server = GrpcServer;
36 class GrpcServer
37 {
38     this()
39     {
40         _HttpConfiguration = new HttpServerOptions();
41         _HttpConfiguration.setSecureConnectionEnabled(false);
42         _HttpConfiguration.setFlowControlStrategy("simple");
43         //_HttpConfiguration.getTcpConfiguration().setTimeout(60 * 1000);
44         _HttpConfiguration.setProtocol(HttpVersion.HTTP_2.asString());
45 
46         _settings = new HashMap!(int, int)();
47         _settings.put(SettingsFrame.HEADER_TABLE_SIZE, _HttpConfiguration.getMaxDynamicTableSize());
48         _settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, _HttpConfiguration.getInitialStreamSendWindow());
49     }
50 
51     this(HttpServerOptions options) {
52         _HttpConfiguration = options;
53         _settings = new HashMap!(int, int)();
54         _settings.put(SettingsFrame.HEADER_TABLE_SIZE, _HttpConfiguration.getMaxDynamicTableSize());
55         _settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, _HttpConfiguration.getInitialStreamSendWindow());
56     }
57 
58     void listen(string host , ushort port)
59     {
60         _HttpConfiguration.setHost(host);
61         _HttpConfiguration.setPort(port);
62         _server = new HttpServer( _HttpConfiguration, new class ServerSessionListener {
63 
64             override
65             Map!(int, int) onPreface(Session session) {
66                 version(HUNT_GRPC_DEBUG) {
67                     infof("server received preface: %s", session);
68                 }
69                 return _settings;
70             }
71             override
72             StreamListener onNewStream(Stream stream, HeadersFrame frame) {
73                 infof("server created new stream: %d", stream.getId());
74                 infof("server created new stream headers: %s", frame.getMetaData().toString());
75                 auto request = cast(HttpRequest)frame.getMetaData();
76 
77                 string path = request.getURI().getPath();
78 
79                 auto arr = path.split("/");
80                 auto mod = arr[1];
81                 auto method = arr[2];
82                 auto service =  mod in _router ;
83             
84 
85                 HttpFields fields = new HttpFields();
86                 //fields.put("content-type" ,"application/grpc+proto");
87                 //fields.put("grpc-accept-encoding" , "identity");
88                 //fields.put("accept-encoding" , "identity");
89 
90                 auto response = new HttpResponse(HttpVersion.HTTP_2 , 200 , fields);
91                 auto res_header = new HeadersFrame(stream.getId(),response , null , false);
92                 stream.headers(res_header , Callback.NOOP);
93 
94                 if(service == null)
95                 {
96                     Status status = new Status(StatusCode.NOT_FOUND , "not found this module:" ~ mod);
97                     stream.headers(endHeaderFrame(status ,stream.getId()), Callback.NOOP);
98                     return null;
99                 }
100 
101                 GrpcStream grpcstream = new  GrpcStream();
102                 grpcstream.attachStream(stream);
103 
104                 grpcstream.onDataReceived((ubyte[] data) {
105                     service.process(method, grpcstream, data);
106                 });       
107 
108 
109                 auto listener =  new class StreamListener {
110 
111                     override
112                     void onHeaders(Stream stream, HeadersFrame frame) {
113                         grpcstream.onHeaders(stream , frame);
114                     }
115 
116                     override
117                     StreamListener onPush(Stream stream, PushPromiseFrame frame) {
118                         return null;
119                     }
120 
121                     override
122                     void onData(Stream stream, DataFrame frame, Callback callback) {
123                         try {
124                             grpcstream.onData(stream , frame);                            
125                             callback.succeeded();
126                         } catch (Exception x) {
127                             callback.failed(x);
128                         }
129 
130                         // ubyte[] complete = grpcstream.onDataTransitTask(stream , frame);
131                         // callback.succeeded();
132                         // if (complete !is null)
133                         // {
134                         //     import std.parallelism;
135                         //     auto t = task!(serviceTask , string , GrpcService , GrpcStream , 
136                         //             ubyte [])(method , *service , grpcstream , complete);
137                         //     taskPool.put(t);
138                         //     //service.process(method , grpcstream ,complete);
139                         // }
140                     }
141 
142                     void onReset(Stream stream, ResetFrame frame, Callback callback) {
143                         try {
144                             grpcstream.reSet();
145                             callback.succeeded();
146                         } catch (Exception x) {
147                             callback.failed(x);
148                         }
149                     }
150 
151                     override
152                     void onReset(Stream stream, ResetFrame frame) {
153                     }
154 
155                     override
156                     bool onIdleTimeout(Stream stream, Exception x) {
157                         return true;
158                     }
159 
160                     override string toString() {
161                         return super.toString();
162                     }
163 
164                 };
165 
166                 return listener;
167 
168             }
169 
170             override
171             void onSettings(Session session, SettingsFrame frame) {
172                 
173             }
174 
175             override
176             void onPing(Session session, PingFrame frame) {
177             }
178 
179             override
180             void onReset(Session session, ResetFrame frame) {
181                 
182             }
183 
184             override
185             void onClose(Session session, GoAwayFrame frame) {
186             }
187 
188             override
189             void onFailure(Session session, Exception failure) {
190             }
191 
192             void onClose(Session session, GoAwayFrame frame, Callback callback)
193             {
194                 try
195                 {
196                     onClose(session, frame);
197                     callback.succeeded();
198                 }
199                 catch (Exception x)
200                 {
201                     callback.failed(x);
202                 }
203             }
204 
205             void onFailure(Session session, Exception failure, Callback callback)
206             {
207                 try
208                 {
209                     onFailure(session, failure);
210                     callback.succeeded();
211                 }
212                 catch (Exception x)
213                 {
214                     callback.failed(x);
215                 }
216             }
217 
218             override
219             void onAccept(Session session) {
220             }
221 
222             override
223             bool onIdleTimeout(Session session) {
224                 return false;
225             }
226         }, new ServerHttpHandlerAdapter(_HttpConfiguration), null);
227     }
228 
229     void register(GrpcService service)
230     {
231         _router[service.getModule()] = service;
232     }
233 
234 
235 
236     void start()
237     {
238         version(Posix) bsd_signal(13, &handleTermination);
239         _server.start();
240     }
241 
242     void stop()
243     {
244         _server.stop();
245     }
246 
247     protected
248     {
249         HttpServerOptions      _HttpConfiguration;
250         Map!(int, int)          _settings;
251         HttpServer             _server;
252         GrpcService[string]     _router;
253     }
254 
255 }