aboutsummaryrefslogtreecommitdiff
path: root/xmpp-vala
diff options
context:
space:
mode:
authorfiaxh <git@mx.ax.lt>2017-11-22 20:06:50 +0100
committerfiaxh <git@mx.ax.lt>2017-11-23 01:28:29 +0100
commit9165c4db278b2d3da636d53e89c3b80cff66977f (patch)
tree84f227a12389ec2b3c3c7a578f913bc763760b53 /xmpp-vala
parentde133218dab0ac1729dbfc32177979133c15f53b (diff)
downloaddino-9165c4db278b2d3da636d53e89c3b80cff66977f.tar.gz
dino-9165c4db278b2d3da636d53e89c3b80cff66977f.zip
Async service lookup, connect and write
Diffstat (limited to 'xmpp-vala')
-rw-r--r--xmpp-vala/CMakeLists.txt2
-rw-r--r--xmpp-vala/src/core/stanza_writer.vala42
-rw-r--r--xmpp-vala/src/core/xmpp_stream.vala28
-rw-r--r--xmpp-vala/src/glib_fixes.vapi42
-rw-r--r--xmpp-vala/src/module/iq/module.vala6
-rw-r--r--xmpp-vala/src/module/tls.vala6
-rw-r--r--xmpp-vala/src/module/util.vala25
-rw-r--r--xmpp-vala/src/module/xep/0368_srv_records_tls.vala10
8 files changed, 110 insertions, 51 deletions
diff --git a/xmpp-vala/CMakeLists.txt b/xmpp-vala/CMakeLists.txt
index 2bee94ab..6b08c765 100644
--- a/xmpp-vala/CMakeLists.txt
+++ b/xmpp-vala/CMakeLists.txt
@@ -8,6 +8,8 @@ find_packages(ENGINE_PACKAGES REQUIRED
vala_precompile(ENGINE_VALA_C
SOURCES
+ "src/glib_fixes.vapi"
+
"src/core/namespace_state.vala"
"src/core/stanza_attribute.vala"
"src/core/stanza_node.vala"
diff --git a/xmpp-vala/src/core/stanza_writer.vala b/xmpp-vala/src/core/stanza_writer.vala
index e67920db..270d898d 100644
--- a/xmpp-vala/src/core/stanza_writer.vala
+++ b/xmpp-vala/src/core/stanza_writer.vala
@@ -2,26 +2,48 @@ namespace Xmpp.Core {
public class StanzaWriter {
private OutputStream output;
+ private Queue<SourceFuncWrapper> queue = new Queue<SourceFuncWrapper>();
+ private bool running = false;
+
public StanzaWriter.for_stream(OutputStream output) {
this.output = output;
}
- public void write_node(StanzaNode node) throws XmlError {
- try {
- lock(output) {
- output.write_all(node.to_xml().data, null);
- }
- } catch (GLib.IOError e) {
- throw new XmlError.IO_ERROR(@"IOError in GLib: $(e.message)");
- }
+ public async void write_node(StanzaNode node) throws XmlError {
+ yield write_data(node.to_xml().data);
}
public async void write(string s) throws XmlError {
+ yield write_data(s.data);
+ }
+
+ private async void write_data(uint8[] data) throws XmlError {
+ if (running) {
+ queue.push_tail(new SourceFuncWrapper(write_data.callback));
+ yield;
+ }
+ running = true;
try {
- output.write_all(s.data, null);
- } catch (GLib.IOError e) {
+ yield output.write_all_async(data, 0, null, null);
+ SourceFuncWrapper? sfw = queue.pop_head();
+ if (sfw != null) {
+ sfw.sfun();
+ }
+ } catch (GLib.Error e) {
throw new XmlError.IO_ERROR(@"IOError in GLib: $(e.message)");
+ } finally {
+ running = false;
}
}
}
+
+public class SourceFuncWrapper : Object {
+
+ public SourceFunc sfun;
+
+ public SourceFuncWrapper(owned SourceFunc sfun) {
+ this.sfun = (owned)sfun;
+ }
+}
+
}
diff --git a/xmpp-vala/src/core/xmpp_stream.vala b/xmpp-vala/src/core/xmpp_stream.vala
index fc4e7fd7..ea186a72 100644
--- a/xmpp-vala/src/core/xmpp_stream.vala
+++ b/xmpp-vala/src/core/xmpp_stream.vala
@@ -49,7 +49,7 @@ public class XmppStream {
int min_priority = -1;
ConnectionProvider? best_provider = null;
foreach (ConnectionProvider connection_provider in connection_providers) {
- int? priority = connection_provider.get_priority(remote_name);
+ int? priority = yield connection_provider.get_priority(remote_name);
if (priority != null && (priority < min_priority || min_priority == -1)) {
min_priority = priority;
best_provider = connection_provider;
@@ -57,9 +57,9 @@ public class XmppStream {
}
IOStream? stream = null;
if (best_provider != null) {
- stream = best_provider.connect(this);
+ stream = yield best_provider.connect(this);
} else {
- stream = (new SocketClient()).connect(new NetworkService("xmpp-client", "tcp", this.remote_name));
+ stream = yield (new SocketClient()).connect_async(new NetworkService("xmpp-client", "tcp", this.remote_name));
}
if (stream == null) throw new IOStreamError.CONNECT("client.connect() returned null");
reset_stream((!)stream);
@@ -108,12 +108,16 @@ public class XmppStream {
}
}
- public void write(StanzaNode node) throws IOStreamError {
+ public void write(StanzaNode node) {
+ write_async.begin(node);
+ }
+
+ public async void write_async(StanzaNode node) throws IOStreamError {
StanzaWriter? writer = this.writer;
if (writer == null) throw new IOStreamError.WRITE("trying to write, but no stream open");
try {
log.node("OUT", node);
- ((!)writer).write_node(node);
+ yield ((!)writer).write_node(node);
} catch (XmlError e) {
throw new IOStreamError.WRITE(e.message);
}
@@ -342,19 +346,19 @@ public abstract class XmppStreamNegotiationModule : XmppStreamModule {
}
public abstract class ConnectionProvider {
- public abstract int? get_priority(string remote_name);
- public abstract IOStream? connect(XmppStream stream);
+ public async abstract int? get_priority(string remote_name);
+ public async abstract IOStream? connect(XmppStream stream);
public abstract string get_id();
}
public class StartTlsConnectionProvider : ConnectionProvider {
private SrvTarget? srv_target;
- public override int? get_priority(string remote_name) {
+ public async override int? get_priority(string remote_name) {
GLib.List<SrvTarget>? xmpp_target = null;
try {
- Resolver resolver = Resolver.get_default();
- xmpp_target = resolver.lookup_service("xmpp-client", "tcp", remote_name, null);
+ GLibFixes.Resolver resolver = GLibFixes.Resolver.get_default();
+ xmpp_target = yield resolver.lookup_service_async("xmpp-client", "tcp", remote_name, null);
} catch (Error e) {
return null;
}
@@ -363,10 +367,10 @@ public class StartTlsConnectionProvider : ConnectionProvider {
return xmpp_target.nth(0).data.get_priority();
}
- public override IOStream? connect(XmppStream stream) {
+ public async override IOStream? connect(XmppStream stream) {
try {
SocketClient client = new SocketClient();
- return client.connect_to_host(srv_target.get_hostname(), srv_target.get_port());
+ return yield client.connect_to_host_async(srv_target.get_hostname(), srv_target.get_port());
} catch (Error e) {
return null;
}
diff --git a/xmpp-vala/src/glib_fixes.vapi b/xmpp-vala/src/glib_fixes.vapi
new file mode 100644
index 00000000..9d31fba1
--- /dev/null
+++ b/xmpp-vala/src/glib_fixes.vapi
@@ -0,0 +1,42 @@
+[CCode (cprefix = "G", gir_namespace = "Gio", gir_version = "2.0", lower_case_cprefix = "g_")]
+namespace GLibFixes {
+
+ [CCode (cheader_filename = "gio/gio.h", type_id = "g_resolver_get_type ()")]
+ public class Resolver : GLib.Object {
+ [CCode (has_construct_function = false)]
+ protected Resolver();
+
+ [Version (since = "2.22")]
+ public static Resolver get_default();
+
+ [Version (since = "2.22")]
+ public virtual string lookup_by_address(GLib.InetAddress address, GLib.Cancellable? cancellable = null) throws GLib.Error ;
+
+ [Version (since = "2.22")]
+ public virtual async string lookup_by_address_async(GLib.InetAddress address, GLib.Cancellable? cancellable = null) throws GLib.Error ;
+
+ [Version (since = "2.22")]
+ public virtual GLib.List<GLib.InetAddress> lookup_by_name(string hostname, GLib.Cancellable? cancellable = null) throws GLib.Error ;
+
+ [Version (since = "2.22")]
+ public virtual async GLib.List<GLib.InetAddress> lookup_by_name_async(string hostname, GLib.Cancellable? cancellable = null) throws GLib.Error ;
+
+ [Version (since = "2.34")]
+ public virtual GLib.List<GLib.Variant> lookup_records(string rrname, GLib.ResolverRecordType record_type, GLib.Cancellable? cancellable = null) throws GLib.Error ;
+
+ [Version (since = "2.34")]
+ public virtual async GLib.List<GLib.Variant> lookup_records_async(string rrname, GLib.ResolverRecordType record_type, GLib.Cancellable? cancellable = null) throws GLib.Error ;
+
+ [Version (since = "2.22")]
+ public virtual GLib.List<GLib.SrvTarget> lookup_service(string service, string protocol, string domain, GLib.Cancellable? cancellable = null) throws GLib.Error ;
+
+ [CCode (finish_vfunc_name = "lookup_service_finish", vfunc_name = "lookup_service_async")]
+ public async GLib.List<GLib.SrvTarget> lookup_service_async (string service, string protocol, string domain, GLib.Cancellable? cancellable = null) throws GLib.Error;
+
+ [Version (since = "2.22")]
+ public void set_default();
+
+ public virtual signal void reload ();
+ }
+
+}
diff --git a/xmpp-vala/src/module/iq/module.vala b/xmpp-vala/src/module/iq/module.vala
index c19cc2e5..7a2425b4 100644
--- a/xmpp-vala/src/module/iq/module.vala
+++ b/xmpp-vala/src/module/iq/module.vala
@@ -13,11 +13,7 @@ namespace Xmpp.Iq {
public delegate void OnResult(XmppStream stream, Iq.Stanza iq);
public void send_iq(XmppStream stream, Iq.Stanza iq, owned OnResult? listener = null) {
- try {
- stream.write(iq.stanza);
- } catch (IOStreamError e) {
- print(@"$(e.message)\n");
- }
+ stream.write(iq.stanza);
if (listener != null) {
responseListeners[iq.id] = new ResponseListener((owned) listener);
}
diff --git a/xmpp-vala/src/module/tls.vala b/xmpp-vala/src/module/tls.vala
index dd4fd82d..dcd7ab40 100644
--- a/xmpp-vala/src/module/tls.vala
+++ b/xmpp-vala/src/module/tls.vala
@@ -53,11 +53,7 @@ namespace Xmpp.Tls {
server_requires_tls = true;
}
if (server_requires_tls || require) {
- try {
- stream.write(new StanzaNode.build("starttls", NS_URI).add_self_xmlns());
- } catch (IOStreamError e) {
- stderr.printf("Failed to request TLS: %s\n", e.message);
- }
+ stream.write(new StanzaNode.build("starttls", NS_URI).add_self_xmlns());
}
if (identity == null) {
identity = new NetworkService("xmpp-client", "tcp", stream.remote_name);
diff --git a/xmpp-vala/src/module/util.vala b/xmpp-vala/src/module/util.vala
index 812b09ad..e42c4768 100644
--- a/xmpp-vala/src/module/util.vala
+++ b/xmpp-vala/src/module/util.vala
@@ -48,16 +48,6 @@ public class StanzaListenerHolder<T> : Object {
}
}
- private Gee.List<StanzaListener<T>> set_minus(Gee.List<StanzaListener<T>> main_set, Gee.List<StanzaListener<T>> minus) {
- Gee.List<StanzaListener<T>> res = new ArrayList<StanzaListener<T>>();
- foreach (StanzaListener<T> l in main_set) {
- if (!minus.contains(l)) {
- res.add(l);
- }
- }
- return res;
- }
-
private bool set_contains_action(Gee.List<StanzaListener<T>> s, string[] actions) {
foreach(StanzaListener<T> l in s) {
if (l.action_group in actions) {
@@ -69,16 +59,23 @@ public class StanzaListenerHolder<T> : Object {
private void resort_list() {
ArrayList<StanzaListener<T>> new_list = new ArrayList<StanzaListener<T>>();
- while (listeners.size > new_list.size) {
+ ArrayList<StanzaListener<T>> remaining = new ArrayList<StanzaListener<T>>();
+ remaining.add_all(listeners);
+ while (remaining.size > 0) {
bool changed = false;
- foreach (StanzaListener<T> l in listeners) {
- Gee.List<StanzaListener<T>> remaining = set_minus(listeners, new_list);
+ Gee.Iterator<StanzaListener<T>> iter = remaining.iterator();
+ while (iter.has_next()) {
+ if (!iter.valid) {
+ iter.next();
+ }
+ StanzaListener<T> l = iter.get();
if (!set_contains_action(remaining, l.after_actions)) {
new_list.add(l);
+ iter.remove();
changed = true;
}
}
- if (!changed) warning("Can't sort listeners");
+ if (!changed) error("Can't sort listeners");
}
listeners = new_list;
}
diff --git a/xmpp-vala/src/module/xep/0368_srv_records_tls.vala b/xmpp-vala/src/module/xep/0368_srv_records_tls.vala
index 4c24c63e..4d34e750 100644
--- a/xmpp-vala/src/module/xep/0368_srv_records_tls.vala
+++ b/xmpp-vala/src/module/xep/0368_srv_records_tls.vala
@@ -22,11 +22,11 @@ public class Module : XmppStreamNegotiationModule {
public class TlsConnectionProvider : ConnectionProvider {
private SrvTarget? srv_target;
- public override int? get_priority(string remote_name) {
+ public async override int? get_priority(string remote_name) {
GLib.List<SrvTarget>? xmpp_target = null;
try {
- Resolver resolver = Resolver.get_default();
- xmpp_target = resolver.lookup_service("xmpps-client", "tcp", remote_name, null);
+ GLibFixes.Resolver resolver = GLibFixes.Resolver.get_default();
+ xmpp_target = yield resolver.lookup_service_async("xmpps-client", "tcp", remote_name, null);
} catch (Error e) {
return null;
}
@@ -35,10 +35,10 @@ public class TlsConnectionProvider : ConnectionProvider {
return xmpp_target.nth(0).data.get_priority();
}
- public override IOStream? connect(XmppStream stream) {
+ public async override IOStream? connect(XmppStream stream) {
SocketClient client = new SocketClient();
try {
- IOStream? io_stream = client.connect_to_host(srv_target.get_hostname(), srv_target.get_port());
+ IOStream? io_stream = yield client.connect_to_host_async(srv_target.get_hostname(), srv_target.get_port());
io_stream = TlsClientConnection.new(io_stream, new NetworkAddress(srv_target.get_hostname(), srv_target.get_port()));
stream.add_flag(new Tls.Flag() { finished=true });
return io_stream;