Belle II Software development
writer.py
1# MIT License
2# Copyright (c) 2016 Western Digital Corporation
3
4# Permission is hereby granted, free of charge, to any person obtaining a copy
5# of this software and associated documentation files (the "Software"), to deal
6# in the Software without restriction, including without limitation the rights
7# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8# copies of the Software, and to permit persons to whom the Software is
9# furnished to do so, subject to the following conditions:
10
11# The above copyright notice and this permission notice shall be included in all
12# copies or substantial portions of the Software.
13
14# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20# SOFTWARE.
21
22"""Write Value Change Dump files.
23
24This module provides :class:`VCDWriter` for writing VCD files.
25
26"""
27from collections import OrderedDict
28from collections.abc import Sequence
29from numbers import Number
30import datetime
31
32from itertools import zip_longest
33
34
35class VCDPhaseError(Exception):
36 """Indicating a :class:`VCDWriter` method was called in the wrong phase.
37
38 For example, calling :meth:`register_var()` after :meth:`close()` will
39 raise this exception.
40
41 """
42
43
44class VCDWriter:
45 """Value Change Dump writer.
46
47 A VCD file captures time-ordered changes to the value of variables.
48
49 :param file file: A file-like object to write the VCD data.
50 :param timescale:
51 Scale of the VCD timestamps. The timescale may either be a string or a
52 tuple containing an (int, str) pair.
53 :type timescale: str, tuple
54 :param str date: Optional `$date` string used in the VCD header.
55 :param str comment: Optional `$comment` string used in the VCD header.
56 :param str version: Optional `$version` string used in the VCD header.
57 :param str default_scope_type: Scope type for scopes where
58 :meth:`set_scope_type()` is not called explicitly.
59 :param str scope_sep: Separator for scopes specified as strings.
60 :param int init_timestamp: The initial timestamp. default=0
61 :raises ValueError: for invalid timescale values
62
63 """
64
65
66 SCOPE_TYPES = ['begin', 'fork', 'function', 'module', 'task']
67
68
69 VAR_TYPES = ['event', 'integer', 'parameter', 'real', 'realtime', 'reg',
70 'supply0', 'supply1', 'time', 'tri', 'triand', 'trior',
71 'trireg', 'tri0', 'tri1', 'wand', 'wire', 'wor']
72
73
74 TIMESCALE_NUMS = [1, 10, 100]
75
76
77 TIMESCALE_UNITS = ['s', 'ms', 'us', 'ns', 'ps', 'fs']
78
79 def __init__(self, file, timescale='1 us', date=None, comment='',
80 version='', default_scope_type='module', scope_sep='.',
81 check_values=True, init_timestamp=0):
82 """Initialization of VCDWriter"""
83
84 self._ofile = file
85
87 '$timescale': self._check_timescale(timescale),
88 '$date': str(datetime.datetime.now()) if date is None else date,
89 '$comment': comment,
90 '$version': version,
91 }
92 if default_scope_type not in self.SCOPE_TYPES:
93 raise ValueError(f'Invalid default scope type ({default_scope_type})')
94
95 self._default_scope_type = default_scope_type
96
97 self._scope_sep = scope_sep
98
99 self._check_values = check_values
100
101 self._registering = True
102
103 self._closed = False
104
105 self._dumping = True
106
108
110
112
113 self._scope_types = {}
114
115 self._ident_values = OrderedDict()
116
117 self._timestamp = int(init_timestamp)
118
119 def set_scope_type(self, scope, scope_type):
120 """Set the scope_type for a given scope.
121
122 The scope's type may be set to one of the valid :const:`SCOPE_TYPES`.
123 VCD viewer applications may display different scope types differently.
124
125 :param scope: The scope to set the type of.
126 :type scope: str or sequence of str
127 :param str scope_type: A valid scope type string.
128 :raises ValueError: for invalid `scope_type`
129
130 """
131 if scope_type is not None and scope_type not in self.SCOPE_TYPES:
132 raise ValueError(f'Invalid scope_type "{scope_type}"')
133 scope_tuple = self._get_scope_tuple(scope)
134 self._scope_types[scope_tuple] = scope_type
135
136 def register_var(self, scope, name, var_type, size=None, init=None,
137 ident=None):
138 """Register a VCD variable and return function to change value.
139
140 All VCD variables must be registered prior to any value changes.
141
142 .. Note::
143
144 The variable `name` differs from the variable's `ident`
145 (identifier). The `name` (also known as `ref`) is meant to refer to
146 the variable name in the code being traced and is visible in VCD
147 viewer applications. The `ident`, however, is only used within the
148 VCD file and can be auto-generated (by specifying ``ident=None``)
149 for most applications.
150
151 :param scope: The hierarchical scope that the variable belongs within.
152 :type scope: str or sequence of str
153 :param str name: Name of the variable.
154 :param str var_type: One of :const:`VAR_TYPES`.
155 :param size:
156 Size, in bits, of the variable. The *size* may be expressed as an
157 int or, for vector variable types, a tuple of int. When the size is
158 expressed as a tuple, the *value* passed to :meth:`change()` must
159 also be a tuple of same arity as the *size* tuple. Some variable
160 types ('integer', 'real', 'realtime', and 'event') have a default
161 size and thus *size* may be ``None`` for those variable types.
162 :type size: int or tuple(int) or None
163 :param init: Optional initial value; defaults to 'x'.
164 :param str ident: Optional identifier for use in the VCD stream.
165 :raises VCDPhaseError: if any values have been changed
166 :raises ValueError: for invalid var_type value
167 :raises TypeError: for invalid parameter types
168 :raises KeyError: for duplicate var name
169 :returns: :class:`Variable` instance appropriate for use with
170 :meth:`change()`.
171
172 """
173 if self._closed:
174 raise VCDPhaseError('Cannot register after close().')
175 elif not self._registering:
176 raise VCDPhaseError('Cannot register after time 0.')
177 elif var_type not in self.VAR_TYPES:
178 raise ValueError(f'Invalid var_type "{var_type}"')
179
180 scope_tuple = self._get_scope_tuple(scope)
181
182 scope_names = self._scope_var_names.setdefault(scope_tuple, [])
183 if name in scope_names:
184 raise KeyError(f'Duplicate var {name} in scope {scope}')
185
186 if ident is None:
187 ident = format(self._next_var_id, 'x')
188
189 if size is None:
190 if var_type in ['integer', 'real', 'realtime']:
191 size = 64
192 elif var_type == 'event':
193 size = 1
194 else:
195 raise ValueError(
196 f'Must supply size for {var_type} var_type')
197
198 if isinstance(size, Sequence):
199 size = tuple(size)
200 var_size = sum(size)
201 else:
202 var_size = size
203
204 var_str = f'$var {var_type} {var_size} {ident} {name} $end'
205
206 if init is None:
207 if var_type == 'real':
208 init = 0.0
209 elif isinstance(size, tuple):
210 init = tuple('x' * len(size))
211 else:
212 init = 'x'
213
214 if size == 1:
215 var = ScalarVariable(ident, var_type, size)
216 elif var_type == 'real':
217 var = RealVariable(ident, var_type, size)
218 else:
219 var = VectorVariable(ident, var_type, size)
220
221 if var_type != 'event':
222 self.change(var, self._timestamp, init)
223
224 # Only alter state after change_func() succeeds
225 self._next_var_id += 1
226 self._scope_var_strs.setdefault(scope_tuple, []).append(var_str)
227 scope_names.append(name)
228
229 return var
230
231 def dump_off(self, timestamp):
232 """Suspend dumping to VCD file."""
233 if self._dumping and not self._registering and self._ident_values:
234 """set dump_off"""
235 self._dump_off(timestamp)
236 self._dumping = False
237
238 def _dump_off(self, timestamp):
239 """Stop dumping to VCD file."""
240 print('#' + str(int(timestamp)), file=self._ofile)
241 print('$dumpoff', file=self._ofile)
242 for ident, val_str in self._ident_values.items():
243 if val_str[0] == 'b':
244 print('bx', ident, file=self._ofile)
245 elif val_str[0] == 'r':
246 pass # real variables cannot have 'z' or 'x' state
247 else:
248 print('x', ident, sep='', file=self._ofile)
249 print('$end', file=self._ofile)
250
251 def dump_on(self, timestamp):
252 """Resume dumping to VCD file."""
253 if not self._dumping and not self._registering and self._ident_values:
254 print('#' + str(int(timestamp)), file=self._ofile)
255 """set dump_values"""
256 self._dump_values('$dumpon')
257 self._dumping = True
258
259 def _dump_values(self, keyword):
260 """Dump values to VCD file."""
261 print(keyword, file=self._ofile)
262 # TODO: events should be excluded
263 print(*self._ident_values.values(),
264 sep='\n', file=self._ofile)
265 print('$end', file=self._ofile)
266
267 def change(self, var, timestamp, value):
268 """Change variable's value in VCD stream.
269
270 This is the fundamental behavior of a :class:`VCDWriter` instance. Each
271 time a variable's value changes, this method should be called.
272
273 The *timestamp* must be in-order relative to timestamps from previous
274 calls to :meth:`change()`. It is okay to call :meth:`change()` multiple
275 times with the same *timestamp*, but never with a past *timestamp*.
276
277 .. Note::
278
279 :meth:`change()` may be called multiple times before the timestamp
280 progresses past 0. The last value change for each variable will go
281 into the $dumpvars section.
282
283 :param Variable var: :class:`Variable` instance (i.e. from
284 :meth:`register_var()`).
285 :param int timestamp: Current simulation time.
286 :param value:
287 New value for *var*. For :class:`VectorVariable`, if the variable's
288 *size* is a tuple, then *value* must be a tuple of the same arity.
289
290 :raises ValueError: if the value is not valid for *var*.
291 :raises VCDPhaseError: if the timestamp is out of order or the
292 :class:`VCDWriter` instance is closed.
293
294 """
295 if timestamp < self._timestamp:
296 raise VCDPhaseError(f'Out of order value change ({var})')
297 elif self._closed:
298 raise VCDPhaseError('Cannot change value after close()')
299
300 val_str = var.format_value(value, self._check_values)
301 ts_int = int(timestamp)
302
303 if ts_int > self._timestamp:
304 if self._registering:
306 if self._dumping:
307 print('#', ts_int, sep='', file=self._ofile)
308 self._timestamp = ts_int
309
310 if self._dumping and not self._registering:
311 print(val_str, file=self._ofile)
312 else:
313 self._ident_values[var.ident] = val_str
314
315 def _get_scope_tuple(self, scope):
316 """get scope tuple function of the VCDWrite"""
317 if isinstance(scope, str):
318 return tuple(scope.split(self._scope_sep))
319 if isinstance(scope, (list, tuple)):
320 return tuple(scope)
321 else:
322 raise TypeError(f'Invalid scope {scope}')
323
324 @classmethod
325 def _check_timescale(cls, timescale):
326 """check time scale function of the VCDWrite"""
327 if isinstance(timescale, (list, tuple)):
328 if len(timescale) == 1:
329 num_str = '1'
330 unit = timescale[0]
331 elif len(timescale) == 2:
332 num, unit = timescale
333 if num not in cls.TIMESCALE_NUMS:
334 raise ValueError(f'Invalid timescale num {num}')
335 num_str = str(num)
336 else:
337 raise ValueError(f'Invalid timescale {timescale}')
338 elif isinstance(timescale, str):
339 if timescale in cls.TIMESCALE_UNITS:
340 num_str = '1'
341 unit = timescale
342 else:
343 for num in sorted(cls.TIMESCALE_NUMS, reverse=True):
344 num_str = str(num)
345 if timescale.startswith(num_str):
346 unit = timescale[len(num_str):].lstrip(' ')
347 break
348 else:
349 raise ValueError(
350 f'Invalid timescale num {timescale}')
351 else:
352 raise TypeError(f'Invalid timescale type {type(timescale).__name__}')
353 if unit not in cls.TIMESCALE_UNITS:
354 raise ValueError(f'Invalid timescale unit "{unit}"')
355 return ' '.join([num_str, unit])
356
357 def __enter__(self):
358 """enter of VCDWriter"""
359 return self
360
361 def __exit__(self, exc_type, exc_val, exc_tb):
362 """exit of VCDWriter"""
363 self.close()
364
365 def close(self, timestamp=None):
366 """Close VCD writer.
367
368 Any buffered VCD data is flushed to the output file. After
369 :meth:`close()`, no variable registration or value changes will be
370 accepted.
371
372 :param int timestamp: optional final timestamp to insert into VCD
373 stream.
374
375 .. Note::
376
377 The output file is not automatically closed. It is up to the user
378 to ensure the output file is closed after the :class:`VCDWriter`
379 instance is closed.
380
381 """
382 if not self._closed:
383 self.flush(timestamp)
384 self._closed = True
385
386 def flush(self, timestamp=None):
387 """Flush any buffered VCD data to output file.
388
389 If the VCD header has not already been written, calling `flush()` will
390 force the header to be written thus disallowing any further variable
391 registration.
392
393 :param int timestamp: optional timestamp to insert into VCD stream.
394
395 """
396 if self._closed:
397 raise VCDPhaseError('Cannot flush() after close()')
398 if self._registering:
400 if timestamp is not None and timestamp > self._timestamp:
401 print("#", int(timestamp), sep='', file=self._ofile)
402 self._ofile.flush()
403
404 def _gen_header(self):
405 """generate header for VCDWriter"""
406 for kwname, kwvalue in sorted(self._header_keywords.items()):
407 if not kwvalue:
408 continue
409 lines = kwvalue.split('\n')
410 if len(lines) == 1:
411 yield f'{kwname} {lines[0]} $end'
412 else:
413 yield kwname
414 for line in lines:
415 yield '\t' + line
416 yield '$end'
417
418 prev_scope = []
419 for scope in sorted(self._scope_var_strs):
420 var_strs = self._scope_var_strs.pop(scope)
421
422 for i, (prev, this) in enumerate(zip_longest(prev_scope, scope)):
423 if prev != this:
424 for _ in prev_scope[i:]:
425 yield '$upscope $end'
426
427 for j, name in enumerate(scope[i:]):
428 scope_type = self._scope_types.get(
429 scope[:i + j + 1], self._default_scope_type)
430 yield f'$scope {scope_type} {name} $end'
431 break
432 else:
433 assert scope != prev_scope # pragma no cover
434
435 yield from var_strs
436
437 prev_scope = scope
438
439 for _ in prev_scope:
440 yield '$upscope $end'
441
442 yield '$enddefinitions $end'
443
445 """finalize registration of VCDWriter"""
446 assert self._registering
447 print(*self._gen_header(), sep='\n', file=self._ofile)
448 if self._ident_values:
449 print('#' + str(int(self._timestamp)), file=self._ofile)
450 self._dump_values('$dumpvars')
451 if not self._dumping:
452 self._dump_off(self._timestamp)
453 self._registering = False
454
455 # This state is not needed after registration phase.
456 self._header_keywords.clear()
457 self._scope_types.clear()
458 self._scope_var_names.clear()
459
460
462 """VCD variable details needed to call :meth:`VCDWriter.change()`."""
463
464
465 __slots__ = ('ident', 'type', 'size')
466
467 def __init__(self, ident, type, size):
468 """Initialization of Variable function"""
469
470 self.ident = ident
471
472 self.type = type
473
474 self.size = size
475
476 def format_value(self, value, check=True):
477 """Format value change for use in VCD stream."""
478 raise NotImplementedError
479
480
482 """One-bit VCD scalar.
483
484 This is a 4-state variable and thus may have values of 0, 1, 'z', or 'x'.
485
486 """
487
488
489 __slots__ = ()
490
491 def format_value(self, value, check=True):
492 """Format scalar value change for VCD stream.
493
494 :param value: 1-bit (4-state) scalar value.
495 :type value: str, bool, int, or None
496 :raises ValueError: for invalid *value*.
497 :returns: string representing value change for use in a VCD stream.
498
499 """
500 if isinstance(value, str):
501 if check and (len(value) != 1 or value not in '01xzXZ'):
502 raise ValueError(f'Invalid scalar value ({value})')
503 return value + self.ident
504 elif value is None:
505 return 'z' + self.ident
506 elif value:
507 return '1' + self.ident
508 else:
509 return '0' + self.ident
510
511
513 """Real (IEEE-754 double-precision floating point) variable.
514
515 Values must be numeric and cannot be 'x' or 'z' states.
516
517 """
518
519
520 __slots__ = ()
521
522 def format_value(self, value, check=True):
523 """Format real value change for VCD stream.
524
525 :param value: Numeric changed value.
526 :param type: float or int
527 :raises ValueError: for invalid real *value*.
528 :returns: string representing value change for use in a VCD stream.
529
530 """
531 if not check or isinstance(value, Number):
532 return f'r{value:.16g} {self.ident}'
533 else:
534 raise ValueError(f'Invalid real value ({value})')
535
536
538 """Bit vector variable type.
539
540 This is for the various non-scalar and non-real variable types including
541 integer, register, wire, etc.
542
543 """
544
545
546 __slots__ = ()
547
548 def format_value(self, value, check=True):
549 """Format value change for VCD stream.
550
551 :param value: New value for the variable.
552 :types value: int, str, or None
553 :raises ValueError: for *some* invalid values.
554
555 A *value* of `None` is the same as `'z'`.
556
557 .. Warning::
558
559 If *value* is of type :py:class:`str`, all characters must be one
560 of `'01xzXZ'`. For the sake of performance, checking **is not**
561 done to ensure value strings only contain conforming characters.
562 Thus it is possible to produce invalid VCD streams with invalid
563 string values.
564
565 """
566 if isinstance(self.size, tuple):
567 # The string is built-up right-to-left in order to minimize/avoid
568 # left-extension in the final value string.
569 vstr_list = []
570 vstr_len = 0
571 size_sum = 0
572 for i, (v, size) in enumerate(zip(reversed(value),
573 reversed(self.size))):
574 vstr = self._format_value(v, size, check)
575 if not vstr_list:
576 vstr_list.insert(0, vstr)
577 vstr_len += len(vstr)
578 else:
579 leftc = vstr_list[0][0]
580 rightc = vstr[0]
581 if len(vstr) > 1 or ((rightc != leftc or leftc == '1') and
582 (rightc != '0' or leftc != '1')):
583 extendc = '0' if leftc == '1' else leftc
584 extend_size = size_sum - vstr_len
585 vstr_list.insert(0, extendc * extend_size)
586 vstr_list.insert(0, vstr)
587 vstr_len += extend_size + len(vstr)
588 size_sum += size
589 value_str = ''.join(vstr_list)
590 else:
591 value_str = self._format_value(value, self.size, check)
592 return f'b{value_str} {self.ident}'
593
594 def _format_value(self, value, size, check):
595 """format value function of VCDWriter"""
596 if isinstance(value, int):
597 max_val = 1 << size
598 if check and (-value > (max_val >> 1) or value >= max_val):
599 raise ValueError(f'Value ({value}) not representable in {size} bits')
600 if value < 0:
601 value += max_val
602 return format(value, 'b')
603 elif value is None:
604 return 'z'
605 else:
606 if check and (not isinstance(value, str) or
607 len(value) > size or
608 any(c not in '01xzXZ-' for c in value)):
609 raise ValueError(f'Invalid vector value ({value})')
610 return value
def format_value(self, value, check=True)
Definition: writer.py:522
def format_value(self, value, check=True)
Definition: writer.py:491
_dumping
set dumping
Definition: writer.py:105
_registering
set registering
Definition: writer.py:101
_timestamp
set time_stamp
Definition: writer.py:117
def __exit__(self, exc_type, exc_val, exc_tb)
Definition: writer.py:361
def _gen_header(self)
Definition: writer.py:404
def _check_timescale(cls, timescale)
Definition: writer.py:325
_scope_sep
set scope_sep
Definition: writer.py:97
def _dump_values(self, keyword)
Definition: writer.py:259
def dump_off(self, timestamp)
Definition: writer.py:231
_next_var_id
set next_var_id
Definition: writer.py:107
def _finalize_registration(self)
Definition: writer.py:444
list TIMESCALE_NUMS
Valid timescale numbers.
Definition: writer.py:74
def dump_on(self, timestamp)
Definition: writer.py:251
_scope_types
set scopr_types
Definition: writer.py:113
def change(self, var, timestamp, value)
Definition: writer.py:267
def __init__(self, file, timescale='1 us', date=None, comment='', version='', default_scope_type='module', scope_sep='.', check_values=True, init_timestamp=0)
Definition: writer.py:81
list TIMESCALE_UNITS
Valid timescale units.
Definition: writer.py:77
list VAR_TYPES
Valid VCD variable types.
Definition: writer.py:69
_check_values
set check_values
Definition: writer.py:99
def register_var(self, scope, name, var_type, size=None, init=None, ident=None)
Definition: writer.py:137
_ofile
output file
Definition: writer.py:84
def close(self, timestamp=None)
Definition: writer.py:365
def flush(self, timestamp=None)
Definition: writer.py:386
_scope_var_names
set scope_var_names
Definition: writer.py:111
_default_scope_type
set default_scope_type
Definition: writer.py:95
def _dump_off(self, timestamp)
Definition: writer.py:238
def set_scope_type(self, scope, scope_type)
Definition: writer.py:119
list SCOPE_TYPES
Valid VCD scope types.
Definition: writer.py:66
def _get_scope_tuple(self, scope)
Definition: writer.py:315
_scope_var_strs
set scope_var_strs
Definition: writer.py:109
_ident_values
set ident_values
Definition: writer.py:115
_header_keywords
header keywords
Definition: writer.py:86
ident
Identifier used in VCD output stream.
Definition: writer.py:470
type
VCD variable type; one of :const:VCDWriter.VAR_TYPES.
Definition: writer.py:472
size
Size, in bits, of variable.
Definition: writer.py:474
def __init__(self, ident, type, size)
Definition: writer.py:467
def format_value(self, value, check=True)
Definition: writer.py:476
def _format_value(self, value, size, check)
Definition: writer.py:594
def format_value(self, value, check=True)
Definition: writer.py:548