#!/usr/bin/env python # vim: set fileencoding=utf-8 sts=4 sw=4 et : """ The world's worst XMPP console user interface. Pass it a Gabble account name; type some words; get minimalistic error reporting. Copyright © 2011–2013 Collabora Ltd. This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this library; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA """ import sys from xml.dom import minidom from gi.repository import Gtk, GLib, Gio, GtkSource from gi.repository import TelepathyGLib as Tp PADDING = 6 CONSOLE_IFACE = "org.freedesktop.Telepathy.Gabble.Plugin.Console" class StanzaViewer(Gtk.ScrolledWindow): def __init__(self): Gtk.ScrolledWindow.__init__(self) self.b = GtkSource.Buffer() self.view = GtkSource.View.new_with_buffer(self.b) self.b.set_language( GtkSource.LanguageManager.get_default().get_language('xml')) self.b.set_highlight_matching_brackets(False) self.view.set_editable(False) self.view.set_wrap_mode(Gtk.WrapMode.WORD_CHAR) self.view.set_property('expand', True) self.add(self.view) def clear(self): self.b.set_text("") def append_stanza(self, xml): pretty = minidom.parseString(xml).toprettyxml() pretty = pretty.replace('\n', '') i = self.b.get_end_iter() self.b.insert(i, pretty + '\n') def append_comment(self, text): i = self.b.get_end_iter() self.b.insert(i, '\n' % text) def tell_me_everything(self): return self.b.get_property('text') class SpinWrapper(Gtk.Notebook): PRIMARY_PAGE = 0 SPINNER_PAGE = 1 def __init__(self, main_widget): Gtk.Notebook.__init__(self) self.set_show_tabs(False) self.set_show_border(False) self.insert_page(main_widget, None, self.PRIMARY_PAGE) self.spinner = Gtk.Spinner() self.spinner.set_property('halign', Gtk.Align.CENTER) self.spinner.set_property('valign', Gtk.Align.CENTER) self.spinner.set_property('width-request', 32) self.spinner.set_property('height-request', 32) self.insert_page(self.spinner, None, self.SPINNER_PAGE) def start_spinning(self): self.set_current_page(self.SPINNER_PAGE) self.spinner.start() def stop_spinning(self): self.spinner.stop() self.set_current_page(self.PRIMARY_PAGE) class Page(Gtk.Grid): def __init__(self, console_proxy): Gtk.Grid.__init__(self) self.console_proxy = console_proxy self.set_column_spacing(PADDING) self.set_row_spacing(PADDING) def add_title(self, title, below=None): label = Gtk.Label() label.set_markup("%s" % title) label.set_property('xalign', 0) if below is None: self.attach(label, 0, 0, 2, 1) else: self.attach_next_to(label, below, Gtk.PositionType.BOTTOM, 2, 1) return label def add_label(self, title, below=None): label = Gtk.Label(title) label.set_property('margin-left', PADDING) label.set_property('xalign', 0) if below is None: self.attach(label, 0, 0, 1, 1) else: self.attach_next_to(label, below, Gtk.PositionType.BOTTOM, 1, 1) return label class IQPage(Page): def __init__(self, console_proxy): Page.__init__(self, console_proxy) request_label = self.add_title("Request") recipient_label, recipient_entry = self.add_label_entry_pair( 'To:', below=request_label) self.recipient_entry = recipient_entry type_label = self.add_label('IQ Type:', below=recipient_label) self.get_button = Gtk.RadioButton.new_with_label([], "get") self.get_button.set_active(True) self.set_button = Gtk.RadioButton.new_with_label_from_widget( self.get_button, "set") box = Gtk.ButtonBox.new(Gtk.Orientation.HORIZONTAL) box.set_layout(Gtk.ButtonBoxStyle.START) box.add(self.get_button) box.add(self.set_button) self.attach_next_to(box, type_label, Gtk.PositionType.RIGHT, 1, 1) body_label, body_entry = self.add_label_entry_pair( 'Body:', below=type_label) body_entry.set_text( "") body_entry.set_icon_from_stock( Gtk.EntryIconPosition.SECONDARY, Gtk.STOCK_GO_FORWARD) body_entry.set_icon_tooltip_text( Gtk.EntryIconPosition.SECONDARY, "Send this IQ") self.body_entry = body_entry reply_label = self.add_title("Reply", below=body_label) self.stanza_viewer = StanzaViewer() self.stanza_viewer.append_comment("send a request to see the reply here") self.result_nb = SpinWrapper(self.stanza_viewer) self.attach_next_to(self.result_nb, reply_label, Gtk.PositionType.BOTTOM, 2, 1) body_entry.connect('activate', self.send_iq) body_entry.connect('icon-release', self.send_iq) def add_label_entry_pair(self, title, below): label = self.add_label(title, below) entry = Gtk.Entry() entry.set_property('margin-right', PADDING) entry.set_property('hexpand', True) self.attach_next_to(entry, label, Gtk.PositionType.RIGHT, 1, 1) return label, entry def send_iq(self, *misc): type = 'get' if self.get_button.get_active() else 'set' to = self.recipient_entry.get_text() body = self.body_entry.get_text() self.console_proxy.SendIQ('(sss)', type, to, body, result_handler=self.send_iq_cb) self.result_nb.start_spinning() def send_iq_cb(self, proxy, result, user_data): self.stanza_viewer.clear() if isinstance(result, Exception): self.stanza_viewer.append_comment("error:\n%s" % result) else: reply_type, reply = result self.stanza_viewer.append_stanza(reply) self.result_nb.stop_spinning() class StanzaPage(Page): def __init__(self, console_proxy): Page.__init__(self, console_proxy) title = self.add_title("Enter a complete stanza:") self.sv = StanzaViewer() self.sv.view.set_editable(True) self.sv.append_stanza("Been on any nice boats recently?") self.spin_wrapper = SpinWrapper(self.sv) self.attach_next_to(self.spin_wrapper, title, Gtk.PositionType.BOTTOM, 2, 1) self.result_label = self.add_label('', self.spin_wrapper) self.result_label.set_property('hexpand', True) self.result_label.set_line_wrap(True) b = Gtk.Button.new_with_mnemonic("_Send") b.connect('clicked', self.__send_stanza) b.set_property('hexpand', False) self.attach_next_to(b, self.result_label, Gtk.PositionType.RIGHT, 1, 1) def __send_stanza(self, button): self.console_proxy.SendStanza('(s)', self.sv.tell_me_everything(), result_handler=self.__send_stanza_cb) self.spin_wrapper.start_spinning() def __send_stanza_cb(self, proxy, result, user_data): if isinstance(result, Exception): # FIXME: this sucks. You can't just get the free text bit without # the D-Bus error bit. t = result.message else: t = "yes sir, captain tightpants" self.result_label.set_text(t) self.spin_wrapper.stop_spinning() class SnoopyPage(Page): def __init__(self, console_proxy): Page.__init__(self, console_proxy) label = self.add_label("Stanza monitor:") label.set_property('hexpand', True) switch = Gtk.Switch() self.attach_next_to(switch, label, Gtk.PositionType.RIGHT, 1, 1) self.stanza_viewer = StanzaViewer() self.attach_next_to(self.stanza_viewer, label, Gtk.PositionType.BOTTOM, 2, 1) switch.set_active(self.get_remote_active()) switch.connect('notify::active', self.__switch_switched_cb) self.console_proxy.connect('g-signal', self.__g_signal_cb) def teardown(self): """Turn off the monitor when we quit.""" self.__set_spew(False) def __set_spew(self, spew): args = GLib.Variant("(ssv)", (CONSOLE_IFACE, "SpewStanzas", GLib.Variant.new_boolean(spew))) self.console_proxy.call_sync( "org.freedesktop.DBus.Properties.Set", args, 0, -1, None) def get_remote_active(self): return self.console_proxy.get_cached_property('SpewStanzas').get_boolean() def __switch_switched_cb(self, switch, pspec): remote = self.get_remote_active() new_local = switch.get_active() if new_local != remote: self.__set_spew(new_local) self.stanza_viewer.append_comment( 'started monitoring' if new_local else 'stopped monitoring') def __g_signal_cb(self, console_proxy, sender_name, signal_name, parameters): if signal_name in ['StanzaSent', 'StanzaReceived']: outgoing = (signal_name == 'StanzaSent') xml, = parameters self.stanza_viewer.append_comment('sent' if outgoing else 'received') self.stanza_viewer.append_stanza(xml) class Window(Gtk.Window): IQ_PAGE = 0 STANZA_PAGE = 1 SNOOPY_PAGE = 2 def __init__(self, account): Gtk.Window.__init__(self) self.set_title('XMPP Console') self.set_default_size(600, 371) request = Tp.AccountChannelRequest.new( account, { Tp.PROP_CHANNEL_CHANNEL_TYPE: CONSOLE_IFACE }, 0) request.create_and_handle_channel_async(None, self.__create_cb, None) self.connect('destroy', Window.__destroy_cb) def __build_ui(self): # Build up the UI self.grid = Gtk.Grid() self.add(self.grid) self.nb = Gtk.Notebook() self.grid.attach(self.nb, 0, 0, 1, 1) self.iq = IQPage(self.console_proxy) self.nb.insert_page(self.iq, Gtk.Label.new_with_mnemonic("_IQ console"), self.IQ_PAGE) self.stanza = StanzaPage(self.console_proxy) self.nb.insert_page(self.stanza, Gtk.Label.new_with_mnemonic("Send a s_tanza"), self.STANZA_PAGE) self.snoopy = SnoopyPage(self.console_proxy) self.nb.insert_page(self.snoopy, Gtk.Label.new_with_mnemonic("_Monitor network traffic"), self.SNOOPY_PAGE) self.infobar = Gtk.InfoBar() self.infobar.set_message_type(Gtk.MessageType.WARNING) self.infobar.set_no_show_all(True) label = Gtk.Label("The connection went away! Time to leave.") label.show() self.infobar.get_content_area().add(label) self.infobar_close_button = self.infobar.add_button("Close", Gtk.ResponseType.CLOSE) self.infobar.connect('response', lambda infobar, response: Gtk.main_quit()) self.infobar.connect('close', lambda infobar: Gtk.main_quit()) self.grid.attach_next_to(self.infobar, self.nb, Gtk.PositionType.BOTTOM, 1, 1) def __create_cb(self, request, result, _): try: channel, context = request.create_and_handle_channel_finish(result) channel.connect('invalidated', self.__channel_invalidated_cb) bus_name = channel.get_bus_name() sidecar_path = channel.get_object_path() bus = Gio.bus_get_sync(Gio.BusType.SESSION, None) self.console_proxy = Gio.DBusProxy.new_sync(bus, 0, None, bus_name, sidecar_path, CONSOLE_IFACE, None) except GLib.GError as e: print(""" Couldn't connect to the XMPP console interface on '%(name)s': %(e)s Check that you have the console plugin installed.""" % { 'name': request.get_account().get_path_suffix(), 'e': e, }) raise SystemExit(2) self.__build_ui() self.show_all() def __channel_invalidated_cb(self, channel, domain, code, message): self.infobar.show() self.infobar_close_button.grab_focus() self.nb.set_sensitive(False) # TODO: try to reconnect? def __destroy_cb(self): try: self.snoopy.teardown() except GLib.GError, e: print("Couldn't turn off the monitor (maybe the connection went away?)") print(e) Gtk.main_quit() def usage(am): xmpp_accounts = sorted( account.get_path_suffix() for account in am.dup_valid_accounts() if account.get_cm_name() == 'gabble') print(""" Usage: %(arg0)s gabble/jabber/blahblah Here are some account identifiers: %(accounts)s """ % { 'arg0': sys.argv[0], 'accounts': '\n '.join(xmpp_accounts), }) raise SystemExit(1) def am_prepared_cb(am, result, account_suffix): try: am.prepare_finish(result) except GLib.GError as e: print(e) raise SystemExit(2) if account_suffix is None: usage(am) for account in am.dup_valid_accounts(): if account.get_path_suffix() == account_suffix: if account.get_connection() is None: print("%s is not online." % account_suffix) raise SystemExit(2) else: win = Window(account) return usage(am) if __name__ == '__main__': account_suffix = sys.argv[1] if len(sys.argv) == 2 else None am = Tp.AccountManager.dup() am.prepare_async([], am_prepared_cb, account_suffix) Gtk.main() """ .,,. ,;;*;;;;, .-'``;-');;. /' .-. /*;; .' \d \;; .;;;, / o ` \; ,__. ,;*;;;*;, \__, _.__,' \_.-') __)--.;;;;;*;;;;, `""`;;;\ /-')_) __) `\' ';;;;;; ;*;;; -') `)_) |\ | ;;;;*; ;;;;| `---` O | | ;;*;;; *;*;\| O / ;;;;;* ;;;;;/| .-------\ / ;*;;;;; ;;;*;/ \ | '. (`. ;;;*;;; ;;;;;'. ; | ) \ | ;;;;;; ,;*;;;;\/ |. / /` | ';;;*; ;;;;;;/ |/ / /__/ ';;; '*jgs/ | / | ;*; `""""` `""""` ;' """