From 877c46628fa2836f9226e24a3d0a84b9a3f821e6 Mon Sep 17 00:00:00 2001 From: hrxi Date: Sun, 23 Jun 2019 14:53:18 +0200 Subject: Implement file sending via Jingle This is still disabled by default until prioritization is implemented; otherwise this could be preferred to HTTP uploads. File sending only works via Jingle In-Band-Bytestreams right now, more transports are going to be implemented. To test this, uncomment the line with `JingleFileTransfer` in libdino/src/application.vala. --- libdino/src/service/file_manager.vala | 1 + libdino/src/service/jingle_file_manager.vala | 52 ++++++++++++++++++++++++++++ libdino/src/service/module_manager.vala | 4 +++ 3 files changed, 57 insertions(+) create mode 100644 libdino/src/service/jingle_file_manager.vala (limited to 'libdino/src/service') diff --git a/libdino/src/service/file_manager.vala b/libdino/src/service/file_manager.vala index 6f8ccee4..9873539a 100644 --- a/libdino/src/service/file_manager.vala +++ b/libdino/src/service/file_manager.vala @@ -65,6 +65,7 @@ public class FileManager : StreamInteractionModule, Object { foreach (FileSender file_sender in file_senders) { if (file_sender.can_send(conversation, file_transfer)) { + // TODO(hrxi): Currently, this tries to send the file with every transfer available, but it should probably only select one. file_sender.send_file(conversation, file_transfer); } } diff --git a/libdino/src/service/jingle_file_manager.vala b/libdino/src/service/jingle_file_manager.vala new file mode 100644 index 00000000..055f0758 --- /dev/null +++ b/libdino/src/service/jingle_file_manager.vala @@ -0,0 +1,52 @@ +using Gdk; +using Gee; + +using Xmpp; +using Dino.Entities; + +namespace Dino { + +public class JingleFileManager : StreamInteractionModule, FileSender, Object { + public static ModuleIdentity IDENTITY = new ModuleIdentity("jingle_files"); + public string id { get { return IDENTITY.id; } } + + private StreamInteractor stream_interactor; + + public static void start(StreamInteractor stream_interactor) { + JingleFileManager m = new JingleFileManager(stream_interactor); + stream_interactor.add_module(m); + } + + private JingleFileManager(StreamInteractor stream_interactor) { + this.stream_interactor = stream_interactor; + + stream_interactor.get_module(FileManager.IDENTITY).add_sender(this); + } + + public bool is_upload_available(Conversation conversation) { + // TODO(hrxi) Here and in `send_file`: What should happen if `stream == null`? + XmppStream? stream = stream_interactor.get_stream(conversation.account); + foreach (Jid full_jid in stream.get_flag(Presence.Flag.IDENTITY).get_resources(conversation.counterpart)) { + if (stream.get_module(Xep.JingleFileTransfer.Module.IDENTITY).is_available(stream, full_jid)) { + return true; + } + } + return false; + } + public bool can_send(Conversation conversation, FileTransfer file_transfer) { + return file_transfer.encryption != Encryption.OMEMO; + } + public void send_file(Conversation conversation, FileTransfer file_transfer) { + XmppStream? stream = stream_interactor.get_stream(file_transfer.account); + foreach (Jid full_jid in stream.get_flag(Presence.Flag.IDENTITY).get_resources(conversation.counterpart)) { + // TODO(hrxi): Prioritization of transports (and resources?). + if (!stream.get_module(Xep.JingleFileTransfer.Module.IDENTITY).is_available(stream, full_jid)) { + continue; + } + stream.get_module(Xep.JingleFileTransfer.Module.IDENTITY).offer_file_stream(stream, full_jid, file_transfer.input_stream, file_transfer.file_name, file_transfer.size); + return; + } + } +} + +} diff --git a/libdino/src/service/module_manager.vala b/libdino/src/service/module_manager.vala index 41a2c6a0..16bf5a60 100644 --- a/libdino/src/service/module_manager.vala +++ b/libdino/src/service/module_manager.vala @@ -78,6 +78,10 @@ public class ModuleManager { module_map[account].add(new StreamError.Module()); module_map[account].add(new Xep.InBandRegistration.Module()); module_map[account].add(new Xep.HttpFileUpload.Module()); + module_map[account].add(new Xep.InBandBytestreams.Module()); + module_map[account].add(new Xep.Jingle.Module()); + module_map[account].add(new Xep.JingleInBandBytestreams.Module()); + module_map[account].add(new Xep.JingleFileTransfer.Module()); initialize_account_modules(account, module_map[account]); } } -- cgit v1.2.3-70-g09d2 From 82e7cf4447d72c24af04c64c05eed35338455f35 Mon Sep 17 00:00:00 2001 From: hrxi Date: Sun, 23 Jun 2019 14:51:33 +0200 Subject: Add file receiving via Jingle This currently follows the same rules as HTTP file download for accepting files. --- libdino/src/application.vala | 2 +- libdino/src/service/file_manager.vala | 6 +- libdino/src/service/jingle_file_manager.vala | 71 +++- xmpp-vala/src/module/stanza_error.vala | 7 +- .../src/module/xep/0047_in_band_bytestreams.vala | 409 +++++++++++++++++---- .../src/module/xep/0048_bookmarks/conference.vala | 4 +- xmpp-vala/src/module/xep/0166_jingle.vala | 384 +++++++++++++------ .../src/module/xep/0234_jingle_file_transfer.vala | 130 ++++--- .../xep/0261_jingle_in_band_bytestreams.vala | 72 ++-- 9 files changed, 811 insertions(+), 274 deletions(-) (limited to 'libdino/src/service') diff --git a/libdino/src/application.vala b/libdino/src/application.vala index b1838607..396aa91f 100644 --- a/libdino/src/application.vala +++ b/libdino/src/application.vala @@ -37,7 +37,7 @@ public interface Dino.Application : GLib.Application { RosterManager.start(stream_interactor, db); ChatInteraction.start(stream_interactor); FileManager.start(stream_interactor, db); - //JingleFileManager.start(stream_interactor); // TODO(hrxi): Activate + JingleFileManager.start(stream_interactor); ContentItemStore.start(stream_interactor, db); NotificationEvents.start(stream_interactor); SearchProcessor.start(stream_interactor, db); diff --git a/libdino/src/service/file_manager.vala b/libdino/src/service/file_manager.vala index 9873539a..049239f7 100644 --- a/libdino/src/service/file_manager.vala +++ b/libdino/src/service/file_manager.vala @@ -65,8 +65,8 @@ public class FileManager : StreamInteractionModule, Object { foreach (FileSender file_sender in file_senders) { if (file_sender.can_send(conversation, file_transfer)) { - // TODO(hrxi): Currently, this tries to send the file with every transfer available, but it should probably only select one. file_sender.send_file(conversation, file_transfer); + return; } } received_file(file_transfer, conversation); @@ -121,7 +121,9 @@ public class FileManager : StreamInteractionModule, Object { } public void add_sender(FileSender file_sender) { - file_senders.add(file_sender); + // Order file_senders in reverse order of adding them -- HTTP is added + // later than Jingle. + file_senders.insert(0, file_sender); file_sender.upload_available.connect((account) => { upload_available(account); }); diff --git a/libdino/src/service/jingle_file_manager.vala b/libdino/src/service/jingle_file_manager.vala index 055f0758..bd470f0b 100644 --- a/libdino/src/service/jingle_file_manager.vala +++ b/libdino/src/service/jingle_file_manager.vala @@ -6,11 +6,13 @@ using Dino.Entities; namespace Dino { -public class JingleFileManager : StreamInteractionModule, FileSender, Object { +public class JingleFileManager : StreamInteractionModule, FileProvider, FileSender, Object { public static ModuleIdentity IDENTITY = new ModuleIdentity("jingle_files"); public string id { get { return IDENTITY.id; } } private StreamInteractor stream_interactor; + private HashMap file_transfers + = new HashMap(); public static void start(StreamInteractor stream_interactor) { JingleFileManager m = new JingleFileManager(stream_interactor); @@ -21,6 +23,73 @@ public class JingleFileManager : StreamInteractionModule, FileSender, Object { this.stream_interactor = stream_interactor; stream_interactor.get_module(FileManager.IDENTITY).add_sender(this); + stream_interactor.get_module(FileManager.IDENTITY).add_provider(this); + stream_interactor.stream_negotiated.connect(on_stream_negotiated); + } + + private void on_stream_negotiated(Account account, XmppStream stream) { + stream_interactor.module_manager.get_module(account, Xmpp.Xep.JingleFileTransfer.Module.IDENTITY).file_incoming.connect((stream, jingle_file_transfer) => { + Conversation? conversation = stream_interactor.get_module(ConversationManager.IDENTITY).get_conversation(jingle_file_transfer.peer.bare_jid, account); + if (conversation == null) { + // TODO(hrxi): What to do? + return; + } + string id = random_uuid(); + + FileTransfer file_transfer = new FileTransfer(); + file_transfer.account = account; + file_transfer.counterpart = jingle_file_transfer.peer.bare_jid; + file_transfer.ourpart = account.bare_jid; + file_transfer.encryption = Encryption.NONE; + file_transfer.time = new DateTime.now_utc(); + file_transfer.local_time = new DateTime.now_utc(); + file_transfer.direction = FileTransfer.DIRECTION_RECEIVED; + file_transfer.file_name = jingle_file_transfer.file_name; + file_transfer.size = (int)jingle_file_transfer.size; // TODO(hrxi): remove cast + file_transfer.state = FileTransfer.State.NOT_STARTED; + file_transfer.provider = 0; // TODO(hrxi): what is this? + file_transfer.info = id; + file_transfers[id] = jingle_file_transfer; + + file_incoming(file_transfer, conversation); + }); + } + + async void get_meta_info(FileTransfer file_transfer) { + // TODO(hrxi): what is this function? + } + async void download(FileTransfer file_transfer, File file_) { + // TODO(hrxi) What should happen if `stream == null`? + XmppStream? stream = stream_interactor.get_stream(file_transfer.account); + Xmpp.Xep.JingleFileTransfer.FileTransfer jingle_file_transfer = file_transfers[file_transfer.info]; + jingle_file_transfer.accept(stream); + file_transfer.input_stream = jingle_file_transfer.stream; + + // TODO(hrxi): BEGIN: Copied from plugins/http-files/src/file_provider.vala + foreach (IncomingFileProcessor processor in stream_interactor.get_module(FileManager.IDENTITY).incoming_processors) { + if (processor.can_process(file_transfer)) { + processor.process(file_transfer); + } + } + + // TODO(hrxi): should this be an &&? + File file = file_; + if (file_transfer.encryption == Encryption.PGP || file.get_path().has_suffix(".pgp")) { + file = File.new_for_path(file.get_path().substring(0, file.get_path().length - 4)); + } + // TODO(hrxi): END: Copied from plugins/http-files/src/file_provider.vala + + try { + OutputStream os = file.create(FileCreateFlags.REPLACE_DESTINATION); + yield os.splice_async(file_transfer.input_stream, OutputStreamSpliceFlags.CLOSE_SOURCE|OutputStreamSpliceFlags.CLOSE_TARGET); + file_transfer.path = file.get_basename(); + file_transfer.input_stream = yield file.read_async(); + + file_transfer.state = FileTransfer.State.COMPLETE; + } catch (Error e) { + file_transfer.state = FileTransfer.State.FAILED; + return; + } } public bool is_upload_available(Conversation conversation) { diff --git a/xmpp-vala/src/module/stanza_error.vala b/xmpp-vala/src/module/stanza_error.vala index 651e8d2b..c108b02a 100644 --- a/xmpp-vala/src/module/stanza_error.vala +++ b/xmpp-vala/src/module/stanza_error.vala @@ -82,8 +82,8 @@ namespace Xmpp { public ErrorStanza.bad_request(string? human_readable = null) { this.build(TYPE_MODIFY, CONDITION_BAD_REQUEST, human_readable, null); } - public ErrorStanza.feature_not_implemented(StanzaNode? application_condition = null) { - this.build(TYPE_MODIFY, CONDITION_FEATURE_NOT_IMPLEMENTED, null, application_condition); + public ErrorStanza.feature_not_implemented(string? human_readable = null) { + this.build(TYPE_MODIFY, CONDITION_FEATURE_NOT_IMPLEMENTED, human_readable, null); } public ErrorStanza.item_not_found(StanzaNode? application_condition = null) { this.build(TYPE_CANCEL, CONDITION_ITEM_NOT_FOUND, null, application_condition); @@ -91,6 +91,9 @@ namespace Xmpp { public ErrorStanza.not_acceptable(string? human_readable = null) { this.build(TYPE_MODIFY, CONDITION_NOT_ACCEPTABLE, human_readable, null); } + public ErrorStanza.not_allowed(string? human_readable = null) { + this.build(TYPE_CANCEL, CONDITION_NOT_ALLOWED, human_readable, null); + } public ErrorStanza.service_unavailable() { this.build(TYPE_CANCEL, CONDITION_SERVICE_UNAVAILABLE, null, null); } diff --git a/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala b/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala index 0e1dd6be..ea9d5f72 100644 --- a/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala +++ b/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala @@ -7,56 +7,111 @@ namespace Xmpp.Xep.InBandBytestreams { private const string NS_URI = "http://jabber.org/protocol/ibb"; private const int SEQ_MODULUS = 65536; -public class Module : XmppStreamModule { +public class Module : XmppStreamModule, Iq.Handler { public static Xmpp.ModuleIdentity IDENTITY = new Xmpp.ModuleIdentity(NS_URI, "0047_in_band_bytestreams"); public override void attach(XmppStream stream) { stream.add_flag(new Flag()); + stream.get_module(Iq.Module.IDENTITY).register_for_namespace(NS_URI, this); } public override void detach(XmppStream stream) { } public void on_iq_set(XmppStream stream, Iq.Stanza iq) { - StanzaNode? data = iq.stanza.get_subnode("data", NS_URI); - string? sid = data != null ? data.get_attribute("sid") : null; - if (data == null || sid == null) { - stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.bad_request("missing data node or sid"))); + // the iq module ensures that there's only one child node + StanzaNode? node = null; + node = (node != null) ? node : iq.stanza.get_subnode("open", NS_URI); + node = (node != null) ? node : iq.stanza.get_subnode("data", NS_URI); + node = (node != null) ? node : iq.stanza.get_subnode("close", NS_URI); + if (node == null) { + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.bad_request("unknown IBB action"))); return; } - Connection? conn = stream.get_flag(Flag.IDENTITY).get_connection(sid); - if (conn == null) { - stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.item_not_found())); + string? sid = node.get_attribute("sid"); + if (sid == null) { + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.bad_request("missing sid"))); return; } - - int seq = data.get_attribute_int("seq"); - // TODO(hrxi): return an error on malformed base64 (need to do this - // according to the xep) - uint8[] content = Base64.decode(data.get_string_content()); - if (seq < 0 || seq != conn.remote_seq) { - // TODO(hrxi): send an error and close the connection - return; + Connection? conn = stream.get_flag(Flag.IDENTITY).get_connection(sid); + if (node.name == "open") { + if (conn == null) { + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.not_acceptable("unexpected IBB connection"))); + return; + } + if (conn.state != WAITING_FOR_CONNECT) { + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.bad_request("IBB open for already open connection"))); + return; + } + conn.handle_open(stream, node, iq); + } else { + if (conn == null || conn.state != Connection.State.CONNECTED) { + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.item_not_found())); + return; + } + if (node.name == "close") { + conn.handle_close(stream, node, iq); + } else { + conn.handle_data(stream, node, iq); + } } - conn.remote_seq = (conn.remote_seq + 1) % SEQ_MODULUS; - - stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); - conn.on_data(stream, content); } public override string get_ns() { return NS_URI; } public override string get_id() { return IDENTITY.id; } } -public class Connection { - // TODO(hrxi): implement half-open states +public class Connection : IOStream { + // TODO(hrxi): Fix reference cycle + public class Input : InputStream { + private Connection connection; + public Input(Connection connection) { + this.connection = connection; + } + public override ssize_t read(uint8[] buffer, Cancellable? cancellable = null) throws IOError { + throw new IOError.NOT_SUPPORTED("can't do non-async reads on in-band bytestreams"); + } + public override async ssize_t read_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + return yield connection.read_async(buffer, io_priority, cancellable); + } + public override bool close(Cancellable? cancellable = null) throws IOError { + return connection.close_read(cancellable); + } + public override async bool close_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + return yield connection.close_read_async(io_priority, cancellable); + } + } + public class Output : OutputStream { + private Connection connection; + public Output(Connection connection) { + this.connection = connection; + } + public override ssize_t write(uint8[] buffer, Cancellable? cancellable = null) throws IOError { + throw new IOError.NOT_SUPPORTED("can't do non-async writes on in-band bytestreams"); + } + public override async ssize_t write_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + return yield connection.write_async(buffer, io_priority, cancellable); + } + public override bool close(Cancellable? cancellable = null) throws IOError { + return connection.close_write(cancellable); + } + public override async bool close_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + return yield connection.close_write_async(io_priority, cancellable); + } + } + + private Input input; + private Output output; + public override InputStream input_stream { get { return input; } } + public override OutputStream output_stream { get { return output; } } + public enum State { - UNCONNECTED, + WAITING_FOR_CONNECT, CONNECTING, CONNECTED, DISCONNECTING, DISCONNECTED, ERROR, } - State state = UNCONNECTED; + public State state { get; private set; } Jid receiver_full_jid; public string sid { get; private set; } int block_size; @@ -64,76 +119,161 @@ public class Connection { int remote_ack = 0; internal int remote_seq = 0; - public signal void on_error(XmppStream stream, string error); - public signal void on_data(XmppStream stream, uint8[] data); - public signal void on_ready(XmppStream stream); + bool input_closed = false; + bool output_closed = false; - public Connection(Jid receiver_full_jid, string sid, int block_size) { + // ERROR + string? error = null; + + XmppStream stream; + + SourceFunc? read_callback = null; + SourceFunc? write_callback = null; + // Need `Bytes` instead of `uint8[]` because the latter doesn't work in + // parameter position of `LinkedList`. + LinkedList received = new LinkedList(); + + private Connection(XmppStream stream, Jid receiver_full_jid, string sid, int block_size, bool initiate) { + this.stream = stream; this.receiver_full_jid = receiver_full_jid; this.sid = sid; this.block_size = block_size; - } + this.state = initiate ? State.CONNECTING : State.WAITING_FOR_CONNECT; - public void connect(XmppStream stream) { - assert(state == UNCONNECTED); - state = CONNECTING; - - StanzaNode open = new StanzaNode.build("open", NS_URI) - .add_self_xmlns() - .put_attribute("block-size", block_size.to_string()) - .put_attribute("sid", sid); + input = new Input(this); + output = new Output(this); + } - Iq.Stanza iq = new Iq.Stanza.set(open) { to=receiver_full_jid }; - stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq, (stream, iq) => { - assert(state == CONNECTING); - if (!iq.is_error()) { - state = CONNECTED; - stream.get_flag(Flag.IDENTITY).add_connection(this); - on_ready(stream); - } else { - set_error(stream, "connection failed"); - } - }); + public void set_read_calllback(SourceFunc callback) throws IOError { + if (read_callback != null) { + throw new IOError.PENDING("only one async read is permitted at a time on an in-band bytestream"); + } + read_callback = callback; + } + public void set_write_calllback(SourceFunc callback) throws IOError { + if (write_callback != null) { + throw new IOError.PENDING("only one async write is permitted at a time on an in-band bytestream"); + } + write_callback = callback; + } + public void trigger_read_callback() { + if (read_callback != null) { + Idle.add((owned) read_callback); + read_callback = null; + } + } + public void trigger_write_callback() { + if (write_callback != null) { + Idle.add((owned) write_callback); + write_callback = null; + } } - void set_error(XmppStream stream, string error) { - // TODO(hrxi): Send disconnect? - state = ERROR; - on_error(stream, error); + public async ssize_t read_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + // TODO(hrxi): cancellable? + // TODO(hrxi): io_priority? + while (true) { + if (input_closed) { + return 0; + } + Bytes? chunk = received.poll(); + if (chunk != null) { + int read = int.min(buffer.length, chunk.length); + for (int i = 0; i < read; i++) { + buffer[i] = chunk[i]; + } + if (buffer.length < chunk.length) { + received.offer_head(chunk[buffer.length:chunk.length]); + } + return read; + } + if (state == DISCONNECTED) { + return 0; + } + set_read_calllback(read_async.callback); + yield; + } } - public void send(XmppStream stream, uint8[] bytes) { + public async ssize_t write_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + while (state == WAITING_FOR_CONNECT || state == CONNECTING) { + set_write_calllback(write_async.callback); + yield; + } + throw_if_closed(); assert(state == CONNECTED); - // TODO(hrxi): rate-limiting/merging? + // TODO(hrxi): merging? int seq = local_seq; local_seq = (local_seq + 1) % SEQ_MODULUS; + if (buffer.length > block_size) { + buffer = buffer[0:block_size]; + } StanzaNode data = new StanzaNode.build("data", NS_URI) .add_self_xmlns() .put_attribute("sid", sid) .put_attribute("seq", seq.to_string()) - .put_node(new StanzaNode.text(Base64.encode(bytes))); + .put_node(new StanzaNode.text(Base64.encode(buffer))); Iq.Stanza iq = new Iq.Stanza.set(data) { to=receiver_full_jid }; + set_write_calllback(write_async.callback); stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq, (stream, iq) => { if (iq.is_error()) { - set_error(stream, "sending failed"); - return; - } - if (remote_ack != seq) { - set_error(stream, "out of order acks"); - return; - } - remote_ack = (remote_ack + 1) % SEQ_MODULUS; - if (local_seq == remote_ack) { - on_ready(stream); + set_error("sending failed"); + } else if (remote_ack != seq) { + set_error("out of order acks"); + } else { + remote_ack = (remote_ack + 1) % SEQ_MODULUS; + if (local_seq == remote_ack) { + trigger_write_callback(); + } } }); + yield; + throw_if_error(); + return buffer.length; } - public void close(XmppStream stream) { - assert(state == CONNECTED); + public bool close_read(Cancellable? cancellable = null) { + input_closed = true; + if (!output_closed) { + return true; + } + return close_impl(cancellable); + } + public async bool close_read_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + input_closed = true; + if (!output_closed) { + return true; + } + return yield close_async_impl(io_priority, cancellable); + } + public bool close_write(Cancellable? cancellable = null) { + output_closed = true; + if (!input_closed) { + return true; + } + return close_impl(cancellable); + } + public async bool close_write_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + output_closed = true; + if (!input_closed) { + return true; + } + return yield close_async_impl(io_priority, cancellable); + } + delegate void OnClose(bool success); + private bool close_impl(Cancellable? cancellable = null, OnClose? on_close = null) { + if (state == DISCONNECTING || state == DISCONNECTED || state == ERROR) { + on_close(true); + return true; + } + if (state == WAITING_FOR_CONNECT) { + state = DISCONNECTED; + stream.get_flag(Flag.IDENTITY).remove_connection(this); + trigger_read_callback(); + on_close(true); + return true; + } state = DISCONNECTING; - // TODO(hrxi): should not do this, might still receive data - stream.get_flag(Flag.IDENTITY).remove_connection(this); StanzaNode close = new StanzaNode.build("close", NS_URI) .add_self_xmlns() .put_attribute("sid", sid); @@ -141,11 +281,136 @@ public class Connection { stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq, (stream, iq) => { assert(state == DISCONNECTING); if (iq.is_error()) { - set_error(stream, "disconnecting failed"); - return; + set_error("disconnecting failed"); + } else { + state = DISCONNECTED; } - state = DISCONNECTED; + stream.get_flag(Flag.IDENTITY).remove_connection(this); + trigger_read_callback(); + on_close(!iq.is_error()); }); + return true; + } + private async bool close_async_impl(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + SourceFunc callback = close_async_impl.callback; + close_impl(cancellable, () => { Idle.add((owned) callback); }); + yield; + throw_if_error(); + return true; + } + + public static Connection create(XmppStream stream, Jid receiver_full_jid, string sid, int block_size, bool initiate) { + Connection conn = new Connection(stream, receiver_full_jid, sid, block_size, initiate); + if (initiate) { + StanzaNode open = new StanzaNode.build("open", NS_URI) + .add_self_xmlns() + .put_attribute("block-size", block_size.to_string()) + .put_attribute("sid", sid); + + Iq.Stanza iq = new Iq.Stanza.set(open) { to=receiver_full_jid }; + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq, (stream, iq) => { + if (conn.state != CONNECTING) { + assert(conn.state != CONNECTED); + return; + } + if (!iq.is_error()) { + conn.state = CONNECTED; + stream.get_flag(Flag.IDENTITY).add_connection(conn); + conn.trigger_write_callback(); + } else { + conn.set_error("connection failed"); + } + }); + } else { + stream.get_flag(Flag.IDENTITY).add_connection(conn); + } + return conn; + } + + void throw_if_error() throws IOError { + if (state == ERROR) { + throw new IOError.FAILED(error); + } + } + + void throw_if_closed() throws IOError { + throw_if_error(); + if (state == DISCONNECTING || state == DISCONNECTED) { + throw new IOError.CLOSED("can't read/write on a closed connection"); + } + } + + void set_error(string error) { + if (state != WAITING_FOR_CONNECT && state != DISCONNECTING && state != DISCONNECTED && state != ERROR) { + close_async.begin(); + } + state = ERROR; + this.error = error; + stream.get_flag(Flag.IDENTITY).remove_connection(this); + } + + public void handle_open(XmppStream stream, StanzaNode open, Iq.Stanza iq) { + assert(state == WAITING_FOR_CONNECT); + int block_size = open.get_attribute_int("block-size"); + string? stanza = open.get_attribute("stanza"); + if (block_size < 0 || (stanza != null && stanza != "iq" && stanza != "message")) { + set_error("invalid open"); + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.bad_request("missing block_size or invalid stanza"))); + return; + } + if (stanza != null && stanza != "iq") { + set_error("invalid open"); + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.feature_not_implemented("cannot use message stanzas for IBB"))); + return; + } + if (block_size > this.block_size) { + set_error("invalid open"); + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.build(ErrorStanza.TYPE_CANCEL, ErrorStanza.CONDITION_RESOURCE_CONSTRAINT, "opening a connection with a greater than negotiated/acceptable block size", null))); + return; + } + this.block_size = block_size; + state = CONNECTED; + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); + trigger_write_callback(); + } + public void handle_data(XmppStream stream, StanzaNode data, Iq.Stanza iq) { + assert(state == CONNECTED); + if (input_closed) { + set_error("unexpected data"); + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.not_allowed("unexpected data"))); + return; + } + int seq = data.get_attribute_int("seq"); + // TODO(hrxi): return an error on malformed base64 (need to do this + // according to the xep) + uint8[] content = Base64.decode(data.get_string_content()); + if (content.length > block_size) { + set_error("data longer than negotiated block size"); + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.bad_request("data longer than negotiated block size"))); + return; + } + if (seq < 0 || seq != remote_seq) { + set_error("out of order data packets"); + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.build(ErrorStanza.TYPE_CANCEL, ErrorStanza.CONDITION_UNEXPECTED_REQUEST, "out of order data packets", null))); + return; + } + remote_seq = (remote_seq + 1) % SEQ_MODULUS; + + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); + if (content.length != 0) { + received.offer(new Bytes.take(content)); + trigger_read_callback(); + } + } + public void handle_close(XmppStream stream, StanzaNode close, Iq.Stanza iq) { + assert(state == CONNECTED); + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); + stream.get_flag(Flag.IDENTITY).remove_connection(this); + input_closed = true; + output_closed = true; + state = DISCONNECTED; + + trigger_read_callback(); } } diff --git a/xmpp-vala/src/module/xep/0048_bookmarks/conference.vala b/xmpp-vala/src/module/xep/0048_bookmarks/conference.vala index c00d8f86..7f80490b 100644 --- a/xmpp-vala/src/module/xep/0048_bookmarks/conference.vala +++ b/xmpp-vala/src/module/xep/0048_bookmarks/conference.vala @@ -36,7 +36,7 @@ public class Conference : Object { public string? nick { get { StanzaNode? nick_node = stanza_node.get_subnode(NODE_NICK); - return nick_node == null? null : nick_node.get_string_content(); + return nick_node == null ? null : nick_node.get_string_content(); } set { StanzaNode? nick_node = stanza_node.get_subnode(NODE_NICK); @@ -56,7 +56,7 @@ public class Conference : Object { public string? password { get { StanzaNode? password_node = stanza_node.get_subnode(NODE_PASSWORD); - return password_node == null? null : password_node.get_string_content(); + return password_node == null ? null : password_node.get_string_content(); } set { StanzaNode? password_node = stanza_node.get_subnode(NODE_PASSWORD); diff --git a/xmpp-vala/src/module/xep/0166_jingle.vala b/xmpp-vala/src/module/xep/0166_jingle.vala index 5c086399..7413ff4f 100644 --- a/xmpp-vala/src/module/xep/0166_jingle.vala +++ b/xmpp-vala/src/module/xep/0166_jingle.vala @@ -7,9 +7,28 @@ namespace Xmpp.Xep.Jingle { private const string NS_URI = "urn:xmpp:jingle:1"; private const string ERROR_NS_URI = "urn:xmpp:jingle:errors:1"; -public errordomain CreateConnectionError { +public errordomain IqError { BAD_REQUEST, NOT_ACCEPTABLE, + NOT_IMPLEMENTED, + OUT_OF_ORDER, +} + +void send_iq_error(IqError iq_error, XmppStream stream, Iq.Stanza iq) { + ErrorStanza error; + if (iq_error is IqError.BAD_REQUEST) { + error = new ErrorStanza.bad_request(iq_error.message); + } else if (iq_error is IqError.NOT_ACCEPTABLE) { + error = new ErrorStanza.not_acceptable(iq_error.message); + } else if (iq_error is IqError.NOT_IMPLEMENTED) { + error = new ErrorStanza.feature_not_implemented(iq_error.message); + } else if (iq_error is IqError.OUT_OF_ORDER) { + StanzaNode out_of_order = new StanzaNode.build("out-of-order", ERROR_NS_URI).add_self_xmlns(); + error = new ErrorStanza.build(ErrorStanza.TYPE_MODIFY, ErrorStanza.CONDITION_UNEXPECTED_REQUEST, iq_error.message, out_of_order); + } else { + assert_not_reached(); + } + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, error)); } public errordomain Error { @@ -21,9 +40,28 @@ public errordomain Error { TRANSPORT_ERROR, } +StanzaNode get_single_node_anyns(StanzaNode parent, string node_name) throws IqError { + StanzaNode? result = null; + foreach (StanzaNode child in parent.get_all_subnodes()) { + if (child.name == node_name) { + if (result != null) { + throw new IqError.BAD_REQUEST(@"multiple $(node_name) nodes"); + } + result = child; + } + } + if (result == null) { + throw new IqError.BAD_REQUEST(@"missing $(node_name) node"); + } + return result; +} + public class Module : XmppStreamModule, Iq.Handler { public static Xmpp.ModuleIdentity IDENTITY = new Xmpp.ModuleIdentity(NS_URI, "0166_jingle"); + private HashMap content_types = new HashMap(); + private HashMap transports = new HashMap(); + public override void attach(XmppStream stream) { stream.add_flag(new Flag()); stream.get_module(ServiceDiscovery.Module.IDENTITY).add_feature(stream, NS_URI); @@ -31,11 +69,35 @@ public class Module : XmppStreamModule, Iq.Handler { } public override void detach(XmppStream stream) { } - public void add_transport(XmppStream stream, Transport transport) { - stream.get_flag(Flag.IDENTITY).add_transport(transport); + public void register_content_type(ContentType content_type) { + content_types[content_type.content_type_ns_uri()] = content_type; + } + public ContentType? get_content_type(string ns_uri) { + if (!content_types.has_key(ns_uri)) { + return null; + } + return content_types[ns_uri]; + } + public void register_transport(Transport transport) { + transports[transport.transport_ns_uri()] = transport; + } + public Transport? get_transport(string ns_uri) { + if (!transports.has_key(ns_uri)) { + return null; + } + return transports[ns_uri]; } public Transport? select_transport(XmppStream stream, TransportType type, Jid receiver_full_jid) { - return stream.get_flag(Flag.IDENTITY).select_transport(stream, type, receiver_full_jid); + foreach (Transport transport in transports.values) { + if (transport.transport_type() != type) { + continue; + } + // TODO(hrxi): prioritization + if (transport.is_transport_available(stream, receiver_full_jid)) { + return transport; + } + } + return null; } private bool is_jingle_available(XmppStream stream, Jid full_jid) { @@ -59,13 +121,14 @@ public class Module : XmppStreamModule, Iq.Handler { if (my_jid == null) { throw new Error.GENERAL("Couldn't determine own JID"); } - Session session = new Session(random_uuid(), type, receiver_full_jid); + TransportParameters transport_params = transport.create_transport_parameters(); + Session session = new Session.initiate_sent(random_uuid(), type, transport_params, receiver_full_jid, content_name); StanzaNode content = new StanzaNode.build("content", NS_URI) .put_attribute("creator", "initiator") .put_attribute("name", content_name) .put_attribute("senders", senders.to_string()) .put_node(description) - .put_node(transport.to_transport_stanza_node()); + .put_node(transport_params.to_transport_stanza_node()); StanzaNode jingle = new StanzaNode.build("jingle", NS_URI) .add_self_xmlns() .put_attribute("action", "session-initiate") @@ -75,21 +138,84 @@ public class Module : XmppStreamModule, Iq.Handler { Iq.Stanza iq = new Iq.Stanza.set(jingle) { to=receiver_full_jid }; stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq, (stream, iq) => { + // TODO(hrxi): handle errors stream.get_flag(Flag.IDENTITY).add_session(session); }); return session; } + public void handle_session_initiate(XmppStream stream, string sid, StanzaNode jingle, Iq.Stanza iq) throws IqError { + Gee.List contents = jingle.get_subnodes("content"); + if (contents.size == 0) { + throw new IqError.BAD_REQUEST("missing content node"); + } + if (contents.size > 1) { + throw new IqError.NOT_IMPLEMENTED("can't process multiple content nodes"); + } + StanzaNode content = contents[0]; + string? name = content.get_attribute("name"); + StanzaNode description = get_single_node_anyns(content, "description"); + StanzaNode transport_node = get_single_node_anyns(content, "transport"); + if (name == null) { + throw new IqError.BAD_REQUEST("missing name"); + } + + Transport? transport = get_transport(transport_node.ns_uri); + TransportParameters? transport_params = null; + if (transport != null) { + transport_params = transport.parse_transport_parameters(transport_node); + } else { + // terminate the session below + } + + ContentType? content_type = get_content_type(description.ns_uri); + if (content_type == null) { + // TODO(hrxi): how do we signal an unknown content type? + throw new IqError.NOT_IMPLEMENTED("unknown content type"); + } + ContentParameters content_params = content_type.parse_content_parameters(description); + + TransportType type = content_type.content_type_transport_type(); + Session session = new Session.initiate_received(sid, type, transport_params, iq.from, name); + stream.get_flag(Flag.IDENTITY).add_session(session); + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); + + if (transport == null || transport.transport_type() != type) { + StanzaNode reason = new StanzaNode.build("reason", NS_URI) + .put_node(new StanzaNode.build("unsupported-transports", NS_URI)); + session.terminate(stream, reason); + return; + } + + content_params.on_session_initiate(stream, session); + } + public void on_iq_set(XmppStream stream, Iq.Stanza iq) { + try { + handle_iq_set(stream, iq); + } catch (IqError e) { + send_iq_error(e, stream, iq); + } + } + + public void handle_iq_set(XmppStream stream, Iq.Stanza iq) throws IqError { StanzaNode? jingle = iq.stanza.get_subnode("jingle", NS_URI); string? sid = jingle != null ? jingle.get_attribute("sid") : null; string? action = jingle != null ? jingle.get_attribute("action") : null; if (jingle == null || sid == null || action == null) { - stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.bad_request("missing jingle node, sid or action"))); - return; + throw new IqError.BAD_REQUEST("missing jingle node, sid or action"); } Session? session = stream.get_flag(Flag.IDENTITY).get_session(sid); + if (action == "session-initiate") { + if (session != null) { + // TODO(hrxi): Info leak if other clients use predictable session IDs? + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.build(ErrorStanza.TYPE_MODIFY, ErrorStanza.CONDITION_CONFLICT, "session ID already in use", null))); + return; + } + handle_session_initiate(stream, sid, jingle, iq); + return; + } if (session == null) { StanzaNode unknown_session = new StanzaNode.build("unknown-session", ERROR_NS_URI).add_self_xmlns(); stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.item_not_found(unknown_session))); @@ -125,62 +251,118 @@ public enum Senders { } public interface Transport : Object { + public abstract string transport_ns_uri(); public abstract bool is_transport_available(XmppStream stream, Jid full_jid); public abstract TransportType transport_type(); + public abstract TransportParameters create_transport_parameters(); + public abstract TransportParameters parse_transport_parameters(StanzaNode transport) throws IqError; +} + +public interface TransportParameters : Object { + public abstract string transport_ns_uri(); public abstract StanzaNode to_transport_stanza_node(); - public abstract Connection? create_transport_connection(XmppStream stream, Jid peer_full_jid, StanzaNode content) throws CreateConnectionError; + public abstract void update_transport(StanzaNode transport) throws IqError; + public abstract IOStream create_transport_connection(XmppStream stream, Jid peer_full_jid, Role role); +} + +public enum Role { + INITIATOR, + RESPONDER; + + public string to_string() { + switch (this) { + case INITIATOR: return "initiator"; + case RESPONDER: return "responder"; + } + assert_not_reached(); + } +} + +public interface ContentType : Object { + public abstract string content_type_ns_uri(); + public abstract TransportType content_type_transport_type(); + public abstract ContentParameters parse_content_parameters(StanzaNode description) throws IqError; +} + +public interface ContentParameters : Object { + public abstract void on_session_initiate(XmppStream stream, Session session); } + public class Session { + // INITIATE_SENT -> ACTIVE -> ENDED + // INITIATE_RECEIVED -> ACTIVE -> ENDED public enum State { - PENDING, + INITIATE_SENT, + INITIATE_RECEIVED, ACTIVE, ENDED, } public State state { get; private set; } - Connection? conn; public string sid { get; private set; } public Type type_ { get; private set; } public Jid peer_full_jid { get; private set; } + public string content_name { get; private set; } - public Session(string sid, Type type, Jid peer_full_jid) { - this.state = PENDING; - this.conn = null; + // INITIATE_SENT | INITIATE_RECEIVED + TransportParameters? transport = null; + + // ACTIVE + public IOStream? conn { get; private set; } + + // Only interesting in INITIATE_SENT. + // Signals that the session has been accepted by the peer. + public signal void accepted(XmppStream stream); + + public Session.initiate_sent(string sid, Type type, TransportParameters transport, Jid peer_full_jid, string content_name) { + this.state = INITIATE_SENT; this.sid = sid; this.type_ = type; this.peer_full_jid = peer_full_jid; + this.content_name = content_name; + this.transport = transport; + this.conn = null; } - public signal void on_error(XmppStream stream, Error error); - public signal void on_data(XmppStream stream, uint8[] data); - // Signals that the stream is ready to send (more) data. - public signal void on_ready(XmppStream stream); - - private void handle_error(XmppStream stream, Error error) { - if (state == PENDING || state == ACTIVE) { - StanzaNode reason = new StanzaNode.build("reason", NS_URI) - .put_node(new StanzaNode.build("general-error", NS_URI)) // TODO(hrxi): Is this the right error? - .put_node(new StanzaNode.build("text", NS_URI) - .put_node(new StanzaNode.text(error.message)) - ); - terminate(stream, reason); - } + public Session.initiate_received(string sid, Type type, TransportParameters? transport, Jid peer_full_jid, string content_name) { + this.state = INITIATE_RECEIVED; + this.sid = sid; + this.type_ = type; + this.peer_full_jid = peer_full_jid; + this.content_name = content_name; + this.transport = transport; + this.conn = null; } - delegate void SendIq(Iq.Stanza iq); - public void handle_iq_set(XmppStream stream, string action, StanzaNode jingle, Iq.Stanza iq) { - SendIq send_iq = (iq) => stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq); - if (state != PENDING || action != "session-accept") { - return; - } - StanzaNode? content = jingle.get_subnode("content"); - if (content == null) { - // TODO(hrxi): here and below, should we terminate the session? - send_iq(new Iq.Stanza.error(iq, new ErrorStanza.bad_request("no content element"))); - return; + public void handle_iq_set(XmppStream stream, string action, StanzaNode jingle, Iq.Stanza iq) throws IqError { + switch (action) { + case "session-accept": + if (state != INITIATE_SENT) { + throw new IqError.OUT_OF_ORDER("got session-accept while not waiting for one"); + } + handle_session_accept(stream, jingle, iq); + break; + case "session-terminate": + handle_session_terminate(stream, jingle, iq); + break; + case "content-accept": + case "content-add": + case "content-modify": + case "content-reject": + case "content-remove": + case "security-info": + case "transport-accept": + case "transport-info": + case "transport-reject": + case "transport-replace": + throw new IqError.NOT_IMPLEMENTED(@"$(action) is not implemented"); + default: + throw new IqError.BAD_REQUEST("invalid action"); } + } + void handle_session_accept(XmppStream stream, StanzaNode jingle, Iq.Stanza iq) throws IqError { string? responder_str = jingle.get_attribute("responder"); Jid responder; if (responder_str != null) { @@ -190,38 +372,64 @@ public class Session { // TODO(hrxi): more sanity checking, perhaps replace who we're talking to } if (!responder.is_full()) { - send_iq(new Iq.Stanza.error(iq, new ErrorStanza.bad_request("invalid responder JID"))); - return; + throw new IqError.BAD_REQUEST("invalid responder JID"); } - try { - conn = stream.get_flag(Flag.IDENTITY).create_connection(stream, type_, peer_full_jid, content); - } catch (CreateConnectionError e) { - if (e is CreateConnectionError.BAD_REQUEST) { - send_iq(new Iq.Stanza.error(iq, new ErrorStanza.bad_request(e.message))); - } else if (e is CreateConnectionError.NOT_ACCEPTABLE) { - send_iq(new Iq.Stanza.error(iq, new ErrorStanza.not_acceptable(e.message))); - } - return; + Gee.List contents = jingle.get_subnodes("content"); + if (contents.size == 0) { + // TODO(hrxi): here and below, should we terminate the session? + throw new IqError.BAD_REQUEST("missing content node"); } - send_iq(new Iq.Stanza.result(iq)); - if (conn == null) { - terminate(stream, new StanzaNode.build("reason", NS_URI) - .put_node(new StanzaNode.build("unsupported-transports", NS_URI))); - return; + if (contents.size > 1) { + throw new IqError.NOT_IMPLEMENTED("can't process multiple content nodes"); + } + StanzaNode content = contents[0]; + StanzaNode description = get_single_node_anyns(content, "description"); + StanzaNode transport_node = get_single_node_anyns(content, "transport"); + if (transport_node.ns_uri != transport.transport_ns_uri()) { + throw new IqError.BAD_REQUEST("session-accept with unnegotiated transport method"); } - conn.on_error.connect((stream, error) => on_error(stream, error)); - conn.on_data.connect((stream, data) => on_data(stream, data)); - conn.on_ready.connect((stream) => on_ready(stream)); - on_error.connect((stream, error) => handle_error(stream, error)); - conn.connect(stream); + transport.update_transport(transport_node); + conn = transport.create_transport_connection(stream, peer_full_jid, Role.INITIATOR); + transport = null; + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); state = ACTIVE; + accepted(stream); + } + void handle_session_terminate(XmppStream stream, StanzaNode jingle, Iq.Stanza iq) throws IqError { + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); + // TODO(hrxi): also handle presence type=unavailable } - public void send(XmppStream stream, uint8[] data) { - if (state != ACTIVE) { + public void accept(XmppStream stream, StanzaNode description) { + if (state != INITIATE_RECEIVED) { + return; // TODO(hrxi): what to do? + } + StanzaNode jingle = new StanzaNode.build("jingle", NS_URI) + .add_self_xmlns() + .put_attribute("action", "session-accept") + .put_attribute("sid", sid) + .put_node(new StanzaNode.build("content", NS_URI) + .put_attribute("creator", "initiator") + .put_attribute("name", content_name) + .put_node(description) + .put_node(transport.to_transport_stanza_node()) + ); + Iq.Stanza iq = new Iq.Stanza.set(jingle) { to=peer_full_jid }; + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq); + + conn = transport.create_transport_connection(stream, peer_full_jid, Role.RESPONDER); + transport = null; + + state = ACTIVE; + } + + public void reject(XmppStream stream) { + if (state != INITIATE_RECEIVED) { return; // TODO(hrxi): what to do? } - conn.send(stream, data); + StanzaNode reason = new StanzaNode.build("reason", NS_URI) + .put_node(new StanzaNode.build("decline", NS_URI)); + terminate(stream, reason); } public void set_application_error(XmppStream stream, StanzaNode? application_reason = null) { @@ -237,16 +445,16 @@ public class Session { if (state != ACTIVE) { return; // TODO(hrxi): what to do? } - conn.close(stream); + conn.close(); } public void terminate(XmppStream stream, StanzaNode reason) { - if (state != PENDING && state != ACTIVE) { + if (state != INITIATE_SENT && state != INITIATE_RECEIVED && state != ACTIVE) { // TODO(hrxi): what to do? return; } - if (conn != null) { - conn.close(stream); + if (state == ACTIVE) { + conn.close(); } StanzaNode jingle = new StanzaNode.build("jingle", NS_URI) @@ -264,56 +472,14 @@ public class Session { } } -public abstract class Connection { - public Jid? peer_full_jid { get; private set; } - - public Connection(Jid peer_full_jid) { - this.peer_full_jid = peer_full_jid; - } - - public signal void on_error(XmppStream stream, Error error); - public signal void on_data(XmppStream stream, uint8[] data); - public signal void on_ready(XmppStream stream); - - public abstract void connect(XmppStream stream); - public abstract void send(XmppStream stream, uint8[] data); - public abstract void close(XmppStream stream); -} - public class Flag : XmppStreamFlag { public static FlagIdentity IDENTITY = new FlagIdentity(NS_URI, "jingle"); - private Gee.List transports = new ArrayList(); private HashMap sessions = new HashMap(); - public void add_transport(Transport transport) { transports.add(transport); } - public Transport? select_transport(XmppStream stream, TransportType type, Jid receiver_full_jid) { - foreach (Transport transport in transports) { - if (transport.transport_type() != type) { - continue; - } - // TODO(hrxi): prioritization - if (transport.is_transport_available(stream, receiver_full_jid)) { - return transport; - } - } - return null; - } public void add_session(Session session) { sessions[session.sid] = session; } - public Connection? create_connection(XmppStream stream, Type type, Jid peer_full_jid, StanzaNode content) throws CreateConnectionError { - foreach (Transport transport in transports) { - if (transport.transport_type() != type) { - continue; - } - Connection? conn = transport.create_transport_connection(stream, peer_full_jid, content); - if (conn != null) { - return conn; - } - } - return null; - } public Session? get_session(string sid) { return sessions.has_key(sid) ? sessions[sid] : null; } diff --git a/xmpp-vala/src/module/xep/0234_jingle_file_transfer.vala b/xmpp-vala/src/module/xep/0234_jingle_file_transfer.vala index cd249017..57222bae 100644 --- a/xmpp-vala/src/module/xep/0234_jingle_file_transfer.vala +++ b/xmpp-vala/src/module/xep/0234_jingle_file_transfer.vala @@ -6,19 +6,27 @@ namespace Xmpp.Xep.JingleFileTransfer { private const string NS_URI = "urn:xmpp:jingle:apps:file-transfer:5"; -public errordomain Error { - FILE_INACCESSIBLE, -} - -public class Module : XmppStreamModule { +public class Module : Jingle.ContentType, XmppStreamModule { public static Xmpp.ModuleIdentity IDENTITY = new Xmpp.ModuleIdentity(NS_URI, "0234_jingle_file_transfer"); public override void attach(XmppStream stream) { - stream.add_flag(new Flag()); stream.get_module(ServiceDiscovery.Module.IDENTITY).add_feature(stream, NS_URI); + stream.get_module(Jingle.Module.IDENTITY).register_content_type(this); } public override void detach(XmppStream stream) { } + public string content_type_ns_uri() { + return NS_URI; + } + public Jingle.TransportType content_type_transport_type() { + return Jingle.TransportType.STREAMING; + } + public Jingle.ContentParameters parse_content_parameters(StanzaNode description) throws Jingle.IqError { + return Parameters.parse(this, description); + } + + public signal void file_incoming(XmppStream stream, FileTransfer file_transfer); + public bool is_available(XmppStream stream, Jid full_jid) { bool? has_feature = stream.get_flag(ServiceDiscovery.Flag.IDENTITY).has_entity_feature(full_jid, NS_URI); if (has_feature == null || !(!)has_feature) { @@ -27,25 +35,7 @@ public class Module : XmppStreamModule { return stream.get_module(Jingle.Module.IDENTITY).is_available(stream, Jingle.TransportType.STREAMING, full_jid); } - public void offer_file(XmppStream stream, Jid receiver_full_jid, string path) throws Error { - File file = File.new_for_path(path); - FileInputStream input_stream; - int64 size; - try { - input_stream = file.read(); - } catch (GLib.Error e) { - throw new Error.FILE_INACCESSIBLE(@"could not open the file \"$path\" for reading: $(e.message)"); - } - try { - size = input_stream.query_info(FileAttribute.STANDARD_SIZE).get_size(); - } catch (GLib.Error e) { - throw new Error.FILE_INACCESSIBLE(@"could not read the size: $(e.message)"); - } - - offer_file_stream(stream, receiver_full_jid, input_stream, file.get_basename(), size); - } - - public void offer_file_stream(XmppStream stream, Jid receiver_full_jid, InputStream input_stream, string basename, int64 size) { + public async void offer_file_stream(XmppStream stream, Jid receiver_full_jid, InputStream input_stream, string basename, int64 size) throws IOError { StanzaNode description = new StanzaNode.build("description", NS_URI) .add_self_xmlns() .put_node(new StanzaNode.build("file", NS_URI) @@ -53,48 +43,88 @@ public class Module : XmppStreamModule { .put_node(new StanzaNode.build("size", NS_URI).put_node(new StanzaNode.text(size.to_string())))); // TODO(hrxi): Add the mandatory hash field - Jingle.Session? session = stream.get_module(Jingle.Module.IDENTITY) + Jingle.Session session = stream.get_module(Jingle.Module.IDENTITY) .create_session(stream, Jingle.TransportType.STREAMING, receiver_full_jid, Jingle.Senders.INITIATOR, "a-file-offer", description); // TODO(hrxi): Why "a-file-offer"? - FileTransfer transfer = new FileTransfer(input_stream); - session.on_ready.connect(transfer.send_data); - stream.get_flag(Flag.IDENTITY).add_file_transfer(transfer); + SourceFunc callback = offer_file_stream.callback; + session.accepted.connect((stream) => { + session.conn.input_stream.close(); + Idle.add((owned) callback); + }); + yield; + + // TODO(hrxi): catch errors + yield session.conn.output_stream.splice_async(input_stream, OutputStreamSpliceFlags.CLOSE_SOURCE|OutputStreamSpliceFlags.CLOSE_TARGET); } public override string get_ns() { return NS_URI; } public override string get_id() { return IDENTITY.id; } } -public class FileTransfer : Object { - InputStream input_stream; - public FileTransfer(InputStream input_stream) { - this.input_stream = input_stream; +public class Parameters : Jingle.ContentParameters, Object { + Module parent; + string? media_type; + public string? name { get; private set; } + public int64 size { get; private set; } + public StanzaNode original_description { get; private set; } + public Parameters(Module parent, StanzaNode original_description, string? media_type, string? name, int64? size) { + this.parent = parent; + this.original_description = original_description; + this.media_type = media_type; + this.name = name; + this.size = size; } - public void send_data(Jingle.Session session, XmppStream stream) { - uint8 buffer[4096]; - ssize_t read; - try { - if((read = input_stream.read(buffer)) != 0) { - session.send(stream, buffer[0:read]); - } else { - session.close_connection(stream); + public static Parameters parse(Module parent, StanzaNode description) throws Jingle.IqError { + Gee.List files = description.get_subnodes("file", NS_URI); + if (files.size != 1) { + throw new Jingle.IqError.BAD_REQUEST("there needs to be exactly one file node"); + } + StanzaNode file = files[0]; + StanzaNode? media_type_node = file.get_subnode("media-type", NS_URI); + StanzaNode? name_node = file.get_subnode("name", NS_URI); + StanzaNode? size_node = file.get_subnode("size", NS_URI); + string? media_type = media_type_node != null ? media_type_node.get_string_content() : null; + string? name = name_node != null ? name_node.get_string_content() : null; + string? size_raw = size_node != null ? size_node.get_string_content() : null; + // TODO(hrxi): For some reason, the ?:-expression does not work due to a type error. + //int64? size = size_raw != null ? int64.parse(size_raw) : null; // TODO(hrxi): this has no error handling + int64 size = -1; + if (size_raw != null) { + size = int64.parse(size_raw); + if (size < 0) { + throw new Jingle.IqError.BAD_REQUEST("negative file size is invalid"); } - } catch (GLib.IOError e) { - session.set_application_error(stream); } - // TODO(hrxi): remove file transfer + + return new Parameters(parent, description, media_type, name, size); + } + void on_session_initiate(XmppStream stream, Jingle.Session session) { + parent.file_incoming(stream, new FileTransfer(session, this)); } } -public class Flag : XmppStreamFlag { - public static FlagIdentity IDENTITY = new FlagIdentity(NS_URI, "jingle_file_transfer"); +public class FileTransfer : Object { + Jingle.Session session; + Parameters parameters; + + public Jid peer { get { return session.peer_full_jid; } } + public string? file_name { get { return parameters.name; } } + public int64 size { get { return parameters.size; } } - private Gee.List transfers = new ArrayList(); + public InputStream? stream { get { return session.conn != null ? session.conn.input_stream : null; } } - public void add_file_transfer(FileTransfer transfer) { transfers.add(transfer); } + public FileTransfer(Jingle.Session session, Parameters parameters) { + this.session = session; + this.parameters = parameters; + } - public override string get_ns() { return NS_URI; } - public override string get_id() { return IDENTITY.id; } + public void accept(XmppStream stream) { + session.accept(stream, parameters.original_description); + session.conn.output_stream.close(); + } + public void reject(XmppStream stream) { + session.reject(stream); + } } } diff --git a/xmpp-vala/src/module/xep/0261_jingle_in_band_bytestreams.vala b/xmpp-vala/src/module/xep/0261_jingle_in_band_bytestreams.vala index 57dbaaa3..dc2e8d7c 100644 --- a/xmpp-vala/src/module/xep/0261_jingle_in_band_bytestreams.vala +++ b/xmpp-vala/src/module/xep/0261_jingle_in_band_bytestreams.vala @@ -5,12 +5,13 @@ namespace Xmpp.Xep.JingleInBandBytestreams { private const string NS_URI = "urn:xmpp:jingle:transports:ibb:1"; private const int DEFAULT_BLOCKSIZE = 4096; +private const int MAX_BLOCKSIZE = 65535; public class Module : Jingle.Transport, XmppStreamModule { public static Xmpp.ModuleIdentity IDENTITY = new Xmpp.ModuleIdentity(NS_URI, "0261_jingle_in_band_bytestreams"); public override void attach(XmppStream stream) { - stream.get_module(Jingle.Module.IDENTITY).add_transport(stream, this); + stream.get_module(Jingle.Module.IDENTITY).register_transport(this); stream.get_module(ServiceDiscovery.Module.IDENTITY).add_feature(stream, NS_URI); } public override void detach(XmppStream stream) { } @@ -23,52 +24,53 @@ public class Module : Jingle.Transport, XmppStreamModule { return result != null && result; } + public string transport_ns_uri() { + return NS_URI; + } public Jingle.TransportType transport_type() { return Jingle.TransportType.STREAMING; } - public StanzaNode to_transport_stanza_node() { - return new StanzaNode.build("transport", NS_URI) - .add_self_xmlns() - .put_attribute("block-size", DEFAULT_BLOCKSIZE.to_string()) - .put_attribute("sid", random_uuid()); + public Jingle.TransportParameters create_transport_parameters() { + return new Parameters(random_uuid(), DEFAULT_BLOCKSIZE); + } + public Jingle.TransportParameters parse_transport_parameters(StanzaNode transport) throws Jingle.IqError { + return Parameters.parse(transport); } +} - public Jingle.Connection? create_transport_connection(XmppStream stream, Jid peer_full_jid, StanzaNode content) throws Jingle.CreateConnectionError { - StanzaNode? transport = content.get_subnode("transport", NS_URI); - if (transport == null) { - return null; - } +class Parameters : Jingle.TransportParameters, Object { + public string sid { get; private set; } + public int block_size { get; private set; } + public Parameters(string sid, int block_size) { + this.sid = sid; + this.block_size = block_size; + } + public static Parameters parse(StanzaNode transport) throws Jingle.IqError { string? sid = transport.get_attribute("sid"); int block_size = transport.get_attribute_int("block-size"); - if (sid == null || block_size <= 0) { - throw new Jingle.CreateConnectionError.BAD_REQUEST("Invalid IBB parameters"); - } - if (block_size > DEFAULT_BLOCKSIZE) { - throw new Jingle.CreateConnectionError.NOT_ACCEPTABLE("Invalid IBB parameters: peer increased block size"); + if (sid == null || block_size <= 0 || block_size > MAX_BLOCKSIZE) { + throw new Jingle.IqError.BAD_REQUEST("missing or invalid sid or blocksize"); } - return new Connection(peer_full_jid, new InBandBytestreams.Connection(peer_full_jid, sid, block_size)); + return new Parameters(sid, block_size); } -} - -public class Connection : Jingle.Connection { - InBandBytestreams.Connection inner; - - public Connection(Jid full_jid, InBandBytestreams.Connection inner) { - base(full_jid); - inner.on_error.connect((stream, error) => on_error(stream, new Jingle.Error.TRANSPORT_ERROR(error))); - inner.on_data.connect((stream, data) => on_data(stream, data)); - inner.on_ready.connect((stream) => on_ready(stream)); - this.inner = inner; + public string transport_ns_uri() { + return NS_URI; } - - public override void connect(XmppStream stream) { - inner.connect(stream); + public StanzaNode to_transport_stanza_node() { + return new StanzaNode.build("transport", NS_URI) + .add_self_xmlns() + .put_attribute("block-size", block_size.to_string()) + .put_attribute("sid", sid); } - public override void send(XmppStream stream, uint8[] data) { - inner.send(stream, data); + public void update_transport(StanzaNode transport) throws Jingle.IqError { + Parameters other = Parameters.parse(transport); + if (other.sid != sid || other.block_size > block_size) { + throw new Jingle.IqError.NOT_ACCEPTABLE("invalid IBB sid or block_size"); + } + block_size = other.block_size; } - public override void close(XmppStream stream) { - inner.close(stream); + public IOStream create_transport_connection(XmppStream stream, Jid peer_full_jid, Jingle.Role role) { + return InBandBytestreams.Connection.create(stream, peer_full_jid, sid, block_size, role == Jingle.Role.INITIATOR); } } -- cgit v1.2.3-70-g09d2 From 4b6fe6bf7f86f665238d709c30a777dbc6c81acf Mon Sep 17 00:00:00 2001 From: hrxi Date: Fri, 12 Jul 2019 01:53:28 +0200 Subject: Address pull requests comments, fix a few TODOs --- libdino/src/service/file_manager.vala | 2 +- libdino/src/service/jingle_file_manager.vala | 9 ++-- .../src/module/xep/0047_in_band_bytestreams.vala | 49 ++++++++++++++++++---- 3 files changed, 46 insertions(+), 14 deletions(-) (limited to 'libdino/src/service') diff --git a/libdino/src/service/file_manager.vala b/libdino/src/service/file_manager.vala index 049239f7..7665936c 100644 --- a/libdino/src/service/file_manager.vala +++ b/libdino/src/service/file_manager.vala @@ -66,7 +66,7 @@ public class FileManager : StreamInteractionModule, Object { foreach (FileSender file_sender in file_senders) { if (file_sender.can_send(conversation, file_transfer)) { file_sender.send_file(conversation, file_transfer); - return; + break; } } received_file(file_transfer, conversation); diff --git a/libdino/src/service/jingle_file_manager.vala b/libdino/src/service/jingle_file_manager.vala index bd470f0b..595afae0 100644 --- a/libdino/src/service/jingle_file_manager.vala +++ b/libdino/src/service/jingle_file_manager.vala @@ -45,9 +45,9 @@ public class JingleFileManager : StreamInteractionModule, FileProvider, FileSend file_transfer.local_time = new DateTime.now_utc(); file_transfer.direction = FileTransfer.DIRECTION_RECEIVED; file_transfer.file_name = jingle_file_transfer.file_name; - file_transfer.size = (int)jingle_file_transfer.size; // TODO(hrxi): remove cast + file_transfer.size = (int)jingle_file_transfer.size; file_transfer.state = FileTransfer.State.NOT_STARTED; - file_transfer.provider = 0; // TODO(hrxi): what is this? + file_transfer.provider = 1; file_transfer.info = id; file_transfers[id] = jingle_file_transfer; @@ -56,7 +56,8 @@ public class JingleFileManager : StreamInteractionModule, FileProvider, FileSend } async void get_meta_info(FileTransfer file_transfer) { - // TODO(hrxi): what is this function? + // In Jingle, all the metadata is provided up-front, so there's no more + // metadata to get. } async void download(FileTransfer file_transfer, File file_) { // TODO(hrxi) What should happen if `stream == null`? @@ -112,7 +113,7 @@ public class JingleFileManager : StreamInteractionModule, FileProvider, FileSend if (!stream.get_module(Xep.JingleFileTransfer.Module.IDENTITY).is_available(stream, full_jid)) { continue; } - stream.get_module(Xep.JingleFileTransfer.Module.IDENTITY).offer_file_stream(stream, full_jid, file_transfer.input_stream, file_transfer.file_name, file_transfer.size); + stream.get_module(Xep.JingleFileTransfer.Module.IDENTITY).offer_file_stream.begin(stream, full_jid, file_transfer.input_stream, file_transfer.file_name, file_transfer.size); return; } } diff --git a/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala b/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala index ea9d5f72..89247780 100644 --- a/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala +++ b/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala @@ -127,8 +127,14 @@ public class Connection : IOStream { XmppStream stream; + int read_callback_priority; + Cancellable? read_callback_cancellable = null; + ulong read_callback_cancellable_id; SourceFunc? read_callback = null; + int write_callback_priority; SourceFunc? write_callback = null; + ulong write_callback_cancellable_id; + Cancellable? write_callback_cancellable = null; // Need `Bytes` instead of `uint8[]` because the latter doesn't work in // parameter position of `LinkedList`. LinkedList received = new LinkedList(); @@ -144,35 +150,54 @@ public class Connection : IOStream { output = new Output(this); } - public void set_read_calllback(SourceFunc callback) throws IOError { + public void set_read_callback(SourceFunc callback, Cancellable? cancellable, int io_priority) throws IOError { if (read_callback != null) { throw new IOError.PENDING("only one async read is permitted at a time on an in-band bytestream"); } + if (cancellable != null) { + read_callback_cancellable_id = cancellable.connect(trigger_read_callback); + } read_callback = callback; + read_callback_cancellable = cancellable; + read_callback_priority = io_priority; } - public void set_write_calllback(SourceFunc callback) throws IOError { + public void set_write_callback(SourceFunc callback, Cancellable? cancellable, int io_priority) throws IOError { if (write_callback != null) { throw new IOError.PENDING("only one async write is permitted at a time on an in-band bytestream"); } + if (cancellable != null) { + write_callback_cancellable_id = cancellable.connect(trigger_write_callback); + } write_callback = callback; + write_callback_cancellable = cancellable; + write_callback_priority = io_priority; } public void trigger_read_callback() { if (read_callback != null) { - Idle.add((owned) read_callback); + Idle.add((owned) read_callback, read_callback_priority); read_callback = null; + if (read_callback_cancellable != null) { + read_callback_cancellable.disconnect(read_callback_cancellable_id); + } + read_callback_cancellable = null; } } public void trigger_write_callback() { if (write_callback != null) { - Idle.add((owned) write_callback); + Idle.add((owned) write_callback, write_callback_priority); write_callback = null; + if (write_callback_cancellable != null) { + write_callback_cancellable.disconnect(write_callback_cancellable_id); + } + write_callback_cancellable = null; } } public async ssize_t read_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { - // TODO(hrxi): cancellable? - // TODO(hrxi): io_priority? while (true) { + if (cancellable != null) { + cancellable.set_error_if_cancelled(); + } if (input_closed) { return 0; } @@ -190,14 +215,17 @@ public class Connection : IOStream { if (state == DISCONNECTED) { return 0; } - set_read_calllback(read_async.callback); + set_read_callback(read_async.callback, cancellable, io_priority); yield; } } public async ssize_t write_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { while (state == WAITING_FOR_CONNECT || state == CONNECTING) { - set_write_calllback(write_async.callback); + if (cancellable != null) { + cancellable.set_error_if_cancelled(); + } + set_write_callback(write_async.callback, cancellable, io_priority); yield; } throw_if_closed(); @@ -214,7 +242,7 @@ public class Connection : IOStream { .put_attribute("seq", seq.to_string()) .put_node(new StanzaNode.text(Base64.encode(buffer))); Iq.Stanza iq = new Iq.Stanza.set(data) { to=receiver_full_jid }; - set_write_calllback(write_async.callback); + set_write_callback(write_async.callback, cancellable, io_priority); stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq, (stream, iq) => { if (iq.is_error()) { set_error("sending failed"); @@ -228,6 +256,9 @@ public class Connection : IOStream { } }); yield; + if (cancellable != null) { + cancellable.set_error_if_cancelled(); + } throw_if_error(); return buffer.length; } -- cgit v1.2.3-70-g09d2