QTools  6.6.0
qutest.py
Go to the documentation of this file.
1 #-----------------------------------------------------------------------------
2 # Product: QUTest Python scripting (compatible with Python 2.7+ and 3.3+)
3 # Last updated for version 6.6.0
4 # Last updated on 2019-07-30
5 #
6 # Q u a n t u m L e a P s
7 # ------------------------
8 # Modern Embedded Software
9 #
10 # Copyright (C) 2005-2019 Quantum Leaps, LLC. All rights reserved.
11 #
12 # This program is open source software: you can redistribute it and/or
13 # modify it under the terms of the GNU General Public License as published
14 # by the Free Software Foundation, either version 3 of the License, or
15 # (at your option) any later version.
16 #
17 # Alternatively, this program may be distributed and modified under the
18 # terms of Quantum Leaps commercial licenses, which expressly supersede
19 # the GNU General Public License and are specifically designed for
20 # licensees interested in retaining the proprietary status of their code.
21 #
22 # This program is distributed in the hope that it will be useful,
23 # but WITHOUT ANY WARRANTY; without even the implied warranty of
24 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
25 # GNU General Public License for more details.
26 #
27 # You should have received a copy of the GNU General Public License
28 # along with this program. If not, see <http://www.gnu.org/licenses/>.
29 #
30 # Contact information:
31 # www.state-machine.com
32 # info@state-machine.com
33 #-----------------------------------------------------------------------------
34 
35 # for compatibility from Python 2
36 from __future__ import print_function
37 __metaclass__ = type
38 
39 from fnmatch import fnmatchcase
40 from glob import glob
41 from platform import python_version
42 from subprocess import Popen
43 from inspect import getframeinfo, stack
44 
45 import struct
46 import socket
47 import time
48 import sys
49 import traceback
50 
51 import os
52 if os.name == 'nt':
53  import msvcrt
54 else:
55  import select
56 
57 #=============================================================================
58 # QUTest test runner and state machine
59 class qutest:
60  _VERSION = 660
61 
62  # class variables
63  _host_exe = ''
64  _is_debug = False
65  _exit_on_fail = False
66  _have_target = False
67  _have_info = False
68  _need_reset = True
69  _last_record = ''
70  _num_groups = 0
71  _num_tests = 0
72  _num_failed = 0
73  _num_skipped = 0
74 
75  # states of the internal state machine
76  _INIT = 0
77  _TEST = 1
78  _FAIL = 2
79  _SKIP = 3
80 
81  # timeout value [seconds]
82  _TOUT = 1.000
83 
84  # test command options
85  _OPT_NORESET = 0x01
86 
87  def __init__(self):
88  qutest._have_target = False
89  qutest._have_info = False
90  qutest._need_reset = True
91 
92  self._state = qutest._INIT
93  self._timestamp = 0
94  self._startTime = 0
95  self._to_skip = 0
96 
97  # The following _DSL_dict dictionary defines the QUTest testing
98  # DSL (Domain Specific Language), which is documented separately
99  # in the file "qutest_dsl.py".
100  #
101  self._DSL_dict = {
102  'test': self.test,
103  'skip': self.skip,
104  'expect': self.expect,
105  'glb_filter': self.glb_filter,
106  'loc_filter': self.loc_filter,
107  'current_obj': self.current_obj,
108  'query_curr': self.query_curr,
109  'tick': self.tick,
110  'expect_pause': self.expect_pause,
111  'continue_test': self.continue_test,
112  'command': self.command,
113  'init': self.init,
114  'dispatch': self.dispatch,
115  'post': self.post,
116  'publish': self.publish,
117  'probe': self.probe,
118  'peek': self.peek,
119  'poke': self.poke,
120  'fill': self.fill,
121  'pack': struct.pack,
122  'on_reset': self._dummy_on_reset,
123  'on_setup': self._dummy_on_setup,
124  'on_teardown': self._dummy_on_teardown,
125  'VERSION': qutest._VERSION,
126  'NORESET': qutest._OPT_NORESET,
127  'OBJ_SM': qspy._OBJ_SM,
128  'OBJ_AO': qspy._OBJ_AO,
129  'OBJ_MP': qspy._OBJ_MP,
130  'OBJ_EQ': qspy._OBJ_EQ,
131  'OBJ_TE': qspy._OBJ_TE,
132  'OBJ_AP': qspy._OBJ_AP,
133  'OBJ_SM_AO': qspy._OBJ_SM_AO,
134  'GRP_OFF': 0,
135  'GRP_ON': qspy._GRP_ON,
136  'GRP_SM': qspy._GRP_SM,
137  'GRP_AO': qspy._GRP_AO,
138  'GRP_EQ': qspy._GRP_EQ,
139  'GRP_MP': qspy._GRP_MP,
140  'GRP_TE': qspy._GRP_TE,
141  'GRP_QF': qspy._GRP_QF,
142  'GRP_SC': qspy._GRP_SC,
143  'GRP_U0': qspy._GRP_U0,
144  'GRP_U1': qspy._GRP_U1,
145  'GRP_U2': qspy._GRP_U2,
146  'GRP_U3': qspy._GRP_U3,
147  'GRP_U4': qspy._GRP_U4,
148  'GRP_UA': qspy._GRP_UA
149  }
150 
151  def __del__(self):
152  #print('~qutest', self)
153  pass
154 
155  #-------------------------------------------------------------------------
156  # QUTest Domain Specific Language (DSL) commands
157 
158  # test DSL command .......................................................
159  def test(self, name, opt = 0):
160  # end the previous test
161  self._test_end()
162 
163  # start the new test...
164  self._startTime = qutest._time()
165  qutest._num_tests += 1
166  print('%s: ' %name, end = '')
167 
168  if self._to_skip > 0:
169  self._to_skip -= 1
170  qutest._num_skipped += 1
171  print('SKIPPED')
172  self._tran(qutest._SKIP)
173  return
174 
175  if opt & qutest._OPT_NORESET != 0:
176  if self._state == qutest._FAIL:
177  self._fail('NORESET-test follows a failed test')
178  return
179  if qutest._need_reset:
180  self._fail('NORESET-test needs reset')
181  return
182  self._tran(qutest._TEST)
183  else:
184  self._tran(qutest._TEST)
185  if not self._reset_target():
186  return
187 
188  if not self._on_setup():
189  self._fail('on_setup() failed')
190  return
191 
192  # expect DSL command .....................................................
193  def expect(self, match):
194  if self._to_skip > 0:
195  pass # ignore
196  elif self._state == qutest._INIT:
197  self._before_test('expect')
198  elif self._state == qutest._TEST:
199 
200  if not qspy._receive(): # timeout?
201  self._fail('expected: "%s"' %match,
202  'received: "" (timeout)')
203  return False
204 
205  if match.startswith('@timestamp') or match.startswith('%timestamp'):
206  self._timestamp += 1
207  expected = '%010d' %self._timestamp + match[10:]
208  elif match[0:9].isdigit():
209  self._timestamp += 1
210  expected = match
211  else:
212  expected = match
213 
214  received = qutest._last_record[3:].decode('utf-8')
215 
216  if fnmatchcase(received, expected):
217  return True
218  else:
219  self._fail('expected: "%s"' %expected,
220  'received: "%s"' %received)
221  return False
222  elif self._state == qutest._FAIL or self._state == qutest._SKIP:
223  pass # ignore
224  else:
225  assert 0, 'invalid state in expect: ' + match
226 
227  # glb_filter DSL command .................................................
228  def glb_filter(self, *args):
229  if self._to_skip > 0:
230  pass # ignore
231  elif self._state == qutest._INIT:
232  self._before_test('glb_filter')
233  elif self._state == qutest._TEST:
234  filter = [0, 0, 0, 0]
235 
236  for arg in args:
237  if arg == 0:
238  pass
239  elif arg < 0x7F:
240  filter[arg // 32] |= 1 << (arg % 32)
241  elif arg == qspy._GRP_ON:
242  # all filters on
243  filter[0] = 0xFFFFFFFF
244  filter[1] = 0xFFFFFFFF
245  filter[2] = 0xFFFFFFFF
246  filter[3] = 0x1FFFFFFF
247  break # no point in continuing
248  elif arg == qspy._GRP_SM: # state machines
249  filter[0] |= 0x000003FE
250  filter[1] |= 0x03800000
251  elif arg == qspy._GRP_AO: # active objects
252  filter[0] |= 0x0007FC00
253  filter[1] |= 0x00002000
254  elif arg == qspy._GRP_EQ: # raw queues
255  filter[0] |= 0x00780000
256  filter[2] |= 0x00004000
257  elif arg == qspy._GRP_MP: # raw memory pools
258  filter[0] |= 0x03000000
259  filter[2] |= 0x00008000
260  elif arg == qspy._GRP_QF: # framework
261  filter[0] |= 0xFC000000
262  filter[1] |= 0x00001FC0
263  elif arg == qspy._GRP_TE: # time events
264  filter[1] |= 0x0000007F
265  elif arg == qspy._GRP_SC: # scheduler
266  filter[1] |= 0x007F0000
267  elif arg == qspy._GRP_U0: # user 70-79
268  filter[2] |= 0x0000FFC0
269  elif arg == qspy._GRP_U1: # user 80-89
270  filter[2] |= 0x03FF0000
271  elif arg == qspy._GRP_U2: # user 90-99
272  filter[2] |= 0xFC000000
273  filter[3] |= 0x0000000F
274  elif arg == qspy._GRP_U3: # user 100-109
275  filter[3] |= 0x00003FF0
276  elif arg == qspy._GRP_U4: # user 110-124
277  filter[3] |= 0x1FFFC000
278  elif arg == qspy._GRP_UA: # user 70-124 (all)
279  filter[2] |= 0xFFFFFFC0
280  filter[3] |= 0x1FFFFFFF
281  else:
282  assert 0, 'invalid global filter'
283 
284  qspy._sendTo(struct.pack(
285  '<BBLLLL', qspy._TRGT_GLB_FILTER, 16,
286  filter[0], filter[1], filter[2], filter[3]))
287 
288  self.expect(' Trg-Ack QS_RX_GLB_FILTER')
289 
290  elif self._state == qutest._FAIL or self._state == qutest._SKIP:
291  pass # ignore
292  else:
293  assert 0, 'invalid state in glb_filter'
294 
295  # loc_filter DSL command .................................................
296  def loc_filter(self, obj_kind, obj_id):
297  if self._to_skip > 0:
298  pass # ignore
299  elif self._state == qutest._INIT:
300  self._before_test('loc_filter')
301  elif self._state == qutest._TEST:
302  fmt = '<BB' + qspy._target_info['objPtr']
303  if isinstance(obj_id, int):
304  # Send directly to Target
305  packet = struct.pack(
306  fmt, qspy._TRGT_LOC_FILTER, obj_kind, obj_id)
307  else:
308  # Have QSpy interpret obj_id string and send filter
309  packet = bytearray(struct.pack(
310  fmt, qspy._QSPY_SEND_LOC_FILTER, obj_kind, 0))
311  packet.extend(qspy._str2bin(obj_id))
312 
313  qspy._sendTo(packet)
314  self.expect(' Trg-Ack QS_RX_LOC_FILTER')
315 
316  elif self._state == qutest._FAIL or self._state == qutest._SKIP:
317  pass # ignore
318  else:
319  assert 0, 'invalid state in loc_filter'
320 
321  # current_obj DSL command ................................................
322  def current_obj(self, obj_kind, obj_id):
323  if self._to_skip > 0:
324  pass # ignore
325  elif self._state == qutest._INIT:
326  self._before_test('current_obj')
327  elif self._state == qutest._TEST:
328  fmt = '<BB' + qspy._target_info['objPtr']
329  # Build packet according to obj_id type
330  if isinstance(obj_id, int):
331  packet = struct.pack(
332  fmt, qspy._TRGT_CURR_OBJ, obj_kind, obj_id)
333  else:
334  packet = bytearray(struct.pack(
335  fmt, qspy._QSPY_SEND_CURR_OBJ, obj_kind, 0))
336  # add string object ID to end
337  packet.extend(qspy._str2bin(obj_id))
338 
339  qspy._sendTo(packet)
340  self.expect(' Trg-Ack QS_RX_CURR_OBJ')
341 
342  elif self._state == qutest._FAIL or self._state == qutest._SKIP:
343  pass # ignore
344  else:
345  assert 0, 'invalid state in current_obj'
346 
347  # query_curr DSL command .................................................
348  def query_curr(self, obj_kind):
349  if self._to_skip > 0:
350  pass # ignore
351  elif self._state == qutest._INIT:
352  self._before_test('query_curr')
353  elif self._state == qutest._TEST:
354  qspy._sendTo(struct.pack('<BB', qspy._TRGT_QUERY_CURR, obj_kind))
355  # test-specific expect
356  elif self._state == qutest._FAIL or self._state == qutest._SKIP:
357  pass # ignore
358  else:
359  assert 0, 'invalid state in query_curr'
360 
361  # expect_pause DSL command ...............................................
362  def expect_pause(self):
363  if self._to_skip > 0:
364  pass # ignore
365  elif self._state == qutest._INIT:
366  self._before_test('expect_pause')
367  elif self._state == qutest._TEST:
368  self.expect(' TstPause')
369  elif self._state == qutest._FAIL or self._state == qutest._SKIP:
370  pass # ignore
371  else:
372  assert 0, 'invalid state in expect_pause'
373 
374  # continue_test DSL command ..............................................
375  def continue_test(self):
376  if self._to_skip > 0:
377  pass # ignore
378  elif self._state == qutest._INIT:
379  self._before_test('continue_test')
380  elif self._state == qutest._TEST:
381  qspy._sendTo(struct.pack('<B', qspy._TRGT_CONTINUE))
382  self.expect(' Trg-Ack QS_RX_TEST_CONTINUE')
383  elif self._state == qutest._FAIL or self._state == qutest._SKIP:
384  pass # ignore
385  else:
386  assert 0, 'invalid state in continue_test'
387 
388  # command DSL command ....................................................
389  def command(self, cmdId, param1 = 0, param2 = 0, param3 = 0):
390  if self._to_skip > 0:
391  pass # ignore
392  if self._state == qutest._INIT:
393  self._before_test('command')
394  elif self._state == qutest._TEST:
395  fmt = '<BBIII'
396  if isinstance(cmdId, int):
397  packet = struct.pack(fmt, qspy._TRGT_COMMAND,
398  cmdId, param1, param2, param3)
399  else:
400  packet = bytearray(struct.pack(
401  fmt, qspy._QSPY_SEND_COMMAND, 0, param1, param2, param3))
402  # add string command ID to end
403  packet.extend(qspy._str2bin(cmdId))
404  qspy._sendTo(packet)
405  self.expect(' Trg-Ack QS_RX_COMMAND')
406  elif self._state == qutest._FAIL or self._state == qutest._SKIP:
407  pass # ignore
408  else:
409  assert 0, 'invalid state in command'
410 
411  # init DSL command .......................................................
412  def init(self, signal = 0, params = None):
413  if self._to_skip > 0:
414  pass # ignore
415  elif self._state == qutest._INIT:
416  self._before_test('init')
417  elif self._state == qutest._TEST:
418  qspy._sendEvt(qspy._EVT_INIT, signal, params)
419  self.expect(' Trg-Ack QS_RX_EVENT')
420  elif self._state == qutest._FAIL or self._state == qutest._SKIP:
421  pass # ignore
422  else:
423  assert 0, 'invalid state in init'
424 
425  # dispatch DSL command ...................................................
426  def dispatch(self, signal, params = None):
427  if self._to_skip > 0:
428  pass # ignore
429  elif self._state == qutest._INIT:
430  self._before_test('dispatch')
431  elif self._state == qutest._TEST:
432  qspy._sendEvt(qspy._EVT_DISPATCH, signal, params)
433  self.expect(' Trg-Ack QS_RX_EVENT')
434  elif self._state == qutest._FAIL or self._state == qutest._SKIP:
435  pass # ignore
436  else:
437  assert 0, 'invalid state in dispatch'
438 
439  # post DSL command .......................................................
440  def post(self, signal, params = None):
441  if self._to_skip > 0:
442  pass # ignore
443  elif self._state == qutest._INIT:
444  self._before_test('post')
445  elif self._state == qutest._TEST:
446  qspy._sendEvt(qspy._EVT_POST, signal, params)
447  self.expect(' Trg-Ack QS_RX_EVENT')
448  elif self._state == qutest._FAIL or self._state == qutest._SKIP:
449  pass # ignore
450  else:
451  assert 0, 'invalid state in post'
452 
453  # publish DSL command ....................................................
454  def publish(self, signal, params = None):
455  if self._to_skip > 0:
456  pass # ignore
457  elif self._state == qutest._INIT:
458  self._before_test('publish')
459  elif self._state == qutest._TEST:
460  qspy._sendEvt(qspy._EVT_PUBLISH, signal, params)
461  self.expect(' Trg-Ack QS_RX_EVENT')
462  elif self._state == qutest._FAIL or self._state == qutest._SKIP:
463  pass # ignore
464  else:
465  assert 0, 'invalid state in publish'
466 
467  # probe DSL command ......................................................
468  def probe(self, func, data):
469  if self._to_skip > 0:
470  pass # ignore
471  elif self._state == qutest._INIT:
472  self._before_test('probe')
473  elif self._state == qutest._TEST:
474  fmt = '<BI' + qspy._target_info['funPtr']
475  if isinstance(func, int):
476  # Send directly to target
477  packet = struct.pack(
478  fmt, qspy._TRGT_TEST_PROBE, data, func)
479  else:
480  # Send to QSPY to provide 'func' from Fun Dictionary
481  packet = bytearray(struct.pack(
482  fmt, qspy._QSPY_SEND_TEST_PROBE, data, 0))
483  # add string func name to end
484  packet.extend(qspy._str2bin(func))
485  qspy._sendTo(packet)
486  self.expect(' Trg-Ack QS_RX_TEST_PROBE')
487  elif self._state == qutest._FAIL or self._state == qutest._SKIP:
488  pass # ignore
489  else:
490  assert 0, 'invalid state in probe'
491 
492  # tick DSL command .......................................................
493  def tick(self, tick_rate = 0):
494  if self._to_skip > 0:
495  pass # ignore
496  elif self._state == qutest._INIT:
497  self._before_test('tick')
498  elif self._state == qutest._TEST:
499  qspy._sendTo(struct.pack('<BB', qspy._TRGT_TICK, tick_rate))
500  self.expect(' Trg-Ack QS_RX_TICK')
501  elif self._state == qutest._FAIL or self._state == qutest._SKIP:
502  pass # ignore
503  else:
504  assert 0, 'invalid state in tick'
505 
506  # peek DSL command .......................................................
507  def peek(self, offset, size, num):
508  assert size == 1 or size == 2 or size == 4, \
509  'Size must be 1, 2, or 4'
510  if self._to_skip > 0:
511  pass # ignore
512  elif self._state == qutest._INIT:
513  self._before_test('peek')
514  elif self._state == qutest._TEST:
515  qspy._sendTo(struct.pack('<BHBB', qspy._TRGT_PEEK,
516  offset, size, num))
517  # explicit expectation of peek output
518  elif self._state == qutest._FAIL or self._state == qutest._SKIP:
519  pass # ignore
520  else:
521  assert 0, 'invalid state in peek'
522 
523  # poke DSL command .......................................................
524  def poke(self, offset, size, data):
525  assert size == 1 or size == 2 or size == 4, \
526  'Size must be 1, 2, or 4'
527  if self._to_skip > 0:
528  pass # ignore
529  elif self._state == qutest._INIT:
530  self._before_test('poke')
531  elif self._state == qutest._TEST:
532  length = len(data)
533  num = length // size
534  packet = bytearray(struct.pack('<BHBB', qspy._TRGT_POKE,
535  offset, size, num))
536  packet.extend(data)
537  qspy._sendTo(packet)
538  self.expect(' Trg-Ack QS_RX_POKE')
539  elif self._state == qutest._FAIL or self._state == qutest._SKIP:
540  pass # ignore
541  else:
542  assert 0, 'invalid state in poke'
543 
544  # fill DSL command .......................................................
545  def fill(self, offset, size, num, item = 0):
546  assert size == 1 or size == 2 or size == 4, \
547  'Size must be 1, 2, or 4'
548  if self._to_skip > 0:
549  pass # ignore
550  elif self._state == qutest._INIT:
551  self._before_test('fill')
552  elif self._state == qutest._TEST:
553  if size == 1:
554  item_fmt = 'B'
555  elif size == 2:
556  item_fmt = 'H'
557  elif size == 4:
558  item_fmt = 'L'
559  else:
560  assert False, 'size for sendFill must be 1, 2, or 4!'
561  fmt = '<BHBB' + item_fmt
562  packet = struct.pack(fmt, qspy._TRGT_FILL, offset, size, num, item)
563  qspy._sendTo(packet)
564  self.expect(' Trg-Ack QS_RX_FILL')
565  elif self._state == qutest._FAIL or self._state == qutest._SKIP:
566  pass # ignore
567  else:
568  assert 0, 'invalid state in fill'
569 
570  # skip DSL command .......................................................
571  def skip(self, nTests = 9999):
572  if self._to_skip == 0: # not skipping already?
573  self._to_skip = nTests
574 
575  # dummy callbacks --------------------------------------------------------
576  def _dummy_on_reset(self):
577  #print('_dummy_on_reset')
578  pass
579 
580  def _dummy_on_setup(self):
581  #print('_dummy_on_setup')
582  pass
583 
584  def _dummy_on_teardown(self):
585  #print('_dummy_on_teardown')
586  pass
587 
588  # helper methods ---------------------------------------------------------
589  @staticmethod
590  def _run_script(fname):
591  print('--------------------------------------------------'
592  '\nGroup:', fname)
593  qutest._num_groups += 1
594 
595  err = 0 # assume no errors
596  with open(fname) as f:
597  qutest_inst = qutest()
598  try:
599  code = compile(f.read(), fname, 'exec')
600  # execute the script code in a separate instance of qutest
601  exec(code, qutest_inst._DSL_dict)
602  except (AssertionError,
603  RuntimeError,
604  OSError) as e:
605  qutest_inst._fail()
606  print(repr(e))
607  err = -2
608  except: # most likely an error in a test script
609  #exc_type, exc_value, exc_traceback = sys.exc_info()
610  qutest_inst._fail()
611  qutest._quit_host_exe()
612  traceback.print_exc(2)
613  err = -3
614 
615  qutest_inst._test_end()
616  qutest._quit_host_exe()
617  err = qutest._num_failed
618 
619  return err;
620 
621  def _tran(self, state):
622  #print('tran(%d->%d)' %(self._state, state))
623  self._state = state
624 
625  def _test_end(self):
626  if not self._state == qutest._TEST:
627  return
628 
629  if not qutest._have_info:
630  return
631 
632  qspy._sendTo(struct.pack('<B', qspy._TRGT_TEST_TEARDOWN))
633  if not qspy._receive(): # timeout?
634  self._fail('expected: end-of-test',
635  'received: "" (timeout)')
636  return
637 
638  expected = ' Trg-Ack QS_RX_TEST_TEARDOWN'
639  received = qutest._last_record[3:].decode('utf-8')
640  if received == expected:
641  self._DSL_dict['on_teardown']()
642  print('PASS (%.3fs)'
643  %(qutest._time() - self._startTime))
644  return
645  else:
646  self._fail('expected: end-of-test',
647  'received: "%s"' %received)
648  # ignore all input until timeout
649  while qspy._receive():
650  pass
651 
652  def _reset_target(self):
653  if qutest._host_exe != '': # running a host executable?
654  qutest._quit_host_exe()
655 
656  # lauch a new instance of the host executable
657  qutest._have_target = True
658  qutest._have_info = False
659  Popen([qutest._host_exe,
660  qspy._host_name + ':' + str(qspy._tcp_port)])
661 
662  else: # running an embedded target
663  qutest._have_target = True
664  qutest._have_info = False
665  if not qutest._is_debug:
666  qspy._sendTo(struct.pack('<B', qspy._TRGT_RESET))
667 
668  # ignore all input until a timeout (False)
669  while qspy._receive():
670  if qutest._have_info:
671  break
672 
673  if qutest._have_info:
674  qutest._need_reset = False;
675  else:
676  qutest._quit_host_exe()
677  raise RuntimeError('Target reset failed')
678 
679  self._timestamp = 0
680  self._DSL_dict['on_reset']()
681  return (self._state == qutest._TEST)
682 
683  def _on_setup(self):
684  assert self._state == qutest._TEST, \
685  'on_setup() outside the TEST state %d' %self._state
686  if not qutest._have_info:
687  return False
688 
689  qspy._sendTo(struct.pack('<B', qspy._TRGT_TEST_SETUP))
690  self.expect(' Trg-Ack QS_RX_TEST_SETUP')
691  if self._state == qutest._TEST:
692  self._timestamp = 0
693  self._DSL_dict['on_setup']()
694  return True
695 
696  def _before_test(self, command):
697  qutest._num_failed += 1
698  self._tran(qutest._FAIL)
699  msg = '"' + command + '" before any test'
700  raise SyntaxError(msg)
701 
702  def _fail(self, msg1 = '', msg2 = ''):
703  print('FAIL @line:%d (%.3fs):' %(
704  getframeinfo(stack()[-4][0]).lineno,
705  qutest._time() - self._startTime))
706  if msg1 != '':
707  print(' ', msg1)
708  if msg2 != '':
709  print(' ', msg2)
710  qutest._num_failed += 1
711  qutest._need_reset = True
712  self._tran(qutest._FAIL)
713 
714  @staticmethod
715  def _quit_host_exe():
716  if qutest._host_exe != '' and qutest._have_target:
717  qutest._have_target = False
718  qspy._sendTo(struct.pack('<B', qspy._TRGT_RESET))
719  time.sleep(qutest._TOUT) # wait until host-exe quits
720 
721  @staticmethod
722  def _time():
723  if sys.version_info[0] == 2: # Python 2 ?
724  return time.time()
725  else: # Python 3+
726  return time.perf_counter()
727 
728 #=============================================================================
729 # Helper class for communication with the QSPY front-end
730 #
731 class qspy:
732  # class variables...
733  _sock = None
734  _is_attached = False
735  _tx_seq = 0
736  _host_name = 'localhost'
737  _udp_port = 7701
738  _tcp_port = 6601
739  _target_info = {
740  'objPtr': 'L',
741  'funPtr': 'L',
742  'tstamp': 'L',
743  'sig': 'H',
744  'evtSize': 'H',
745  'queueCtr': 'B',
746  'poolCtr': 'H',
747  'poolBlk': 'H',
748  'tevtCtr': 'H',
749  'target' : 'UNKNOWN'
750  }
751 
752  # records to the Target...
753  _TRGT_INFO = 0
754  _TRGT_COMMAND = 1
755  _TRGT_RESET = 2
756  _TRGT_TICK = 3
757  _TRGT_PEEK = 4
758  _TRGT_POKE = 5
759  _TRGT_FILL = 6
760  _TRGT_TEST_SETUP = 7
761  _TRGT_TEST_TEARDOWN = 8
762  _TRGT_TEST_PROBE = 9
763  _TRGT_GLB_FILTER = 10
764  _TRGT_LOC_FILTER = 11
765  _TRGT_AO_FILTER = 12
766  _TRGT_CURR_OBJ = 13
767  _TRGT_CONTINUE = 14
768  _TRGT_QUERY_CURR = 15
769  _TRGT_EVENT = 16
770 
771  # packets to QSPY only...
772  _QSPY_ATTACH = 128
773  _QSPY_DETACH = 129
774  _QSPY_SAVE_DICT = 130
775  _QSPY_SCREEN_OUT = 131
776  _QSPY_BIN_OUT = 132
777  _QSPY_MATLAB_OUT = 133
778  _QSPY_MSCGEN_OUT = 134
779  _QSPY_SEND_EVENT = 135
780  _QSPY_SEND_LOC_FILTER = 136
781  _QSPY_SEND_CURR_OBJ = 137
782  _QSPY_SEND_COMMAND = 138
783  _QSPY_SEND_TEST_PROBE = 139
784 
785  # gloal filter groups...
786  _GRP_ON = 0xF0
787  _GRP_SM = 0xF1
788  _GRP_AO = 0xF2
789  _GRP_MP = 0xF3
790  _GRP_EQ = 0xF4
791  _GRP_TE = 0xF5
792  _GRP_QF = 0xF6
793  _GRP_SC = 0xF7
794  _GRP_U0 = 0xF8
795  _GRP_U1 = 0xF9
796  _GRP_U2 = 0xFA
797  _GRP_U3 = 0xFB
798  _GRP_U4 = 0xFC
799  _GRP_UA = 0xFD
800 
801  # kinds of objects (local-filter and curr-obj)...
802  _OBJ_SM = 0
803  _OBJ_AO = 1
804  _OBJ_MP = 2
805  _OBJ_EQ = 3
806  _OBJ_TE = 4
807  _OBJ_AP = 5
808  _OBJ_SM_AO = 6
809 
810  # special events for QS-RX
811  _EVT_PUBLISH = 0
812  _EVT_POST = 253
813  _EVT_INIT = 254
814  _EVT_DISPATCH = 255
815 
816  # special empty record
817  _EMPTY_RECORD = ' '
818 
819  @staticmethod
820  def _attach(channels = 0x2):
821  # channels: 1-binary, 2-text, 3-both
822  # Create socket and connect
823  qspy._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
824  qspy._sock.connect((qspy._host_name, qspy._udp_port))
825  qspy._sock.settimeout(qutest._TOUT) # timeout for the socket
826 
827  print('Attaching to QSPY (%s:%d) ... '
828  %(qspy._host_name, qspy._udp_port), end = '')
829  qspy._is_attached = False
830  qspy._sendTo(struct.pack('<BB', qspy._QSPY_ATTACH, channels))
831  try:
832  qspy._receive()
833  except:
834  pass
835 
836  if qspy._is_attached:
837  print('OK')
838  return True
839  else:
840  print('FAIL!')
841  return False
842 
843  @staticmethod
844  def _detach():
845  if qspy._sock is None:
846  return
847  qspy._sendTo(struct.pack('<B', qspy._QSPY_DETACH))
848  time.sleep(qutest._TOUT)
849  qspy._sock.shutdown(socket.SHUT_RDWR)
850  qspy._sock.close()
851  qspy._sock = None
852 
853  @staticmethod
854  def _receive():
855  '''returns True if packet received, False if timed out'''
856 
857  if not qutest._is_debug:
858  try:
859  data = qspy._sock.recv(4096)
860  except socket.timeout:
861  qutest._last_record = qspy._EMPTY_RECORD
862  return False # timeout
863  # don't catch OSError
864  else:
865  while True:
866  try:
867  data = qspy._sock.recv(4096)
868  break
869  except socket.timeout:
870  print("\nwaiting for Target (press Enter to skip this test)...", end='')
871  if os.name == 'nt':
872  if msvcrt.kbhit():
873  if msvcrt.getch() == '\r':
874  print()
875  return False; # timeout
876  else:
877  dr,dw,de = select.select([sys.stdin], [], [], 0)
878  if dr != []:
879  sys.stdin.readline() # consue the Return key
880  print()
881  return False; # timeout
882  # don't catch OSError
883 
884  dlen = len(data)
885  if dlen < 2:
886  qutest._last_record = qspy._EMPTY_RECORD
887  raise RuntimeError('UDP packet from QSPY too short')
888 
889  # binary conversion compatible with Python 2
890  recID = struct.unpack('B', data[1:2])[0]
891 
892  if recID == 0: # text data? (most common)
893  qutest._last_record = data
894  # QS_ASSERTION?
895  if dlen > 3 and struct.unpack('B', data[2:3])[0] == 69:
896  qutest._need_reset = True
897 
898  elif recID == 64: # target info?
899  if dlen != 18:
900  qutest._last_record = qspy._EMPTY_RECORD
901  raise RuntimeError('Incorrect Target info')
902 
903  fmt = 'xBHxLxxxQ'
904  bytes = struct.unpack('>BBBBBBBBBBBBB', data[5:18])
905  qspy._target_info['objPtr'] = fmt[bytes[3] & 0x0F]
906  qspy._target_info['funPtr'] = fmt[bytes[3] >> 4]
907  qspy._target_info['tstamp'] = fmt[bytes[4] & 0x0F]
908  qspy._target_info['sig'] = fmt[bytes[0] & 0x0F]
909  qspy._target_info['evtSize'] = fmt[bytes[0] >> 4]
910  qspy._target_info['queueCtr']= fmt[bytes[1] & 0x0F]
911  qspy._target_info['poolCtr'] = fmt[bytes[2] >> 4]
912  qspy._target_info['poolBlk'] = fmt[bytes[2] & 0x0F]
913  qspy._target_info['tevtCtr'] = fmt[bytes[1] >> 4]
914  qspy._target_info['target'] = '%02d%02d%02d_%02d%02d%02d' \
915  %(bytes[12], bytes[11], bytes[10],
916  bytes[9], bytes[8], bytes[7])
917  #print('******* Target:', qspy._target_info['target'])
918  qutest._last_record = data
919  qutest._have_info = True
920 
921  elif recID == 128: # attach
922  qutest._last_record = data
923  qspy._is_attached = True
924 
925  elif recID == 129: # detach
926  qutest._quit_host_exe()
927  qutest._last_record = data
928  qspy._detach()
929  qspy._is_attached = False
930 
931  else:
932  qutest._last_record = qspy._EMPTY_RECORD
933  raise RuntimeError('Unrecognized UDP data type from QSPY')
934 
935  return True # some input received
936 
937  @staticmethod
938  def _sendTo(packet):
939  tx_packet = bytearray({qspy._tx_seq})
940  tx_packet.extend(packet)
941  qspy._sock.send(tx_packet)
942  qspy._tx_seq = (qspy._tx_seq + 1) & 0xFF
943  #print('sendTo', qspy._tx_seq)
944 
945  @staticmethod
946  def _sendEvt(ao_prio, signal, parameters = None):
947  fmt = '<BB' + qspy._target_info['sig'] + 'H'
948  if parameters is not None:
949  length = len(parameters)
950  else:
951  length = 0
952 
953  if isinstance(signal, int):
954  packet = bytearray(struct.pack(
955  fmt, qspy._TRGT_EVENT, ao_prio, signal, length))
956  if parameters is not None:
957  packet.extend(parameters)
958  else:
959  packet = bytearray(struct.pack(
960  fmt, qspy._QSPY_SEND_EVENT, ao_prio, 0, length))
961  if parameters is not None:
962  packet.extend(parameters)
963  packet.extend(qspy._str2bin(signal))
964 
965  qspy._sendTo(packet)
966 
967  @staticmethod
968  def _str2bin(str):
969  if sys.version_info[0] == 2: # Python 2 ?
970  packed = str
971  else: # Python 3+
972  packed = bytes(str, 'utf-8')
973  fmt = '%dsB' %(len(packed) + 1)
974  # Null terminate and return
975  return struct.pack(fmt, packed, 0)
976 
977 
978 #=============================================================================
979 # main entry point to qutest
980 def _main(argv):
981  startTime = qutest._time()
982 
983  print('QUTest unit testing front-end %d.%d.%d running on Python %s' \
984  %(qutest._VERSION//100,
985  (qutest._VERSION//10) % 10,
986  qutest._VERSION % 10, python_version()))
987  print('Copyright (c) 2005-2019 Quantum Leaps, www.state-machine.com')
988 
989  # list of scripts to exectute...
990  scripts = []
991 
992  # process command-line arguments...
993  args = argv[1:] # remove the 'qutest' name
994  #print(args)
995 
996  if '-h' in args or '--help' in args or '?' in args:
997  print('\nUsage: qutest [-x] [test-scripts] '
998  '[host_exe] [qspy_host[:udp_port]] [qspy_tcp_port]\n\n'
999  'help at: https://www.state-machine.com/qtools/qutest.html')
1000  return 0
1001 
1002  argc = len(args)
1003  if argc > 0 and args[0] == '-x':
1004  qutest._exit_on_fail = True
1005  args = args[1:0]
1006  argc -= 1
1007 
1008  if argc > 0:
1009  # scan args for test scripts...
1010  new_args = [] # arguments after removing all test scripts
1011  for arg in args:
1012  # if test file input uses wildcard, find matches
1013  if arg.find('*') >= 0:
1014  scripts.extend(glob(arg))
1015  elif arg.endswith('.py'):
1016  scripts.extend(glob(arg))
1017  else:
1018  new_args.append(arg)
1019 
1020  argc = len(new_args)
1021  if argc > 0:
1022  qutest._host_exe = new_args[0]
1023  if qutest._host_exe == 'DEBUG':
1024  qutest._host_exe = ''
1025  qutest._is_debug = True
1026  if argc > 1:
1027  host_port = new_args[1].split(':')
1028  if len(host_port) > 0:
1029  qspy._host_name = host_port[0]
1030  if len(host_port) > 1:
1031  qspy._udp_port = int(host_port[1])
1032  if argc > 2:
1033  qspy._tcp_port = new_args[2]
1034  else:
1035  scripts.extend(glob('*.py'))
1036 
1037  # attach to QSPY
1038  if not qspy._attach():
1039  return -1
1040 
1041  # run all the test scripts...
1042  err = 0
1043  for scr in scripts:
1044  # run the script...
1045  err = qutest._run_script(scr)
1046 
1047  # error encountered and shall we quit on failure?
1048  if err != 0 and qutest._exit_on_fail:
1049  break
1050 
1051  if qutest._num_failed == 0:
1052  status = 'OK'
1053  else:
1054  status = 'FAIL!'
1055  if qutest._have_info:
1056  print('============= Target:',
1057  qspy._target_info['target'], '==============')
1058  else:
1059  print('================= (no target ) ===================')
1060 
1061  print('%d Groups, %d Tests, %d Failures, %d Skipped (%.3fs)'
1062  '\n%s'
1063  %(qutest._num_groups, qutest._num_tests,
1064  qutest._num_failed, qutest._num_skipped,
1065  (qutest._time() - startTime),
1066  status))
1067 
1068  qutest._quit_host_exe()
1069  qspy._detach()
1070 
1071  return err
1072 
1073 #=============================================================================
1074 if __name__ == '__main__':
1075  sys.exit(_main(sys.argv))
qutest.qutest._to_skip
_to_skip
Definition: qutest.py:95
qutest.qutest.skip
def skip(self, nTests=9999)
Definition: qutest.py:571
qutest.qutest.dispatch
def dispatch(self, signal, params=None)
Definition: qutest.py:426
qutest.qutest._startTime
_startTime
Definition: qutest.py:94
qutest.qutest._reset_target
def _reset_target(self)
Definition: qutest.py:652
qutest.qutest._dummy_on_reset
def _dummy_on_reset(self)
Definition: qutest.py:576
qutest.qutest._DSL_dict
_DSL_dict
Definition: qutest.py:101
qutest.qutest.current_obj
def current_obj(self, obj_kind, obj_id)
Definition: qutest.py:322
qutest.qutest.poke
def poke(self, offset, size, data)
Definition: qutest.py:524
qutest.qutest
Definition: qutest.py:59
qutest.qutest.tick
def tick(self, tick_rate=0)
Definition: qutest.py:493
qutest.qutest.__del__
def __del__(self)
Definition: qutest.py:151
qutest.qutest.peek
def peek(self, offset, size, num)
Definition: qutest.py:507
qutest.qutest._on_setup
def _on_setup(self)
Definition: qutest.py:683
qutest.qutest.__init__
def __init__(self)
Definition: qutest.py:87
qutest.qutest._state
_state
Definition: qutest.py:92
qutest.qutest._dummy_on_teardown
def _dummy_on_teardown(self)
Definition: qutest.py:584
qutest.qutest.probe
def probe(self, func, data)
Definition: qutest.py:468
qutest.qutest._timestamp
_timestamp
Definition: qutest.py:93
qutest.qutest.init
def init(self, signal=0, params=None)
Definition: qutest.py:412
qutest.qspy
Definition: qutest.py:731
qutest.qutest.loc_filter
def loc_filter(self, obj_kind, obj_id)
Definition: qutest.py:296
qutest
Definition: qutest.py:1
qutest.qutest._fail
def _fail(self, msg1='', msg2='')
Definition: qutest.py:702
qutest.qutest._dummy_on_setup
def _dummy_on_setup(self)
Definition: qutest.py:580
qutest.qutest._before_test
def _before_test(self, command)
Definition: qutest.py:696
qutest.qutest.command
def command(self, cmdId, param1=0, param2=0, param3=0)
Definition: qutest.py:389
qutest.qutest.glb_filter
def glb_filter(self, *args)
Definition: qutest.py:228
qutest.qutest.fill
def fill(self, offset, size, num, item=0)
Definition: qutest.py:545
qutest.qutest.expect
def expect(self, match)
Definition: qutest.py:193
qutest.qutest.continue_test
def continue_test(self)
Definition: qutest.py:375
qutest.qutest._tran
def _tran(self, state)
Definition: qutest.py:621
qutest.qutest.test
def test(self, name, opt=0)
Definition: qutest.py:159
qutest.qutest.query_curr
def query_curr(self, obj_kind)
Definition: qutest.py:348
qutest.qutest.publish
def publish(self, signal, params=None)
Definition: qutest.py:454
qutest.qutest._test_end
def _test_end(self)
Definition: qutest.py:625
qutest.qutest.expect_pause
def expect_pause(self)
Definition: qutest.py:362
qutest.qutest.post
def post(self, signal, params=None)
Definition: qutest.py:440