# HG changeset patch # User Kim Alvefur # Date 1343830272 -7200 # Node ID d11d91ee81edd88f7a418db20de8d7a6409ae79d # Parent bcf0c9fff512b5a237dd4203933345eb6c25935f mod_pubsub_hub: New module that implements the Hub part of PubSubHubbub diff -r bcf0c9fff512 -r d11d91ee81ed mod_pubsub_hub/mod_pubsub_hub.lua --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/mod_pubsub_hub/mod_pubsub_hub.lua Wed Aug 01 16:11:12 2012 +0200 @@ -0,0 +1,241 @@ +-- Copyright (C) 2011 - 2012 Kim Alvefur +-- +-- This file is MIT/X11 licensed. + +local http = require "net.http"; +local formdecode = http.formdecode; +local formencode = http.formencode; +local uuid = require "util.uuid".generate; +local hmac_sha1 = require "util.hmac".sha1; +local json_encode = require "util.json".encode; +local time = os.time; +local m_min, m_max = math.min, math.max; +local tostring = tostring; +local xmlns_pubsub = "http://jabber.org/protocol/pubsub"; +local xmlns_pubsub_event = xmlns_pubsub .. "#event"; +local subs_by_topic = module:shared"subscriptions"; + +local max_lease, min_lease, default_lease = 86400, 600, 3600; + +module:depends"pubsub"; + +local valid_modes = { ["subscribe"] = true, ["unsubscribe"] = true, } + +local function do_subscribe(subscription) + -- FIXME handle other states + if subscription.state == "subscribed" then + local ok, err = hosts[module.host].modules.pubsub.service:add_subscription(subscription.topic, true, module.host); + module:log(ok and "debug" or "error", "add_subscription() => %s, %s", tostring(ok), tostring(err)); + end +end + +local function handle_request(event) + local request, response = event.request, event.response; + local method, body = request.method, request.body; + + local query = request.url.query or {}; + if query and type(query) == "string" then + query = formdecode(query); + end + if body and request.headers.content_type == "application/x-www-form-urlencoded" then + body = formdecode(body); + end + + if method == "POST" then + -- Subscription request + if body["hub.callback"] and body["hub.mode"] and valid_modes[body["hub.mode"]] + and body["hub.topic"] and body["hub.verify"] then + + -- http://pubsubhubbub.googlecode.com/svn/trunk/pubsubhubbub-core-0.3.html#anchor5 + local callback = body["hub.callback"]; + local mode = body["hub.mode"]; + local topic = body["hub.topic"]; + local lease_seconds = m_min(tonumber(body["hub.lease_seconds"]) or default_lease, max_lease); + local secret = body["hub.secret"]; + local verify_token = body["hub.verify_token"]; + + module:log("debug", "topic is "..(type(topic)=="string" and "%q" or "%s"), topic); + + if not subs_by_topic[topic] then + subs_by_topic[topic] = {}; + end + local subscription = subs_by_topic[topic][callback]; + + local verify_modes = {}; + for i=1,#body do + if body[i].name == "hub.verify" then + verify_modes[body[i].value] = true; + end + end + + subscription = subscription or { + id = uuid(), + callback = callback, + topic = topic, + state = "unsubscribed", + secret = secret, + want_state = mode, + lease_seconds = lease_seconds, + expires = time() + lease_seconds, + }; + subs_by_topic[topic][callback] = subscription; + local challenge = uuid(); + + local callback_url = callback .. (callback:match("%?") and "&" or "?") .. formencode{ + ["hub.mode"] = mode, + ["hub.topic"] = topic, + ["hub.challenge"] = challenge, + ["hub.lease_seconds"] = tostring(lease_seconds), + ["hub.verify_token"] = verify_token, + } + module:log("debug", require"util.serialization".serialize(verify_modes)); + if verify_modes["async"] then + module:log("debug", "Sending async verification request to %s for %s", tostring(callback_url), tostring(subscription)); + http.request(callback_url, nil, function(body, code) + if body == challenge and code > 199 and code < 300 then + if not subscription.want_state then + module:log("warn", "Verification of already verified request, probably"); + return; + end + subscription.state = subscription.want_state .. "d"; + subscription.want_state = nil; + module:log("debug", "calling do_subscribe()"); + do_subscribe(subscription); + subs_by_topic[topic][callback] = subscription; + else + module:log("warn", "status %d and body was %q", tostring(code), tostring(body)); + subs_by_topic[topic][callback] = subscription; + end + end) + return 202; + elseif verify_modes["sync"] then + http.request(callback_url, nil, function(body, code) + if body == challenge and code > 199 and code < 300 then + if not subscription.want_state then + module:log("warn", "Verification of already verified request, probably"); + return; + end + if mode == "unsubscribe" then + subs_by_topic[topic][callback] = nil; + else + subscription.state = subscription.want_state .. "d"; + subscription.want_state = nil; + module:log("debug", "calling do_subscribe()"); + do_subscribe(subscription); + subs_by_topic[topic][callback] = subscription; + end + else + subs_by_topic[topic][callback] = subscription; + end + response.status = 204; + response:send(); + end) + return true; + end + return 400; + else + response.status = 400; + response.headers.content_type = "text/html"; + return "

