36 handshake_timeout = 60; |
36 handshake_timeout = 60; |
37 max_wait = 86400; |
37 max_wait = 86400; |
38 }; |
38 }; |
39 |
39 |
40 local fds = createtable(10, 0); -- FD -> conn |
40 local fds = createtable(10, 0); -- FD -> conn |
|
41 |
|
42 -- Timer and scheduling -- |
|
43 |
41 local timers = {}; |
44 local timers = {}; |
42 |
45 |
43 local function noop() end |
46 local function noop() end |
44 local function closetimer(t) |
47 local function closetimer(t) |
45 t[1] = 0; |
48 t[1] = 0; |
90 end |
93 end |
91 break; |
94 break; |
92 end |
95 end |
93 local new_timeout = f(now); |
96 local new_timeout = f(now); |
94 if new_timeout then |
97 if new_timeout then |
95 -- Schedlue for 'delay' from the time actually sheduled, |
98 -- Schedule for 'delay' from the time actually scheduled, |
96 -- not from now, in order to prevent timer drift. |
99 -- not from now, in order to prevent timer drift. |
97 timer[1] = t + new_timeout; |
100 timer[1] = t + new_timeout; |
98 resort_timers = true; |
101 resort_timers = true; |
99 else |
102 else |
100 t_remove(timers, i); |
103 t_remove(timers, i); |
121 return ("FD %d (%s, %d)"):format(self:getfd(), self.sockname or self.peername, self.sockport or self.peerport); |
126 return ("FD %d (%s, %d)"):format(self:getfd(), self.sockname or self.peername, self.sockport or self.peerport); |
122 end |
127 end |
123 return ("%s FD %d"):format(tostring(self.conn), self:getfd()); |
128 return ("%s FD %d"):format(tostring(self.conn), self:getfd()); |
124 end |
129 end |
125 |
130 |
|
131 -- Replace the listener and tell the old one |
126 function interface:setlistener(listeners) |
132 function interface:setlistener(listeners) |
127 self:on("detach"); |
133 self:on("detach"); |
128 self.listeners = listeners; |
134 self.listeners = listeners; |
129 end |
135 end |
130 |
136 |
131 -- Call callback |
137 -- Call a listener callback |
132 function interface:on(what, ...) |
138 function interface:on(what, ...) |
133 local listener = self.listeners["on"..what]; |
139 local listener = self.listeners["on"..what]; |
134 if not listener then |
140 if not listener then |
135 -- log("debug", "Missing listener 'on%s'", what); -- uncomment for development and debugging |
141 -- log("debug", "Missing listener 'on%s'", what); -- uncomment for development and debugging |
136 return; |
142 return; |
140 log("error", "Error calling on%s: %s", what, err); |
146 log("error", "Error calling on%s: %s", what, err); |
141 end |
147 end |
142 return err; |
148 return err; |
143 end |
149 end |
144 |
150 |
|
151 -- Return the file descriptor number |
145 function interface:getfd() |
152 function interface:getfd() |
146 if self.conn then |
153 if self.conn then |
147 return self.conn:getfd(); |
154 return self.conn:getfd(); |
148 end |
155 end |
149 return -1; |
156 return -1; |
150 end |
157 end |
151 |
158 |
|
159 -- Get IP address |
152 function interface:ip() |
160 function interface:ip() |
153 return self.peername or self.sockname; |
161 return self.peername or self.sockname; |
154 end |
162 end |
155 |
163 |
|
164 -- Get a port number, doesn't matter which |
156 function interface:port() |
165 function interface:port() |
157 return self.sockport or self.peerport; |
166 return self.sockport or self.peerport; |
158 end |
167 end |
159 |
168 |
|
169 -- Get local port number |
160 function interface:clientport() |
170 function interface:clientport() |
161 return self.sockport; |
171 return self.sockport; |
162 end |
172 end |
163 |
173 |
|
174 -- Get remote port |
164 function interface:serverport() |
175 function interface:serverport() |
165 if self.sockport then |
176 if self.sockport then |
166 return self.sockport; |
177 return self.sockport; |
167 elseif self.server then |
178 elseif self.server then |
168 self.server:port(); |
179 self.server:port(); |
169 end |
180 end |
170 end |
181 end |
171 |
182 |
|
183 -- Return underlying socket |
172 function interface:socket() |
184 function interface:socket() |
173 return self.conn; |
185 return self.conn; |
174 end |
186 end |
175 |
187 |
176 function interface:setoption(k, v) |
188 function interface:setoption(k, v) |
178 if self.conn.setoption then |
190 if self.conn.setoption then |
179 self.conn:setoption(k, v); |
191 self.conn:setoption(k, v); |
180 end |
192 end |
181 end |
193 end |
182 |
194 |
|
195 -- Timeout for detecting dead or idle sockets |
183 function interface:setreadtimeout(t) |
196 function interface:setreadtimeout(t) |
184 if t == false then |
197 if t == false then |
185 if self._readtimeout then |
198 if self._readtimeout then |
186 self._readtimeout:close(); |
199 self._readtimeout:close(); |
187 self._readtimeout = nil; |
200 self._readtimeout = nil; |
233 elseif self._wantwrite then |
248 elseif self._wantwrite then |
234 return "w"; |
249 return "w"; |
235 end |
250 end |
236 end |
251 end |
237 |
252 |
|
253 -- Add or remove sockets or modify epoll flags |
238 function interface:setflags(r, w) |
254 function interface:setflags(r, w) |
239 if r ~= nil then self._wantread = r; end |
255 if r ~= nil then self._wantread = r; end |
240 if w ~= nil then self._wantwrite = w; end |
256 if w ~= nil then self._wantwrite = w; end |
241 local flags = self:flags(); |
257 local flags = self:flags(); |
242 local currentflags = self._flags; |
258 local currentflags = self._flags; |
318 self:on("disconnect", err); |
334 self:on("disconnect", err); |
319 self:destroy(); |
335 self:destroy(); |
320 end |
336 end |
321 end |
337 end |
322 |
338 |
|
339 -- The write buffer has been successfully emptied |
323 function interface:ondrain() |
340 function interface:ondrain() |
324 if self._toclose then |
341 if self._toclose then |
325 return self:close(); |
342 return self:close(); |
326 elseif self._starttls then |
343 elseif self._starttls then |
327 return self:starttls(); |
344 return self:starttls(); |
328 else |
345 else |
329 return self:on("drain"); |
346 return self:on("drain"); |
330 end |
347 end |
331 end |
348 end |
332 |
349 |
|
350 -- Add data to write buffer and set flag for wanting to write |
333 function interface:write(data) |
351 function interface:write(data) |
334 local buffer = self.writebuffer; |
352 local buffer = self.writebuffer; |
335 if buffer then |
353 if buffer then |
336 t_insert(buffer, data); |
354 t_insert(buffer, data); |
337 else |
355 else |
341 self:setflags(nil, true); |
359 self:setflags(nil, true); |
342 return #data; |
360 return #data; |
343 end |
361 end |
344 interface.send = interface.write; |
362 interface.send = interface.write; |
345 |
363 |
|
364 -- Close, possibly after writing is done |
346 function interface:close() |
365 function interface:close() |
347 if self._wantwrite then |
366 if self._wantwrite then |
348 self:setflags(false, true); -- Flush final buffer contents |
367 self:setflags(false, true); -- Flush final buffer contents |
349 self.write, self.send = noop, noop; -- No more writing |
368 self.write, self.send = noop, noop; -- No more writing |
350 log("debug", "Close %s after writing", tostring(self)); |
369 log("debug", "Close %s after writing", tostring(self)); |
452 conn.sockname, conn.sockport = client:getsockname(); |
471 conn.sockname, conn.sockport = client:getsockname(); |
453 end |
472 end |
454 return conn; |
473 return conn; |
455 end |
474 end |
456 |
475 |
|
476 -- A server interface has new incoming connections waiting |
|
477 -- This replaces the onreadable callback |
457 function interface:onacceptable() |
478 function interface:onacceptable() |
458 local conn, err = self.conn:accept(); |
479 local conn, err = self.conn:accept(); |
459 if not conn then |
480 if not conn then |
460 log("debug", "Error accepting new client: %s, server will be paused for %ds", err, cfg.accept_retry_interval); |
481 log("debug", "Error accepting new client: %s, server will be paused for %ds", err, cfg.accept_retry_interval); |
461 self:pausefor(cfg.accept_retry_interval); |
482 self:pausefor(cfg.accept_retry_interval); |
464 local client = wrapsocket(conn, self, nil, self.listeners, self.tls); |
485 local client = wrapsocket(conn, self, nil, self.listeners, self.tls); |
465 log("debug", "New connection %s", tostring(client)); |
486 log("debug", "New connection %s", tostring(client)); |
466 client:init(); |
487 client:init(); |
467 end |
488 end |
468 |
489 |
|
490 -- Initialization |
469 function interface:init() |
491 function interface:init() |
470 if self.tls and not self._tls then |
492 if self.tls and not self._tls then |
471 self._tls = false; -- This means we should call onconnect when TLS is up |
493 self._tls = false; -- This means we should call onconnect when TLS is up |
472 return self:starttls(); |
494 return self:starttls(); |
473 else |
495 else |
568 self.send = new_send; |
591 self.send = new_send; |
569 end |
592 end |
570 |
593 |
571 local quitting = nil; |
594 local quitting = nil; |
572 |
595 |
|
596 -- Signal main loop about shutdown via above upvalue |
573 local function setquitting() |
597 local function setquitting() |
574 quitting = "quitting"; |
598 quitting = "quitting"; |
575 end |
599 end |
576 |
600 |
|
601 -- Main loop |
577 local function loop() |
602 local function loop() |
578 repeat |
603 repeat |
579 local t = runtimers(cfg.max_wait); |
604 local t = runtimers(cfg.max_wait); |
580 local fd, r, w = epoll.wait(t); |
605 local fd, r, w = epoll.wait(t); |
581 if fd then |
606 if fd then |