From 877c46628fa2836f9226e24a3d0a84b9a3f821e6 Mon Sep 17 00:00:00 2001 From: hrxi Date: Sun, 23 Jun 2019 14:53:18 +0200 Subject: Implement file sending via Jingle This is still disabled by default until prioritization is implemented; otherwise this could be preferred to HTTP uploads. File sending only works via Jingle In-Band-Bytestreams right now, more transports are going to be implemented. To test this, uncomment the line with `JingleFileTransfer` in libdino/src/application.vala. --- xmpp-vala/CMakeLists.txt | 8 +- xmpp-vala/src/module/message/stanza.vala | 4 +- .../src/module/xep/0047_in_band_bytestreams.vala | 172 +++++++++++ xmpp-vala/src/module/xep/0166_jingle.vala | 328 +++++++++++++++++++++ .../src/module/xep/0234_jingle_file_transfer.vala | 100 +++++++ .../xep/0261_jingle_in_band_bytestreams.vala | 75 +++++ 6 files changed, 683 insertions(+), 4 deletions(-) create mode 100644 xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala create mode 100644 xmpp-vala/src/module/xep/0166_jingle.vala create mode 100644 xmpp-vala/src/module/xep/0234_jingle_file_transfer.vala create mode 100644 xmpp-vala/src/module/xep/0261_jingle_in_band_bytestreams.vala (limited to 'xmpp-vala') diff --git a/xmpp-vala/CMakeLists.txt b/xmpp-vala/CMakeLists.txt index faff9e79..528c84a6 100644 --- a/xmpp-vala/CMakeLists.txt +++ b/xmpp-vala/CMakeLists.txt @@ -49,8 +49,9 @@ SOURCES "src/module/xep/0045_muc/flag.vala" "src/module/xep/0045_muc/module.vala" "src/module/xep/0045_muc/status_code.vala" - "src/module/xep/0048_bookmarks/module.vala" + "src/module/xep/0047_in_band_bytestreams.vala" "src/module/xep/0048_bookmarks/conference.vala" + "src/module/xep/0048_bookmarks/module.vala" "src/module/xep/0049_private_xml_storage.vala" "src/module/xep/0054_vcard/module.vala" "src/module/xep/0060_pubsub.vala" @@ -60,11 +61,14 @@ SOURCES "src/module/xep/0084_user_avatars.vala" "src/module/xep/0085_chat_state_notifications.vala" "src/module/xep/0115_entitiy_capabilities.vala" + "src/module/xep/0166_jingle.vala" + "src/module/xep/0184_message_delivery_receipts.vala" "src/module/xep/0191_blocking_command.vala" "src/module/xep/0198_stream_management.vala" "src/module/xep/0199_ping.vala" - "src/module/xep/0184_message_delivery_receipts.vala" "src/module/xep/0203_delayed_delivery.vala" + "src/module/xep/0234_jingle_file_transfer.vala" + "src/module/xep/0261_jingle_in_band_bytestreams.vala" "src/module/xep/0280_message_carbons.vala" "src/module/xep/0313_message_archive_management.vala" "src/module/xep/0333_chat_markers.vala" diff --git a/xmpp-vala/src/module/message/stanza.vala b/xmpp-vala/src/module/message/stanza.vala index 640f2796..053c44dd 100644 --- a/xmpp-vala/src/module/message/stanza.vala +++ b/xmpp-vala/src/module/message/stanza.vala @@ -18,7 +18,7 @@ public class MessageStanza : Xmpp.Stanza { public string body { get { StanzaNode? body_node = stanza.get_subnode(NODE_BODY); - return body_node == null? null : body_node.get_string_content(); + return body_node == null ? null : body_node.get_string_content(); } set { StanzaNode? body_node = stanza.get_subnode(NODE_BODY); @@ -65,4 +65,4 @@ public abstract class MessageFlag : Object { public abstract string get_id(); } -} \ No newline at end of file +} diff --git a/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala b/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala new file mode 100644 index 00000000..0e1dd6be --- /dev/null +++ b/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala @@ -0,0 +1,172 @@ +using Gee; +using Xmpp; +using Xmpp.Xep; + +namespace Xmpp.Xep.InBandBytestreams { + +private const string NS_URI = "http://jabber.org/protocol/ibb"; +private const int SEQ_MODULUS = 65536; + +public class Module : XmppStreamModule { + public static Xmpp.ModuleIdentity IDENTITY = new Xmpp.ModuleIdentity(NS_URI, "0047_in_band_bytestreams"); + + public override void attach(XmppStream stream) { + stream.add_flag(new Flag()); + } + public override void detach(XmppStream stream) { } + + public void on_iq_set(XmppStream stream, Iq.Stanza iq) { + StanzaNode? data = iq.stanza.get_subnode("data", NS_URI); + string? sid = data != null ? data.get_attribute("sid") : null; + if (data == null || sid == null) { + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.bad_request("missing data node or sid"))); + return; + } + Connection? conn = stream.get_flag(Flag.IDENTITY).get_connection(sid); + if (conn == null) { + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.item_not_found())); + return; + } + + int seq = data.get_attribute_int("seq"); + // TODO(hrxi): return an error on malformed base64 (need to do this + // according to the xep) + uint8[] content = Base64.decode(data.get_string_content()); + if (seq < 0 || seq != conn.remote_seq) { + // TODO(hrxi): send an error and close the connection + return; + } + conn.remote_seq = (conn.remote_seq + 1) % SEQ_MODULUS; + + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); + conn.on_data(stream, content); + } + + public override string get_ns() { return NS_URI; } + public override string get_id() { return IDENTITY.id; } +} + +public class Connection { + // TODO(hrxi): implement half-open states + public enum State { + UNCONNECTED, + CONNECTING, + CONNECTED, + DISCONNECTING, + DISCONNECTED, + ERROR, + } + State state = UNCONNECTED; + Jid receiver_full_jid; + public string sid { get; private set; } + int block_size; + int local_seq = 0; + int remote_ack = 0; + internal int remote_seq = 0; + + public signal void on_error(XmppStream stream, string error); + public signal void on_data(XmppStream stream, uint8[] data); + public signal void on_ready(XmppStream stream); + + public Connection(Jid receiver_full_jid, string sid, int block_size) { + this.receiver_full_jid = receiver_full_jid; + this.sid = sid; + this.block_size = block_size; + } + + public void connect(XmppStream stream) { + assert(state == UNCONNECTED); + state = CONNECTING; + + StanzaNode open = new StanzaNode.build("open", NS_URI) + .add_self_xmlns() + .put_attribute("block-size", block_size.to_string()) + .put_attribute("sid", sid); + + Iq.Stanza iq = new Iq.Stanza.set(open) { to=receiver_full_jid }; + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq, (stream, iq) => { + assert(state == CONNECTING); + if (!iq.is_error()) { + state = CONNECTED; + stream.get_flag(Flag.IDENTITY).add_connection(this); + on_ready(stream); + } else { + set_error(stream, "connection failed"); + } + }); + } + + void set_error(XmppStream stream, string error) { + // TODO(hrxi): Send disconnect? + state = ERROR; + on_error(stream, error); + } + + public void send(XmppStream stream, uint8[] bytes) { + assert(state == CONNECTED); + // TODO(hrxi): rate-limiting/merging? + int seq = local_seq; + local_seq = (local_seq + 1) % SEQ_MODULUS; + StanzaNode data = new StanzaNode.build("data", NS_URI) + .add_self_xmlns() + .put_attribute("sid", sid) + .put_attribute("seq", seq.to_string()) + .put_node(new StanzaNode.text(Base64.encode(bytes))); + Iq.Stanza iq = new Iq.Stanza.set(data) { to=receiver_full_jid }; + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq, (stream, iq) => { + if (iq.is_error()) { + set_error(stream, "sending failed"); + return; + } + if (remote_ack != seq) { + set_error(stream, "out of order acks"); + return; + } + remote_ack = (remote_ack + 1) % SEQ_MODULUS; + if (local_seq == remote_ack) { + on_ready(stream); + } + }); + } + + public void close(XmppStream stream) { + assert(state == CONNECTED); + state = DISCONNECTING; + // TODO(hrxi): should not do this, might still receive data + stream.get_flag(Flag.IDENTITY).remove_connection(this); + StanzaNode close = new StanzaNode.build("close", NS_URI) + .add_self_xmlns() + .put_attribute("sid", sid); + Iq.Stanza iq = new Iq.Stanza.set(close) { to=receiver_full_jid }; + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq, (stream, iq) => { + assert(state == DISCONNECTING); + if (iq.is_error()) { + set_error(stream, "disconnecting failed"); + return; + } + state = DISCONNECTED; + }); + } +} + + +public class Flag : XmppStreamFlag { + public static FlagIdentity IDENTITY = new FlagIdentity(NS_URI, "in_band_bytestreams"); + + private HashMap active = new HashMap(); + + public void add_connection(Connection conn) { + active[conn.sid] = conn; + } + public Connection? get_connection(string sid) { + return active.has_key(sid) ? active[sid] : null; + } + public void remove_connection(Connection conn) { + active.unset(conn.sid); + } + + public override string get_ns() { return NS_URI; } + public override string get_id() { return IDENTITY.id; } +} + +} diff --git a/xmpp-vala/src/module/xep/0166_jingle.vala b/xmpp-vala/src/module/xep/0166_jingle.vala new file mode 100644 index 00000000..5c086399 --- /dev/null +++ b/xmpp-vala/src/module/xep/0166_jingle.vala @@ -0,0 +1,328 @@ +using Gee; +using Xmpp.Xep; +using Xmpp; + +namespace Xmpp.Xep.Jingle { + +private const string NS_URI = "urn:xmpp:jingle:1"; +private const string ERROR_NS_URI = "urn:xmpp:jingle:errors:1"; + +public errordomain CreateConnectionError { + BAD_REQUEST, + NOT_ACCEPTABLE, +} + +public errordomain Error { + GENERAL, + BAD_REQUEST, + INVALID_PARAMETERS, + UNSUPPORTED_TRANSPORT, + NO_SHARED_PROTOCOLS, + TRANSPORT_ERROR, +} + +public class Module : XmppStreamModule, Iq.Handler { + public static Xmpp.ModuleIdentity IDENTITY = new Xmpp.ModuleIdentity(NS_URI, "0166_jingle"); + + public override void attach(XmppStream stream) { + stream.add_flag(new Flag()); + stream.get_module(ServiceDiscovery.Module.IDENTITY).add_feature(stream, NS_URI); + stream.get_module(Iq.Module.IDENTITY).register_for_namespace(NS_URI, this); + } + public override void detach(XmppStream stream) { } + + public void add_transport(XmppStream stream, Transport transport) { + stream.get_flag(Flag.IDENTITY).add_transport(transport); + } + public Transport? select_transport(XmppStream stream, TransportType type, Jid receiver_full_jid) { + return stream.get_flag(Flag.IDENTITY).select_transport(stream, type, receiver_full_jid); + } + + private bool is_jingle_available(XmppStream stream, Jid full_jid) { + bool? has_jingle = stream.get_flag(ServiceDiscovery.Flag.IDENTITY).has_entity_feature(full_jid, NS_URI); + return has_jingle != null && has_jingle; + } + + public bool is_available(XmppStream stream, TransportType type, Jid full_jid) { + return is_jingle_available(stream, full_jid) && select_transport(stream, type, full_jid) != null; + } + + public Session create_session(XmppStream stream, TransportType type, Jid receiver_full_jid, Senders senders, string content_name, StanzaNode description) throws Error { + if (!is_jingle_available(stream, receiver_full_jid)) { + throw new Error.NO_SHARED_PROTOCOLS("No Jingle support"); + } + Transport? transport = select_transport(stream, type, receiver_full_jid); + if (transport == null) { + throw new Error.NO_SHARED_PROTOCOLS("No suitable transports"); + } + Jid? my_jid = stream.get_flag(Bind.Flag.IDENTITY).my_jid; + if (my_jid == null) { + throw new Error.GENERAL("Couldn't determine own JID"); + } + Session session = new Session(random_uuid(), type, receiver_full_jid); + StanzaNode content = new StanzaNode.build("content", NS_URI) + .put_attribute("creator", "initiator") + .put_attribute("name", content_name) + .put_attribute("senders", senders.to_string()) + .put_node(description) + .put_node(transport.to_transport_stanza_node()); + StanzaNode jingle = new StanzaNode.build("jingle", NS_URI) + .add_self_xmlns() + .put_attribute("action", "session-initiate") + .put_attribute("initiator", my_jid.to_string()) + .put_attribute("sid", session.sid) + .put_node(content); + Iq.Stanza iq = new Iq.Stanza.set(jingle) { to=receiver_full_jid }; + + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq, (stream, iq) => { + stream.get_flag(Flag.IDENTITY).add_session(session); + }); + + return session; + } + + public void on_iq_set(XmppStream stream, Iq.Stanza iq) { + StanzaNode? jingle = iq.stanza.get_subnode("jingle", NS_URI); + string? sid = jingle != null ? jingle.get_attribute("sid") : null; + string? action = jingle != null ? jingle.get_attribute("action") : null; + if (jingle == null || sid == null || action == null) { + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.bad_request("missing jingle node, sid or action"))); + return; + } + Session? session = stream.get_flag(Flag.IDENTITY).get_session(sid); + if (session == null) { + StanzaNode unknown_session = new StanzaNode.build("unknown-session", ERROR_NS_URI).add_self_xmlns(); + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.item_not_found(unknown_session))); + return; + } + session.handle_iq_set(stream, action, jingle, iq); + } + + public override string get_ns() { return NS_URI; } + public override string get_id() { return IDENTITY.id; } +} + +public enum TransportType { + DATAGRAM, + STREAMING, +} + +public enum Senders { + BOTH, + INITIATOR, + NONE, + RESPONDER; + + public string to_string() { + switch (this) { + case BOTH: return "both"; + case INITIATOR: return "initiator"; + case NONE: return "none"; + case RESPONDER: return "responder"; + } + assert_not_reached(); + } +} + +public interface Transport : Object { + public abstract bool is_transport_available(XmppStream stream, Jid full_jid); + public abstract TransportType transport_type(); + public abstract StanzaNode to_transport_stanza_node(); + public abstract Connection? create_transport_connection(XmppStream stream, Jid peer_full_jid, StanzaNode content) throws CreateConnectionError; +} + +public class Session { + public enum State { + PENDING, + ACTIVE, + ENDED, + } + + public State state { get; private set; } + Connection? conn; + + public string sid { get; private set; } + public Type type_ { get; private set; } + public Jid peer_full_jid { get; private set; } + + public Session(string sid, Type type, Jid peer_full_jid) { + this.state = PENDING; + this.conn = null; + this.sid = sid; + this.type_ = type; + this.peer_full_jid = peer_full_jid; + } + + public signal void on_error(XmppStream stream, Error error); + public signal void on_data(XmppStream stream, uint8[] data); + // Signals that the stream is ready to send (more) data. + public signal void on_ready(XmppStream stream); + + private void handle_error(XmppStream stream, Error error) { + if (state == PENDING || state == ACTIVE) { + StanzaNode reason = new StanzaNode.build("reason", NS_URI) + .put_node(new StanzaNode.build("general-error", NS_URI)) // TODO(hrxi): Is this the right error? + .put_node(new StanzaNode.build("text", NS_URI) + .put_node(new StanzaNode.text(error.message)) + ); + terminate(stream, reason); + } + } + + delegate void SendIq(Iq.Stanza iq); + public void handle_iq_set(XmppStream stream, string action, StanzaNode jingle, Iq.Stanza iq) { + SendIq send_iq = (iq) => stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq); + if (state != PENDING || action != "session-accept") { + return; + } + StanzaNode? content = jingle.get_subnode("content"); + if (content == null) { + // TODO(hrxi): here and below, should we terminate the session? + send_iq(new Iq.Stanza.error(iq, new ErrorStanza.bad_request("no content element"))); + return; + } + string? responder_str = jingle.get_attribute("responder"); + Jid responder; + if (responder_str != null) { + responder = Jid.parse(responder_str) ?? iq.from; + } else { + responder = iq.from; // TODO(hrxi): and above, can we assume iq.from != null + // TODO(hrxi): more sanity checking, perhaps replace who we're talking to + } + if (!responder.is_full()) { + send_iq(new Iq.Stanza.error(iq, new ErrorStanza.bad_request("invalid responder JID"))); + return; + } + try { + conn = stream.get_flag(Flag.IDENTITY).create_connection(stream, type_, peer_full_jid, content); + } catch (CreateConnectionError e) { + if (e is CreateConnectionError.BAD_REQUEST) { + send_iq(new Iq.Stanza.error(iq, new ErrorStanza.bad_request(e.message))); + } else if (e is CreateConnectionError.NOT_ACCEPTABLE) { + send_iq(new Iq.Stanza.error(iq, new ErrorStanza.not_acceptable(e.message))); + } + return; + } + send_iq(new Iq.Stanza.result(iq)); + if (conn == null) { + terminate(stream, new StanzaNode.build("reason", NS_URI) + .put_node(new StanzaNode.build("unsupported-transports", NS_URI))); + return; + } + conn.on_error.connect((stream, error) => on_error(stream, error)); + conn.on_data.connect((stream, data) => on_data(stream, data)); + conn.on_ready.connect((stream) => on_ready(stream)); + on_error.connect((stream, error) => handle_error(stream, error)); + conn.connect(stream); + state = ACTIVE; + } + + public void send(XmppStream stream, uint8[] data) { + if (state != ACTIVE) { + return; // TODO(hrxi): what to do? + } + conn.send(stream, data); + } + + public void set_application_error(XmppStream stream, StanzaNode? application_reason = null) { + StanzaNode reason = new StanzaNode.build("reason", NS_URI) + .put_node(new StanzaNode.build("failed-application", NS_URI)); + if (application_reason != null) { + reason.put_node(application_reason); + } + terminate(stream, reason); + } + + public void close_connection(XmppStream stream) { + if (state != ACTIVE) { + return; // TODO(hrxi): what to do? + } + conn.close(stream); + } + + public void terminate(XmppStream stream, StanzaNode reason) { + if (state != PENDING && state != ACTIVE) { + // TODO(hrxi): what to do? + return; + } + if (conn != null) { + conn.close(stream); + } + + StanzaNode jingle = new StanzaNode.build("jingle", NS_URI) + .add_self_xmlns() + .put_attribute("action", "session-terminate") + .put_attribute("sid", sid) + .put_node(reason); + Iq.Stanza iq = new Iq.Stanza.set(jingle) { to=peer_full_jid }; + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq); + + state = ENDED; + // Immediately remove the session from the open sessions as per the + // XEP, don't wait for confirmation. + stream.get_flag(Flag.IDENTITY).remove_session(sid); + } +} + +public abstract class Connection { + public Jid? peer_full_jid { get; private set; } + + public Connection(Jid peer_full_jid) { + this.peer_full_jid = peer_full_jid; + } + + public signal void on_error(XmppStream stream, Error error); + public signal void on_data(XmppStream stream, uint8[] data); + public signal void on_ready(XmppStream stream); + + public abstract void connect(XmppStream stream); + public abstract void send(XmppStream stream, uint8[] data); + public abstract void close(XmppStream stream); +} + +public class Flag : XmppStreamFlag { + public static FlagIdentity IDENTITY = new FlagIdentity(NS_URI, "jingle"); + + private Gee.List transports = new ArrayList(); + private HashMap sessions = new HashMap(); + + public void add_transport(Transport transport) { transports.add(transport); } + public Transport? select_transport(XmppStream stream, TransportType type, Jid receiver_full_jid) { + foreach (Transport transport in transports) { + if (transport.transport_type() != type) { + continue; + } + // TODO(hrxi): prioritization + if (transport.is_transport_available(stream, receiver_full_jid)) { + return transport; + } + } + return null; + } + public void add_session(Session session) { + sessions[session.sid] = session; + } + public Connection? create_connection(XmppStream stream, Type type, Jid peer_full_jid, StanzaNode content) throws CreateConnectionError { + foreach (Transport transport in transports) { + if (transport.transport_type() != type) { + continue; + } + Connection? conn = transport.create_transport_connection(stream, peer_full_jid, content); + if (conn != null) { + return conn; + } + } + return null; + } + public Session? get_session(string sid) { + return sessions.has_key(sid) ? sessions[sid] : null; + } + public void remove_session(string sid) { + sessions.unset(sid); + } + + public override string get_ns() { return NS_URI; } + public override string get_id() { return IDENTITY.id; } +} + +} diff --git a/xmpp-vala/src/module/xep/0234_jingle_file_transfer.vala b/xmpp-vala/src/module/xep/0234_jingle_file_transfer.vala new file mode 100644 index 00000000..cd249017 --- /dev/null +++ b/xmpp-vala/src/module/xep/0234_jingle_file_transfer.vala @@ -0,0 +1,100 @@ +using Gee; +using Xmpp; +using Xmpp.Xep; + +namespace Xmpp.Xep.JingleFileTransfer { + +private const string NS_URI = "urn:xmpp:jingle:apps:file-transfer:5"; + +public errordomain Error { + FILE_INACCESSIBLE, +} + +public class Module : XmppStreamModule { + public static Xmpp.ModuleIdentity IDENTITY = new Xmpp.ModuleIdentity(NS_URI, "0234_jingle_file_transfer"); + + public override void attach(XmppStream stream) { + stream.add_flag(new Flag()); + stream.get_module(ServiceDiscovery.Module.IDENTITY).add_feature(stream, NS_URI); + } + public override void detach(XmppStream stream) { } + + public bool is_available(XmppStream stream, Jid full_jid) { + bool? has_feature = stream.get_flag(ServiceDiscovery.Flag.IDENTITY).has_entity_feature(full_jid, NS_URI); + if (has_feature == null || !(!)has_feature) { + return false; + } + return stream.get_module(Jingle.Module.IDENTITY).is_available(stream, Jingle.TransportType.STREAMING, full_jid); + } + + public void offer_file(XmppStream stream, Jid receiver_full_jid, string path) throws Error { + File file = File.new_for_path(path); + FileInputStream input_stream; + int64 size; + try { + input_stream = file.read(); + } catch (GLib.Error e) { + throw new Error.FILE_INACCESSIBLE(@"could not open the file \"$path\" for reading: $(e.message)"); + } + try { + size = input_stream.query_info(FileAttribute.STANDARD_SIZE).get_size(); + } catch (GLib.Error e) { + throw new Error.FILE_INACCESSIBLE(@"could not read the size: $(e.message)"); + } + + offer_file_stream(stream, receiver_full_jid, input_stream, file.get_basename(), size); + } + + public void offer_file_stream(XmppStream stream, Jid receiver_full_jid, InputStream input_stream, string basename, int64 size) { + StanzaNode description = new StanzaNode.build("description", NS_URI) + .add_self_xmlns() + .put_node(new StanzaNode.build("file", NS_URI) + .put_node(new StanzaNode.build("name", NS_URI).put_node(new StanzaNode.text(basename))) + .put_node(new StanzaNode.build("size", NS_URI).put_node(new StanzaNode.text(size.to_string())))); + // TODO(hrxi): Add the mandatory hash field + + Jingle.Session? session = stream.get_module(Jingle.Module.IDENTITY) + .create_session(stream, Jingle.TransportType.STREAMING, receiver_full_jid, Jingle.Senders.INITIATOR, "a-file-offer", description); // TODO(hrxi): Why "a-file-offer"? + + FileTransfer transfer = new FileTransfer(input_stream); + session.on_ready.connect(transfer.send_data); + stream.get_flag(Flag.IDENTITY).add_file_transfer(transfer); + } + + public override string get_ns() { return NS_URI; } + public override string get_id() { return IDENTITY.id; } +} + +public class FileTransfer : Object { + InputStream input_stream; + public FileTransfer(InputStream input_stream) { + this.input_stream = input_stream; + } + public void send_data(Jingle.Session session, XmppStream stream) { + uint8 buffer[4096]; + ssize_t read; + try { + if((read = input_stream.read(buffer)) != 0) { + session.send(stream, buffer[0:read]); + } else { + session.close_connection(stream); + } + } catch (GLib.IOError e) { + session.set_application_error(stream); + } + // TODO(hrxi): remove file transfer + } +} + +public class Flag : XmppStreamFlag { + public static FlagIdentity IDENTITY = new FlagIdentity(NS_URI, "jingle_file_transfer"); + + private Gee.List transfers = new ArrayList(); + + public void add_file_transfer(FileTransfer transfer) { transfers.add(transfer); } + + public override string get_ns() { return NS_URI; } + public override string get_id() { return IDENTITY.id; } +} + +} diff --git a/xmpp-vala/src/module/xep/0261_jingle_in_band_bytestreams.vala b/xmpp-vala/src/module/xep/0261_jingle_in_band_bytestreams.vala new file mode 100644 index 00000000..57dbaaa3 --- /dev/null +++ b/xmpp-vala/src/module/xep/0261_jingle_in_band_bytestreams.vala @@ -0,0 +1,75 @@ +using Xmpp; +using Xmpp.Xep; + +namespace Xmpp.Xep.JingleInBandBytestreams { + +private const string NS_URI = "urn:xmpp:jingle:transports:ibb:1"; +private const int DEFAULT_BLOCKSIZE = 4096; + +public class Module : Jingle.Transport, XmppStreamModule { + public static Xmpp.ModuleIdentity IDENTITY = new Xmpp.ModuleIdentity(NS_URI, "0261_jingle_in_band_bytestreams"); + + public override void attach(XmppStream stream) { + stream.get_module(Jingle.Module.IDENTITY).add_transport(stream, this); + stream.get_module(ServiceDiscovery.Module.IDENTITY).add_feature(stream, NS_URI); + } + public override void detach(XmppStream stream) { } + + public override string get_ns() { return NS_URI; } + public override string get_id() { return IDENTITY.id; } + + public bool is_transport_available(XmppStream stream, Jid full_jid) { + bool? result = stream.get_flag(ServiceDiscovery.Flag.IDENTITY).has_entity_feature(full_jid, NS_URI); + return result != null && result; + } + + public Jingle.TransportType transport_type() { + return Jingle.TransportType.STREAMING; + } + public StanzaNode to_transport_stanza_node() { + return new StanzaNode.build("transport", NS_URI) + .add_self_xmlns() + .put_attribute("block-size", DEFAULT_BLOCKSIZE.to_string()) + .put_attribute("sid", random_uuid()); + } + + public Jingle.Connection? create_transport_connection(XmppStream stream, Jid peer_full_jid, StanzaNode content) throws Jingle.CreateConnectionError { + StanzaNode? transport = content.get_subnode("transport", NS_URI); + if (transport == null) { + return null; + } + string? sid = transport.get_attribute("sid"); + int block_size = transport.get_attribute_int("block-size"); + if (sid == null || block_size <= 0) { + throw new Jingle.CreateConnectionError.BAD_REQUEST("Invalid IBB parameters"); + } + if (block_size > DEFAULT_BLOCKSIZE) { + throw new Jingle.CreateConnectionError.NOT_ACCEPTABLE("Invalid IBB parameters: peer increased block size"); + } + return new Connection(peer_full_jid, new InBandBytestreams.Connection(peer_full_jid, sid, block_size)); + } +} + +public class Connection : Jingle.Connection { + InBandBytestreams.Connection inner; + + public Connection(Jid full_jid, InBandBytestreams.Connection inner) { + base(full_jid); + inner.on_error.connect((stream, error) => on_error(stream, new Jingle.Error.TRANSPORT_ERROR(error))); + inner.on_data.connect((stream, data) => on_data(stream, data)); + inner.on_ready.connect((stream) => on_ready(stream)); + this.inner = inner; + } + + public override void connect(XmppStream stream) { + inner.connect(stream); + } + public override void send(XmppStream stream, uint8[] data) { + inner.send(stream, data); + } + public override void close(XmppStream stream) { + inner.close(stream); + } +} + +} -- cgit v1.2.3-70-g09d2