Bad Request

\nMissing required parameter(s)\n" + end + end +end + +local function periodic() + local now = time(); + local next_check = now + max_lease; + local purge = false + for topic, callbacks in pairs(subs_by_topic) do + for callback, subscription in pairs(callbacks) do + if subscription.mode == "subscribed" then + if subscription.expires < now then + -- Subscription has expired, drop it. + purge = true + end + if subscription.expires < now + min_lease then + -- Subscription set to expire soon, re-confirm it. + local challenge = uuid(); + local callback_url = callback .. (callback:match("%?") and "&" or "?") .. formencode{ + ["hub.mode"] = subscription.state, + ["hub.topic"] = topic, + ["hub.challenge"] = challenge, + ["hub.lease_seconds"] = subscription.lease_seconds, + ["hub.verify_token"] = subscription.verify_token, + } + http.request(callback_url, nil, function(body, code) + if body == challenge and code > 199 and code < 300 then + subscription.expires = now + subscription.lease_seconds; + end + end); + else + next_check = m_min(next_check, now - subscription.expires) + end + end + end + if purge then + local new_callbacks = {}; + for callback, subscription in pairs(callbacks) do + if (subscription.state == "subscribed" and subscription.expires < now) + and subscription.want_state ~= "remove" then + new_callbacks[callback] = subscription; + end + end + subs_by_topic[topic] = new_callbacks + end + end + return m_max(next_check - min_lease, min_lease); +end + +local function on_notify(subscription, content) + local body = tostring(content); + local headers = { + ["Content-Type"] = "application/xml", + }; + if subscription.secret then + headers["X-Hub-Signature"] = "sha1="..hmac_sha1(subscription.secret, body, true); + end + http.request(subscription.callback, { method = "POST", body = body, headers = headers }, function(body, code) + if code >= 200 and code <= 299 then + module:log("debug", "Delivered"); + else + module:log("warn", "Got status code %d on delivery to %s", tonumber(code) or -1, tostring(subscription.callback)); + -- TODO Retry + -- ... but the spec says that you should not retry, wtf? + end + end); +end + +module:hook("message/host", function(event) + local stanza = event.stanza; + if stanza.attr.from ~= module.host then return end; + + for pubsub_event in stanza:childtags("event", xmlns_pubsub_event) do + local items = pubsub_event:get_child("items"); + local node = items.attr.node; + if items and node and subs_by_topic[node] then + for item in items:childtags("item") do + local content = item.tags[1]; + for callback, subscription in pairs(subs_by_topic[node]) do + on_notify(subscription, content) + end + end + end + end + return true; +end, 10); + +module:depends"http"; +module:provides("http", { + default_path = "/hub"; + route = { + POST = handle_request; + GET = function() + return json_encode(subs_by_topic); + end; + ["GET /topic/*"] = function(event, path) + return json_encode(subs_by_topic[path]) + end; + }; +}); + +module:add_timer(1, periodic);