Imported LmConnection refactor from old repository.
Socket code splitted out into LmSocket and the message queue now lives in LmMessageQueue.
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/.gitignore Fri Feb 02 15:16:48 2007 +0100
@@ -0,0 +1,24 @@
+Makefile
+Makefile.in
+*.la
+*.lo
+*.o
+.deps
+.libs
+ltmain.sh
+missing
+stamp-h1
+configure
+config.h*
+config.log
+config.status
+config.sub
+depcomp
+aclocal.m4
+autom4te*
+config.guess
+gtk-doc.make
+install-sh
+libtool
+*.pc
+*.spec
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/examples/.gitignore Fri Feb 02 15:16:48 2007 +0100
@@ -0,0 +1,7 @@
+lm-change-password
+lm-register
+lm-send-async
+lm-send-sync
+test-http-proxy
+test-lm
+test-tunnel
--- a/loudmouth/Makefile.am Fri Feb 02 15:04:37 2007 +0100
+++ b/loudmouth/Makefile.am Fri Feb 02 15:16:48 2007 +0100
@@ -31,6 +31,10 @@
lm-message.c \
lm-message-handler.c \
lm-message-node.c \
+ lm-message-queue.c \
+ lm-message-queue.h \
+ lm-misc.c \
+ lm-misc.h \
lm-parser.c \
lm-parser.h \
lm-internals.h \
@@ -45,6 +49,8 @@
lm-proxy.c \
lm-sock.h \
lm-sock.c \
+ lm-socket.c \
+ lm-socket.h \
$(NULL)
libloudmouthinclude_HEADERS = \
--- a/loudmouth/lm-connection.c Fri Feb 02 15:04:37 2007 +0100
+++ b/loudmouth/lm-connection.c Fri Feb 02 15:16:48 2007 +0100
@@ -31,24 +31,20 @@
#include "lm-debug.h"
#include "lm-error.h"
#include "lm-internals.h"
+#include "lm-message-queue.h"
+#include "lm-misc.h"
#include "lm-ssl-internals.h"
#include "lm-parser.h"
#include "lm-sha.h"
#include "lm-connection.h"
#include "lm-utils.h"
-
-#define IN_BUFFER_SIZE 1024
+#include "lm-socket.h"
typedef struct {
LmHandlerPriority priority;
LmMessageHandler *handler;
} HandlerData;
-typedef struct {
- GSource source;
- LmConnection *connection;
-} LmIncomingSource;
-
struct _LmConnection {
/* Parameters */
GMainContext *context;
@@ -56,24 +52,17 @@
gchar *jid;
guint port;
+ LmSocket *socket;
LmSSL *ssl;
-
LmProxy *proxy;
-
LmParser *parser;
+
gchar *stream_id;
GHashTable *id_handlers;
GSList *handlers[LM_MESSAGE_TYPE_UNKNOWN];
/* Communication */
- GIOChannel *io_channel;
- guint io_watch_in;
- guint io_watch_err;
- guint io_watch_hup;
- LmSocket fd;
- guint io_watch_connect;
-
guint open_id;
LmCallback *open_cb;
@@ -81,24 +70,16 @@
gboolean blocking;
gboolean cancel_open;
- LmCallback *close_cb; /* unused */
LmCallback *auth_cb;
- LmCallback *register_cb; /* unused */
LmCallback *disconnect_cb;
- GQueue *incoming_messages;
- GSource *incoming_source;
+ LmMessageQueue *queue;
LmConnectionState state;
guint keep_alive_rate;
- guint keep_alive_id;
-
- guint io_watch_out;
- GString *out_buf;
-
- LmConnectData *connect_data;
+ GSource *keep_alive_source;
gint ref_count;
};
@@ -121,24 +102,7 @@
static gboolean connection_do_open (LmConnection *connection,
GError **error);
-static void connection_do_close (LmConnection *connection);
-static gint connection_do_write (LmConnection *connection,
- const gchar *buf,
- gint len);
-static gboolean connection_in_event (GIOChannel *source,
- GIOCondition condition,
- LmConnection *connection);
-static gboolean connection_error_event (GIOChannel *source,
- GIOCondition condition,
- LmConnection *connection);
-static gboolean connection_hup_event (GIOChannel *source,
- GIOCondition condition,
- LmConnection *connection);
-static gboolean connection_send (LmConnection *connection,
- const gchar *str,
- gint len,
- GError **error);
static LmMessage * connection_create_auth_req_msg (const gchar *username);
static LmMessage * connection_create_auth_msg (LmConnection *connection,
const gchar *username,
@@ -149,59 +113,36 @@
LmConnection *connection,
LmMessage *m,
gpointer user_data);
-static int connection_check_auth_type (LmMessage *auth_req_rpl);
+static int connection_check_auth_type (LmMessage *auth_req_rpl);
-static LmHandlerResult connection_auth_reply (LmMessageHandler *handler,
- LmConnection *connection,
- LmMessage *m,
- gpointer user_data);
-
-static void connection_stream_received (LmConnection *connection,
- LmMessage *m);
+static LmHandlerResult
+connection_auth_reply (LmMessageHandler *handler,
+ LmConnection *connection,
+ LmMessage *m,
+ gpointer user_data);
-static gint connection_handler_compare_func (HandlerData *a,
- HandlerData *b);
-static gboolean connection_incoming_prepare (GSource *source,
- gint *timeout);
-static gboolean connection_incoming_check (GSource *source);
-static gboolean connection_incoming_dispatch (GSource *source,
- GSourceFunc callback,
- gpointer user_data);
-static GSource * connection_create_source (LmConnection *connection);
-static void connection_signal_disconnect (LmConnection *connection,
- LmDisconnectReason reason);
+static void connection_stream_received (LmConnection *connection,
+ LmMessage *m);
-static void connection_do_connect (LmConnectData *connect_data);
-static guint connection_add_watch (LmConnection *connection,
- GIOChannel *channel,
- GIOCondition condition,
- GIOFunc func,
- gpointer user_data);
-static gboolean connection_send_keep_alive (LmConnection *connection);
-static void connection_start_keep_alive (LmConnection *connection);
-static void connection_stop_keep_alive (LmConnection *connection);
-static gboolean connection_buffered_write_cb (GIOChannel *source,
- GIOCondition condition,
- LmConnection *connection);
-static gboolean connection_output_is_buffered (LmConnection *connection,
- const gchar *buffer,
- gint len);
-static void connection_setup_output_buffer (LmConnection *connection,
- const gchar *buffer,
- gint len);
-
-static GSourceFuncs incoming_funcs = {
- connection_incoming_prepare,
- connection_incoming_check,
- connection_incoming_dispatch,
- NULL
-};
+static gint connection_handler_compare_func (HandlerData *a,
+ HandlerData *b);
+static gboolean connection_send_keep_alive (LmConnection *connection);
+static void connection_start_keep_alive (LmConnection *connection);
+static void connection_stop_keep_alive (LmConnection *connection);
+static gboolean connection_send (LmConnection *connection,
+ const gchar *str,
+ gint len,
+ GError **error);
+static void connection_message_queue_cb (LmMessageQueue *queue,
+ LmConnection *connection);
+static void connection_incoming_data (LmSocket *socket,
+ const gchar *buf,
+ LmConnection *connection);
static void
connection_free (LmConnection *connection)
{
int i;
- LmMessage *m;
g_free (connection->server);
g_free (connection->jid);
@@ -226,7 +167,7 @@
g_hash_table_destroy (connection->id_handlers);
if (connection->state >= LM_CONNECTION_STATE_OPENING) {
- connection_do_close (connection);
+ _lm_connection_do_close (connection);
}
if (connection->open_cb) {
@@ -239,28 +180,16 @@
lm_connection_set_disconnect_function (connection, NULL, NULL, NULL);
- while ((m = g_queue_pop_head (connection->incoming_messages)) != NULL) {
- lm_message_unref (m);
- }
+ lm_message_queue_unref (connection->queue);
- if (connection->ssl) {
- lm_ssl_unref (connection->ssl);
- }
-
- if (connection->proxy) {
- lm_proxy_unref (connection->proxy);
- }
-
- g_queue_free (connection->incoming_messages);
-
if (connection->context) {
g_main_context_unref (connection->context);
}
- if (connection->out_buf) {
- g_string_free (connection->out_buf, TRUE);
+ if (connection->socket) {
+ lm_socket_unref (connection->socket);
}
-
+
g_free (connection);
}
@@ -329,401 +258,7 @@
_lm_message_type_to_string (lm_message_get_type (m)),
from);
- g_queue_push_tail (connection->incoming_messages, m);
-}
-
-void
-_lm_connection_succeeded (LmConnectData *connect_data)
-{
- LmConnection *connection;
- LmMessage *m;
- gchar *server_from_jid;
- gchar *ch;
-
- connection = connect_data->connection;
-
- if (connection->io_watch_connect != 0) {
- GSource *source;
-
- source = g_main_context_find_source_by_id (connection->context,
- connection->io_watch_connect);
- if (source) {
- g_source_destroy (source);
- }
- connection->io_watch_connect = 0;
- }
-
- /* Need some way to report error/success */
- if (connection->cancel_open) {
- lm_verbose ("Cancelling connection...\n");
- return;
- }
-
- connection->fd = connect_data->fd;
- connection->io_channel = connect_data->io_channel;
-
- freeaddrinfo (connect_data->resolved_addrs);
- connection->connect_data = NULL;
- g_free (connect_data);
-
- if (connection->ssl) {
- GError *error = NULL;
-
- lm_verbose ("Setting up SSL...\n");
-
-#ifdef HAVE_GNUTLS
- /* GNU TLS requires the socket to be blocking */
- _lm_sock_set_blocking (connection->fd, TRUE);
-#endif
-
- if (!_lm_ssl_begin (connection->ssl, connection->fd,
- connection->server,
- &error)) {
- lm_verbose ("Could not begin SSL\n");
-
- if (error) {
- g_log (LM_LOG_DOMAIN, LM_LOG_LEVEL_NET,
- "%s\n", error->message);
- g_error_free (error);
- }
-
- _lm_sock_shutdown (connection->fd);
- _lm_sock_close (connection->fd);
-
- connection_do_close (connection);
-
- return;
- }
-
-#ifdef HAVE_GNUTLS
- _lm_sock_set_blocking (connection->fd, FALSE);
-#endif
- }
-
- connection->io_watch_in =
- connection_add_watch (connection,
- connection->io_channel,
- G_IO_IN,
- (GIOFunc) connection_in_event,
- connection);
-
- /* FIXME: if we add these, we don't get ANY
- * response from the server, this is to do with the way that
- * windows handles watches, see bug #331214.
- */
-#ifndef G_OS_WIN32
- connection->io_watch_err =
- connection_add_watch (connection,
- connection->io_channel,
- G_IO_ERR,
- (GIOFunc) connection_error_event,
- connection);
-
- connection->io_watch_hup =
- connection_add_watch (connection,
- connection->io_channel,
- G_IO_HUP,
- (GIOFunc) connection_hup_event,
- connection);
-#endif
-
- /* FIXME: Set up according to XMPP 1.0 specification */
- /* StartTLS and the like */
- if (!connection_send (connection,
- "<?xml version='1.0' encoding='UTF-8'?>", -1,
- NULL)) {
- lm_verbose ("Failed to send xml version and encoding\n");
- connection_do_close (connection);
-
- return;
- }
-
- if (connection->jid != NULL && (ch = strchr (connection->jid, '@')) != NULL) {
- server_from_jid = ch + 1;
- } else {
- server_from_jid = connection->server;
- }
-
- m = lm_message_new (server_from_jid, LM_MESSAGE_TYPE_STREAM);
- lm_message_node_set_attributes (m->node,
- "xmlns:stream",
- "http://etherx.jabber.org/streams",
- "xmlns", "jabber:client",
- NULL);
-
- lm_verbose ("Opening stream...");
-
- if (!lm_connection_send (connection, m, NULL)) {
- lm_verbose ("Failed to send stream information\n");
- connection_do_close (connection);
- }
-
- lm_message_unref (m);
-}
-
-void
-_lm_connection_failed_with_error (LmConnectData *connect_data, int error)
-{
- LmConnection *connection;
-
- g_log (LM_LOG_DOMAIN, LM_LOG_LEVEL_NET,
- "Connection failed: %s (error %d)\n",
- _lm_sock_get_error_str (error), error);
-
- connection = connect_data->connection;
-
- connect_data->current_addr = connect_data->current_addr->ai_next;
-
- if (connection->io_watch_connect != 0) {
- GSource *source;
-
- source = g_main_context_find_source_by_id (connection->context,
- connection->io_watch_connect);
- if (source) {
- g_source_destroy (source);
- }
-
- connection->io_watch_connect = 0;
- }
-
- if (connect_data->io_channel != NULL) {
- g_io_channel_unref (connect_data->io_channel);
- /* FIXME: need to check for last unref and close the socket */
- }
-
- if (connect_data->current_addr == NULL) {
- connection_do_close (connection);
- if (connection->open_cb) {
- LmCallback *cb = connection->open_cb;
-
- connection->open_cb = NULL;
-
- (* ((LmResultFunction) cb->func)) (connection, FALSE,
- cb->user_data);
- _lm_utils_free_callback (cb);
- }
-
- freeaddrinfo (connect_data->resolved_addrs);
- connection->connect_data = NULL;
- g_free (connect_data);
- } else {
- /* try to connect to the next host */
- connection_do_connect (connect_data);
- }
-}
-
-void
-_lm_connection_failed (LmConnectData *connect_data)
-{
- _lm_connection_failed_with_error (connect_data,
- _lm_sock_get_last_error());
-}
-
-static gboolean
-connection_connect_cb (GIOChannel *source,
- GIOCondition condition,
- LmConnectData *connect_data)
-{
- LmConnection *connection;
- struct addrinfo *addr;
- int err;
- socklen_t len;
- LmSocket fd;
-
- connection = connect_data->connection;
- addr = connect_data->current_addr;
- fd = g_io_channel_unix_get_fd (source);
-
- if (condition == G_IO_ERR) {
- len = sizeof (err);
- _lm_sock_get_error (fd, &err, &len);
- if (!_lm_sock_is_blocking_error (err)) {
- g_log (LM_LOG_DOMAIN, LM_LOG_LEVEL_NET,
- "Connection failed.\n");
-
- _lm_connection_failed_with_error (connect_data, err);
-
- connection->io_watch_connect = 0;
- return FALSE;
- }
- }
-
- if (connection->async_connect_waiting) {
- gint res;
-
- fd = g_io_channel_unix_get_fd (source);
-
- res = _lm_sock_connect (fd, addr->ai_addr, (int)addr->ai_addrlen);
- if (res < 0) {
- err = _lm_sock_get_last_error ();
- if (_lm_sock_is_blocking_success (err)) {
- connection->async_connect_waiting = FALSE;
-
- g_log (LM_LOG_DOMAIN, LM_LOG_LEVEL_NET,
- "Connection success.\n");
-
- _lm_connection_succeeded (connect_data);
- }
-
- if (connection->async_connect_waiting &&
- !_lm_sock_is_blocking_error (err)) {
- g_log (LM_LOG_DOMAIN, LM_LOG_LEVEL_NET,
- "Connection failed.\n");
-
- _lm_sock_close (connect_data->fd);
- _lm_connection_failed_with_error (connect_data, err);
-
- connection->io_watch_connect = 0;
- return FALSE;
- }
- }
- } else {
- /* for blocking sockets, G_IO_OUT means we are connected */
- g_log (LM_LOG_DOMAIN, LM_LOG_LEVEL_NET,
- "Connection success.\n");
-
- _lm_connection_succeeded (connect_data);
- }
-
- return TRUE;
-}
-
-static const char *
-connection_condition_to_str (GIOCondition condition)
-{
- static char buf[256];
-
- buf[0] = '\0';
-
- if(condition & G_IO_ERR)
- strcat(buf, "G_IO_ERR ");
- if(condition & G_IO_HUP)
- strcat(buf, "G_IO_HUP ");
- if(condition & G_IO_NVAL)
- strcat(buf, "G_IO_NVAL ");
- if(condition & G_IO_IN)
- strcat(buf, "G_IO_IN ");
- if(condition & G_IO_OUT)
- strcat(buf, "G_IO_OUT ");
-
- return buf;
-}
-
-static void
-connection_do_connect (LmConnectData *connect_data)
-{
- LmConnection *connection;
- LmSocket fd;
- int res, err;
- int port;
- char name[NI_MAXHOST];
- char portname[NI_MAXSERV];
- struct addrinfo *addr;
-
- connection = connect_data->connection;
- addr = connect_data->current_addr;
-
- if (connection->proxy) {
- port = htons (lm_proxy_get_port (connection->proxy));
- } else {
- port = htons (connection->port);
- }
-
- ((struct sockaddr_in *) addr->ai_addr)->sin_port = port;
-
- res = getnameinfo (addr->ai_addr,
- (socklen_t)addr->ai_addrlen,
- name, sizeof (name),
- portname, sizeof (portname),
- NI_NUMERICHOST | NI_NUMERICSERV);
-
- if (res < 0) {
- _lm_connection_failed (connect_data);
- return;
- }
-
- g_log (LM_LOG_DOMAIN, LM_LOG_LEVEL_NET,
- "Trying %s port %s...\n", name, portname);
-
- fd = _lm_sock_makesocket (addr->ai_family,
- addr->ai_socktype,
- addr->ai_protocol);
-
- if (!_LM_SOCK_VALID (fd)) {
- g_log (LM_LOG_DOMAIN, LM_LOG_LEVEL_NET,
- "Failed making socket, error:%d...\n",
- _lm_sock_get_last_error ());
-
- _lm_connection_failed (connect_data);
-
- return;
- }
-
- /* Even though it says _unix_new(), it is supported by glib on
- * win32 because glib does some cool stuff to find out if it
- * can treat it as a FD or a windows SOCKET.
- */
- connect_data->fd = fd;
- connect_data->io_channel = g_io_channel_unix_new (fd);
-
- g_io_channel_set_encoding (connect_data->io_channel, NULL, NULL);
- g_io_channel_set_buffered (connect_data->io_channel, FALSE);
-
- _lm_sock_set_blocking (connect_data->fd,
- connection->blocking);
-
- if (connection->proxy) {
- connection->io_watch_connect =
- connection_add_watch (connection,
- connect_data->io_channel,
- G_IO_OUT|G_IO_ERR,
- (GIOFunc) _lm_proxy_connect_cb,
- connect_data);
- } else {
- connection->io_watch_connect =
- connection_add_watch (connection,
- connect_data->io_channel,
- G_IO_OUT|G_IO_ERR,
- (GIOFunc) connection_connect_cb,
- connect_data);
- }
-
- connection->async_connect_waiting = !connection->blocking;
-
- res = _lm_sock_connect (connect_data->fd,
- addr->ai_addr, (int)addr->ai_addrlen);
- if (res < 0) {
- err = _lm_sock_get_last_error ();
- if (!_lm_sock_is_blocking_error (err)) {
- _lm_sock_close (connect_data->fd);
- _lm_connection_failed_with_error (connect_data, err);
-
- return;
- }
- }
-}
-
-static guint
-connection_add_watch (LmConnection *connection,
- GIOChannel *channel,
- GIOCondition condition,
- GIOFunc func,
- gpointer user_data)
-{
- GSource *source;
- guint id;
-
- g_return_val_if_fail (channel != NULL, 0);
-
- source = g_io_create_watch (channel, condition);
-
- g_source_set_callback (source, (GSourceFunc)func, user_data, NULL);
-
- id = g_source_attach (source, connection->context);
-
- g_source_unref (source);
-
- return id;
+ lm_message_queue_push_tail (connection->queue, m);
}
static gboolean
@@ -739,434 +274,27 @@
static void
connection_start_keep_alive (LmConnection *connection)
{
- if (connection->keep_alive_id != 0) {
+ if (connection->keep_alive_source) {
connection_stop_keep_alive (connection);
}
if (connection->keep_alive_rate > 0) {
- connection->keep_alive_id =
- g_timeout_add (connection->keep_alive_rate,
- (GSourceFunc) connection_send_keep_alive,
- connection);
+ connection->keep_alive_source =
+ lm_misc_add_timeout (connection->context,
+ connection->keep_alive_rate,
+ (GSourceFunc) connection_send_keep_alive,
+ connection);
}
}
static void
connection_stop_keep_alive (LmConnection *connection)
{
- if (connection->keep_alive_id != 0) {
- g_source_remove (connection->keep_alive_id);
- }
-
- connection->keep_alive_id = 0;
-}
-
-static gboolean
-connection_buffered_write_cb (GIOChannel *source,
- GIOCondition condition,
- LmConnection *connection)
-{
- gint b_written;
- GString *out_buf;
- /* FIXME: Do the writing */
-
- out_buf = connection->out_buf;
- if (!out_buf) {
- /* Should not be possible */
- return FALSE;
- }
-
- b_written = connection_do_write (connection, out_buf->str, out_buf->len);
-
- if (b_written < 0) {
- connection_error_event (connection->io_channel,
- G_IO_HUP,
- connection);
- return FALSE;
- }
-
- g_string_erase (out_buf, 0, (gsize) b_written);
- if (out_buf->len == 0) {
- lm_verbose ("Output buffer is empty, going back to normal output\n");
-
- if (connection->io_watch_out != 0) {
- GSource *source;
-
- source = g_main_context_find_source_by_id (connection->context,
- connection->io_watch_out);
- if (source) {
- g_source_destroy (source);
- }
-
- connection->io_watch_out = 0;
- }
-
- g_string_free (out_buf, TRUE);
- connection->out_buf = NULL;
- return FALSE;
- }
-
- return TRUE;
-}
-
-static gboolean
-connection_output_is_buffered (LmConnection *connection,
- const gchar *buffer,
- gint len)
-{
- if (connection->out_buf) {
- lm_verbose ("Appending %d bytes to output buffer\n", len);
- g_string_append_len (connection->out_buf, buffer, len);
- return TRUE;
- }
-
- return FALSE;
-}
-
-static void
-connection_setup_output_buffer (LmConnection *connection,
- const gchar *buffer,
- gint len)
-{
- lm_verbose ("OUTPUT BUFFER ENABLED\n");
-
- connection->out_buf = g_string_new_len (buffer, len);
-
- connection->io_watch_out =
- connection_add_watch (connection,
- connection->io_channel,
- G_IO_OUT,
- (GIOFunc) connection_buffered_write_cb,
- connection);
-}
-
-/* Returns directly */
-/* Setups all data needed to start the connection attempts */
-static gboolean
-connection_do_open (LmConnection *connection, GError **error)
-{
- struct addrinfo req;
- struct addrinfo *ans;
- LmConnectData *data;
-
- if (lm_connection_is_open (connection)) {
- g_set_error (error,
- LM_ERROR,
- LM_ERROR_CONNECTION_NOT_OPEN,
- "Connection is already open, call lm_connection_close() first");
- return FALSE;
- }
-
- if (!connection->server) {
- g_set_error (error,
- LM_ERROR,
- LM_ERROR_CONNECTION_FAILED,
- "You need to set the server hostname in the call to lm_connection_new()");
- return FALSE;
- }
-
- /* source thingie for messages and stuff */
- connection->incoming_source = connection_create_source (connection);
- g_source_attach (connection->incoming_source, connection->context);
-
- lm_verbose ("Connecting to: %s:%d\n",
- connection->server, connection->port);
-
- memset (&req, 0, sizeof(req));
-
- req.ai_family = AF_UNSPEC;
- req.ai_socktype = SOCK_STREAM;
- req.ai_protocol = IPPROTO_TCP;
-
- connection->cancel_open = FALSE;
- connection->state = LM_CONNECTION_STATE_OPENING;
- connection->async_connect_waiting = FALSE;
-
- if (connection->proxy) {
- int err;
- const gchar *proxy_server;
-
- proxy_server = lm_proxy_get_server (connection->proxy);
-
- /* Connect through proxy */
- g_log (LM_LOG_DOMAIN, LM_LOG_LEVEL_NET,
- "Going to connect to proxy %s\n", proxy_server);
-
- err = getaddrinfo (proxy_server, NULL, &req, &ans);
- if (err != 0) {
- const char *str;
-
- str = _lm_sock_addrinfo_get_error_str (err);
- g_set_error (error,
- LM_ERROR,
- LM_ERROR_CONNECTION_FAILED,
- str);
- return FALSE;
- }
- } else {
- int err;
-
- /* Connect directly */
- g_log (LM_LOG_DOMAIN, LM_LOG_LEVEL_NET,
- "Going to connect to %s\n",
- connection->server);
-
- err = getaddrinfo (connection->server,
- NULL, &req, &ans);
- if (err != 0) {
- const char *str;
-
- str = _lm_sock_addrinfo_get_error_str (err);
- g_set_error (error,
- LM_ERROR,
- LM_ERROR_CONNECTION_FAILED,
- str);
- return FALSE;
- }
- }
-
- if (connection->ssl) {
- _lm_ssl_initialize (connection->ssl);
+ if (connection->keep_alive_source) {
+ g_source_destroy (connection->keep_alive_source);
}
- /* Prepare and do the nonblocking connection */
- data = g_new (LmConnectData, 1);
-
- data->connection = connection;
- data->resolved_addrs = ans;
- data->current_addr = ans;
- data->io_channel = NULL;
- data->fd = -1;
-
- connection->connect_data = data;
-
- connection_do_connect (data);
- return TRUE;
-}
-
-static void
-connection_do_close (LmConnection *connection)
-{
- GSource *source;
- LmConnectData *data;
-
- connection_stop_keep_alive (connection);
-
- if (connection->io_watch_connect != 0) {
-
- source = g_main_context_find_source_by_id (connection->context,
- connection->io_watch_connect);
-
- if (source) {
- g_source_destroy (source);
- }
-
- connection->io_watch_connect = 0;
- }
-
- data = connection->connect_data;
- if (data) {
- freeaddrinfo (data->resolved_addrs);
- connection->connect_data = NULL;
- g_free (data);
- }
-
- if (connection->io_channel) {
- if (connection->io_watch_in != 0) {
- source = g_main_context_find_source_by_id (connection->context,
- connection->io_watch_in);
- if (source) {
- g_source_destroy (source);
- }
-
- connection->io_watch_in = 0;
- }
-
- if (connection->io_watch_err != 0) {
- source = g_main_context_find_source_by_id (connection->context,
- connection->io_watch_err);
- if (source) {
- g_source_destroy (source);
- }
-
- connection->io_watch_err = 0;
- }
-
- if (connection->io_watch_hup != 0) {
- source = g_main_context_find_source_by_id (connection->context,
- connection->io_watch_hup);
-
- if (source) {
- g_source_destroy (source);
- }
-
- connection->io_watch_hup = 0;
- }
-
- if (connection->io_watch_out != 0) {
- source = g_main_context_find_source_by_id (connection->context,
- connection->io_watch_out);
-
- if (source) {
- g_source_destroy (source);
- }
-
- connection->io_watch_out = 0;
- }
-
-
- g_io_channel_unref (connection->io_channel);
- connection->io_channel = NULL;
-
- connection->fd = -1;
- }
-
- if (connection->incoming_source) {
- g_source_destroy (connection->incoming_source);
- g_source_unref (connection->incoming_source);
- connection->incoming_source = NULL;
- }
-
- if (!lm_connection_is_open (connection)) {
- /* lm_connection_is_open is FALSE for state OPENING as well */
- connection->state = LM_CONNECTION_STATE_CLOSED;
- connection->async_connect_waiting = FALSE;
- return;
- }
-
- connection->state = LM_CONNECTION_STATE_CLOSED;
- connection->async_connect_waiting = FALSE;
- if (connection->ssl) {
- _lm_ssl_close (connection->ssl);
- }
-}
-
-static gint
-connection_do_write (LmConnection *connection,
- const gchar *buf,
- gint len)
-{
- gint b_written;
-
- if (connection->ssl) {
- b_written = _lm_ssl_send (connection->ssl, buf, len);
- } else {
- GIOStatus io_status = G_IO_STATUS_AGAIN;
- gsize bytes_written;
-
- while (io_status == G_IO_STATUS_AGAIN) {
- io_status = g_io_channel_write_chars (connection->io_channel,
- buf, len,
- &bytes_written,
- NULL);
- }
-
- b_written = bytes_written;
-
- if (io_status != G_IO_STATUS_NORMAL) {
- b_written = -1;
- }
- }
-
- return b_written;
-}
-
-static gboolean
-connection_in_event (GIOChannel *source,
- GIOCondition condition,
- LmConnection *connection)
-{
- gchar buf[IN_BUFFER_SIZE];
- gsize bytes_read;
- GIOStatus status;
-
- if (!connection->io_channel) {
- return FALSE;
- }
-
- if (connection->ssl) {
- status = _lm_ssl_read (connection->ssl,
- buf, IN_BUFFER_SIZE - 1, &bytes_read);
- } else {
- status = g_io_channel_read_chars (connection->io_channel,
- buf, IN_BUFFER_SIZE - 1,
- &bytes_read,
- NULL);
- }
-
- if (status != G_IO_STATUS_NORMAL || bytes_read < 0) {
- gint reason;
-
- switch (status) {
- case G_IO_STATUS_EOF:
- reason = LM_DISCONNECT_REASON_HUP;
- break;
- case G_IO_STATUS_AGAIN:
- return TRUE;
- break;
- case G_IO_STATUS_ERROR:
- reason = LM_DISCONNECT_REASON_ERROR;
- break;
- default:
- reason = LM_DISCONNECT_REASON_UNKNOWN;
- }
-
- connection_do_close (connection);
- connection_signal_disconnect (connection, reason);
-
- return FALSE;
- }
-
- buf[bytes_read] = '\0';
- g_log (LM_LOG_DOMAIN, LM_LOG_LEVEL_NET, "\nRECV [%d]:\n",
- (int)bytes_read);
- g_log (LM_LOG_DOMAIN, LM_LOG_LEVEL_NET,
- "-----------------------------------\n");
- g_log (LM_LOG_DOMAIN, LM_LOG_LEVEL_NET, "'%s'\n", buf);
- g_log (LM_LOG_DOMAIN, LM_LOG_LEVEL_NET,
- "-----------------------------------\n");
-
- lm_verbose ("Read: %d chars\n", (int)bytes_read);
-
- lm_parser_parse (connection->parser, buf);
-
- return TRUE;
-}
-
-static gboolean
-connection_error_event (GIOChannel *source,
- GIOCondition condition,
- LmConnection *connection)
-{
- lm_verbose ("Error event: %d->'%s'\n",
- condition, connection_condition_to_str (condition));
-
- if (!connection->io_channel) {
- return FALSE;
- }
-
- connection_do_close (connection);
- connection_signal_disconnect (connection, LM_DISCONNECT_REASON_ERROR);
-
- return TRUE;
-}
-
-static gboolean
-connection_hup_event (GIOChannel *source,
- GIOCondition condition,
- LmConnection *connection)
-{
- lm_verbose ("HUP event: %d->'%s'\n",
- condition, connection_condition_to_str (condition));
-
- if (!connection->io_channel) {
- return FALSE;
- }
-
- connection_do_close (connection);
- connection_signal_disconnect (connection, LM_DISCONNECT_REASON_HUP);
-
- return TRUE;
+ connection->keep_alive_source = NULL;
}
static gboolean
@@ -1176,7 +304,7 @@
GError **error)
{
gint b_written;
-
+
if (connection->state < LM_CONNECTION_STATE_OPENING) {
g_log (LM_LOG_DOMAIN,LM_LOG_LEVEL_NET,
"Connection is not open.\n");
@@ -1202,22 +330,21 @@
/* Check to see if there already is an output buffer, if so, add to the
buffer and return */
- if (connection_output_is_buffered (connection, str, len)) {
+ if (lm_socket_output_is_buffered (connection->socket, str, len)) {
return TRUE;
}
- b_written = connection_do_write (connection, str, len);
-
+ b_written = lm_socket_do_write (connection->socket, str, len);
if (b_written < 0) {
- connection_error_event (connection->io_channel,
- G_IO_HUP,
- connection);
+ _lm_connection_error_event (connection->socket,
+ G_IO_HUP,
+ connection);
return FALSE;
}
if (b_written < len) {
- connection_setup_output_buffer (connection,
+ lm_socket_setup_output_buffer (connection->socket,
str + b_written,
len - b_written);
}
@@ -1225,6 +352,107 @@
return TRUE;
}
+static void
+connection_message_queue_cb (LmMessageQueue *queue,
+ LmConnection *connection)
+{
+ LmMessage *m;
+
+ m = lm_message_queue_pop_nth (connection->queue, 0);
+
+ if (m) {
+ connection_handle_message (connection, m);
+ lm_message_unref (m);
+ }
+}
+
+/* Returns directly */
+/* Setups all data needed to start the connection attempts */
+static gboolean
+connection_do_open (LmConnection *connection, GError **error)
+{
+ if (lm_connection_is_open (connection)) {
+ g_set_error (error,
+ LM_ERROR,
+ LM_ERROR_CONNECTION_NOT_OPEN,
+ "Connection is already open, call lm_connection_close() first");
+ return FALSE;
+ }
+
+ if (!connection->server) {
+ g_set_error (error,
+ LM_ERROR,
+ LM_ERROR_CONNECTION_FAILED,
+ "You need to set the server hostname in the call to lm_connection_new()");
+ return FALSE;
+ }
+
+ lm_message_queue_attach (connection->queue, connection->context);
+
+ lm_verbose ("Connecting to: %s:%d\n",
+ connection->server, connection->port);
+
+ connection->state = LM_CONNECTION_STATE_OPENING;
+ connection->async_connect_waiting = FALSE;
+
+ connection->socket = lm_socket_create (connection->context,
+ (IncomingDataFunc) connection_incoming_data,
+ connection,
+ connection,
+ connection->blocking,
+ connection->server,
+ connection->port,
+ connection->ssl,
+ connection->proxy,
+ error);
+ if (!connection->socket) {
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+void
+_lm_connection_do_close (LmConnection *connection)
+{
+ connection_stop_keep_alive (connection);
+
+ lm_socket_close (connection->socket);
+
+ lm_message_queue_detach (connection->queue);
+
+ if (!lm_connection_is_open (connection)) {
+ /* lm_connection_is_open is FALSE for state OPENING as well */
+ connection->state = LM_CONNECTION_STATE_CLOSED;
+ connection->async_connect_waiting = FALSE;
+ return;
+ }
+
+ connection->state = LM_CONNECTION_STATE_CLOSED;
+ connection->async_connect_waiting = FALSE;
+ if (connection->ssl) {
+ _lm_ssl_close (connection->ssl);
+ }
+}
+
+gboolean
+_lm_connection_error_event (LmSocket *socket,
+ GIOCondition condition,
+ LmConnection *connection)
+{
+ lm_verbose ("Error event: %d->'%s'\n",
+ condition, lm_misc_io_condition_to_str (condition));
+
+ if (!connection->socket) {
+ return FALSE;
+ }
+
+ _lm_connection_do_close (connection);
+ _lm_connection_signal_disconnect (connection, LM_DISCONNECT_REASON_ERROR);
+
+ return TRUE;
+}
+
typedef struct {
gchar *username;
gchar *password;
@@ -1444,55 +672,8 @@
return b->priority - a->priority;
}
-static gboolean
-connection_incoming_prepare (GSource *source, gint *timeout)
-{
- LmConnection *connection;
-
- connection = ((LmIncomingSource *)source)->connection;
-
- return !g_queue_is_empty (connection->incoming_messages);
-}
-
-static gboolean
-connection_incoming_check (GSource *source)
-{
- return FALSE;
-}
-
-static gboolean
-connection_incoming_dispatch (GSource *source,
- GSourceFunc callback,
- gpointer user_data)
-{
- LmConnection *connection;
- LmMessage *m;
-
- connection = ((LmIncomingSource *) source)->connection;
-
- m = (LmMessage *) g_queue_pop_head (connection->incoming_messages);
-
- if (m) {
- connection_handle_message (connection, m);
- lm_message_unref (m);
- }
-
- return TRUE;
-}
-
-static GSource *
-connection_create_source (LmConnection *connection)
-{
- GSource *source;
-
- source = g_source_new (&incoming_funcs, sizeof (LmIncomingSource));
- ((LmIncomingSource *) source)->connection = connection;
-
- return source;
-}
-
-static void
-connection_signal_disconnect (LmConnection *connection,
+void
+_lm_connection_signal_disconnect (LmConnection *connection,
LmDisconnectReason reason)
{
if (connection->disconnect_cb && connection->disconnect_cb->func) {
@@ -1504,6 +685,84 @@
}
}
+static void
+connection_incoming_data (LmSocket *socket,
+ const gchar *buf,
+ LmConnection *connection)
+{
+ lm_parser_parse (connection->parser, buf);
+}
+
+void
+_lm_connection_socket_result (LmConnection *connection, gboolean result)
+{
+ LmMessage *m;
+ gchar *server_from_jid;
+ gchar *ch;
+
+ if (!result) {
+ _lm_connection_do_close (connection);
+
+ if (connection->open_cb) {
+ LmCallback *cb = connection->open_cb;
+
+ connection->open_cb = NULL;
+
+ (* ((LmResultFunction) cb->func)) (connection, FALSE,
+ cb->user_data);
+ _lm_utils_free_callback (cb);
+ }
+
+ return;
+ }
+
+ /* FIXME: Set up according to XMPP 1.0 specification */
+ /* StartTLS and the like */
+ if (!connection_send (connection,
+ "<?xml version='1.0' encoding='UTF-8'?>", -1,
+ NULL)) {
+ lm_verbose ("Failed to send xml version and encoding\n");
+ _lm_connection_do_close (connection);
+
+ return;
+ }
+
+ if (connection->jid != NULL && (ch = strchr (connection->jid, '@')) != NULL) {
+ server_from_jid = ch + 1;
+ } else {
+ server_from_jid = connection->server;
+ }
+
+ m = lm_message_new (server_from_jid, LM_MESSAGE_TYPE_STREAM);
+ lm_message_node_set_attributes (m->node,
+ "xmlns:stream",
+ "http://etherx.jabber.org/streams",
+ "xmlns", "jabber:client",
+ NULL);
+
+ lm_verbose ("Opening stream...");
+
+ if (!lm_connection_send (connection, m, NULL)) {
+ lm_verbose ("Failed to send stream information\n");
+ _lm_connection_do_close (connection);
+ }
+
+ lm_message_unref (m);
+}
+
+gboolean
+_lm_connection_async_connect_waiting (LmConnection *connection)
+{
+ return connection->async_connect_waiting;
+}
+
+void
+_lm_connection_set_async_connect_waiting (LmConnection *connection,
+ gboolean waiting)
+{
+ connection->async_connect_waiting = waiting;
+}
+
/**
* lm_connection_new:
* @server: The hostname to the server for the connection.
@@ -1536,13 +795,13 @@
connection->ssl = NULL;
connection->proxy = NULL;
connection->disconnect_cb = NULL;
- connection->incoming_messages = g_queue_new ();
+ connection->queue = lm_message_queue_new ((LmMessageQueueCallback) connection_message_queue_cb,
+ connection);
connection->cancel_open = FALSE;
connection->state = LM_CONNECTION_STATE_CLOSED;
- connection->keep_alive_id = 0;
+ connection->keep_alive_source = NULL;
connection->keep_alive_rate = 0;
- connection->out_buf = NULL;
- connection->connect_data = NULL;
+ connection->socket = NULL;
connection->id_handlers = g_hash_table_new_full (g_str_hash,
g_str_equal,
@@ -1710,12 +969,12 @@
if (!connection_send (connection, "</stream:stream>", -1, error)) {
no_errors = FALSE;
}
-
- g_io_channel_flush (connection->io_channel, NULL);
+
+ lm_socket_flush (connection->socket);
}
- connection_do_close (connection);
- connection_signal_disconnect (connection, LM_DISCONNECT_REASON_OK);
+ _lm_connection_do_close (connection);
+ _lm_connection_signal_disconnect (connection, LM_DISCONNECT_REASON_OK);
return no_errors;
}
@@ -1891,7 +1150,7 @@
connection_stop_keep_alive (connection);
if (rate == 0) {
- connection->keep_alive_id = 0;
+ connection->keep_alive_source = NULL;
return;
}
@@ -2151,7 +1410,7 @@
g_return_val_if_fail (connection != NULL, FALSE);
g_return_val_if_fail (message != NULL, FALSE);
-
+
xml_str = lm_message_node_to_string (message->node);
if ((ch = strstr (xml_str, "</stream:stream>"))) {
*ch = '\0';
@@ -2238,9 +1497,7 @@
lm_message_node_set_attributes (message->node, "id", id, NULL);
}
- g_source_remove (g_source_get_id (connection->incoming_source));
- g_source_unref (connection->incoming_source);
- connection->incoming_source = NULL;
+ lm_message_queue_detach (connection->queue);
lm_connection_send (connection, message, error);
@@ -2250,29 +1507,27 @@
g_main_context_iteration (connection->context, TRUE);
- if (g_queue_is_empty (connection->incoming_messages)) {
+ if (lm_message_queue_is_empty (connection->queue)) {
continue;
}
- for (n = 0; n < g_queue_get_length (connection->incoming_messages); n++) {
+ for (n = 0; n < lm_message_queue_get_length (connection->queue); n++) {
LmMessage *m;
- m = (LmMessage *) g_queue_peek_nth (connection->incoming_messages, n);
+ m = (LmMessage *) lm_message_queue_peek_nth (connection->queue, n);
m_id = lm_message_node_get_attribute (m->node, "id");
if (m_id && strcmp (m_id, id) == 0) {
reply = m;
- g_queue_pop_nth (connection->incoming_messages,
- n);
+ lm_message_queue_pop_nth (connection->queue, n);
break;
}
}
}
g_free (id);
- connection->incoming_source = connection_create_source (connection);
- g_source_attach (connection->incoming_source, connection->context);
+ lm_message_queue_attach (connection->queue, connection->context);
return reply;
}
--- a/loudmouth/lm-internals.h Fri Feb 02 15:04:37 2007 +0100
+++ b/loudmouth/lm-internals.h Fri Feb 02 15:16:48 2007 +0100
@@ -31,11 +31,12 @@
#include "lm-message-handler.h"
#include "lm-message-node.h"
#include "lm-sock.h"
+#include "lm-socket.h"
#ifndef G_OS_WIN32
-typedef int LmSocket;
+typedef int LmSocketT;
#else /* G_OS_WIN32 */
-typedef SOCKET LmSocket;
+typedef SOCKET LmSocketT;
#endif /* G_OS_WIN32 */
typedef struct {
@@ -45,19 +46,33 @@
} LmCallback;
typedef struct {
- LmConnection *connection;
+ LmConnection *connection;
+ LmSocket *socket;
/* struct to save resolved address */
struct addrinfo *resolved_addrs;
struct addrinfo *current_addr;
- LmSocket fd;
+ LmSocketT fd;
GIOChannel *io_channel;
} LmConnectData;
-void _lm_connection_failed_with_error (LmConnectData *connect_data,
+void _lm_socket_failed_with_error (LmConnectData *connect_data,
int error);
-void _lm_connection_failed (LmConnectData *connect_data);
-void _lm_connection_succeeded (LmConnectData *connect_data);
+void _lm_socket_failed (LmConnectData *connect_data);
+void _lm_socket_succeeded (LmConnectData *connect_data);
+gboolean _lm_connection_error_event (LmSocket *socket,
+ GIOCondition condition,
+ LmConnection *connection);
+void _lm_connection_do_close (LmConnection *connection);
+void _lm_connection_signal_disconnect (LmConnection *connection,
+ LmDisconnectReason reason);
+
+void _lm_connection_socket_result (LmConnection *connection,
+ gboolean result);
+gboolean _lm_connection_async_connect_waiting (LmConnection *connection);
+void _lm_connection_set_async_connect_waiting (LmConnection *connection,
+ gboolean waiting);
+
LmCallback * _lm_utils_new_callback (gpointer func,
gpointer data,
GDestroyNotify notify);
@@ -84,20 +99,20 @@
LmMessage *messag);
gboolean _lm_sock_library_init (void);
void _lm_sock_library_shutdown (void);
-void _lm_sock_set_blocking (LmSocket sock,
+void _lm_sock_set_blocking (LmSocketT sock,
gboolean block);
-void _lm_sock_shutdown (LmSocket sock);
-void _lm_sock_close (LmSocket sock);
-LmSocket _lm_sock_makesocket (int af,
+void _lm_sock_shutdown (LmSocketT sock);
+void _lm_sock_close (LmSocketT sock);
+LmSocketT _lm_sock_makesocket (int af,
int type,
int protocol);
-int _lm_sock_connect (LmSocket sock,
+int _lm_sock_connect (LmSocketT sock,
const struct sockaddr *name,
int namelen);
gboolean _lm_sock_is_blocking_error (int err);
gboolean _lm_sock_is_blocking_success (int err);
int _lm_sock_get_last_error (void);
-void _lm_sock_get_error (LmSocket sock,
+void _lm_sock_get_error (LmSocketT sock,
void *error,
socklen_t *len);
const gchar * _lm_sock_get_error_str (int err);
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/loudmouth/lm-message-queue.c Fri Feb 02 15:16:48 2007 +0100
@@ -0,0 +1,227 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * Copyright (C) 2006 Imendio AB
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#include <config.h>
+
+#include "lm-message-queue.h"
+
+struct _LmMessageQueue {
+ GQueue *messages;
+
+ GMainContext *context;
+ GSource *source;
+
+ LmMessageQueueCallback callback;
+ gpointer user_data;
+
+ gint ref_count;
+};
+
+typedef struct {
+ GSource source;
+ LmMessageQueue *queue;
+} MessageQueueSource;
+
+static void message_queue_free (LmMessageQueue *queue);
+static gboolean message_queue_prepare_func (GSource *source,
+ gint *timeout);
+static gboolean message_queue_check_func (GSource *source);
+static gboolean message_queue_dispatch_func (GSource *source,
+ GSourceFunc callback,
+ gpointer user_data);
+
+static GSourceFuncs source_funcs = {
+ message_queue_prepare_func,
+ message_queue_check_func,
+ message_queue_dispatch_func,
+ NULL
+};
+
+static void
+foreach_free_message (LmMessage *m, gpointer user_data)
+{
+ lm_message_unref (m);
+}
+
+static void
+message_queue_free (LmMessageQueue *queue)
+{
+ lm_message_queue_detach (queue);
+
+ g_queue_foreach (queue->messages, (GFunc) foreach_free_message, NULL);
+ g_queue_free (queue->messages);
+
+ g_free (queue);
+}
+
+static gboolean
+message_queue_prepare_func (GSource *source, gint *timeout)
+{
+ LmMessageQueue *queue;
+
+ queue = ((MessageQueueSource *)source)->queue;
+
+ return !g_queue_is_empty (queue->messages);
+}
+
+static gboolean
+message_queue_check_func (GSource *source)
+{
+ return FALSE;
+}
+
+static gboolean
+message_queue_dispatch_func (GSource *source,
+ GSourceFunc callback,
+ gpointer user_data)
+{
+ LmMessageQueue *queue;
+
+ queue = ((MessageQueueSource *)source)->queue;
+
+ if (queue->callback) {
+ (queue->callback) (queue, queue->user_data);
+ }
+
+ return TRUE;
+}
+
+LmMessageQueue *
+lm_message_queue_new (LmMessageQueueCallback callback,
+ gpointer user_data)
+{
+ LmMessageQueue *queue;
+
+ queue = g_new0 (LmMessageQueue, 1);
+
+ queue->messages = g_queue_new ();
+ queue->context = NULL;
+ queue->source = NULL;
+ queue->ref_count = 1;
+
+ queue->callback = callback;
+ queue->user_data = user_data;
+
+ return queue;
+}
+
+void
+lm_message_queue_attach (LmMessageQueue *queue, GMainContext *context)
+{
+ GSource *source;
+
+ if (queue->source) {
+ if (queue->context == context) {
+ /* Already attached */
+ return;
+ }
+ lm_message_queue_detach (queue);
+ }
+
+ if (context) {
+ queue->context = g_main_context_ref (context);
+ }
+
+ source = g_source_new (&source_funcs, sizeof (MessageQueueSource));
+ ((MessageQueueSource *)source)->queue = queue;
+ queue->source = source;
+
+ g_source_attach (source, queue->context);
+}
+
+void
+lm_message_queue_detach (LmMessageQueue *queue)
+{
+ if (queue->source) {
+ g_source_destroy (queue->source);
+ g_source_unref (queue->source);
+ }
+
+ if (queue->context) {
+ g_main_context_unref (queue->context);
+ }
+
+ queue->source = NULL;
+ queue->context = NULL;
+}
+
+void
+lm_message_queue_push_tail (LmMessageQueue *queue, LmMessage *m)
+{
+ g_return_if_fail (queue != NULL);
+ g_return_if_fail (m != NULL);
+
+ g_queue_push_tail (queue->messages, m);
+}
+
+LmMessage *
+lm_message_queue_peek_nth (LmMessageQueue *queue, guint n)
+{
+ g_return_val_if_fail (queue != NULL, NULL);
+
+ return (LmMessage *) g_queue_peek_nth (queue->messages, n);
+}
+
+LmMessage *
+lm_message_queue_pop_nth (LmMessageQueue *queue, guint n)
+{
+ g_return_val_if_fail (queue != NULL, NULL);
+
+ return (LmMessage *) g_queue_pop_nth (queue->messages, n);
+}
+
+guint
+lm_message_queue_get_length (LmMessageQueue *queue)
+{
+ g_return_val_if_fail (queue != NULL, 0);
+
+ return g_queue_get_length (queue->messages);
+}
+
+gboolean
+lm_message_queue_is_empty (LmMessageQueue *queue)
+{
+ g_return_val_if_fail (queue != NULL, TRUE);
+
+ return g_queue_is_empty (queue->messages);
+}
+
+LmMessageQueue *
+lm_message_queue_ref (LmMessageQueue *queue)
+{
+ g_return_val_if_fail (queue != NULL, NULL);
+
+ queue->ref_count++;
+
+ return queue;
+}
+
+void
+lm_message_queue_unref (LmMessageQueue *queue)
+{
+ g_return_if_fail (queue != NULL);
+
+ queue->ref_count--;
+
+ if (queue->ref_count <= 0) {
+ message_queue_free (queue);
+ }
+}
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/loudmouth/lm-message-queue.h Fri Feb 02 15:16:48 2007 +0100
@@ -0,0 +1,50 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * Copyright (C) 2006 Imendio AB
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifndef __LM_MESSAGE_QUEUE_H__
+#define __LM_MESSAGE_QUEUE_H__
+
+#include <glib.h>
+#include <loudmouth/lm-message.h>
+
+typedef struct _LmMessageQueue LmMessageQueue;
+
+typedef void (* LmMessageQueueCallback) (LmMessageQueue *queue,
+ gpointer user_data);
+
+LmMessageQueue * lm_message_queue_new (LmMessageQueueCallback func,
+ gpointer data);
+void lm_message_queue_attach (LmMessageQueue *queue,
+ GMainContext *context);
+
+void lm_message_queue_detach (LmMessageQueue *queue);
+void lm_message_queue_push_tail (LmMessageQueue *queue,
+ LmMessage *m);
+LmMessage * lm_message_queue_peek_nth (LmMessageQueue *queue,
+ guint n);
+LmMessage * lm_message_queue_pop_nth (LmMessageQueue *queue,
+ guint n);
+guint lm_message_queue_get_length (LmMessageQueue *queue);
+gboolean lm_message_queue_is_empty (LmMessageQueue *queue);
+
+LmMessageQueue * lm_message_queue_ref (LmMessageQueue *queue);
+void lm_message_queue_unref (LmMessageQueue *queue);
+
+#endif /* __LM_MESSAGE_QUEUE_H__ */
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/loudmouth/lm-misc.c Fri Feb 02 15:16:48 2007 +0100
@@ -0,0 +1,106 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * Copyright (C) 2006 Imendio AB
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#include <config.h>
+
+#include <string.h>
+
+#include "lm-misc.h"
+
+static void
+misc_setup_source (GMainContext *context,
+ GSource *source,
+ GSourceFunc function,
+ gpointer data)
+{
+ g_source_set_callback (source, (GSourceFunc)function, data, NULL);
+ g_source_attach (source, context);
+ g_source_unref (source);
+}
+
+GSource *
+lm_misc_add_io_watch (GMainContext *context,
+ GIOChannel *channel,
+ GIOCondition condition,
+ GIOFunc function,
+ gpointer data)
+{
+ GSource *source;
+
+ g_return_val_if_fail (channel != NULL, 0);
+
+ source = g_io_create_watch (channel, condition);
+ misc_setup_source (context, source, (GSourceFunc) function, data);
+
+ return source;
+}
+
+GSource *
+lm_misc_add_idle (GMainContext *context,
+ GSourceFunc function,
+ gpointer data)
+{
+ GSource *source;
+
+ g_return_val_if_fail (function != NULL, 0);
+
+ source = g_idle_source_new ();
+ misc_setup_source (context, source, function, data);
+
+ return source;
+}
+
+GSource *
+lm_misc_add_timeout (GMainContext *context,
+ guint interval,
+ GSourceFunc function,
+ gpointer data)
+{
+ GSource *source;
+
+ g_return_val_if_fail (function != NULL, 0);
+
+ source = g_timeout_source_new (interval);
+ misc_setup_source (context, source, function, data);
+
+ return source;
+}
+
+const char *
+lm_misc_io_condition_to_str (GIOCondition condition)
+{
+ static char buf[256];
+
+ buf[0] = '\0';
+
+ if(condition & G_IO_ERR)
+ strcat(buf, "G_IO_ERR ");
+ if(condition & G_IO_HUP)
+ strcat(buf, "G_IO_HUP ");
+ if(condition & G_IO_NVAL)
+ strcat(buf, "G_IO_NVAL ");
+ if(condition & G_IO_IN)
+ strcat(buf, "G_IO_IN ");
+ if(condition & G_IO_OUT)
+ strcat(buf, "G_IO_OUT ");
+
+ return buf;
+}
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/loudmouth/lm-misc.h Fri Feb 02 15:16:48 2007 +0100
@@ -0,0 +1,43 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * Copyright (C) 2006 Imendio AB
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifndef __LM_MISC_H__
+#define __LM_MISC_H__
+
+#include <glib.h>
+
+GSource * lm_misc_add_io_watch (GMainContext *context,
+ GIOChannel *chan,
+ GIOCondition condition,
+ GIOFunc function,
+ gpointer data);
+GSource * lm_misc_add_idle (GMainContext *context,
+ GSourceFunc function,
+ gpointer data);
+GSource * lm_misc_add_timeout (GMainContext *context,
+ guint interval,
+ GSourceFunc function,
+ gpointer data);
+
+const char * lm_misc_io_condition_to_str (GIOCondition condition);
+
+
+#endif /* __LM_MISC_H__ */
+
--- a/loudmouth/lm-proxy.c Fri Feb 02 15:04:37 2007 +0100
+++ b/loudmouth/lm-proxy.c Fri Feb 02 15:16:48 2007 +0100
@@ -163,7 +163,7 @@
if (retval == TRUE) {
g_source_remove (proxy->io_watch);
- _lm_connection_succeeded ((LmConnectData *) data);
+ _lm_socket_succeeded ((LmConnectData *) data);
}
return FALSE;
@@ -204,11 +204,11 @@
if (condition == G_IO_ERR) {
len = sizeof (error);
_lm_sock_get_error (connect_data->fd, &error, &len);
- _lm_connection_failed_with_error (connect_data, error);
+ _lm_socket_failed_with_error (connect_data, error);
return FALSE;
} else if (condition == G_IO_OUT) {
if (!proxy_negotiate (lm_connection_get_proxy (connection), connect_data->fd, lm_connection_get_server (connection), lm_connection_get_port (connection))) {
- _lm_connection_failed (connect_data);
+ _lm_socket_failed (connect_data);
return FALSE;
}
--- a/loudmouth/lm-sock.c Fri Feb 02 15:04:37 2007 +0100
+++ b/loudmouth/lm-sock.c Fri Feb 02 15:16:48 2007 +0100
@@ -110,7 +110,7 @@
}
void
-_lm_sock_set_blocking (LmSocket sock,
+_lm_sock_set_blocking (LmSocketT sock,
gboolean block)
{
int res;
@@ -130,13 +130,13 @@
}
void
-_lm_sock_shutdown (LmSocket sock)
+_lm_sock_shutdown (LmSocketT sock)
{
shutdown (sock, LM_SHUTDOWN);
}
void
-_lm_sock_close (LmSocket sock)
+_lm_sock_close (LmSocketT sock)
{
#ifndef G_OS_WIN32
close (sock);
@@ -145,16 +145,16 @@
#endif /* G_OS_WIN32 */
}
-LmSocket
+LmSocketT
_lm_sock_makesocket (int af,
int type,
int protocol)
{
- return (LmSocket)socket (af, type, protocol);
+ return (LmSocketT)socket (af, type, protocol);
}
int
-_lm_sock_connect (LmSocket sock,
+_lm_sock_connect (LmSocketT sock,
const struct sockaddr *name,
int namelen)
{
@@ -190,7 +190,7 @@
}
void
-_lm_sock_get_error (LmSocket sock,
+_lm_sock_get_error (LmSocketT sock,
void *error,
socklen_t *len)
{
--- a/loudmouth/lm-socket.c Fri Feb 02 15:04:37 2007 +0100
+++ b/loudmouth/lm-socket.c Fri Feb 02 15:16:48 2007 +0100
@@ -18,348 +18,110 @@
* Boston, MA 02111-1307, USA.
*/
-#include "lm-socket.h"
+#include <config.h>
+
+#include <string.h>
-#ifndef G_OS_WIN32
-typedef int LmSock;
-#else /* G_OS_WIN32 */
-typedef SOCKET LmSock;
-#endif /* G_OS_WIN32 */
+#include "lm-debug.h"
+#include "lm-internals.h"
+#include "lm-misc.h"
+#include "lm-ssl.h"
+#include "lm-ssl-internals.h"
+#include "lm-proxy.h"
+#include "lm-socket.h"
+#include "lm-sock.h"
+#include "lm-error.h"
-/* FIXME: Integrate with the SSL stuff */
-
-/* FIXME: IO Buffering both in/out here? */
+#define IN_BUFFER_SIZE 1024
+#define MIN_PORT 1
+#define MAX_PORT 65536
struct _LmSocket {
- LmSock sock;
- GIOChannel *io_channel;
- guint io_watch;
- /* FIXME: Add the rest */
+ LmConnection *connection;
+ GMainContext *context;
- LmSSL *ssl;
-
- LmSocketFuncs funcs;
-
- LmSocketState state;
+ gchar *server;
+ guint port;
+
+ gboolean blocking;
- gboolean is_blocking;
-
- gint ref;
-};
+ LmSSL *ssl;
+ LmProxy *proxy;
-typedef void (* SocketDNSCallback) (LmSocket *socket,
- SocketDNSData *data,
- gboolean success);
+ GIOChannel *io_channel;
+ GSource *watch_in;
+ GSource *watch_err;
+ GSource *watch_hup;
-typedef struct {
- /* Used when socket tries to connect */
- struct addrinfo *resolved_addrs;
- struct addrinfo *current_addr;
+ LmSocketT fd;
+
+ GSource *watch_connect;
- /* -- Internal during DNS lookup -- */
- LmSocket *socket;
- gchar *host;
- SocketDNSCallback callback;
-} SocketDNSData;
+ gboolean cancel_open;
+
+ GSource *watch_out;
+ GString *out_buf;
+
+ LmConnectData *connect_data;
+
+ IncomingDataFunc func;
+ gpointer user_data;
+
+ guint ref_count;
+};
-static LmSocket * socket_create (void);
-static void socket_free (LmSocket *socket);
-static gboolean socket_channel_event (GIOChannel *source,
- GIOCondition condition,
- LmSocket *socket);
-static void socket_dns_lookup (LmSocket *socket,
- const gchar *host,
- SocketDNSCallback callback);
-static SocketDNSData * socket_dns_data_new (LmSocket *socket,
- const gchar *host,
- SocketDNSCallback callbac);
-static void socket_dns_data_free (SocketDNSData *data);
-static void socket_start_connect (LmSocket *socket,
- SocketDNSData *data,
- gboolean success);
-
-static LmSocket *
-socket_create (void)
-{
- LmSocket *socket;
-
- socket = g_new0 (LmSocket, 1);
- socket->ref_count = 1;
- socket->is_blocking = FALSE;
- socket->state = LM_SOCKET_STATE_CLOSED;
- socket->ssl = NULL;
-
- return socket;
-}
+static void socket_free (LmSocket *socket);
+static void socket_do_connect (LmConnectData *connect_data);
+static gboolean socket_connect_cb (GIOChannel *source,
+ GIOCondition condition,
+ LmConnectData *connect_data);
+static gboolean socket_in_event (GIOChannel *source,
+ GIOCondition condition,
+ LmSocket *socket);
+static gboolean socket_hup_event (GIOChannel *source,
+ GIOCondition condition,
+ LmSocket *socket);
+static gboolean
+socket_buffered_write_cb (GIOChannel *source,
+ GIOCondition condition,
+ LmSocket *socket);
static void
socket_free (LmSocket *socket)
{
- /* FIXME: Free up the rest of the memory */
+ g_free (socket->server);
- if (socket->io_channel) {
- if (socket->io_watch != 0) {
- g_source_destroy (g_main_context_find_source_by_id (socket->context),
- socket->io_watch);
- socket->io_watch = 0;
- }
-
- g_io_channel_unref (socket->io_channel);
- socket->io_channel = NULL;
-
- socket->fd = -1;
+ if (socket->ssl) {
+ lm_ssl_unref (socket->ssl);
}
- if (socket->ssl) {
- _lm_ssl_unref (socket->ssl);
+ if (socket->proxy) {
+ lm_proxy_unref (socket->proxy);
+ }
+
+ if (socket->out_buf) {
+ g_string_free (socket->out_buf, TRUE);
}
- g_free (socket->host);
g_free (socket);
}
-static gboolean
-socket_channel_event (GIOChannel *source,
- GIOCondition condition,
- LmSocket *socket)
-{
- if (condition & G_IO_IN) {
- socket_signal_read_available ();
- }
- if (condition & G_IO_OUT) {
- socket_attempt_write ();
- }
- if (condition & G_IO_ERR ||
- condition & G_IO_HUP) {
- socket_disconnected ();
- }
-}
-
-static void
-socket_dns_lookup (LmSocket *socket,
- const gchar *host,
- SocketDNSCallback callback);
-{
- SocketDNSData *data;
- struct addrinfo req;
- struct addrinfo ans;
- int err;
-
- /* FIXME: This should not be synchronous */
- data = socket_dns_data_new (socket, host, callback);
-
- memset (&req, 0, sizeof (req));
- req.ai_family = AF_UNSPEC;
- req.ai_socktype = SOCK_STREAM;
- req.ai_protocol = IPPROTO_TCP;
-
- err = getaddrinfo (data->host, NULL, &req, &ans);
- if (err != 0) {
- /* FIXME: Handle error */
- data->callback (data->socket, data, FALSE);
- }
-
- data->resolved_addrs = ans;
- data->current_addr = ans;
-
- data->callback (data->socket, data, TRUE);
-}
-
-static SocketDNSData *
-socket_dns_data_new (LmSocket *socket,
- const gchar *host,
- SocketDNSCallback callbac)
-{
- SocketDNSData *data;
-
- data = g_new0 (SocketDNSData, 1);
-
- data->socket = lm_socket_ref (socket);
- data->host = g_strdup (host);
- data->callback = callback;
-
- data->resolved_addrs = data->current_addr = NULL;
-
- return data;
-}
-
-static void
-socket_dns_data_free (SocketDNSData *data)
-{
- lm_socket_unref (data->socket);
- g_free (data->host);
-
- if (data->resolved_addrs) {
- freeaddrinfo (data->resolved_addrs);;
- deta->resolved_addrs = NULL;
- }
-
- g_free (data);
-}
-
-static void
-socket_start_connect (LmSocket *socket,
- SocketDNSData *data,
- gboolean success)
-{
- struct addrinfo *addr;
- int port;
- char name[NI_MAXHOST];
- char portname[NI_MAXSERV];
- gint fd;
-
- if (!success) {
- socket_dns_data_free (data);
- /* FIXME: Report error */
- return;
- }
-
- addr = data->current_addr;
-
- if (socket->proxy) {
- port = htons (lm_proxy_get_port (socket->proxy));
- } else {
- port = htons (socket->port);
- }
-
- ((struct sockaddr_in *) addr->ai_addr)->sin_port = port;
-
- res = getnameinfo (addr->ai_addr,
- (socklen_t) addr->ai_addrlen,
- name, sizeof (name),
- portname, sizeof (portname),
- NI_NUMERICHOST | NI_NUMERICSERV);
-
- if (res < 0) {
- /* FIXME: Report failure */
- return;
- }
-
- fd = _lm_sock_makesocket (addr->ai_family,
- addr->ai_socktype,
- addr->ai_protocol);
-
- if (!_LM_SOCK_VALID (fd)) {
- g_log (LM_LOG_DOMAIN, LM_LOG_LEVEL_NET,
- "Failed making socket, error:%d...\n",
- _lm_sock_get_last_error ());
-
- /* FIXME: Report failure */
- return;
- }
-
- /* Even though it says _unix_new(), it is supported by glib on
- * win32 because glib does some cool stuff to find out if it
- * can treat it as a FD or a windows SOCKET.
- */
- socket->fd = fd;
- socket->io_channel = g_io_channel_unix_new (fd);
-
- g_io_channel_set_encoding (connect_data->io_channel, NULL, NULL);
- g_io_channel_set_buffered (connect_data->io_channel, FALSE);
-
- _lm_sock_set_blocking (socket->fd, FALSE);
-
- if (socket->proxy) {
- socket->io_watch_connect =
- socket_add_watch (socket,
- socket->io_channel,
- G_IO_IN|G_IO_OUT|G_IO_ERR|G_IO_HUP,
- (GIOFunc) _lm_proxy_connect_cb,
- socket);
- } else {
- socket->io_watch_connect =
- socket_add_watch (socket,
- socket->io_channel,
- G_IO_IN|G_IO_OUT|G_IO_ERR|G_IO_HUP,
- (GIOFunc) socket_connect_cb,
- socket);
- }
-
- /* FIXME: Continue and connect */
-}
-
-LmSocket *
-lm_socket_new (LmSocketFuncs funcs, const gchar *host, guint port)
-{
- LmSocket *socket;
-
- socket = socket_create ();
-
- socket->funcs = funcs;
- socket->host = g_strdup (host);
- socket->port = port;
- socket->is_blocking = FALSE;
-
- return socket;
-}
-
-void
-lm_socket_open (LmSocket *socket)
-{
- g_return_if_fail (socket != NULL);
-
- socket_dns_lookup (socket, socket->host, socket_start_connect);
-}
-
-int
-lm_socket_get_fd (LmSocket *socket)
-{
- g_return_val_if_fail (socket != NULL, -1);
-
- return socket->fd;
-}
-
-gboolean
-lm_socket_get_is_blocking (LmSocket *socket)
-{
- return socket->is_blocking;
-}
-
-void
-lm_socket_set_is_blocking (LmSocket *socket, gboolean is_block)
-{
- int res;
-
- g_return_if_fail (socket != NULL);
-
- /* FIXME: Don't unset all flags */
-
-#ifndef G_OS_WIN32
- res = fcntl (socket->sock, F_SETFL, is_block ? 0 : O_NONBLOCK);
-#else /* G_OS_WIN32 */
- u_long mode = (is_block ? 0 : 1);
- res = ioctlsocket (socket->sock, FIONBIO, &mode);
-#endif /* G_OS_WIN32 */
-
- if (res != 0) {
- g_log (LM_LOG_DOMAIN, LM_LOG_LEVEL_NET,
- "Could not set socket to be %s\n",
- block ? "blocking" : "non-blocking");
- }
-
- socket->is_blocking = is_block;
-}
-
-int
-lm_socket_write (LmSocket *socket,
- gsize size,
- gchar *buf,
- GError **error)
+gint
+lm_socket_do_write (LmSocket *socket,
+ const gchar *buf,
+ gint len)
{
gint b_written;
-
- g_return_val_if_fail (socket != NULL, -1);
if (socket->ssl) {
b_written = _lm_ssl_send (socket->ssl, buf, len);
} else {
GIOStatus io_status = G_IO_STATUS_AGAIN;
- gsize bytes_written = 0;
+ gsize bytes_written;
while (io_status == G_IO_STATUS_AGAIN) {
- io_status = g_io_channel_write_chars (socket->io_channel,
- buf, size,
+ io_status = g_io_channel_write_chars (socket->io_channel,
+ buf, len,
&bytes_written,
NULL);
}
@@ -374,46 +136,615 @@
return b_written;
}
-int
-lm_socket_read (LmSocket *socket,
- gsize size,
- gchar *buf,
- GError **error)
+static gboolean
+socket_in_event (GIOChannel *source,
+ GIOCondition condition,
+ LmSocket *socket)
{
- gsize bytes_read = 0;
- GIOStatus status = G_IO_STATUS_AGAIN;
+ gchar buf[IN_BUFFER_SIZE];
+ gsize bytes_read;
+ GIOStatus status;
+
+ if (!socket->io_channel) {
+ return FALSE;
+ }
+
+ if (socket->ssl) {
+ status = _lm_ssl_read (socket->ssl,
+ buf, IN_BUFFER_SIZE - 1, &bytes_read);
+ } else {
+ status = g_io_channel_read_chars (socket->io_channel,
+ buf, IN_BUFFER_SIZE - 1,
+ &bytes_read,
+ NULL);
+ }
+
+ if (status != G_IO_STATUS_NORMAL || bytes_read < 0) {
+ gint reason;
+
+ switch (status) {
+ case G_IO_STATUS_EOF:
+ reason = LM_DISCONNECT_REASON_HUP;
+ break;
+ case G_IO_STATUS_AGAIN:
+ return TRUE;
+ break;
+ case G_IO_STATUS_ERROR:
+ reason = LM_DISCONNECT_REASON_ERROR;
+ break;
+ default:
+ reason = LM_DISCONNECT_REASON_UNKNOWN;
+ }
+
+ _lm_connection_do_close (socket->connection);
+ _lm_connection_signal_disconnect (socket->connection, reason);
+
+ return FALSE;
+ }
+
+ buf[bytes_read] = '\0';
+ g_log (LM_LOG_DOMAIN, LM_LOG_LEVEL_NET, "\nRECV [%d]:\n",
+ (int)bytes_read);
+ g_log (LM_LOG_DOMAIN, LM_LOG_LEVEL_NET,
+ "-----------------------------------\n");
+ g_log (LM_LOG_DOMAIN, LM_LOG_LEVEL_NET, "'%s'\n", buf);
+ g_log (LM_LOG_DOMAIN, LM_LOG_LEVEL_NET,
+ "-----------------------------------\n");
+
+ lm_verbose ("Read: %d chars\n", (int)bytes_read);
+
+ (socket->func) (socket, buf, socket->user_data);
+
+ return TRUE;
+}
+
+
+
+static gboolean
+socket_hup_event (GIOChannel *source,
+ GIOCondition condition,
+ LmSocket *socket)
+{
+ lm_verbose ("HUP event: %d->'%s'\n",
+ condition, lm_misc_io_condition_to_str (condition));
+
+ if (!socket->io_channel) {
+ return FALSE;
+ }
+
+ _lm_connection_do_close (socket->connection);
+ _lm_connection_signal_disconnect (socket->connection,
+ LM_DISCONNECT_REASON_HUP);
+
+ return TRUE;
+}
+
+void
+_lm_socket_succeeded (LmConnectData *connect_data)
+{
+ LmSocket *socket;
+
+ socket = connect_data->socket;
+
+ if (socket->watch_connect) {
+ g_source_destroy (socket->watch_connect);
+ socket->watch_connect = NULL;
+ }
+
+ /* Need some way to report error/success */
+ if (socket->cancel_open) {
+ lm_verbose ("Cancelling connection...\n");
+ return;
+ }
+
+ socket->fd = connect_data->fd;
+ socket->io_channel = connect_data->io_channel;
+
+ freeaddrinfo (connect_data->resolved_addrs);
+ socket->connect_data = NULL;
+ g_free (connect_data);
+
+ if (socket->ssl) {
+ GError *error = NULL;
+
+ lm_verbose ("Setting up SSL...\n");
+
+#ifdef HAVE_GNUTLS
+ /* GNU TLS requires the socket to be blocking */
+ _lm_sock_set_blocking (socket->fd, TRUE);
+#endif
- g_return_val_if_fail (socket != NULL, -1);
+ if (!_lm_ssl_begin (socket->ssl, socket->fd,
+ socket->server,
+ &error)) {
+ lm_verbose ("Could not begin SSL\n");
+
+ if (error) {
+ g_log (LM_LOG_DOMAIN, LM_LOG_LEVEL_NET,
+ "%s\n", error->message);
+ g_error_free (error);
+ }
+
+ _lm_sock_shutdown (socket->fd);
+ _lm_sock_close (socket->fd);
+
+ _lm_connection_do_close (socket->connection);
+
+ return;
+ }
+
+#ifdef HAVE_GNUTLS
+ _lm_sock_set_blocking (socket->fd, FALSE);
+#endif
+ }
+
+ socket->watch_in =
+ lm_misc_add_io_watch (socket->context,
+ socket->io_channel,
+ G_IO_IN,
+ (GIOFunc) socket_in_event,
+ socket);
+
+ /* FIXME: if we add these, we don't get ANY
+ * response from the server, this is to do with the way that
+ * windows handles watches, see bug #331214.
+ */
+#ifndef G_OS_WIN32
+ socket->watch_err =
+ lm_misc_add_io_watch (socket->context,
+ socket->io_channel,
+ G_IO_ERR,
+ (GIOFunc) _lm_connection_error_event,
+ socket);
+
+ socket->watch_hup =
+ lm_misc_add_io_watch (socket->context,
+ socket->io_channel,
+ G_IO_HUP,
+ (GIOFunc) socket_hup_event,
+ socket);
+#endif
+
+ _lm_connection_socket_result (socket->connection, TRUE);
+}
- while (status == G_IO_STATUS_AGAIN) {
- if (socket->ssl) {
- status = _lm_ssl_read (socket->ssl,
- buf, size, &bytes_read);
- } else {
- status = g_io_channel_read_chars (socket->io_channel,
- buf, size,
- &bytes_read,
- NULL);
+void
+_lm_socket_failed_with_error (LmConnectData *connect_data, int error)
+{
+ LmSocket *socket;
+
+ g_log (LM_LOG_DOMAIN, LM_LOG_LEVEL_NET,
+ "Connection failed: %s (error %d)\n",
+ _lm_sock_get_error_str (error), error);
+
+ socket = connect_data->socket;
+
+ connect_data->current_addr = connect_data->current_addr->ai_next;
+
+ if (socket->watch_connect) {
+ g_source_destroy (socket->watch_connect);
+ socket->watch_connect = NULL;
+ }
+
+ if (connect_data->io_channel != NULL) {
+ g_io_channel_unref (connect_data->io_channel);
+ /* FIXME: need to check for last unref and close the socket */
+ }
+
+ if (connect_data->current_addr == NULL) {
+ _lm_connection_socket_result (socket->connection, FALSE);
+
+ freeaddrinfo (connect_data->resolved_addrs);
+ socket->connect_data = NULL;
+ g_free (connect_data);
+ } else {
+ /* try to connect to the next host */
+ socket_do_connect (connect_data);
+ }
+}
+
+void
+_lm_socket_failed (LmConnectData *connect_data)
+{
+ _lm_socket_failed_with_error (connect_data, _lm_sock_get_last_error());
+}
+
+static gboolean
+socket_connect_cb (GIOChannel *source,
+ GIOCondition condition,
+ LmConnectData *connect_data)
+{
+ LmSocket *socket;
+ struct addrinfo *addr;
+ int err;
+ socklen_t len;
+ LmSocketT fd;
+
+ socket = connect_data->socket;
+ addr = connect_data->current_addr;
+ fd = g_io_channel_unix_get_fd (source);
+
+ if (condition == G_IO_ERR) {
+ len = sizeof (err);
+ _lm_sock_get_error (fd, &err, &len);
+ if (!_lm_sock_is_blocking_error (err)) {
+ g_log (LM_LOG_DOMAIN, LM_LOG_LEVEL_NET,
+ "Connection failed.\n");
+
+ _lm_socket_failed_with_error (connect_data, err);
+
+ socket->watch_connect = NULL;
+ return FALSE;
}
}
- if (status != G_IO_STATUS_NORMAL || bytes_read < 0) {
- /* FIXME: Set error */
+ if (_lm_connection_async_connect_waiting (socket->connection)) {
+ gint res;
+
+ fd = g_io_channel_unix_get_fd (source);
+
+ res = _lm_sock_connect (fd, addr->ai_addr, (int)addr->ai_addrlen);
+ if (res < 0) {
+ err = _lm_sock_get_last_error ();
+ if (_lm_sock_is_blocking_success (err)) {
+ _lm_connection_set_async_connect_waiting (socket->connection, FALSE);
+
+ g_log (LM_LOG_DOMAIN, LM_LOG_LEVEL_NET,
+ "Connection success (1).\n");
+
+ _lm_socket_succeeded (connect_data);
+ }
+
+ if (_lm_connection_async_connect_waiting (socket->connection) &&
+ !_lm_sock_is_blocking_error (err)) {
+ g_log (LM_LOG_DOMAIN, LM_LOG_LEVEL_NET,
+ "Connection failed.\n");
+
+ _lm_sock_close (connect_data->fd);
+ _lm_socket_failed_with_error (connect_data, err);
- return -1;
+ socket->watch_connect = NULL;
+ return FALSE;
+ }
+ }
+ } else {
+ /* for blocking sockets, G_IO_OUT means we are connected */
+ g_log (LM_LOG_DOMAIN, LM_LOG_LEVEL_NET,
+ "Connection success (2).\n");
+
+ _lm_socket_succeeded (connect_data);
+ }
+ return TRUE;
+}
+
+static void
+socket_do_connect (LmConnectData *connect_data)
+{
+ LmSocket *socket;
+ LmSocketT fd;
+ int res, err;
+ int port;
+ char name[NI_MAXHOST];
+ char portname[NI_MAXSERV];
+ struct addrinfo *addr;
+
+ socket = connect_data->socket;
+ addr = connect_data->current_addr;
+
+ if (socket->proxy) {
+ port = htons (lm_proxy_get_port (socket->proxy));
+ } else {
+ port = htons (socket->port);
+ }
+
+ ((struct sockaddr_in *) addr->ai_addr)->sin_port = port;
+
+ res = getnameinfo (addr->ai_addr,
+ (socklen_t)addr->ai_addrlen,
+ name, sizeof (name),
+ portname, sizeof (portname),
+ NI_NUMERICHOST | NI_NUMERICSERV);
+
+ if (res < 0) {
+ _lm_socket_failed (connect_data);
+ return;
}
- return bytes_read;
+ g_log (LM_LOG_DOMAIN, LM_LOG_LEVEL_NET,
+ "Trying %s port %s...\n", name, portname);
+
+ fd = _lm_sock_makesocket (addr->ai_family,
+ addr->ai_socktype,
+ addr->ai_protocol);
+
+ if (!_LM_SOCK_VALID (fd)) {
+ g_log (LM_LOG_DOMAIN, LM_LOG_LEVEL_NET,
+ "Failed making socket, error:%d...\n",
+ _lm_sock_get_last_error ());
+
+ _lm_socket_failed (connect_data);
+
+ return;
+ }
+
+ /* Even though it says _unix_new(), it is supported by glib on
+ * win32 because glib does some cool stuff to find out if it
+ * can treat it as a FD or a windows SOCKET.
+ */
+ connect_data->fd = fd;
+ connect_data->io_channel = g_io_channel_unix_new (fd);
+
+ g_io_channel_set_encoding (connect_data->io_channel, NULL, NULL);
+ g_io_channel_set_buffered (connect_data->io_channel, FALSE);
+
+ _lm_sock_set_blocking (connect_data->fd, socket->blocking);
+
+ if (socket->proxy) {
+ socket->watch_connect =
+ lm_misc_add_io_watch (socket->context,
+ connect_data->io_channel,
+ G_IO_OUT|G_IO_ERR,
+ (GIOFunc) _lm_proxy_connect_cb,
+ connect_data);
+ } else {
+ socket->watch_connect =
+ lm_misc_add_io_watch (socket->context,
+ connect_data->io_channel,
+ G_IO_OUT|G_IO_ERR,
+ (GIOFunc) socket_connect_cb,
+ connect_data);
+ }
+
+ _lm_connection_set_async_connect_waiting (socket->connection, !socket->blocking);
+
+ res = _lm_sock_connect (connect_data->fd,
+ addr->ai_addr, (int)addr->ai_addrlen);
+ if (res < 0) {
+ err = _lm_sock_get_last_error ();
+ if (!_lm_sock_is_blocking_error (err)) {
+ _lm_sock_close (connect_data->fd);
+ _lm_socket_failed_with_error (connect_data, err);
+
+ return;
+ }
+ }
}
gboolean
-lm_socket_close (LmSocket *socket, GError **error)
+lm_socket_output_is_buffered (LmSocket *socket,
+ const gchar *buffer,
+ gint len)
+{
+ if (socket->out_buf) {
+ lm_verbose ("Appending %d bytes to output buffer\n", len);
+ g_string_append_len (socket->out_buf, buffer, len);
+ return TRUE;
+ }
+
+ return FALSE;
+}
+
+void
+lm_socket_setup_output_buffer (LmSocket *socket,
+ const gchar *buffer,
+ gint len)
+{
+ lm_verbose ("OUTPUT BUFFER ENABLED\n");
+
+ socket->out_buf = g_string_new_len (buffer, len);
+
+ socket->watch_out =
+ lm_misc_add_io_watch (socket->context,
+ socket->io_channel,
+ G_IO_OUT,
+ (GIOFunc) socket_buffered_write_cb,
+ socket);
+}
+
+static gboolean
+socket_buffered_write_cb (GIOChannel *source,
+ GIOCondition condition,
+ LmSocket *socket)
+{
+ gint b_written;
+ GString *out_buf;
+ /* FIXME: Do the writing */
+
+ out_buf = socket->out_buf;
+ if (!out_buf) {
+ /* Should not be possible */
+ return FALSE;
+ }
+
+ b_written = lm_socket_do_write (socket, out_buf->str, out_buf->len);
+
+ if (b_written < 0) {
+ _lm_connection_error_event (socket,
+ G_IO_HUP, socket->connection);
+ return FALSE;
+ }
+
+ g_string_erase (out_buf, 0, (gsize) b_written);
+ if (out_buf->len == 0) {
+ lm_verbose ("Output buffer is empty, going back to normal output\n");
+
+ if (socket->watch_out) {
+ g_source_destroy (socket->watch_out);
+ socket->watch_out = NULL;
+ }
+
+ g_string_free (out_buf, TRUE);
+ socket->out_buf = NULL;
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+LmSocket *
+lm_socket_create (GMainContext *context,
+ IncomingDataFunc func,
+ gpointer user_data,
+ LmConnection *connection,
+ gboolean blocking,
+ const gchar *server,
+ guint port,
+ LmSSL *ssl,
+ LmProxy *proxy,
+ GError **error)
{
-#ifndef G_OS_WIN32
- close (socket->sock);
-#else /* G_OS_WIN32 */
- closesocket (socket->sock);
-#endif /* G_OS_WIN32 */
+ LmSocket *socket;
+ struct addrinfo req;
+ struct addrinfo *ans;
+ LmConnectData *data;
+
+ g_return_val_if_fail (server != NULL, NULL);
+ g_return_val_if_fail ((port >= MIN_PORT && port <= MAX_PORT), NULL);
+ g_return_val_if_fail (func != NULL, NULL);
+
+ socket = g_new0 (LmSocket, 1);
+
+ memset (&req, 0, sizeof(req));
+
+ req.ai_family = AF_UNSPEC;
+ req.ai_socktype = SOCK_STREAM;
+ req.ai_protocol = IPPROTO_TCP;
+
+ socket->ref_count = 1;
+
+ socket->connection = connection;
+ socket->server = g_strdup (server);
+ socket->port = port;
+ socket->cancel_open = FALSE;
+ socket->ssl = NULL;
+ socket->proxy = NULL;
+ socket->blocking = blocking;
+ socket->func = func;
+ socket->user_data = user_data;
+
+ if (context) {
+ socket->context = g_main_context_ref (context);
+ }
+
+ if (proxy) {
+ int err;
+ const gchar *proxy_server;
+
+ socket->proxy = lm_proxy_ref (proxy);
+
+ proxy_server = lm_proxy_get_server (socket->proxy);
+
+ /* Connect through proxy */
+ g_log (LM_LOG_DOMAIN, LM_LOG_LEVEL_NET,
+ "Going to connect to proxy %s\n", proxy_server);
+
+ err = getaddrinfo (proxy_server, NULL, &req, &ans);
+ if (err != 0) {
+ const char *str;
+
+ str = _lm_sock_addrinfo_get_error_str (err);
+ g_set_error (error,
+ LM_ERROR,
+ LM_ERROR_CONNECTION_FAILED,
+ str);
+ return NULL;
+ }
+ } else {
+ int err;
+
+ /* Connect directly */
+ g_log (LM_LOG_DOMAIN, LM_LOG_LEVEL_NET,
+ "Going to connect to %s\n",
+ socket->server);
+
+ err = getaddrinfo (socket->server,
+ NULL, &req, &ans);
+ if (err != 0) {
+ const char *str;
+
+ str = _lm_sock_addrinfo_get_error_str (err);
+ g_set_error (error,
+ LM_ERROR,
+ LM_ERROR_CONNECTION_FAILED,
+ str);
+ return NULL;
+ }
+ }
+
+ if (ssl) {
+ socket->ssl = lm_ssl_ref (ssl);
+ _lm_ssl_initialize (socket->ssl);
+ }
+
+ /* Prepare and do the nonblocking connection */
+ data = g_new (LmConnectData, 1);
+
+ data->socket = socket;
+ data->connection = connection;
+ data->resolved_addrs = ans;
+ data->current_addr = ans;
+ data->io_channel = NULL;
+ data->fd = -1;
+
+ socket->connect_data = data;
+
+ socket_do_connect (data);
+
+ return socket;
+}
+
+void
+lm_socket_flush (LmSocket *socket)
+{
+ g_return_if_fail (socket != NULL);
+ g_return_if_fail (socket->io_channel != NULL);
+
+ g_io_channel_flush (socket->io_channel, NULL);
+}
+
+void
+lm_socket_close (LmSocket *socket)
+{
+ LmConnectData *data;
+
+ if (socket->watch_connect) {
+ g_source_destroy (socket->watch_connect);
+ socket->watch_connect = NULL;
+ }
+
+ data = socket->connect_data;
+ if (data) {
+ freeaddrinfo (data->resolved_addrs);
+ socket->connect_data = NULL;
+ g_free (data);
+ }
+
+ if (socket->io_channel) {
+ if (socket->watch_in) {
+ g_source_destroy (socket->watch_in);
+ socket->watch_in = NULL;
+ }
+
+ if (socket->watch_err) {
+ g_source_destroy (socket->watch_err);
+ socket->watch_err = NULL;
+ }
+
+ if (socket->watch_hup) {
+ g_source_destroy (socket->watch_hup);
+ socket->watch_hup = NULL;
+ }
+
+ if (socket->watch_out) {
+ g_source_destroy (socket->watch_out);
+ socket->watch_out = NULL;
+ }
+
+ g_io_channel_unref (socket->io_channel);
+ socket->io_channel = NULL;
+
+ socket->fd = -1;
+ }
}
LmSocket *
@@ -421,8 +752,8 @@
{
g_return_val_if_fail (socket != NULL, NULL);
- socket->ref++;
-
+ socket->ref_count++;
+
return socket;
}
@@ -430,10 +761,10 @@
lm_socket_unref (LmSocket *socket)
{
g_return_if_fail (socket != NULL);
-
- socket->ref--;
-
- if (socket->ref <= 0) {
+
+ socket->ref_count--;
+
+ if (socket->ref_count <= 0) {
socket_free (socket);
}
}
--- a/loudmouth/lm-socket.h Fri Feb 02 15:04:37 2007 +0100
+++ b/loudmouth/lm-socket.h Fri Feb 02 15:16:48 2007 +0100
@@ -18,46 +18,43 @@
* Boston, MA 02111-1307, USA.
*/
+#ifndef __LM_SOCKET_H__
+#define __LM_SOCKET_H__
+
#include <glib.h>
-#ifndef __LM_SOCKET_H__
-#define __LM_SOCKET_H__
+#include "lm-internals.h"
typedef struct _LmSocket LmSocket;
-typedef struct {
- /* ConnectCB */
- /* InCB */
- /* HupCB */
-} LmSocketFuncs;
+typedef void (* IncomingDataFunc) (LmSocket *socket,
+ const gchar *buf,
+ gpointer user_data);
-typedef enum {
- LM_SOCKET_STATE_CLOSED,
- LM_SOCKET_STATE_DNS_LOOKUP,
- LM_SOCKET_STATE_OPENING,
- LM_SOCKET_STATE_OPEN
-} LmSocketState;
+gboolean lm_socket_output_is_buffered (LmSocket *socket,
+ const gchar *buffer,
+ gint len);
+void lm_socket_setup_output_buffer (LmSocket *socket,
+ const gchar *buffer,
+ gint len);
+gint lm_socket_do_write (LmSocket *socket,
+ const gchar *buf,
+ gint len);
-LmSocket * lm_socket_new (LmSocketFuncs funcs,
- const gchar *host,
- guint port);
-void lm_socket_open (LmSocket *socket);
-int lm_socket_get_fd (LmSocket *socket);
-gboolean lm_socket_get_is_blocking (LmSocket *socket);
-void lm_socket_set_is_blocking (LmSocket *socket,
- gboolean is_block);
-int lm_socket_write (LmSocket *socket,
- gsize size,
- gchar *buf,
- GError **error);
-int lm_socket_read (LmSocket *socket,
- gsize size,
- gchar *buf,
- GError **error);
-gboolean lm_socket_close (LmSocket *socket,
- GError **error);
-LmSock * lm_socket_ref (LmSocket *socket);
-void lm_socket_unref (LmSocket *socket);
+LmSocket * lm_socket_create (GMainContext *context,
+ IncomingDataFunc func,
+ gpointer user_data,
+ LmConnection *connection,
+ gboolean blocking,
+ const gchar *server,
+ guint port,
+ LmSSL *ssl,
+ LmProxy *proxy,
+ GError **error);
+void lm_socket_flush (LmSocket *socket);
+void lm_socket_close (LmSocket *socket);
+LmSocket * lm_socket_ref (LmSocket *socket);
+void lm_socket_unref (LmSocket *socket);
#endif /* __LM_SOCKET_H__ */
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/tests/.gitignore Fri Feb 02 15:16:48 2007 +0100
@@ -0,0 +1,2 @@
+test-objects
+test-parser