summaryrefslogtreecommitdiff
path: root/tests/twisted/tubes/offer-muc-dbus-tube.py
blob: 0e138a22a037c129b6cbfaa3b4edc80061086c5d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
"""Test IBB tube support in the context of a MUC."""

import base64

import dbus
from dbus.connection import Connection
from dbus.lowlevel import SignalMessage

from servicetest import call_async, EventPattern, assertContains, assertEquals, wrap_channel
from gabbletest import exec_test, acknowledge_iq, elem, make_muc_presence, sync_stream
import ns
import constants as cs
import tubetestutil as t

from twisted.words.xish import xpath

from mucutil import join_muc, echo_muc_presence

sample_parameters = dbus.Dictionary({
    's': 'hello',
    'ay': dbus.ByteArray('hello'),
    'u': dbus.UInt32(123),
    'i': dbus.Int32(-123),
    }, signature='sv')

def check_tube_in_presence(presence, initiator):
    tubes_nodes = xpath.queryForNodes('/presence/tubes[@xmlns="%s"]'
        % ns.TUBES, presence)
    assert tubes_nodes is not None
    assert len(tubes_nodes) == 1

    tube_nodes = xpath.queryForNodes('/tubes/tube', tubes_nodes[0])
    assert tube_nodes is not None
    assert len(tube_nodes) == 1
    for tube in tube_nodes:
        tube['type'] = 'dbus'
        assert tube['initiator'] == initiator
        assert tube['service'] == 'com.example.TestCase'
        dbus_stream_id = tube['stream-id']
        my_bus_name = tube['dbus-name']
        dbus_tube_id = tube['id']

    params = {}
    parameter_nodes = xpath.queryForNodes('/tube/parameters/parameter', tube)
    for node in parameter_nodes:
        assert node['name'] not in params
        params[node['name']] = (node['type'], str(node))
    assert params == {'ay': ('bytes', 'aGVsbG8='),
                      's': ('str', 'hello'),
                      'i': ('int', '-123'),
                      'u': ('uint', '123'),
                     }

    return dbus_stream_id, my_bus_name, dbus_tube_id


def fire_signal_on_tube(q, tube, chatroom, dbus_stream_id, my_bus_name):
    signal = SignalMessage('/', 'foo.bar', 'baz')
    signal.append(42, signature='u')
    tube.send_message(signal)

    event = q.expect('stream-message', to=chatroom,
        message_type='groupchat')
    message = event.stanza

    data_nodes = xpath.queryForNodes('/message/data[@xmlns="%s"]' % ns.MUC_BYTESTREAM,
        message)
    assert data_nodes is not None
    assert len(data_nodes) == 1
    ibb_data = data_nodes[0]
    assert ibb_data['sid'] == dbus_stream_id
    binary = base64.b64decode(str(ibb_data))
    # little and big endian versions of: SIGNAL, NO_REPLY, protocol v1,
    # 4-byte payload
    assert binary.startswith('l\x04\x01\x01' '\x04\x00\x00\x00') or \
           binary.startswith('B\x04\x01\x01' '\x00\x00\x00\x04')
    # little and big endian versions of the 4-byte payload, UInt32(42)
    assert (binary[0] == 'l' and binary.endswith('\x2a\x00\x00\x00')) or \
           (binary[0] == 'B' and binary.endswith('\x00\x00\x00\x2a'))
    # XXX: verify that it's actually in the "sender" slot, rather than just
    # being in the message somewhere
    assert my_bus_name in binary

    # Send another big signal which has to be split on 3 stanzas
    signal = SignalMessage('/', 'foo.bar', 'baz')
    signal.append('a' * 100000, signature='s')
    tube.send_message(signal)

    def wait_for_data(q):
        event = q.expect('stream-message', to=chatroom,
            message_type='groupchat')

        data_nodes = xpath.queryForNodes('/message/data[@xmlns="%s"]' % ns.MUC_BYTESTREAM,
            event.stanza)
        ibb_data = data_nodes[0]

        return ibb_data['frag']

    frag = wait_for_data(q)
    assertEquals(frag, 'first')

    frag = wait_for_data(q)
    assertEquals(frag, 'middle')

    frag = wait_for_data(q)
    assertEquals(frag, 'last')

