summaryrefslogtreecommitdiff
path: root/.gitlab-ci/lava/lava_job_submitter.py
blob: a8f2ace8deddc111c39b949d8460af23ddc50d2a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
#!/usr/bin/env python3
#
# Copyright (C) 2020, 2021 Collabora Limited
# Author: Gustavo Padovan <gustavo.padovan@collabora.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the "Software"),
# to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense,
# and/or sell copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice (including the next
# paragraph) shall be included in all copies or substantial portions of the
# Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

"""Send a job to LAVA, track it and collect log back"""

import argparse
import lavacli
import os
import sys
import time
import traceback
import urllib.parse
import xmlrpc
import yaml

from datetime import datetime, timedelta
from lavacli.utils import loader

# Timeout in minutes to decide if the device from the dispatched LAVA job has
# hung or not due to the lack of new log output.
DEVICE_HANGING_TIMEOUT_MIN = 5

# How many seconds the script should wait before try a new polling iteration to
# check if the dispatched LAVA job is running or waiting in the job queue.
WAIT_FOR_DEVICE_POLLING_TIME_SEC = 10

# How many seconds to wait between log output LAVA RPC calls.
LOG_POLLING_TIME_SEC = 5

# How many retries should be made when a timeout happen.
NUMBER_OF_RETRIES_TIMEOUT_DETECTION = 2


def print_log(msg):
    print("{}: {}".format(datetime.now(), msg))

def fatal_err(msg):
    print_log(msg)
    sys.exit(1)

def generate_lava_yaml(args):
    # General metadata and permissions, plus also inexplicably kernel arguments
    values = {
        'job_name': 'mesa: {}'.format(args.pipeline_info),
        'device_type': args.device_type,
        'visibility': { 'group': [ args.visibility_group ] },
        'priority': 75,
        'context': {
            'extra_nfsroot_args': ' init=/init rootwait minio_results={}'.format(args.job_artifacts_base)
        },
        'timeouts': {
            'job': {
                'minutes': 30
            }
        },
    }

    if args.lava_tags:
        values['tags'] = args.lava_tags.split(',')

    # URLs to our kernel rootfs to boot from, both generated by the base
    # container build
    deploy = {
      'timeout': { 'minutes': 10 },
      'to': 'tftp',
      'os': 'oe',
      'kernel': {
        'url': '{}/{}'.format(args.base_system_url_prefix, args.kernel_image_name),
      },
      'nfsrootfs': {
        'url': '{}/lava-rootfs.tgz'.format(args.base_system_url_prefix),
        'compression': 'gz',
      }
    }
    if args.kernel_image_type:
        deploy['kernel']['type'] = args.kernel_image_type
    if args.dtb:
        deploy['dtb'] = {
          'url': '{}/{}.dtb'.format(args.base_system_url_prefix, args.dtb)
        }

    # always boot over NFS
    boot = {
      'timeout': { 'minutes': 25 },
      'method': args.boot_method,
      'commands': 'nfs',
      'prompts': ['lava-shell:'],
    }

    # skeleton test definition: only declaring each job as a single 'test'
    # since LAVA's test parsing is not useful to us
    test = {
      'timeout': { 'minutes': 30 },
      'failure_retry': 1,
      'definitions': [ {
        'name': 'mesa',
        'from': 'inline',
        'path': 'inline/mesa.yaml',
        'repository': {
          'metadata': {
            'name': 'mesa',
            'description': 'Mesa test plan',
            'os': [ 'oe' ],
            'scope': [ 'functional' ],
            'format': 'Lava-Test Test Definition 1.0',
          },
          'parse': {
            'pattern': r'hwci: (?P<test_case_id>\S*):\s+(?P<result>(pass|fail))'
          },
          'run': {
          },
        },
      } ],
    }

    # job execution script:
    #   - inline .gitlab-ci/common/init-stage1.sh
    #   - fetch and unpack per-pipeline build artifacts from build job
    #   - fetch and unpack per-job environment from lava-submit.sh
    #   - exec .gitlab-ci/common/init-stage2.sh 
    init_lines = []
    with open(args.first_stage_init, 'r') as init_sh:
      init_lines += [ x.rstrip() for x in init_sh if not x.startswith('#') and x.rstrip() ]
    init_lines += [
      'mkdir -p {}'.format(args.ci_project_dir),
      'wget -S --progress=dot:giga -O- {} | tar -xz -C {}'.format(args.mesa_build_url, args.ci_project_dir),
      'wget -S --progress=dot:giga -O- {} | tar -xz -C /'.format(args.job_rootfs_overlay_url),
      'set +x',
      'export CI_JOB_JWT="{}"'.format(args.jwt),
      'set -x',
      'exec /init-stage2.sh',
    ]
    test['definitions'][0]['repository']['run']['steps'] = init_lines

    values['actions'] = [
      { 'deploy': deploy },
      { 'boot': boot },
      { 'test': test },
    ]

    return yaml.dump(values, width=10000000)


