From 9165c4db278b2d3da636d53e89c3b80cff66977f Mon Sep 17 00:00:00 2001 From: fiaxh Date: Wed, 22 Nov 2017 20:06:50 +0100 Subject: Async service lookup, connect and write --- xmpp-vala/CMakeLists.txt | 2 ++ xmpp-vala/src/core/stanza_writer.vala | 42 ++++++++++++++++------ xmpp-vala/src/core/xmpp_stream.vala | 28 ++++++++------- xmpp-vala/src/glib_fixes.vapi | 42 ++++++++++++++++++++++ xmpp-vala/src/module/iq/module.vala | 6 +--- xmpp-vala/src/module/tls.vala | 6 +--- xmpp-vala/src/module/util.vala | 25 ++++++------- xmpp-vala/src/module/xep/0368_srv_records_tls.vala | 10 +++--- 8 files changed, 110 insertions(+), 51 deletions(-) create mode 100644 xmpp-vala/src/glib_fixes.vapi (limited to 'xmpp-vala') diff --git a/xmpp-vala/CMakeLists.txt b/xmpp-vala/CMakeLists.txt index 2bee94ab..6b08c765 100644 --- a/xmpp-vala/CMakeLists.txt +++ b/xmpp-vala/CMakeLists.txt @@ -8,6 +8,8 @@ find_packages(ENGINE_PACKAGES REQUIRED vala_precompile(ENGINE_VALA_C SOURCES + "src/glib_fixes.vapi" + "src/core/namespace_state.vala" "src/core/stanza_attribute.vala" "src/core/stanza_node.vala" diff --git a/xmpp-vala/src/core/stanza_writer.vala b/xmpp-vala/src/core/stanza_writer.vala index e67920db..270d898d 100644 --- a/xmpp-vala/src/core/stanza_writer.vala +++ b/xmpp-vala/src/core/stanza_writer.vala @@ -2,26 +2,48 @@ namespace Xmpp.Core { public class StanzaWriter { private OutputStream output; + private Queue queue = new Queue(); + private bool running = false; + public StanzaWriter.for_stream(OutputStream output) { this.output = output; } - public void write_node(StanzaNode node) throws XmlError { - try { - lock(output) { - output.write_all(node.to_xml().data, null); - } - } catch (GLib.IOError e) { - throw new XmlError.IO_ERROR(@"IOError in GLib: $(e.message)"); - } + public async void write_node(StanzaNode node) throws XmlError { + yield write_data(node.to_xml().data); } public async void write(string s) throws XmlError { + yield write_data(s.data); + } + + private async void write_data(uint8[] data) throws XmlError { + if (running) { + queue.push_tail(new SourceFuncWrapper(write_data.callback)); + yield; + } + running = true; try { - output.write_all(s.data, null); - } catch (GLib.IOError e) { + yield output.write_all_async(data, 0, null, null); + SourceFuncWrapper? sfw = queue.pop_head(); + if (sfw != null) { + sfw.sfun(); + } + } catch (GLib.Error e) { throw new XmlError.IO_ERROR(@"IOError in GLib: $(e.message)"); + } finally { + running = false; } } } + +public class SourceFuncWrapper : Object { + + public SourceFunc sfun; + + public SourceFuncWrapper(owned SourceFunc sfun) { + this.sfun = (owned)sfun; + } +} + } diff --git a/xmpp-vala/src/core/xmpp_stream.vala b/xmpp-vala/src/core/xmpp_stream.vala index fc4e7fd7..ea186a72 100644 --- a/xmpp-vala/src/core/xmpp_stream.vala +++ b/xmpp-vala/src/core/xmpp_stream.vala @@ -49,7 +49,7 @@ public class XmppStream { int min_priority = -1; ConnectionProvider? best_provider = null; foreach (ConnectionProvider connection_provider in connection_providers) { - int? priority = connection_provider.get_priority(remote_name); + int? priority = yield connection_provider.get_priority(remote_name); if (priority != null && (priority < min_priority || min_priority == -1)) { min_priority = priority; best_provider = connection_provider; @@ -57,9 +57,9 @@ public class XmppStream { } IOStream? stream = null; if (best_provider != null) { - stream = best_provider.connect(this); + stream = yield best_provider.connect(this); } else { - stream = (new SocketClient()).connect(new NetworkService("xmpp-client", "tcp", this.remote_name)); + stream = yield (new SocketClient()).connect_async(new NetworkService("xmpp-client", "tcp", this.remote_name)); } if (stream == null) throw new IOStreamError.CONNECT("client.connect() returned null"); reset_stream((!)stream); @@ -108,12 +108,16 @@ public class XmppStream { } } - public void write(StanzaNode node) throws IOStreamError { + public void write(StanzaNode node) { + write_async.begin(node); + } + + public async void write_async(StanzaNode node) throws IOStreamError { StanzaWriter? writer = this.writer; if (writer == null) throw new IOStreamError.WRITE("trying to write, but no stream open"); try { log.node("OUT", node); - ((!)writer).write_node(node); + yield ((!)writer).write_node(node); } catch (XmlError e) { throw new IOStreamError.WRITE(e.message); } @@ -342,19 +346,19 @@ public abstract class XmppStreamNegotiationModule : XmppStreamModule { } public abstract class ConnectionProvider { - public abstract int? get_priority(string remote_name); - public abstract IOStream? connect(XmppStream stream); + public async abstract int? get_priority(string remote_name); + public async abstract IOStream? connect(XmppStream stream); public abstract string get_id(); } public class StartTlsConnectionProvider : ConnectionProvider { private SrvTarget? srv_target; - public override int? get_priority(string remote_name) { + public async override int? get_priority(string remote_name) { GLib.List? xmpp_target = null; try { - Resolver resolver = Resolver.get_default(); - xmpp_target = resolver.lookup_service("xmpp-client", "tcp", remote_name, null); + GLibFixes.Resolver resolver = GLibFixes.Resolver.get_default(); + xmpp_target = yield resolver.lookup_service_async("xmpp-client", "tcp", remote_name, null); } catch (Error e) { return null; } @@ -363,10 +367,10 @@ public class StartTlsConnectionProvider : ConnectionProvider { return xmpp_target.nth(0).data.get_priority(); } - public override IOStream? connect(XmppStream stream) { + public async override IOStream? connect(XmppStream stream) { try { SocketClient client = new SocketClient(); - return client.connect_to_host(srv_target.get_hostname(), srv_target.get_port()); + return yield client.connect_to_host_async(srv_target.get_hostname(), srv_target.get_port()); } catch (Error e) { return null; } diff --git a/xmpp-vala/src/glib_fixes.vapi b/xmpp-vala/src/glib_fixes.vapi new file mode 100644 index 00000000..9d31fba1 --- /dev/null +++ b/xmpp-vala/src/glib_fixes.vapi @@ -0,0 +1,42 @@ +[CCode (cprefix = "G", gir_namespace = "Gio", gir_version = "2.0", lower_case_cprefix = "g_")] +namespace GLibFixes { + + [CCode (cheader_filename = "gio/gio.h", type_id = "g_resolver_get_type ()")] + public class Resolver : GLib.Object { + [CCode (has_construct_function = false)] + protected Resolver(); + + [Version (since = "2.22")] + public static Resolver get_default(); + + [Version (since = "2.22")] + public virtual string lookup_by_address(GLib.InetAddress address, GLib.Cancellable? cancellable = null) throws GLib.Error ; + + [Version (since = "2.22")] + public virtual async string lookup_by_address_async(GLib.InetAddress address, GLib.Cancellable? cancellable = null) throws GLib.Error ; + + [Version (since = "2.22")] + public virtual GLib.List lookup_by_name(string hostname, GLib.Cancellable? cancellable = null) throws GLib.Error ; + + [Version (since = "2.22")] + public virtual async GLib.List lookup_by_name_async(string hostname, GLib.Cancellable? cancellable = null) throws GLib.Error ; + + [Version (since = "2.34")] + public virtual GLib.List lookup_records(string rrname, GLib.ResolverRecordType record_type, GLib.Cancellable? cancellable = null) throws GLib.Error ; + + [Version (since = "2.34")] + public virtual async GLib.List lookup_records_async(string rrname, GLib.ResolverRecordType record_type, GLib.Cancellable? cancellable = null) throws GLib.Error ; + + [Version (since = "2.22")] + public virtual GLib.List lookup_service(string service, string protocol, string domain, GLib.Cancellable? cancellable = null) throws GLib.Error ; + + [CCode (finish_vfunc_name = "lookup_service_finish", vfunc_name = "lookup_service_async")] + public async GLib.List lookup_service_async (string service, string protocol, string domain, GLib.Cancellable? cancellable = null) throws GLib.Error; + + [Version (since = "2.22")] + public void set_default(); + + public virtual signal void reload (); + } + +} diff --git a/xmpp-vala/src/module/iq/module.vala b/xmpp-vala/src/module/iq/module.vala index c19cc2e5..7a2425b4 100644 --- a/xmpp-vala/src/module/iq/module.vala +++ b/xmpp-vala/src/module/iq/module.vala @@ -13,11 +13,7 @@ namespace Xmpp.Iq { public delegate void OnResult(XmppStream stream, Iq.Stanza iq); public void send_iq(XmppStream stream, Iq.Stanza iq, owned OnResult? listener = null) { - try { - stream.write(iq.stanza); - } catch (IOStreamError e) { - print(@"$(e.message)\n"); - } + stream.write(iq.stanza); if (listener != null) { responseListeners[iq.id] = new ResponseListener((owned) listener); } diff --git a/xmpp-vala/src/module/tls.vala b/xmpp-vala/src/module/tls.vala index dd4fd82d..dcd7ab40 100644 --- a/xmpp-vala/src/module/tls.vala +++ b/xmpp-vala/src/module/tls.vala @@ -53,11 +53,7 @@ namespace Xmpp.Tls { server_requires_tls = true; } if (server_requires_tls || require) { - try { - stream.write(new StanzaNode.build("starttls", NS_URI).add_self_xmlns()); - } catch (IOStreamError e) { - stderr.printf("Failed to request TLS: %s\n", e.message); - } + stream.write(new StanzaNode.build("starttls", NS_URI).add_self_xmlns()); } if (identity == null) { identity = new NetworkService("xmpp-client", "tcp", stream.remote_name); diff --git a/xmpp-vala/src/module/util.vala b/xmpp-vala/src/module/util.vala index 812b09ad..e42c4768 100644 --- a/xmpp-vala/src/module/util.vala +++ b/xmpp-vala/src/module/util.vala @@ -48,16 +48,6 @@ public class StanzaListenerHolder : Object { } } - private Gee.List> set_minus(Gee.List> main_set, Gee.List> minus) { - Gee.List> res = new ArrayList>(); - foreach (StanzaListener l in main_set) { - if (!minus.contains(l)) { - res.add(l); - } - } - return res; - } - private bool set_contains_action(Gee.List> s, string[] actions) { foreach(StanzaListener l in s) { if (l.action_group in actions) { @@ -69,16 +59,23 @@ public class StanzaListenerHolder : Object { private void resort_list() { ArrayList> new_list = new ArrayList>(); - while (listeners.size > new_list.size) { + ArrayList> remaining = new ArrayList>(); + remaining.add_all(listeners); + while (remaining.size > 0) { bool changed = false; - foreach (StanzaListener l in listeners) { - Gee.List> remaining = set_minus(listeners, new_list); + Gee.Iterator> iter = remaining.iterator(); + while (iter.has_next()) { + if (!iter.valid) { + iter.next(); + } + StanzaListener l = iter.get(); if (!set_contains_action(remaining, l.after_actions)) { new_list.add(l); + iter.remove(); changed = true; } } - if (!changed) warning("Can't sort listeners"); + if (!changed) error("Can't sort listeners"); } listeners = new_list; } diff --git a/xmpp-vala/src/module/xep/0368_srv_records_tls.vala b/xmpp-vala/src/module/xep/0368_srv_records_tls.vala index 4c24c63e..4d34e750 100644 --- a/xmpp-vala/src/module/xep/0368_srv_records_tls.vala +++ b/xmpp-vala/src/module/xep/0368_srv_records_tls.vala @@ -22,11 +22,11 @@ public class Module : XmppStreamNegotiationModule { public class TlsConnectionProvider : ConnectionProvider { private SrvTarget? srv_target; - public override int? get_priority(string remote_name) { + public async override int? get_priority(string remote_name) { GLib.List? xmpp_target = null; try { - Resolver resolver = Resolver.get_default(); - xmpp_target = resolver.lookup_service("xmpps-client", "tcp", remote_name, null); + GLibFixes.Resolver resolver = GLibFixes.Resolver.get_default(); + xmpp_target = yield resolver.lookup_service_async("xmpps-client", "tcp", remote_name, null); } catch (Error e) { return null; } @@ -35,10 +35,10 @@ public class TlsConnectionProvider : ConnectionProvider { return xmpp_target.nth(0).data.get_priority(); } - public override IOStream? connect(XmppStream stream) { + public async override IOStream? connect(XmppStream stream) { SocketClient client = new SocketClient(); try { - IOStream? io_stream = client.connect_to_host(srv_target.get_hostname(), srv_target.get_port()); + IOStream? io_stream = yield client.connect_to_host_async(srv_target.get_hostname(), srv_target.get_port()); io_stream = TlsClientConnection.new(io_stream, new NetworkAddress(srv_target.get_hostname(), srv_target.get_port())); stream.add_flag(new Tls.Flag() { finished=true }); return io_stream; -- cgit v1.2.3-70-g09d2