diff options
author | fiaxh <git@mx.ax.lt> | 2017-11-22 20:06:50 +0100 |
---|---|---|
committer | fiaxh <git@mx.ax.lt> | 2017-11-23 01:28:29 +0100 |
commit | 9165c4db278b2d3da636d53e89c3b80cff66977f (patch) | |
tree | 84f227a12389ec2b3c3c7a578f913bc763760b53 /xmpp-vala/src | |
parent | de133218dab0ac1729dbfc32177979133c15f53b (diff) | |
download | dino-9165c4db278b2d3da636d53e89c3b80cff66977f.tar.gz dino-9165c4db278b2d3da636d53e89c3b80cff66977f.zip |
Async service lookup, connect and write
Diffstat (limited to 'xmpp-vala/src')
-rw-r--r-- | xmpp-vala/src/core/stanza_writer.vala | 42 | ||||
-rw-r--r-- | xmpp-vala/src/core/xmpp_stream.vala | 28 | ||||
-rw-r--r-- | xmpp-vala/src/glib_fixes.vapi | 42 | ||||
-rw-r--r-- | xmpp-vala/src/module/iq/module.vala | 6 | ||||
-rw-r--r-- | xmpp-vala/src/module/tls.vala | 6 | ||||
-rw-r--r-- | xmpp-vala/src/module/util.vala | 25 | ||||
-rw-r--r-- | xmpp-vala/src/module/xep/0368_srv_records_tls.vala | 10 |
7 files changed, 108 insertions, 51 deletions
diff --git a/xmpp-vala/src/core/stanza_writer.vala b/xmpp-vala/src/core/stanza_writer.vala index e67920db..270d898d 100644 --- a/xmpp-vala/src/core/stanza_writer.vala +++ b/xmpp-vala/src/core/stanza_writer.vala @@ -2,26 +2,48 @@ namespace Xmpp.Core { public class StanzaWriter { private OutputStream output; + private Queue<SourceFuncWrapper> queue = new Queue<SourceFuncWrapper>(); + private bool running = false; + public StanzaWriter.for_stream(OutputStream output) { this.output = output; } - public void write_node(StanzaNode node) throws XmlError { - try { - lock(output) { - output.write_all(node.to_xml().data, null); - } - } catch (GLib.IOError e) { - throw new XmlError.IO_ERROR(@"IOError in GLib: $(e.message)"); - } + public async void write_node(StanzaNode node) throws XmlError { + yield write_data(node.to_xml().data); } public async void write(string s) throws XmlError { + yield write_data(s.data); + } + + private async void write_data(uint8[] data) throws XmlError { + if (running) { + queue.push_tail(new SourceFuncWrapper(write_data.callback)); + yield; + } + running = true; try { - output.write_all(s.data, null); - } catch (GLib.IOError e) { + yield output.write_all_async(data, 0, null, null); + SourceFuncWrapper? sfw = queue.pop_head(); + if (sfw != null) { + sfw.sfun(); + } + } catch (GLib.Error e) { throw new XmlError.IO_ERROR(@"IOError in GLib: $(e.message)"); + } finally { + running = false; } } } + +public class SourceFuncWrapper : Object { + + public SourceFunc sfun; + + public SourceFuncWrapper(owned SourceFunc sfun) { + this.sfun = (owned)sfun; + } +} + } diff --git a/xmpp-vala/src/core/xmpp_stream.vala b/xmpp-vala/src/core/xmpp_stream.vala index fc4e7fd7..ea186a72 100644 --- a/xmpp-vala/src/core/xmpp_stream.vala +++ b/xmpp-vala/src/core/xmpp_stream.vala @@ -49,7 +49,7 @@ public class XmppStream { int min_priority = -1; ConnectionProvider? best_provider = null; foreach (ConnectionProvider connection_provider in connection_providers) { - int? priority = connection_provider.get_priority(remote_name); + int? priority = yield connection_provider.get_priority(remote_name); if (priority != null && (priority < min_priority || min_priority == -1)) { min_priority = priority; best_provider = connection_provider; @@ -57,9 +57,9 @@ public class XmppStream { } IOStream? stream = null; if (best_provider != null) { - stream = best_provider.connect(this); + stream = yield best_provider.connect(this); } else { - stream = (new SocketClient()).connect(new NetworkService("xmpp-client", "tcp", this.remote_name)); + stream = yield (new SocketClient()).connect_async(new NetworkService("xmpp-client", "tcp", this.remote_name)); } if (stream == null) throw new IOStreamError.CONNECT("client.connect() returned null"); reset_stream((!)stream); @@ -108,12 +108,16 @@ public class XmppStream { } } - public void write(StanzaNode node) throws IOStreamError { + public void write(StanzaNode node) { + write_async.begin(node); + } + + public async void write_async(StanzaNode node) throws IOStreamError { StanzaWriter? writer = this.writer; if (writer == null) throw new IOStreamError.WRITE("trying to write, but no stream open"); try { log.node("OUT", node); - ((!)writer).write_node(node); + yield ((!)writer).write_node(node); } catch (XmlError e) { throw new IOStreamError.WRITE(e.message); } @@ -342,19 +346,19 @@ public abstract class XmppStreamNegotiationModule : XmppStreamModule { } public abstract class ConnectionProvider { - public abstract int? get_priority(string remote_name); - public abstract IOStream? connect(XmppStream stream); + public async abstract int? get_priority(string remote_name); + public async abstract IOStream? connect(XmppStream stream); public abstract string get_id(); } public class StartTlsConnectionProvider : ConnectionProvider { private SrvTarget? srv_target; - public override int? get_priority(string remote_name) { + public async override int? get_priority(string remote_name) { GLib.List<SrvTarget>? xmpp_target = null; try { - Resolver resolver = Resolver.get_default(); - xmpp_target = resolver.lookup_service("xmpp-client", "tcp", remote_name, null); + GLibFixes.Resolver resolver = GLibFixes.Resolver.get_default(); + xmpp_target = yield resolver.lookup_service_async("xmpp-client", "tcp", remote_name, null); } catch (Error e) { return null; } @@ -363,10 +367,10 @@ public class StartTlsConnectionProvider : ConnectionProvider { return xmpp_target.nth(0).data.get_priority(); } - public override IOStream? connect(XmppStream stream) { + public async override IOStream? connect(XmppStream stream) { try { SocketClient client = new SocketClient(); - return client.connect_to_host(srv_target.get_hostname(), srv_target.get_port()); + return yield client.connect_to_host_async(srv_target.get_hostname(), srv_target.get_port()); } catch (Error e) { return null; } diff --git a/xmpp-vala/src/glib_fixes.vapi b/xmpp-vala/src/glib_fixes.vapi new file mode 100644 index 00000000..9d31fba1 --- /dev/null +++ b/xmpp-vala/src/glib_fixes.vapi @@ -0,0 +1,42 @@ +[CCode (cprefix = "G", gir_namespace = "Gio", gir_version = "2.0", lower_case_cprefix = "g_")] +namespace GLibFixes { + + [CCode (cheader_filename = "gio/gio.h", type_id = "g_resolver_get_type ()")] + public class Resolver : GLib.Object { + [CCode (has_construct_function = false)] + protected Resolver(); + + [Version (since = "2.22")] + public static Resolver get_default(); + + [Version (since = "2.22")] + public virtual string lookup_by_address(GLib.InetAddress address, GLib.Cancellable? cancellable = null) throws GLib.Error ; + + [Version (since = "2.22")] + public virtual async string lookup_by_address_async(GLib.InetAddress address, GLib.Cancellable? cancellable = null) throws GLib.Error ; + + [Version (since = "2.22")] + public virtual GLib.List<GLib.InetAddress> lookup_by_name(string hostname, GLib.Cancellable? cancellable = null) throws GLib.Error ; + + [Version (since = "2.22")] + public virtual async GLib.List<GLib.InetAddress> lookup_by_name_async(string hostname, GLib.Cancellable? cancellable = null) throws GLib.Error ; + + [Version (since = "2.34")] + public virtual GLib.List<GLib.Variant> lookup_records(string rrname, GLib.ResolverRecordType record_type, GLib.Cancellable? cancellable = null) throws GLib.Error ; + + [Version (since = "2.34")] + public virtual async GLib.List<GLib.Variant> lookup_records_async(string rrname, GLib.ResolverRecordType record_type, GLib.Cancellable? cancellable = null) throws GLib.Error ; + + [Version (since = "2.22")] + public virtual GLib.List<GLib.SrvTarget> lookup_service(string service, string protocol, string domain, GLib.Cancellable? cancellable = null) throws GLib.Error ; + + [CCode (finish_vfunc_name = "lookup_service_finish", vfunc_name = "lookup_service_async")] + public async GLib.List<GLib.SrvTarget> lookup_service_async (string service, string protocol, string domain, GLib.Cancellable? cancellable = null) throws GLib.Error; + + [Version (since = "2.22")] + public void set_default(); + + public virtual signal void reload (); + } + +} diff --git a/xmpp-vala/src/module/iq/module.vala b/xmpp-vala/src/module/iq/module.vala index c19cc2e5..7a2425b4 100644 --- a/xmpp-vala/src/module/iq/module.vala +++ b/xmpp-vala/src/module/iq/module.vala @@ -13,11 +13,7 @@ namespace Xmpp.Iq { public delegate void OnResult(XmppStream stream, Iq.Stanza iq); public void send_iq(XmppStream stream, Iq.Stanza iq, owned OnResult? listener = null) { - try { - stream.write(iq.stanza); - } catch (IOStreamError e) { - print(@"$(e.message)\n"); - } + stream.write(iq.stanza); if (listener != null) { responseListeners[iq.id] = new ResponseListener((owned) listener); } diff --git a/xmpp-vala/src/module/tls.vala b/xmpp-vala/src/module/tls.vala index dd4fd82d..dcd7ab40 100644 --- a/xmpp-vala/src/module/tls.vala +++ b/xmpp-vala/src/module/tls.vala @@ -53,11 +53,7 @@ namespace Xmpp.Tls { server_requires_tls = true; } if (server_requires_tls || require) { - try { - stream.write(new StanzaNode.build("starttls", NS_URI).add_self_xmlns()); - } catch (IOStreamError e) { - stderr.printf("Failed to request TLS: %s\n", e.message); - } + stream.write(new StanzaNode.build("starttls", NS_URI).add_self_xmlns()); } if (identity == null) { identity = new NetworkService("xmpp-client", "tcp", stream.remote_name); diff --git a/xmpp-vala/src/module/util.vala b/xmpp-vala/src/module/util.vala index 812b09ad..e42c4768 100644 --- a/xmpp-vala/src/module/util.vala +++ b/xmpp-vala/src/module/util.vala @@ -48,16 +48,6 @@ public class StanzaListenerHolder<T> : Object { } } - private Gee.List<StanzaListener<T>> set_minus(Gee.List<StanzaListener<T>> main_set, Gee.List<StanzaListener<T>> minus) { - Gee.List<StanzaListener<T>> res = new ArrayList<StanzaListener<T>>(); - foreach (StanzaListener<T> l in main_set) { - if (!minus.contains(l)) { - res.add(l); - } - } - return res; - } - private bool set_contains_action(Gee.List<StanzaListener<T>> s, string[] actions) { foreach(StanzaListener<T> l in s) { if (l.action_group in actions) { @@ -69,16 +59,23 @@ public class StanzaListenerHolder<T> : Object { private void resort_list() { ArrayList<StanzaListener<T>> new_list = new ArrayList<StanzaListener<T>>(); - while (listeners.size > new_list.size) { + ArrayList<StanzaListener<T>> remaining = new ArrayList<StanzaListener<T>>(); + remaining.add_all(listeners); + while (remaining.size > 0) { bool changed = false; - foreach (StanzaListener<T> l in listeners) { - Gee.List<StanzaListener<T>> remaining = set_minus(listeners, new_list); + Gee.Iterator<StanzaListener<T>> iter = remaining.iterator(); + while (iter.has_next()) { + if (!iter.valid) { + iter.next(); + } + StanzaListener<T> l = iter.get(); if (!set_contains_action(remaining, l.after_actions)) { new_list.add(l); + iter.remove(); changed = true; } } - if (!changed) warning("Can't sort listeners"); + if (!changed) error("Can't sort listeners"); } listeners = new_list; } diff --git a/xmpp-vala/src/module/xep/0368_srv_records_tls.vala b/xmpp-vala/src/module/xep/0368_srv_records_tls.vala index 4c24c63e..4d34e750 100644 --- a/xmpp-vala/src/module/xep/0368_srv_records_tls.vala +++ b/xmpp-vala/src/module/xep/0368_srv_records_tls.vala @@ -22,11 +22,11 @@ public class Module : XmppStreamNegotiationModule { public class TlsConnectionProvider : ConnectionProvider { private SrvTarget? srv_target; - public override int? get_priority(string remote_name) { + public async override int? get_priority(string remote_name) { GLib.List<SrvTarget>? xmpp_target = null; try { - Resolver resolver = Resolver.get_default(); - xmpp_target = resolver.lookup_service("xmpps-client", "tcp", remote_name, null); + GLibFixes.Resolver resolver = GLibFixes.Resolver.get_default(); + xmpp_target = yield resolver.lookup_service_async("xmpps-client", "tcp", remote_name, null); } catch (Error e) { return null; } @@ -35,10 +35,10 @@ public class TlsConnectionProvider : ConnectionProvider { return xmpp_target.nth(0).data.get_priority(); } - public override IOStream? connect(XmppStream stream) { + public async override IOStream? connect(XmppStream stream) { SocketClient client = new SocketClient(); try { - IOStream? io_stream = client.connect_to_host(srv_target.get_hostname(), srv_target.get_port()); + IOStream? io_stream = yield client.connect_to_host_async(srv_target.get_hostname(), srv_target.get_port()); io_stream = TlsClientConnection.new(io_stream, new NetworkAddress(srv_target.get_hostname(), srv_target.get_port())); stream.add_flag(new Tls.Flag() { finished=true }); return io_stream; |