1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
|
"""Test IBB tube support in the context of a MUC."""
import base64
import dbus
from dbus.connection import Connection
from dbus.lowlevel import SignalMessage
from servicetest import call_async, EventPattern, assertContains, assertEquals, wrap_channel
from gabbletest import exec_test, acknowledge_iq, elem, make_muc_presence, sync_stream
import ns
import constants as cs
import tubetestutil as t
from twisted.words.xish import xpath
from mucutil import join_muc, echo_muc_presence
sample_parameters = dbus.Dictionary({
's': 'hello',
'ay': dbus.ByteArray('hello'),
'u': dbus.UInt32(123),
'i': dbus.Int32(-123),
}, signature='sv')
def check_tube_in_presence(presence, initiator):
tubes_nodes = xpath.queryForNodes('/presence/tubes[@xmlns="%s"]'
% ns.TUBES, presence)
assert tubes_nodes is not None
assert len(tubes_nodes) == 1
tube_nodes = xpath.queryForNodes('/tubes/tube', tubes_nodes[0])
assert tube_nodes is not None
assert len(tube_nodes) == 1
for tube in tube_nodes:
tube['type'] = 'dbus'
assert tube['initiator'] == initiator
assert tube['service'] == 'com.example.TestCase'
dbus_stream_id = tube['stream-id']
my_bus_name = tube['dbus-name']
dbus_tube_id = tube['id']
params = {}
parameter_nodes = xpath.queryForNodes('/tube/parameters/parameter', tube)
for node in parameter_nodes:
assert node['name'] not in params
params[node['name']] = (node['type'], str(node))
assert params == {'ay': ('bytes', 'aGVsbG8='),
's': ('str', 'hello'),
'i': ('int', '-123'),
'u': ('uint', '123'),
}
return dbus_stream_id, my_bus_name, dbus_tube_id
def fire_signal_on_tube(q, tube, chatroom, dbus_stream_id, my_bus_name):
signal = SignalMessage('/', 'foo.bar', 'baz')
signal.append(42, signature='u')
tube.send_message(signal)
event = q.expect('stream-message', to=chatroom,
message_type='groupchat')
message = event.stanza
data_nodes = xpath.queryForNodes('/message/data[@xmlns="%s"]' % ns.MUC_BYTESTREAM,
message)
assert data_nodes is not None
assert len(data_nodes) == 1
ibb_data = data_nodes[0]
assert ibb_data['sid'] == dbus_stream_id
binary = base64.b64decode(str(ibb_data))
# little and big endian versions of: SIGNAL, NO_REPLY, protocol v1,
# 4-byte payload
assert binary.startswith('l\x04\x01\x01' '\x04\x00\x00\x00') or \
binary.startswith('B\x04\x01\x01' '\x00\x00\x00\x04')
# little and big endian versions of the 4-byte payload, UInt32(42)
assert (binary[0] == 'l' and binary.endswith('\x2a\x00\x00\x00')) or \
(binary[0] == 'B' and binary.endswith('\x00\x00\x00\x2a'))
# XXX: verify that it's actually in the "sender" slot, rather than just
# being in the message somewhere
assert my_bus_name in binary
# Send another big signal which has to be split on 3 stanzas
signal = SignalMessage('/', 'foo.bar', 'baz')
signal.append('a' * 100000, signature='s')
tube.send_message(signal)
def wait_for_data(q):
event = q.expect('stream-message', to=chatroom,
message_type='groupchat')
data_nodes = xpath.queryForNodes('/message/data[@xmlns="%s"]' % ns.MUC_BYTESTREAM,
event.stanza)
ibb_data = data_nodes[0]
return ibb_data['frag']
frag = wait_for_data(q)
assertEquals(frag, 'first')
frag = wait_for_data(q)
assertEquals(frag, 'middle')
frag = wait_for_data(q)
assertEquals(frag, 'last')
def test(q, bus, conn, stream, access_control):
iq_event = q.expect('stream-iq', to=None, query_ns='vcard-temp',
query_name='vCard')
acknowledge_iq(stream, iq_event.stanza)
# check if we can request muc D-Bus tube
t.check_conn_properties(q, conn)
self_handle = conn.Properties.Get(cs.CONN, "SelfHandle")
self_name = conn.inspect_contact_sync(self_handle)
# offer a D-Bus tube to another room using new API
muc = 'chat2@conf.localhost'
request = {
cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_DBUS_TUBE,
cs.TARGET_HANDLE_TYPE: cs.HT_ROOM,
cs.TARGET_ID: 'chat2@conf.localhost',
cs.DBUS_TUBE_SERVICE_NAME: 'com.example.TestCase',
}
join_muc(q, bus, conn, stream, muc, request=request)
e = q.expect('dbus-signal', signal='NewChannel')
path, prop = e.args
assert prop[cs.CHANNEL_TYPE] == cs.CHANNEL_TYPE_DBUS_TUBE
assert prop[cs.INITIATOR_ID] == 'chat2@conf.localhost/test'
assert prop[cs.REQUESTED] == True
assert prop[cs.TARGET_HANDLE_TYPE] == cs.HT_ROOM
assert prop[cs.TARGET_ID] == 'chat2@conf.localhost'
assert prop[cs.DBUS_TUBE_SERVICE_NAME] == 'com.example.TestCase'
assert prop[cs.DBUS_TUBE_SUPPORTED_ACCESS_CONTROLS] == [cs.SOCKET_ACCESS_CONTROL_CREDENTIALS,
cs.SOCKET_ACCESS_CONTROL_LOCALHOST]
# check that the tube channel is in the channels list
all_channels = conn.Get(cs.CONN_IFACE_REQUESTS, 'Channels',
dbus_interface=cs.PROPERTIES_IFACE, byte_arrays=True)
assertContains((path, prop), all_channels)
tube_chan = wrap_channel(bus.get_object(conn.bus_name, path), 'DBusTube1')
tube_props = tube_chan.Properties.GetAll(cs.CHANNEL_IFACE_TUBE, byte_arrays=True)
assert tube_props['State'] == cs.TUBE_CHANNEL_STATE_NOT_OFFERED
# try to offer using a wrong access control
try:
tube_chan.DBusTube1.Offer(sample_parameters, cs.SOCKET_ACCESS_CONTROL_PORT)
except dbus.DBusException, e:
assertEquals(e.get_dbus_name(), cs.INVALID_ARGUMENT)
else:
assert False
# offer the tube
call_async(q, tube_chan.DBusTube1, 'Offer', sample_parameters, access_control)
presence_event, return_event, status_event, dbus_changed_event = q.expect_many(
EventPattern('stream-presence', to='chat2@conf.localhost/test', predicate=lambda e: t.presence_contains_tube(e)),
EventPattern('dbus-return', method='Offer'),
EventPattern('dbus-signal', signal='TubeChannelStateChanged', args=[cs.TUBE_CHANNEL_STATE_OPEN]),
EventPattern('dbus-signal', signal='DBusNamesChanged', interface=cs.CHANNEL_TYPE_DBUS_TUBE))
tube_self_handle = tube_chan.Properties.Get(cs.CHANNEL_IFACE_GROUP, 'SelfHandle')
assert tube_self_handle != 0
# handle presence_event
# We announce our newly created tube in our muc presence
presence = presence_event.stanza
dbus_stream_id, my_bus_name, dbus_tube_id = check_tube_in_presence(presence,
'chat2@conf.localhost/test')
# handle dbus_changed_event
added, removed = dbus_changed_event.args
assert added == {tube_self_handle: my_bus_name}
assert removed == []
dbus_tube_adr = return_event.value[0]
bob_bus_name = ':2.Ym9i'
bob_handle = conn.get_contact_handle_sync('chat2@conf.localhost/bob')
def bob_in_tube():
presence = elem('presence', from_='chat2@conf.localhost/bob', to='chat2@conf.localhost')(
elem('x', xmlns=ns.MUC_USER),
elem('tubes', xmlns=ns.TUBES)(
elem('tube', type='dbus', initiator='chat2@conf.localhost/test',
service='com.example.TestCase', id=str(dbus_tube_id))(
elem('parameters')(
elem('parameter', name='ay', type='bytes')(u'aGVsbG8='),
elem('parameter', name='s', type='str')(u'hello'),
elem('parameter', name='i', type='int')(u'-123'),
elem('parameter', name='u', type='uint')(u'123')
))))
# have to add stream-id and dbus-name attributes manually as we can't use
# keyword with '-'...
tube_node = xpath.queryForNodes('/presence/tubes/tube', presence)[0]
tube_node['stream-id'] = dbus_stream_id
tube_node['dbus-name'] = bob_bus_name
stream.send(presence)
# Bob joins the tube
bob_in_tube()
dbus_changed_event = q.expect('dbus-signal', signal='DBusNamesChanged',
interface=cs.CHANNEL_TYPE_DBUS_TUBE)
added, removed = dbus_changed_event.args
assert added == {bob_handle: bob_bus_name}
assert removed == []
tube = Connection(dbus_tube_adr)
fire_signal_on_tube(q, tube, 'chat2@conf.localhost', dbus_stream_id, my_bus_name)
names = tube_chan.Get(cs.CHANNEL_TYPE_DBUS_TUBE, 'DBusNames', dbus_interface=cs.PROPERTIES_IFACE)
assert names == {tube_self_handle: my_bus_name, bob_handle: bob_bus_name}
# Bob leave the tube
presence = elem('presence', from_='chat2@conf.localhost/bob', to='chat2@conf.localhost')(
elem('x', xmlns=ns.MUC_USER),
elem('tubes', xmlns=ns.TUBES))
stream.send(presence)
dbus_changed_event = q.expect('dbus-signal', signal='DBusNamesChanged',
interface=cs.CHANNEL_TYPE_DBUS_TUBE)
added, removed = dbus_changed_event.args
assert added == {}
assert removed == [bob_handle]
names = tube_chan.Get(cs.CHANNEL_TYPE_DBUS_TUBE, 'DBusNames', dbus_interface=cs.PROPERTIES_IFACE)
assert names == {tube_self_handle: my_bus_name}
tube_chan.Channel.Close()
_, _, event = q.expect_many(
EventPattern('dbus-signal', signal='Closed'),
EventPattern('dbus-signal', signal='ChannelClosed'),
EventPattern('stream-presence', to='chat2@conf.localhost/test',
presence_type='unavailable'))
# we must echo the MUC presence so the room will actually close
# and we should wait to make sure gabble has actually parsed our
# echo before trying to rejoin
echo_muc_presence(q, stream, event.stanza, 'none', 'participant')
sync_stream(q, stream)
# rejoin the room
call_async(q, conn.Requests, 'CreateChannel',
{ cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT,
cs.TARGET_HANDLE_TYPE: cs.HT_ROOM,
cs.TARGET_ID: 'chat2@conf.localhost' })
q.expect('stream-presence', to='chat2@conf.localhost/test')
# Bob is in the room and in the tube
bob_in_tube()
# Send presence for own membership of room.
stream.send(make_muc_presence('none', 'participant', muc, 'test'))
def new_tube(e):
path, props = e.args
return props[cs.CHANNEL_TYPE] == cs.CHANNEL_TYPE_DBUS_TUBE
def new_text(e):
path, props = e.args
return props[cs.CHANNEL_TYPE] == cs.CHANNEL_TYPE_TEXT
# tube and text is created
text_event, tube_event = q.expect_many(EventPattern('dbus-signal', signal='NewChannel',
predicate=new_text),
EventPattern('dbus-signal', signal='NewChannel',
predicate=new_tube))
tube_path, props = tube_event.args
assertEquals(cs.CHANNEL_TYPE_DBUS_TUBE, props[cs.CHANNEL_TYPE])
assertEquals('chat2@conf.localhost/test', props[cs.INITIATOR_ID])
assertEquals(False, props[cs.REQUESTED])
assertEquals(cs.HT_ROOM, props[cs.TARGET_HANDLE_TYPE])
assertEquals('com.example.TestCase', props[cs.DBUS_TUBE_SERVICE_NAME])
_, props = text_event.args
assertEquals(cs.CHANNEL_TYPE_TEXT, props[cs.CHANNEL_TYPE])
assertEquals(True, props[cs.REQUESTED])
# tube is local-pending
tube_chan = bus.get_object(conn.bus_name, tube_path)
state = tube_chan.Get(cs.CHANNEL_IFACE_TUBE, 'State',
dbus_interface=dbus.PROPERTIES_IFACE)
assertEquals(cs.TUBE_STATE_LOCAL_PENDING, state)
if __name__ == '__main__':
# We can't use t.exec_dbus_tube_test() as we can use only the muc bytestream
exec_test(lambda q, bus, conn, stream:
test(q, bus, conn, stream, cs.SOCKET_ACCESS_CONTROL_CREDENTIALS))
exec_test(lambda q, bus, conn, stream:
test(q, bus, conn, stream, cs.SOCKET_ACCESS_CONTROL_LOCALHOST))
|