Movatterモバイル変換


[0]ホーム

URL:


Google Git
Sign in
chromium /external /github.com /python /cpython /refs/tags/v2.0.1 /. /Lib /pty.py
blob: 12c9093a24ffa1ddd6fb8c5e0c1f7e02a8706426 [file] [log] [blame]
"""Pseudo terminal utilities."""
# Bugs: No signal handling. Doesn't set slave termios and window size.
#Only tested on Linux.
# See: W. Richard Stevens. 1992. Advanced Programming in the
#UNIX Environment. Chapter 19.
# Author: Steen Lumholt -- with additions by Guido.
from selectimport select
import os, FCNTL
import tty
STDIN_FILENO=0
STDOUT_FILENO=1
STDERR_FILENO=2
CHILD=0
def openpty():
"""openpty() -> (master_fd, slave_fd)
Open a pty master/slave pair, using os.openpty() if possible."""
try:
return os.openpty()
except(AttributeError,OSError):
pass
master_fd, slave_name= _open_terminal()
slave_fd= slave_open(slave_name)
return master_fd, slave_fd
def master_open():
"""master_open() -> (master_fd, slave_name)
Open a pty master and return the fd, and the filename of the slave end.
Deprecated, use openpty() instead."""
try:
master_fd, slave_fd= os.openpty()
except(AttributeError,OSError):
pass
else:
slave_name= os.ttyname(slave_fd)
os.close(slave_fd)
return master_fd, slave_name
return _open_terminal()
def _open_terminal():
"""Open pty master and return (master_fd, tty_name).
SGI and generic BSD version, for when openpty() fails."""
try:
import sgi
exceptImportError:
pass
else:
try:
tty_name, master_fd= sgi._getpty(FCNTL.O_RDWR,0666,0)
exceptIOError, msg:
raise os.error, msg
return master_fd, tty_name
for xin'pqrstuvwxyzPQRST':
for yin'0123456789abcdef':
pty_name='/dev/pty'+ x+ y
try:
fd= os.open(pty_name, FCNTL.O_RDWR)
except os.error:
continue
return(fd,'/dev/tty'+ x+ y)
raise os.error,'out of pty devices'
def slave_open(tty_name):
"""slave_open(tty_name) -> slave_fd
Open the pty slave and acquire the controlling terminal, returning
opened filedescriptor.
Deprecated, use openpty() instead."""
return os.open(tty_name, FCNTL.O_RDWR)
def fork():
"""fork() -> (pid, master_fd)
Fork and make the child a session leader with a controlling terminal."""
try:
pid, fd= os.forkpty()
except(AttributeError,OSError):
pass
else:
if pid== CHILD:
try:
os.setsid()
exceptOSError:
# os.forkpty() already set us session leader
pass
return pid, fd
master_fd, slave_fd= openpty()
pid= os.fork()
if pid== CHILD:
# Establish a new session.
os.setsid()
os.close(master_fd)
# Slave becomes stdin/stdout/stderr of child.
os.dup2(slave_fd, STDIN_FILENO)
os.dup2(slave_fd, STDOUT_FILENO)
os.dup2(slave_fd, STDERR_FILENO)
if(slave_fd> STDERR_FILENO):
os.close(slave_fd)
# Parent and child process.
return pid, master_fd
def _writen(fd, data):
"""Write all the data to a descriptor."""
while data!='':
n= os.write(fd, data)
data= data[n:]
def _read(fd):
"""Default read function."""
return os.read(fd,1024)
def _copy(master_fd, master_read=_read, stdin_read=_read):
"""Parent copy loop.
Copies
pty master -> standard output(master_read)
standard input -> pty master(stdin_read)"""
while1:
rfds, wfds, xfds= select(
[master_fd, STDIN_FILENO],[],[])
if master_fdin rfds:
data= master_read(master_fd)
os.write(STDOUT_FILENO, data)
if STDIN_FILENOin rfds:
data= stdin_read(STDIN_FILENO)
_writen(master_fd, data)
def spawn(argv, master_read=_read, stdin_read=_read):
"""Create a spawned process."""
if type(argv)== type(''):
argv=(argv,)
pid, master_fd= fork()
if pid== CHILD:
apply(os.execlp,(argv[0],)+ argv)
mode= tty.tcgetattr(STDIN_FILENO)
tty.setraw(STDIN_FILENO)
try:
_copy(master_fd, master_read, stdin_read)
except:
tty.tcsetattr(STDIN_FILENO, tty.TCSAFLUSH, mode)

[8]ページ先頭

©2009-2025 Movatter.jp