• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#    Copyright 2013-2015 ARM Limited
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15
16
17import time
18from contextlib import contextmanager
19from logging import Logger
20
21import serial
22
23import pexpect
24from distutils.version import StrictVersion as V
25if V(pexpect.__version__) < V('4.0.0'):
26    import fdpexpect
27else:
28    from pexpect import fdpexpect
29# Adding pexpect exceptions into this module's namespace
30from pexpect import EOF, TIMEOUT  # NOQA pylint: disable=W0611
31
32from devlib.exception import HostError
33
34
35def pulse_dtr(conn, state=True, duration=0.1):
36    """Set the DTR line of the specified serial connection to the specified state
37    for the specified duration (note: the initial state of the line is *not* checked."""
38    conn.setDTR(state)
39    time.sleep(duration)
40    conn.setDTR(not state)
41
42
43def get_connection(timeout, init_dtr=None, logcls=Logger,
44                   *args, **kwargs):
45    if init_dtr is not None:
46        kwargs['dsrdtr'] = True
47    try:
48        conn = serial.Serial(*args, **kwargs)
49    except serial.SerialException as e:
50        raise HostError(e.message)
51    if init_dtr is not None:
52        conn.setDTR(init_dtr)
53    conn.nonblocking()
54    conn.flushOutput()
55    target = fdpexpect.fdspawn(conn.fileno(), timeout=timeout)
56    target.logfile_read = logcls('read')
57    target.logfile_send = logcls('send')
58
59    # Monkey-patching sendline to introduce a short delay after
60    # chacters are sent to the serial. If two sendline s are issued
61    # one after another the second one might start putting characters
62    # into the serial device before the first one has finished, causing
63    # corruption. The delay prevents that.
64    tsln = target.sendline
65
66    def sendline(x):
67        tsln(x)
68        time.sleep(0.1)
69
70    target.sendline = sendline
71    return target, conn
72
73
74def write_characters(conn, line, delay=0.05):
75    """Write a single line out to serial charcter-by-character. This will ensure that nothing will
76    be dropped for longer lines."""
77    line = line.rstrip('\r\n')
78    for c in line:
79        conn.send(c)
80        time.sleep(delay)
81    conn.sendline('')
82
83
84@contextmanager
85def open_serial_connection(timeout, get_conn=False, init_dtr=None,
86                           logcls=Logger, *args, **kwargs):
87    """
88    Opens a serial connection to a device.
89
90    :param timeout: timeout for the fdpexpect spawn object.
91    :param conn: ``bool`` that specfies whether the underlying connection
92                 object should be yielded as well.
93    :param init_dtr: specifies the initial DTR state stat should be set.
94
95    All arguments are passed into the __init__ of serial.Serial. See
96    pyserial documentation for details:
97
98        http://pyserial.sourceforge.net/pyserial_api.html#serial.Serial
99
100    :returns: a pexpect spawn object connected to the device.
101              See: http://pexpect.sourceforge.net/pexpect.html
102
103    """
104    target, conn = get_connection(timeout, init_dtr=init_dtr,
105                                  logcls=logcls, *args, **kwargs)
106    if get_conn:
107        yield target, conn
108    else:
109        yield target
110
111    target.close()  # Closes the file descriptor used by the conn.
112    del conn
113
114