def test(q, bus, conn, stream, access_control):
    iq_event = q.expect('stream-iq', to=None, query_ns='vcard-temp',
            query_name='vCard')

    acknowledge_iq(stream, iq_event.stanza)

    # check if we can request muc D-Bus tube
    t.check_conn_properties(q, conn)

    self_handle = conn.Properties.Get(cs.CONN, "SelfHandle")
    self_name = conn.inspect_contact_sync(self_handle)

    # offer a D-Bus tube to another room using new API
    muc = 'chat2@conf.localhost'
    request = {
        cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_DBUS_TUBE,
        cs.TARGET_HANDLE_TYPE: cs.HT_ROOM,
        cs.TARGET_ID: 'chat2@conf.localhost',
        cs.DBUS_TUBE_SERVICE_NAME: 'com.example.TestCase',
    }
    join_muc(q, bus, conn, stream, muc, request=request)

    e = q.expect('dbus-signal', signal='NewChannel')

    path, prop = e.args
    assert prop[cs.CHANNEL_TYPE] == cs.CHANNEL_TYPE_DBUS_TUBE
    assert prop[cs.INITIATOR_ID] == 'chat2@conf.localhost/test'
    assert prop[cs.REQUESTED] == True
    assert prop[cs.TARGET_HANDLE_TYPE] == cs.HT_ROOM
    assert prop[cs.TARGET_ID] == 'chat2@conf.localhost'
    assert prop[cs.DBUS_TUBE_SERVICE_NAME] == 'com.example.TestCase'
    assert prop[cs.DBUS_TUBE_SUPPORTED_ACCESS_CONTROLS] == [cs.SOCKET_ACCESS_CONTROL_CREDENTIALS,
            cs.SOCKET_ACCESS_CONTROL_LOCALHOST]

    # check that the tube channel is in the channels list
    all_channels = conn.Get(cs.CONN_IFACE_REQUESTS, 'Channels',
        dbus_interface=cs.PROPERTIES_IFACE, byte_arrays=True)
    assertContains((path, prop), all_channels)

    tube_chan = wrap_channel(bus.get_object(conn.bus_name, path), 'DBusTube1')
    tube_props = tube_chan.Properties.GetAll(cs.CHANNEL_IFACE_TUBE, byte_arrays=True)

    assert tube_props['State'] == cs.TUBE_CHANNEL_STATE_NOT_OFFERED

    # try to offer using a wrong access control
    try:
        tube_chan.DBusTube1.Offer(sample_parameters, cs.SOCKET_ACCESS_CONTROL_PORT)
    except dbus.DBusException, e:
        assertEquals(e.get_dbus_name(), cs.INVALID_ARGUMENT)
    else:
        assert False

    # offer the tube
    call_async(q, tube_chan.DBusTube1, 'Offer', sample_parameters, access_control)

    presence_event, return_event, status_event, dbus_changed_event = q.expect_many(
        EventPattern('stream-presence', to='chat2@conf.localhost/test', predicate=lambda e: t.presence_contains_tube(e)),
        EventPattern('dbus-return', method='Offer'),
        EventPattern('dbus-signal', signal='TubeChannelStateChanged', args=[cs.TUBE_CHANNEL_STATE_OPEN]),
        EventPattern('dbus-signal', signal='DBusNamesChanged', interface=cs.CHANNEL_TYPE_DBUS_TUBE))

    tube_self_handle = tube_chan.Properties.Get(cs.CHANNEL_IFACE_GROUP, 'SelfHandle')
    assert tube_self_handle != 0

    # handle presence_event
    # We announce our newly created tube in our muc presence
    presence = presence_event.stanza
    dbus_stream_id, my_bus_name, dbus_tube_id = check_tube_in_presence(presence,
                                                                       'chat2@conf.localhost/test')

    # handle dbus_changed_event
    added, removed = dbus_changed_event.args
    assert added == {tube_self_handle: my_bus_name}
    assert removed == []

    dbus_tube_adr = return_event.value[0]

    bob_bus_name = ':2.Ym9i'
    bob_handle = conn.get_contact_handle_sync('chat2@conf.localhost/bob')

    def bob_in_tube():
        presence = elem('presence', from_='chat2@conf.localhost/bob', to='chat2@conf.localhost')(
            elem('x', xmlns=ns.MUC_USER),
            elem('tubes', xmlns=ns.TUBES)(
                elem('tube', type='dbus', initiator='chat2@conf.localhost/test',
                    service='com.example.TestCase', id=str(dbus_tube_id))(
                        elem('parameters')(
                            elem('parameter', name='ay', type='bytes')(u'aGVsbG8='),
                            elem('parameter', name='s', type='str')(u'hello'),
                            elem('parameter', name='i', type='int')(u'-123'),
                            elem('parameter', name='u', type='uint')(u'123')
                            ))))

        # have to add stream-id and dbus-name attributes manually as we can't use
        # keyword with '-'...
        tube_node = xpath.queryForNodes('/presence/tubes/tube', presence)[0]
        tube_node['stream-id'] = dbus_stream_id
        tube_node['dbus-name'] = bob_bus_name
        stream.send(presence)

    # Bob joins the tube
    bob_in_tube()

    dbus_changed_event = q.expect('dbus-signal', signal='DBusNamesChanged',
        interface=cs.CHANNEL_TYPE_DBUS_TUBE)

    added, removed = dbus_changed_event.args
    assert added == {bob_handle: bob_bus_name}
    assert removed == []

    tube = Connection(dbus_tube_adr)
    fire_signal_on_tube(q, tube, 'chat2@conf.localhost', dbus_stream_id, my_bus_name)

    names = tube_chan.Get(cs.CHANNEL_TYPE_DBUS_TUBE, 'DBusNames', dbus_interface=cs.PROPERTIES_IFACE)
    assert names == {tube_self_handle: my_bus_name, bob_handle: bob_bus_name}

    # Bob leave the tube
    presence = elem('presence', from_='chat2@conf.localhost/bob', to='chat2@conf.localhost')(
        elem('x', xmlns=ns.MUC_USER),
        elem('tubes', xmlns=ns.TUBES))
    stream.send(presence)

    dbus_changed_event = q.expect('dbus-signal', signal='DBusNamesChanged',
        interface=cs.CHANNEL_TYPE_DBUS_TUBE)

    added, removed = dbus_changed_event.args
    assert added == {}
    assert removed == [bob_handle]

    names = tube_chan.Get(cs.CHANNEL_TYPE_DBUS_TUBE, 'DBusNames', dbus_interface=cs.PROPERTIES_IFACE)
    assert names == {tube_self_handle: my_bus_name}

    tube_chan.Channel.Close()
    _, _, event = q.expect_many(
        EventPattern('dbus-signal', signal='Closed'),
        EventPattern('dbus-signal', signal='ChannelClosed'),
        EventPattern('stream-presence', to='chat2@conf.localhost/test',
                     presence_type='unavailable'))

    # we must echo the MUC presence so the room will actually close
    # and we should wait to make sure gabble has actually parsed our
    # echo before trying to rejoin
    echo_muc_presence(q, stream, event.stanza, 'none', 'participant')
    sync_stream(q, stream)

    # rejoin the room
    call_async(q, conn.Requests, 'CreateChannel',
        { cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT,
          cs.TARGET_HANDLE_TYPE: cs.HT_ROOM,
          cs.TARGET_ID: 'chat2@conf.localhost' })

    q.expect('stream-presence', to='chat2@conf.localhost/test')

    # Bob is in the room and in the tube
    bob_in_tube()

    # Send presence for own membership of room.
    stream.send(make_muc_presence('none', 'participant', muc, 'test'))

    def new_tube(e):
        path, props = e.args
        return props[cs.CHANNEL_TYPE] == cs.CHANNEL_TYPE_DBUS_TUBE

    def new_text(e):
        path, props = e.args
        return props[cs.CHANNEL_TYPE] == cs.CHANNEL_TYPE_TEXT

    # tube and text is created
    text_event, tube_event = q.expect_many(EventPattern('dbus-signal', signal='NewChannel',
                                                        predicate=new_text),
                                           EventPattern('dbus-signal', signal='NewChannel',
                                                        predicate=new_tube))

    tube_path, props = tube_event.args
    assertEquals(cs.CHANNEL_TYPE_DBUS_TUBE, props[cs.CHANNEL_TYPE])
    assertEquals('chat2@conf.localhost/test', props[cs.INITIATOR_ID])
    assertEquals(False, props[cs.REQUESTED])
    assertEquals(cs.HT_ROOM, props[cs.TARGET_HANDLE_TYPE])
    assertEquals('com.example.TestCase', props[cs.DBUS_TUBE_SERVICE_NAME])

    _, props = text_event.args
    assertEquals(cs.CHANNEL_TYPE_TEXT, props[cs.CHANNEL_TYPE])
    assertEquals(True, props[cs.REQUESTED])

    # tube is local-pending
    tube_chan = bus.get_object(conn.bus_name, tube_path)
    state = tube_chan.Get(cs.CHANNEL_IFACE_TUBE, 'State',
            dbus_interface=dbus.PROPERTIES_IFACE)
    assertEquals(cs.TUBE_STATE_LOCAL_PENDING, state)

if __name__ == '__main__':
    # We can't use t.exec_dbus_tube_test() as we can use only the muc bytestream
    exec_test(lambda q, bus, conn, stream:
        test(q, bus, conn, stream, cs.SOCKET_ACCESS_CONTROL_CREDENTIALS))
    exec_test(lambda q, bus, conn, stream:
        test(q, bus, conn, stream, cs.SOCKET_ACCESS_CONTROL_LOCALHOST))