summaryrefslogtreecommitdiff
path: root/.gitlab-ci/bare-metal/poe-set.py
blob: 77e5d74d51c14405e634771b39e04c2686ef2992 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#!/usr/bin/env python3

# Copyright © 2021 Igalia, S.L.
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the "Software"),
# to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense,
# and/or sell copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice (including the next
# paragraph) shall be included in all copies or substantial portions of the
# Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.

import argparse
import fcntl
import os
import telnetlib
import time

class Telnet:
    def __init__(self, args):
        self.retries = 3
        self.lock_file = "/var/run/poe.lock"
        self.debug = args.debug
        self.lock = None
        self.tn = None
        try:
            self.host = os.environ['BM_POE_ADDRESS']
            self.username = os.environ['BM_POE_USERNAME'].encode('ascii')
            self.password = os.environ['BM_POE_PASSWORD'].encode('ascii')
        except KeyError as k:
            raise OSError("envvar " + str(k) + " undefined")

    def login(self):
        # Sometimes login fails; retry 3 times before aborting
        logged = False
        for retry in range(self.retries):
            self.lock = open(self.lock_file, 'w')
            fcntl.flock(self.lock,  fcntl.LOCK_EX)
            self.tn = telnetlib.Telnet(self.host)
            self.tn.set_debuglevel(1 if self.debug else 0)
            self.tn.read_until(b'Password:')
            self.tn.write(self.username + b'\t' + self.password + b'\r')
            w = self.tn.read_until(b'Back', 3).decode('ascii')
            if w.endswith("Back"):
                logged = True
                break
            self.tn.close()
            self.lock.close()
            time.sleep(3)

        if not logged:
            raise OSError("Can not log in")

        self.tn.write(b'\x01')
        self.tn.read_until(b'>')
        self.tn.write(b'lcli\r')
        self.tn.read_until(b'User Name:')
        self.tn.write(self.username + b'\r')
        self.tn.read_until(b'Password:')
        self.tn.write(self.password + b'\r')
        self.tn.read_until(b'#')

    def logout(self):
        self.lock.close()
        self.tn.close()

    def select_port(self, port):
        self.tn.write(b'configure\r')
        self.tn.read_until(b'#')

        self.tn.write(b'interface GE ' + str(port).encode('ascii') + b'\r')
        self.tn.read_until(b'#')

    def poe_on(self):
        self.tn.write(b'power inline auto\r')
        self.tn.read_until(b'#')

    def poe_off(self):
        self.tn.write(b'power inline never\r')
        self.tn.read_until(b'#')

    def poe_reset(self):
        self.poe_off()
        time.sleep(3)
        self.poe_on()

def main():
    parser = argparse.ArgumentParser(description='Powers on/off switch port')
    parser.add_argument('-d', '--debug',
                        action='store_true',
                        help='Enable debug')
    parser.add_argument('port',
                        type=int,
                        help='Port to turn on/off')
    parser.add_argument('operation',
                        choices=['on', 'off', 'reset'],
                        help='Operation to perform')
    args = parser.parse_args()

    try:
        telnet = Telnet(args)

        telnet.login()

        telnet.select_port(args.port)
        if args.operation == "on":
            telnet.poe_on()
        elif args.operation == "off":
            telnet.poe_off()
        elif args.operation == "reset":
            telnet.poe_reset()

        telnet.logout()
    except Exception as e:
        print("Error! " + str(e))

if __name__ == '__main__':
    main()