--- a/mod_smacks/mod_smacks.lua Thu Oct 09 15:08:05 2014 +0200
+++ b/mod_smacks/mod_smacks.lua Sun Oct 12 13:24:50 2014 +0200
@@ -68,8 +68,47 @@
end
end);
-local function wrap_session(session, resume, xmlns_sm)
- local sm_attr = { xmlns = xmlns_sm };
+local function outgoing_stanza_filter(stanza, session)
+ local is_stanza = stanza.attr and not stanza.attr.xmlns;
+ if is_stanza and not stanza._cached then -- Stanza in default stream namespace
+ local queue = session.outgoing_stanza_queue;
+ module:log("debug", "st.clone( %s ) -- %s a stanza", tostring(stanza), is_stanza and "is" or "is not");
+ local cached_stanza = st.clone(stanza);
+ cached_stanza._cached = true;
+
+ if cached_stanza and cached_stanza:get_child("delay", xmlns_delay) == nil then
+ cached_stanza = cached_stanza:tag("delay", { xmlns = xmlns_delay, from = session.host, stamp = datetime.datetime()});
+ end
+
+ queue[#queue+1] = cached_stanza;
+ session.log("debug", "#queue = %d", #queue);
+ if #queue > max_unacked_stanzas then
+ module:add_timer(0, function ()
+ if not session.awaiting_ack then
+ session.awaiting_ack = true;
+ session.send(st.stanza("r", { xmlns = session.smacks }));
+ end
+ end);
+ end
+ end
+ if session.hibernating then
+ session.log("debug", "hibernating, stanza queued")
+ -- The session is hibernating, no point in sending the stanza
+ -- over a dead connection. It will be delivered upon resumption.
+ return nil; -- or empty string?
+ end
+ return stanza;
+end
+
+local function count_incoming_stanzas(stanza, session)
+ if not stanza.attr.xmlns then
+ session.handled_stanza_count = session.handled_stanza_count + 1;
+ session.log("debug", "Handled %d incoming stanzas", session.handled_stanza_count);
+ end
+ return stanza;
+end
+
+local function wrap_session(session, resume)
-- Overwrite process_stanza() and send()
local queue;
if not resume then
@@ -80,39 +119,7 @@
queue = session.outgoing_stanza_queue;
end
- local _send = session.sends2s or session.send;
- local function new_send(stanza)
- local is_stanza = stanza.attr and not stanza.attr.xmlns;
- if is_stanza then -- Stanza in default stream namespace
- module:log("debug", "st.clone( %s ) -- %s a stanza", tostring(stanza), is_stanza and "is" or "is not");
- local cached_stanza = st.clone(stanza);
-
- if cached_stanza and cached_stanza:get_child("delay", xmlns_delay) == nil then
- cached_stanza = cached_stanza:tag("delay", { xmlns = xmlns_delay, from = session.host, stamp = datetime.datetime()});
- end
-
- queue[#queue+1] = cached_stanza;
- session.log("debug", "#queue = %d", #queue);
- end
- if session.hibernating then
- session.log("debug", "hibernating, stanza queued")
- -- The session is hibernating, no point in sending the stanza
- -- over a dead connection. It will be delivered upon resumption.
- return true;
- end
- local ok, err = _send(stanza);
- if ok and #queue > max_unacked_stanzas and not session.awaiting_ack and is_stanza then
- session.awaiting_ack = true;
- return _send(st.stanza("r", sm_attr));
- end
- return ok, err;
- end
-
- if session.sends2s then
- session.sends2s = new_send;
- else
- session.send = new_send;
- end
+ add_filter(session, "stanzas/out", outgoing_stanza_filter, -1000);
local session_close = session.close;
function session.close(...)
@@ -125,13 +132,7 @@
if not resume then
session.handled_stanza_count = 0;
- add_filter(session, "stanzas/in", function (stanza)
- if not stanza.attr.xmlns then
- session.handled_stanza_count = session.handled_stanza_count + 1;
- session.log("debug", "Handled %d incoming stanzas", session.handled_stanza_count);
- end
- return stanza;
- end);
+ add_filter(session, "stanzas/in", count_incoming_stanzas, 1000);
end
return session;
@@ -146,9 +147,9 @@
end
module:log("debug", "Enabling stream management");
- session.smacks = true;
+ session.smacks = xmlns_sm;
- wrap_session(session, false, xmlns_sm);
+ wrap_session(session, false);
local resume_token;
local resume = stanza.attr.resume;
@@ -165,9 +166,9 @@
function handle_enabled(session, stanza, xmlns_sm)
module:log("debug", "Enabling stream management");
- session.smacks = true;
+ session.smacks = xmlns_sm;
- wrap_session(session, false, xmlns_sm);
+ wrap_session(session, false);
-- FIXME Resume?
@@ -310,24 +311,13 @@
original_session.ip = session.ip;
original_session.conn = session.conn;
original_session.send = session.send;
+ original_session.send.filter = original_session.filter;
original_session.stream = session.stream;
original_session.secure = session.secure;
original_session.hibernating = nil;
- local filter = original_session.filter;
- local stream = session.stream;
- local log = session.log;
- function original_session.data(data)
- data = filter("bytes/in", data);
- if data then
- local ok, err = stream:feed(data);
- if ok then return; end
- log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_"));
- original_session:close("xml-not-well-formed");
- end
- end
- wrap_session(original_session, true, xmlns_sm);
+ wrap_session(original_session, true);
-- Inform xmppstream of the new session (passed to its callbacks)
- stream:set_session(original_session);
+ original_session.stream:set_session(original_session);
-- Similar for connlisteners
c2s_sessions[session.conn] = original_session;