net/server_epoll.lua
changeset 7640 cccea9136b2d
parent 7633 abe2697b5e92
child 7660 252823632401
equal deleted inserted replaced
7639:7674cb520557 7640:cccea9136b2d
    36 	handshake_timeout = 60;
    36 	handshake_timeout = 60;
    37 	max_wait = 86400;
    37 	max_wait = 86400;
    38 };
    38 };
    39 
    39 
    40 local fds = createtable(10, 0); -- FD -> conn
    40 local fds = createtable(10, 0); -- FD -> conn
       
    41 
       
    42 -- Timer and scheduling --
       
    43 
    41 local timers = {};
    44 local timers = {};
    42 
    45 
    43 local function noop() end
    46 local function noop() end
    44 local function closetimer(t)
    47 local function closetimer(t)
    45 	t[1] = 0;
    48 	t[1] = 0;
    90 			end
    93 			end
    91 			break;
    94 			break;
    92 		end
    95 		end
    93 		local new_timeout = f(now);
    96 		local new_timeout = f(now);
    94 		if new_timeout then
    97 		if new_timeout then
    95 			-- Schedlue for 'delay' from the time actually sheduled,
    98 			-- Schedule for 'delay' from the time actually scheduled,
    96 			-- not from now, in order to prevent timer drift.
    99 			-- not from now, in order to prevent timer drift.
    97 			timer[1] = t + new_timeout;
   100 			timer[1] = t + new_timeout;
    98 			resort_timers = true;
   101 			resort_timers = true;
    99 		else
   102 		else
   100 			t_remove(timers, i);
   103 			t_remove(timers, i);
   109 		next_delay = 1e-6;
   112 		next_delay = 1e-6;
   110 	end
   113 	end
   111 	return next_delay;
   114 	return next_delay;
   112 end
   115 end
   113 
   116 
       
   117 -- Socket handler interface
       
   118 
   114 local interface = {};
   119 local interface = {};
   115 local interface_mt = { __index = interface };
   120 local interface_mt = { __index = interface };
   116 
   121 
   117 function interface_mt:__tostring()
   122 function interface_mt:__tostring()
   118 	if self.sockname and self.peername then
   123 	if self.sockname and self.peername then
   121 		return ("FD %d (%s, %d)"):format(self:getfd(), self.sockname or self.peername, self.sockport or self.peerport);
   126 		return ("FD %d (%s, %d)"):format(self:getfd(), self.sockname or self.peername, self.sockport or self.peerport);
   122 	end
   127 	end
   123 	return ("%s FD %d"):format(tostring(self.conn), self:getfd());
   128 	return ("%s FD %d"):format(tostring(self.conn), self:getfd());
   124 end
   129 end
   125 
   130 
       
   131 -- Replace the listener and tell the old one
   126 function interface:setlistener(listeners)
   132 function interface:setlistener(listeners)
   127 	self:on("detach");
   133 	self:on("detach");
   128 	self.listeners = listeners;
   134 	self.listeners = listeners;
   129 end
   135 end
   130 
   136 
   131 -- Call callback
   137 -- Call a listener callback
   132 function interface:on(what, ...)
   138 function interface:on(what, ...)
   133 	local listener = self.listeners["on"..what];
   139 	local listener = self.listeners["on"..what];
   134 	if not listener then
   140 	if not listener then
   135 		-- log("debug", "Missing listener 'on%s'", what); -- uncomment for development and debugging
   141 		-- log("debug", "Missing listener 'on%s'", what); -- uncomment for development and debugging
   136 		return;
   142 		return;
   140 		log("error", "Error calling on%s: %s", what, err);
   146 		log("error", "Error calling on%s: %s", what, err);
   141 	end
   147 	end
   142 	return err;
   148 	return err;
   143 end
   149 end
   144 
   150 
       
   151 -- Return the file descriptor number
   145 function interface:getfd()
   152 function interface:getfd()
   146 	if self.conn then
   153 	if self.conn then
   147 		return self.conn:getfd();
   154 		return self.conn:getfd();
   148 	end
   155 	end
   149 	return -1;
   156 	return -1;
   150 end
   157 end
   151 
   158 
       
   159 -- Get IP address
   152 function interface:ip()
   160 function interface:ip()
   153 	return self.peername or self.sockname;
   161 	return self.peername or self.sockname;
   154 end
   162 end
   155 
   163 
       
   164 -- Get a port number, doesn't matter which
   156 function interface:port()
   165 function interface:port()
   157 	return self.sockport or self.peerport;
   166 	return self.sockport or self.peerport;
   158 end
   167 end
   159 
   168 
       
   169 -- Get local port number
   160 function interface:clientport()
   170 function interface:clientport()
   161 	return self.sockport;
   171 	return self.sockport;
   162 end
   172 end
   163 
   173 
       
   174 -- Get remote port
   164 function interface:serverport()
   175 function interface:serverport()
   165 	if self.sockport then
   176 	if self.sockport then
   166 		return self.sockport;
   177 		return self.sockport;
   167 	elseif self.server then
   178 	elseif self.server then
   168 		self.server:port();
   179 		self.server:port();
   169 	end
   180 	end
   170 end
   181 end
   171 
   182 
       
   183 -- Return underlying socket
   172 function interface:socket()
   184 function interface:socket()
   173 	return self.conn;
   185 	return self.conn;
   174 end
   186 end
   175 
   187 
   176 function interface:setoption(k, v)
   188 function interface:setoption(k, v)
   178 	if self.conn.setoption then
   190 	if self.conn.setoption then
   179 		self.conn:setoption(k, v);
   191 		self.conn:setoption(k, v);
   180 	end
   192 	end
   181 end
   193 end
   182 
   194 
       
   195 -- Timeout for detecting dead or idle sockets
   183 function interface:setreadtimeout(t)
   196 function interface:setreadtimeout(t)
   184 	if t == false then
   197 	if t == false then
   185 		if self._readtimeout then
   198 		if self._readtimeout then
   186 			self._readtimeout:close();
   199 			self._readtimeout:close();
   187 			self._readtimeout = nil;
   200 			self._readtimeout = nil;
   202 			end
   215 			end
   203 		end);
   216 		end);
   204 	end
   217 	end
   205 end
   218 end
   206 
   219 
       
   220 -- Timeout for detecting dead sockets
   207 function interface:setwritetimeout(t)
   221 function interface:setwritetimeout(t)
   208 	if t == false then
   222 	if t == false then
   209 		if self._writetimeout then
   223 		if self._writetimeout then
   210 			self._writetimeout:close();
   224 			self._writetimeout:close();
   211 			self._writetimeout = nil;
   225 			self._writetimeout = nil;
   222 			self:destroy();
   236 			self:destroy();
   223 		end);
   237 		end);
   224 	end
   238 	end
   225 end
   239 end
   226 
   240 
       
   241 -- lua-epoll flag for currently requested poll state
   227 function interface:flags()
   242 function interface:flags()
   228 	if self._wantread then
   243 	if self._wantread then
   229 		if self._wantwrite then
   244 		if self._wantwrite then
   230 			return "rw";
   245 			return "rw";
   231 		end
   246 		end
   233 	elseif self._wantwrite then
   248 	elseif self._wantwrite then
   234 		return "w";
   249 		return "w";
   235 	end
   250 	end
   236 end
   251 end
   237 
   252 
       
   253 -- Add or remove sockets or modify epoll flags
   238 function interface:setflags(r, w)
   254 function interface:setflags(r, w)
   239 	if r ~= nil then self._wantread = r; end
   255 	if r ~= nil then self._wantread = r; end
   240 	if w ~= nil then self._wantwrite = w; end
   256 	if w ~= nil then self._wantwrite = w; end
   241 	local flags = self:flags();
   257 	local flags = self:flags();
   242 	local currentflags = self._flags;
   258 	local currentflags = self._flags;
   318 		self:on("disconnect", err);
   334 		self:on("disconnect", err);
   319 		self:destroy();
   335 		self:destroy();
   320 	end
   336 	end
   321 end
   337 end
   322 
   338 
       
   339 -- The write buffer has been successfully emptied
   323 function interface:ondrain()
   340 function interface:ondrain()
   324 	if self._toclose then
   341 	if self._toclose then
   325 		return self:close();
   342 		return self:close();
   326 	elseif self._starttls then
   343 	elseif self._starttls then
   327 		return self:starttls();
   344 		return self:starttls();
   328 	else
   345 	else
   329 		return self:on("drain");
   346 		return self:on("drain");
   330 	end
   347 	end
   331 end
   348 end
   332 
   349 
       
   350 -- Add data to write buffer and set flag for wanting to write
   333 function interface:write(data)
   351 function interface:write(data)
   334 	local buffer = self.writebuffer;
   352 	local buffer = self.writebuffer;
   335 	if buffer then
   353 	if buffer then
   336 		t_insert(buffer, data);
   354 		t_insert(buffer, data);
   337 	else
   355 	else
   341 	self:setflags(nil, true);
   359 	self:setflags(nil, true);
   342 	return #data;
   360 	return #data;
   343 end
   361 end
   344 interface.send = interface.write;
   362 interface.send = interface.write;
   345 
   363 
       
   364 -- Close, possibly after writing is done
   346 function interface:close()
   365 function interface:close()
   347 	if self._wantwrite then
   366 	if self._wantwrite then
   348 		self:setflags(false, true); -- Flush final buffer contents
   367 		self:setflags(false, true); -- Flush final buffer contents
   349 		self.write, self.send = noop, noop; -- No more writing
   368 		self.write, self.send = noop, noop; -- No more writing
   350 		log("debug", "Close %s after writing", tostring(self));
   369 		log("debug", "Close %s after writing", tostring(self));
   452 		conn.sockname, conn.sockport = client:getsockname();
   471 		conn.sockname, conn.sockport = client:getsockname();
   453 	end
   472 	end
   454 	return conn;
   473 	return conn;
   455 end
   474 end
   456 
   475 
       
   476 -- A server interface has new incoming connections waiting
       
   477 -- This replaces the onreadable callback
   457 function interface:onacceptable()
   478 function interface:onacceptable()
   458 	local conn, err = self.conn:accept();
   479 	local conn, err = self.conn:accept();
   459 	if not conn then
   480 	if not conn then
   460 		log("debug", "Error accepting new client: %s, server will be paused for %ds", err, cfg.accept_retry_interval);
   481 		log("debug", "Error accepting new client: %s, server will be paused for %ds", err, cfg.accept_retry_interval);
   461 		self:pausefor(cfg.accept_retry_interval);
   482 		self:pausefor(cfg.accept_retry_interval);
   464 	local client = wrapsocket(conn, self, nil, self.listeners, self.tls);
   485 	local client = wrapsocket(conn, self, nil, self.listeners, self.tls);
   465 	log("debug", "New connection %s", tostring(client));
   486 	log("debug", "New connection %s", tostring(client));
   466 	client:init();
   487 	client:init();
   467 end
   488 end
   468 
   489 
       
   490 -- Initialization
   469 function interface:init()
   491 function interface:init()
   470 	if self.tls and not self._tls then
   492 	if self.tls and not self._tls then
   471 		self._tls = false; -- This means we should call onconnect when TLS is up
   493 		self._tls = false; -- This means we should call onconnect when TLS is up
   472 		return self:starttls();
   494 		return self:starttls();
   473 	else
   495 	else
   499 		end
   521 		end
   500 		self:setflags(true);
   522 		self:setflags(true);
   501 	end);
   523 	end);
   502 end
   524 end
   503 
   525 
       
   526 -- Connected!
   504 function interface:onconnect()
   527 function interface:onconnect()
   505 	self.onwriteable = nil;
   528 	self.onwriteable = nil;
   506 	self:on("connect");
   529 	self:on("connect");
   507 	self:setflags(true);
   530 	self:setflags(true);
   508 	return self:onwriteable();
   531 	return self:onwriteable();
   568 	self.send = new_send;
   591 	self.send = new_send;
   569 end
   592 end
   570 
   593 
   571 local quitting = nil;
   594 local quitting = nil;
   572 
   595 
       
   596 -- Signal main loop about shutdown via above upvalue
   573 local function setquitting()
   597 local function setquitting()
   574 	quitting = "quitting";
   598 	quitting = "quitting";
   575 end
   599 end
   576 
   600 
       
   601 -- Main loop
   577 local function loop()
   602 local function loop()
   578 	repeat
   603 	repeat
   579 		local t = runtimers(cfg.max_wait);
   604 		local t = runtimers(cfg.max_wait);
   580 		local fd, r, w = epoll.wait(t);
   605 		local fd, r, w = epoll.wait(t);
   581 		if fd then
   606 		if fd then