1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
|
import re
import subprocess
import os
import shlex
import shutil
import sys
from .terminal import terminals
from .conffile import load
def cli_from_config(config, terminal_choice):
cli = []
if config["fvp-bindir"]:
cli.append(os.path.join(config["fvp-bindir"], config["exe"]))
else:
cli.append(config["exe"])
for param, value in config["parameters"].items():
cli.extend(["--parameter", f"{param}={value}"])
for value in config["data"]:
cli.extend(["--data", value])
for param, value in config["applications"].items():
cli.extend(["--application", f"{param}={value}"])
for terminal, name in config["terminals"].items():
# If terminals are enabled and this terminal has been named
if terminal_choice != "none" and name:
# TODO if raw mode
# cli.extend(["--parameter", f"{terminal}.mode=raw"])
# TODO put name into terminal title
cli.extend(["--parameter", f"{terminal}.terminal_command={terminals[terminal_choice].command}"])
else:
# Disable terminal
cli.extend(["--parameter", f"{terminal}.start_telnet=0"])
cli.extend(config["args"])
return cli
def check_telnet():
# Check that telnet is present
if not bool(shutil.which("telnet")):
raise RuntimeError("Cannot find telnet, this is needed to connect to the FVP.")
class ConsolePortParser:
def __init__(self, lines):
self._lines = lines
self._console_ports = {}
def parse_port(self, console):
if console in self._console_ports:
return self._console_ports[console]
while True:
try:
line = next(self._lines).strip().decode(errors='ignore')
m = re.match(r"^(\S+): Listening for serial connection on port (\d+)$", line)
if m:
matched_console = m.group(1)
matched_port = int(m.group(2))
if matched_console == console:
return matched_port
else:
self._console_ports[matched_console] = matched_port
except StopIteration:
# self._lines might be a growing log file
pass
# This function is backported from Python 3.8. Remove it and replace call sites
# with shlex.join once OE-core support for earlier Python versions is dropped.
def shlex_join(split_command):
"""Return a shell-escaped string from *split_command*."""
return ' '.join(shlex.quote(arg) for arg in split_command)
class FVPRunner:
def __init__(self, logger):
self._logger = logger
self._fvp_process = None
self._telnets = []
self._pexpects = []
self._config = None
def start(self, fvpconf, extra_args=[], terminal_choice="none", stdout=subprocess.PIPE):
self._logger.debug(f"Loading {fvpconf}")
self._config = load(fvpconf)
cli = cli_from_config(self._config, terminal_choice)
cli += extra_args
# Pass through environment variables needed for GUI applications, such
# as xterm, to work.
env = self._config['env']
for name in ('DISPLAY', 'PATH', 'WAYLAND_DISPLAY', 'XAUTHORITY'):
if name in os.environ:
env[name] = os.environ[name]
# Allow filepath to be relative to fvp configuration file
cwd = os.path.dirname(fvpconf) or None
self._logger.debug(f"FVP call will be executed in working directory: {cwd}")
self._logger.debug(f"Constructed FVP call: {shlex_join(cli)}")
self._fvp_process = subprocess.Popen(
cli,
stdin=subprocess.DEVNULL, stdout=stdout, stderr=subprocess.STDOUT,
env=env,
cwd=cwd)
def stop(self):
if self._fvp_process:
self._logger.debug(f"Terminating FVP PID {self._fvp_process.pid}")
try:
self._fvp_process.terminate()
self._fvp_process.wait(10.0)
except subprocess.TimeoutExpired:
self._logger.debug(f"Killing FVP PID {self._fvp_process.pid}")
self._fvp_process.kill()
except ProcessLookupError:
pass
for telnet in self._telnets:
try:
telnet.terminate()
telnet.wait(10.0)
except subprocess.TimeoutExpired:
telnet.kill()
except ProcessLookupError:
pass
for console in self._pexpects:
import pexpect
# Ensure pexpect logs all remaining output to the logfile
console.expect(pexpect.EOF, timeout=5.0)
console.close()
if self._fvp_process and self._fvp_process.returncode and \
self._fvp_process.returncode > 0:
# Return codes < 0 indicate that the process was explicitly
# terminated above.
self._logger.info(f"FVP quit with code {self._fvp_process.returncode}")
return self._fvp_process.returncode
else:
return 0
def wait(self, timeout):
self._fvp_process.wait(timeout)
def getConfig(self):
return self._config
@property
def stdout(self):
return self._fvp_process.stdout
def create_telnet(self, port):
check_telnet()
telnet = subprocess.Popen(["telnet", "localhost", str(port)], stdin=sys.stdin, stdout=sys.stdout)
self._telnets.append(telnet)
return telnet
def create_pexpect(self, port, **kwargs):
import pexpect
instance = pexpect.spawn(f"telnet localhost {port}", **kwargs)
self._pexpects.append(instance)
return instance
def pid(self):
return self._fvp_process.pid
|