def setup_lava_proxy():
    config = lavacli.load_config("default")
    uri, usr, tok = (config.get(key) for key in ("uri", "username", "token"))
    uri_obj = urllib.parse.urlparse(uri)
    uri_str = "{}://{}:{}@{}{}".format(uri_obj.scheme, usr, tok, uri_obj.netloc, uri_obj.path)
    transport = lavacli.RequestsTransport(
        uri_obj.scheme,
        config.get("proxy"),
        config.get("timeout", 120.0),
        config.get("verify_ssl_cert", True),
    )
    proxy = xmlrpc.client.ServerProxy(
        uri_str, allow_none=True, transport=transport)

    print_log("Proxy for {} created.".format(config['uri']))

    return proxy


def _call_proxy(fn, *args):
    retries = 60
    for n in range(1, retries + 1):
        try:
            return fn(*args)
        except xmlrpc.client.ProtocolError as err:
            if n == retries:
                traceback.print_exc()
                fatal_err("A protocol error occurred (Err {} {})".format(err.errcode, err.errmsg))
            else:
                time.sleep(15)
                pass
        except xmlrpc.client.Fault as err:
            traceback.print_exc()
            fatal_err("FATAL: Fault: {} (code: {})".format(err.faultString, err.faultCode))


def get_job_results(proxy, job_id, test_suite, test_case):
    # Look for infrastructure errors and retry if we see them.
    results_yaml = _call_proxy(proxy.results.get_testjob_results_yaml, job_id)
    results = yaml.load(results_yaml, Loader=loader(False))
    for res in results:
        metadata = res['metadata']
        if not 'result' in metadata or metadata['result'] != 'fail':
            continue
        if 'error_type' in metadata and metadata['error_type'] == "Infrastructure":
            print_log("LAVA job {} failed with Infrastructure Error. Retry.".format(job_id))
            return False
        if 'case' in metadata and metadata['case'] == "validate":
            print_log("LAVA job {} failed validation (possible download error). Retry.".format(job_id))
            return False

    results_yaml = _call_proxy(proxy.results.get_testcase_results_yaml, job_id, test_suite, test_case)
    results = yaml.load(results_yaml, Loader=loader(False))
    if not results:
        fatal_err("LAVA: no result for test_suite '{}', test_case '{}'".format(test_suite, test_case))

    print_log("LAVA: result for test_suite '{}', test_case '{}': {}".format(test_suite, test_case, results[0]['result']))
    if results[0]['result'] != 'pass':
        fatal_err("FAIL")

    return True

def wait_until_job_is_started(proxy, job_id):
    print_log(f"Waiting for job {job_id} to start.")
    current_state = "Submitted"
    waiting_states = ["Submitted", "Scheduling", "Scheduled"]
    while current_state in waiting_states:
        job_state = _call_proxy(proxy.scheduler.job_state, job_id)
        current_state = job_state["job_state"]

        time.sleep(WAIT_FOR_DEVICE_POLLING_TIME_SEC)
    print_log(f"Job {job_id} started.")

