summaryrefslogtreecommitdiff
path: root/meta-arm/meta-arm/lib/fvp/runner.py
blob: e7c1358553a7ddc7a6134dfde75ea875ed552e6e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
import re
import subprocess
import os
import shlex
import shutil
import sys

from .terminal import terminals
from .conffile import load

def cli_from_config(config, terminal_choice):
    cli = []
    if config["fvp-bindir"]:
        cli.append(os.path.join(config["fvp-bindir"], config["exe"]))
    else:
        cli.append(config["exe"])

    for param, value in config["parameters"].items():
        cli.extend(["--parameter", f"{param}={value}"])

    for value in config["data"]:
        cli.extend(["--data", value])

    for param, value in config["applications"].items():
        cli.extend(["--application", f"{param}={value}"])

    for terminal, name in config["terminals"].items():
        # If terminals are enabled and this terminal has been named
        if terminal_choice != "none" and name:
            # TODO if raw mode
            # cli.extend(["--parameter", f"{terminal}.mode=raw"])
            # TODO put name into terminal title
            cli.extend(["--parameter", f"{terminal}.terminal_command={terminals[terminal_choice].command}"])
        else:
            # Disable terminal
            cli.extend(["--parameter", f"{terminal}.start_telnet=0"])

    cli.extend(config["args"])

    return cli

def check_telnet():
    # Check that telnet is present
    if not bool(shutil.which("telnet")):
        raise RuntimeError("Cannot find telnet, this is needed to connect to the FVP.")


class ConsolePortParser:
    def __init__(self, lines):
        self._lines = lines
        self._console_ports = {}

    def parse_port(self, console):
        if console in self._console_ports:
            return self._console_ports[console]

        while True:
            try:
                line = next(self._lines).strip().decode(errors='ignore')
                m = re.match(r"^(\S+): Listening for serial connection on port (\d+)$", line)
                if m:
                    matched_console = m.group(1)
                    matched_port = int(m.group(2))
                    if matched_console == console:
                        return matched_port
                    else:
                        self._console_ports[matched_console] = matched_port
            except StopIteration:
                # self._lines might be a growing log file
                pass


# This function is backported from Python 3.8. Remove it and replace call sites
# with shlex.join once OE-core support for earlier Python versions is dropped.
def shlex_join(split_command):
    """Return a shell-escaped string from *split_command*."""
    return ' '.join(shlex.quote(arg) for arg in split_command)


class FVPRunner:
    def __init__(self, logger):
        self._logger = logger
        self._fvp_process = None
        self._telnets = []
        self._pexpects = []
        self._config = None

    def start(self, fvpconf, extra_args=[], terminal_choice="none", stdout=subprocess.PIPE):
        self._logger.debug(f"Loading {fvpconf}")
        self._config = load(fvpconf)

        cli = cli_from_config(self._config, terminal_choice)
        cli += extra_args

        # Pass through environment variables needed for GUI applications, such
        # as xterm, to work.
        env = self._config['env']
        for name in ('DISPLAY', 'PATH', 'WAYLAND_DISPLAY', 'XAUTHORITY'):
            if name in os.environ:
                env[name] = os.environ[name]

        # Allow filepath to be relative to fvp configuration file
        cwd = os.path.dirname(fvpconf) or None
        self._logger.debug(f"FVP call will be executed in working directory: {cwd}")

        self._logger.debug(f"Constructed FVP call: {shlex_join(cli)}")
        self._fvp_process = subprocess.Popen(
            cli,
            stdin=subprocess.DEVNULL, stdout=stdout, stderr=subprocess.STDOUT,
            env=env,
            cwd=cwd)

    def stop(self):
        if self._fvp_process:
            self._logger.debug(f"Terminating FVP PID {self._fvp_process.pid}")
            try:
                self._fvp_process.terminate()
                self._fvp_process.wait(10.0)
            except subprocess.TimeoutExpired:
                self._logger.debug(f"Killing FVP PID {self._fvp_process.pid}")
                self._fvp_process.kill()
            except ProcessLookupError:
                pass

        for telnet in self._telnets:
            try:
                telnet.terminate()
                telnet.wait(10.0)
            except subprocess.TimeoutExpired:
                telnet.kill()
            except ProcessLookupError:
                pass

        for console in self._pexpects:
            import pexpect
            # Ensure pexpect logs all remaining output to the logfile
            console.expect(pexpect.EOF, timeout=5.0)
            console.close()

        if self._fvp_process and self._fvp_process.returncode and \
                self._fvp_process.returncode > 0:
            # Return codes < 0 indicate that the process was explicitly
            # terminated above.
            self._logger.info(f"FVP quit with code {self._fvp_process.returncode}")
            return self._fvp_process.returncode
        else:
            return 0

    def wait(self, timeout):
        self._fvp_process.wait(timeout)

    def getConfig(self):
        return self._config

    @property
    def stdout(self):
        return self._fvp_process.stdout

    def create_telnet(self, port):
        check_telnet()
        telnet = subprocess.Popen(["telnet", "localhost", str(port)], stdin=sys.stdin, stdout=sys.stdout)
        self._telnets.append(telnet)
        return telnet

    def create_pexpect(self, port, **kwargs):
        import pexpect
        instance = pexpect.spawn(f"telnet localhost {port}", **kwargs)
        self._pexpects.append(instance)
        return instance

    def pid(self):
        return self._fvp_process.pid