summaryrefslogtreecommitdiff
path: root/tests/twisted/text/initiate-requestotron.py
blob: e1fd66a5a8458b6b1b3e32e8262d93aebd24f50b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
"""
Test text channel initiated by me, using Requests.
"""

import dbus

from sofiatest import exec_test
from servicetest import call_async, EventPattern, assertEquals, assertContains
import constants as cs

def test(q, bus, conn, stream):
    conn.Connect()
    q.expect('dbus-signal', signal='StatusChanged', args=[0, 1])

    self_handle = conn.Get(cs.CONN, 'SelfHandle', dbus_interface=cs.PROPERTIES_IFACE)
    self_uri = conn.inspect_contact_sync(self_handle)

    uri = 'sip:foo@bar.com'
    foo_handle = conn.get_contact_handle_sync(uri)

    properties = conn.GetAll(cs.CONN_IFACE_REQUESTS,
            dbus_interface=cs.PROPERTIES_IFACE)
    assertEquals([], properties.get('Channels'))

    assertContains(
            ({ cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT,
               cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT },
             [ cs.TARGET_HANDLE, cs.TARGET_ID ]), properties['RequestableChannelClasses'])

    requestotron = dbus.Interface(conn, cs.CONN_IFACE_REQUESTS)
    call_async(q, requestotron, 'CreateChannel',
            { cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_TEXT,
              cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT,
              cs.TARGET_HANDLE: foo_handle })

    ret, new_sig = q.expect_many(
        EventPattern('dbus-return', method='CreateChannel'),
        EventPattern('dbus-signal', signal='NewChannels'),
        )

    assert len(ret.value) == 2
    emitted_props = ret.value[1]
    assertEquals(cs.CHANNEL_TYPE_TEXT, emitted_props[cs.CHANNEL_TYPE])
    assertEquals(cs.HT_CONTACT, emitted_props[cs.TARGET_HANDLE_TYPE])
    assertEquals(foo_handle, emitted_props[cs.TARGET_HANDLE])
    assertEquals(uri, emitted_props[cs.TARGET_ID])
    assertEquals(True, emitted_props[cs.REQUESTED])
    assertEquals(self_handle, emitted_props[cs.INITIATOR_HANDLE])
    assertEquals(self_uri, emitted_props[cs.INITIATOR_ID])

    assert len(new_sig.args) == 1
    assert len(new_sig.args[0]) == 1        # one channel
    assert len(new_sig.args[0][0]) == 2     # two struct members
    assert new_sig.args[0][0][0] == ret.value[0]
    assert new_sig.args[0][0][1] == ret.value[1]

    properties = conn.GetAll(cs.CONN_IFACE_REQUESTS, dbus_interface=cs.PROPERTIES_IFACE)

    assert new_sig.args[0][0] in properties['Channels'], \
            (new_sig.args[0][0], properties['Channels'])

    conn.Disconnect()
    q.expect('dbus-signal', signal='StatusChanged', args=[2, 1])

if __name__ == '__main__':
    exec_test(test)