def follow_job_execution(proxy, job_id):
    line_count = 0
    finished = False
    last_time_logs = datetime.now()
    while not finished:
        (finished, data) = _call_proxy(proxy.scheduler.jobs.logs, job_id, line_count)
        logs = yaml.load(str(data), Loader=loader(False))
        if logs:
            # Reset the timeout
            last_time_logs = datetime.now()
            for line in logs:
                print("{} {}".format(line["dt"], line["msg"]))

            line_count += len(logs)

        else:
            time_limit = timedelta(minutes=DEVICE_HANGING_TIMEOUT_MIN)
            if datetime.now() - last_time_logs > time_limit:
                print_log("LAVA job {} doesn't advance (machine got hung?). Retry.".format(job_id))
                return False

        # `proxy.scheduler.jobs.logs` does not block, even when there is no
        # new log to be fetched. To avoid dosing the LAVA dispatcher
        # machine, let's add a sleep to save them some stamina.
        time.sleep(LOG_POLLING_TIME_SEC)

    return True

def show_job_data(proxy, job_id):
    show = _call_proxy(proxy.scheduler.jobs.show, job_id)
    for field, value in show.items():
        print("{}\t: {}".format(field, value))


def validate_job(proxy, job_file):
    try:
        return _call_proxy(proxy.scheduler.jobs.validate, job_file, True)
    except:
        return False

def submit_job(proxy, job_file):
    return _call_proxy(proxy.scheduler.jobs.submit, job_file)


def main(args):
    proxy = setup_lava_proxy()

    yaml_file = generate_lava_yaml(args)

    if args.dump_yaml:
        censored_args = args
        censored_args.jwt = "jwt-hidden"
        print(generate_lava_yaml(censored_args))

    if args.validate_only:
        ret = validate_job(proxy, yaml_file)
        if not ret:
            fatal_err("Error in LAVA job definition")
        print("LAVA job definition validated successfully")
        return

    retry_count = NUMBER_OF_RETRIES_TIMEOUT_DETECTION

    while retry_count >= 0:
        job_id = submit_job(proxy, yaml_file)

        print_log("LAVA job id: {}".format(job_id))

        wait_until_job_is_started(proxy, job_id)

        if not follow_job_execution(proxy, job_id):
            print_log(f"Job {job_id} has timed out. Cancelling it.")
            # Cancel the job as it is considered unreachable by Mesa CI.
            proxy.scheduler.jobs.cancel(job_id)

            retry_count -= 1
            continue

        show_job_data(proxy, job_id)

        if get_job_results(proxy,  job_id, "0_mesa", "mesa") == True:
             break


if __name__ == '__main__':
    # given that we proxy from DUT -> LAVA dispatcher -> LAVA primary -> us ->
    # GitLab runner -> GitLab primary -> user, safe to say we don't need any
    # more buffering
    sys.stdout.reconfigure(line_buffering=True)
    sys.stderr.reconfigure(line_buffering=True)
    parser = argparse.ArgumentParser("LAVA job submitter")

    parser.add_argument("--pipeline-info")
    parser.add_argument("--base-system-url-prefix")
    parser.add_argument("--mesa-build-url")
    parser.add_argument("--job-rootfs-overlay-url")
    parser.add_argument("--job-artifacts-base")
    parser.add_argument("--first-stage-init")
    parser.add_argument("--ci-project-dir")
    parser.add_argument("--device-type")
    parser.add_argument("--dtb", nargs='?', default="")
    parser.add_argument("--kernel-image-name")
    parser.add_argument("--kernel-image-type", nargs='?', default="")
    parser.add_argument("--boot-method")
    parser.add_argument("--lava-tags", nargs='?', default="")
    parser.add_argument("--jwt")
    parser.add_argument("--validate-only", action='store_true')
    parser.add_argument("--dump-yaml", action='store_true')
    parser.add_argument("--visibility-group")

    parser.set_defaults(func=main)
    args = parser.parse_args()
    args.func(args)