Join GitHub today
GitHub is home to over 50 million developers working together to host and review code, manage projects, and build software together.
Sign upmaster
cpython/Lib/os.py/Jump to
Code definitions _existsFunction_get_exports_listFunctiondelFunctiondelFunctiondelFunction_addFunctiondelFunctiondelFunctiondelFunctiondelFunctionmakedirsFunctionremovedirsFunctionrenamesFunctionwalkFunction_walkFunctionfwalkFunction_fwalkFunctionassertFunctionexeclFunctionexecleFunctionexeclpFunctionexeclpeFunctionexecvpFunctionexecvpeFunction_execvpeFunctionget_exec_pathFunction_EnvironClass__init__Function__getitem__Function__setitem__Function__delitem__FunctiondelFunction__iter__Function__len__Function__repr__FunctioncopyFunctionsetdefaultFunction__ior__Function__or__Function__ror__Function_createenvironFunctioncheck_strFunctionencodekeyFunctionencodeFunctiondecodeFunctiondelFunctiongetenvFunction_check_bytesFunctiondelFunctiongetenvbFunction_fscodecFunctionfsencodeFunctionfsdecodeFunctiondelFunction_spawnvefFunctionspawnvFunctionspawnveFunctionspawnvpFunctionspawnvpeFunctionspawnlFunctionspawnleFunctionspawnlpFunctionspawnlpeFunctionpopenFunction_wrap_closeClass__init__FunctioncloseFunction__enter__Function__exit__Function__getattr__Function__iter__FunctionfdopenFunction_fspathFunctionPathLikeClass__fspath__Function__subclasshook__Function_AddedDllDirectoryClass__init__FunctioncloseFunction__enter__Function__exit__Function__repr__Functionadd_dll_directoryFunction
Go to fileJump to
- Go to file
Copy path
This implements things like `list[int]`,which returns an object of type `types.GenericAlias`.This object mostly acts as a proxy for `list`,but has attributes `__origin__` and `__args__`that allow recovering the parts (with values `list` and `(int,)`.There is also an approximate notion of type variables;e.g. `list[T]` has a `__parameters__` attribute equal to `(T,)`.Type variables are objects of type `typing.TypeVar`.
r"""OS routines for NT or Posix depending on what system we're on. | |
This exports: | |
- all functions from posix or nt, e.g. unlink, stat, etc. | |
- os.path is either posixpath or ntpath | |
- os.name is either 'posix' or 'nt' | |
- os.curdir is a string representing the current directory (always '.') | |
- os.pardir is a string representing the parent directory (always '..') | |
- os.sep is the (or a most common) pathname separator ('/' or '\\') | |
- os.extsep is the extension separator (always '.') | |
- os.altsep is the alternate pathname separator (None or '/') | |
- os.pathsep is the component separator used in $PATH etc | |
- os.linesep is the line separator in text files ('\r' or '\n' or '\r\n') | |
- os.defpath is the default search path for executables | |
- os.devnull is the file path of the null device ('/dev/null', etc.) | |
Programs that import and use 'os' stand a better chance of being | |
portable between different platforms. Of course, they must then | |
only use functions that are defined by all platforms (e.g., unlink | |
and opendir), and leave all pathname manipulation to os.path | |
(e.g., split and join). | |
""" | |
#' | |
importabc | |
importsys | |
importstatasst | |
from_collections_abcimport_check_methods | |
GenericAlias=type(list[int]) | |
_names=sys.builtin_module_names | |
# Note: more names are added to __all__ later. | |
__all__= ["altsep","curdir","pardir","sep","pathsep","linesep", | |
"defpath","name","path","devnull","SEEK_SET","SEEK_CUR", | |
"SEEK_END","fsencode","fsdecode","get_exec_path","fdopen", | |
"popen","extsep"] | |
def_exists(name): | |
returnnameinglobals() | |
def_get_exports_list(module): | |
try: | |
returnlist(module.__all__) | |
exceptAttributeError: | |
return [nfornindir(module)ifn[0]!='_'] | |
# Any new dependencies of the os module and/or changes in path separator | |
# requires updating importlib as well. | |
if'posix'in_names: | |
name='posix' | |
linesep='\n' | |
fromposiximport* | |
try: | |
fromposiximport_exit | |
__all__.append('_exit') | |
exceptImportError: | |
pass | |
importposixpathaspath | |
try: | |
fromposiximport_have_functions | |
exceptImportError: | |
pass | |
importposix | |
__all__.extend(_get_exports_list(posix)) | |
delposix | |
elif'nt'in_names: | |
name='nt' | |
linesep='\r\n' | |
fromntimport* | |
try: | |
fromntimport_exit | |
__all__.append('_exit') | |
exceptImportError: | |
pass | |
importntpathaspath | |
importnt | |
__all__.extend(_get_exports_list(nt)) | |
delnt | |
try: | |
fromntimport_have_functions | |
exceptImportError: | |
pass | |
else: | |
raiseImportError('no os specific module found') | |
sys.modules['os.path']=path | |
fromos.pathimport (curdir,pardir,sep,pathsep,defpath,extsep,altsep, | |
devnull) | |
del_names | |
if_exists("_have_functions"): | |
_globals=globals() | |
def_add(str,fn): | |
if (fnin_globals)and (strin_have_functions): | |
_set.add(_globals[fn]) | |
_set=set() | |
_add("HAVE_FACCESSAT","access") | |
_add("HAVE_FCHMODAT","chmod") | |
_add("HAVE_FCHOWNAT","chown") | |
_add("HAVE_FSTATAT","stat") | |
_add("HAVE_FUTIMESAT","utime") | |
_add("HAVE_LINKAT","link") | |
_add("HAVE_MKDIRAT","mkdir") | |
_add("HAVE_MKFIFOAT","mkfifo") | |
_add("HAVE_MKNODAT","mknod") | |
_add("HAVE_OPENAT","open") | |
_add("HAVE_READLINKAT","readlink") | |
_add("HAVE_RENAMEAT","rename") | |
_add("HAVE_SYMLINKAT","symlink") | |
_add("HAVE_UNLINKAT","unlink") | |
_add("HAVE_UNLINKAT","rmdir") | |
_add("HAVE_UTIMENSAT","utime") | |
supports_dir_fd=_set | |
_set=set() | |
_add("HAVE_FACCESSAT","access") | |
supports_effective_ids=_set | |
_set=set() | |
_add("HAVE_FCHDIR","chdir") | |
_add("HAVE_FCHMOD","chmod") | |
_add("HAVE_FCHOWN","chown") | |
_add("HAVE_FDOPENDIR","listdir") | |
_add("HAVE_FDOPENDIR","scandir") | |
_add("HAVE_FEXECVE","execve") | |
_set.add(stat)# fstat always works | |
_add("HAVE_FTRUNCATE","truncate") | |
_add("HAVE_FUTIMENS","utime") | |
_add("HAVE_FUTIMES","utime") | |
_add("HAVE_FPATHCONF","pathconf") | |
if_exists("statvfs")and_exists("fstatvfs"):# mac os x10.3 | |
_add("HAVE_FSTATVFS","statvfs") | |
supports_fd=_set | |
_set=set() | |
_add("HAVE_FACCESSAT","access") | |
# Some platforms don't support lchmod(). Often the function exists | |
# anyway, as a stub that always returns ENOSUP or perhaps EOPNOTSUPP. | |
# (No, I don't know why that's a good design.) ./configure will detect | |
# this and reject it--so HAVE_LCHMOD still won't be defined on such | |
# platforms. This is Very Helpful. | |
# | |
# However, sometimes platforms without a working lchmod() *do* have | |
# fchmodat(). (Examples: Linux kernel 3.2 with glibc 2.15, | |
# OpenIndiana 3.x.) And fchmodat() has a flag that theoretically makes | |
# it behave like lchmod(). So in theory it would be a suitable | |
# replacement for lchmod(). But when lchmod() doesn't work, fchmodat()'s | |
# flag doesn't work *either*. Sadly ./configure isn't sophisticated | |
# enough to detect this condition--it only determines whether or not | |
# fchmodat() minimally works. | |
# | |
# Therefore we simply ignore fchmodat() when deciding whether or not | |
# os.chmod supports follow_symlinks. Just checking lchmod() is | |
# sufficient. After all--if you have a working fchmodat(), your | |
# lchmod() almost certainly works too. | |
# | |
# _add("HAVE_FCHMODAT", "chmod") | |
_add("HAVE_FCHOWNAT","chown") | |
_add("HAVE_FSTATAT","stat") | |
_add("HAVE_LCHFLAGS","chflags") | |
_add("HAVE_LCHMOD","chmod") | |
if_exists("lchown"):# mac os x10.3 | |
_add("HAVE_LCHOWN","chown") | |
_add("HAVE_LINKAT","link") | |
_add("HAVE_LUTIMES","utime") | |
_add("HAVE_LSTAT","stat") | |
_add("HAVE_FSTATAT","stat") | |
_add("HAVE_UTIMENSAT","utime") | |
_add("MS_WINDOWS","stat") | |
supports_follow_symlinks=_set | |
del_set | |
del_have_functions | |
del_globals | |
del_add | |
# Python uses fixed values for the SEEK_ constants; they are mapped | |
# to native constants if necessary in posixmodule.c | |
# Other possible SEEK values are directly imported from posixmodule.c | |
SEEK_SET=0 | |
SEEK_CUR=1 | |
SEEK_END=2 | |
# Super directory utilities. | |
# (Inspired by Eric Raymond; the doc strings are mostly his) | |
defmakedirs(name,mode=0o777,exist_ok=False): | |
"""makedirs(name [, mode=0o777][, exist_ok=False]) | |
Super-mkdir; create a leaf directory and all intermediate ones. Works like | |
mkdir, except that any intermediate path segment (not just the rightmost) | |
will be created if it does not exist. If the target directory already | |
exists, raise an OSError if exist_ok is False. Otherwise no exception is | |
raised. This is recursive. | |
""" | |
head,tail=path.split(name) | |
ifnottail: | |
head,tail=path.split(head) | |
ifheadandtailandnotpath.exists(head): | |
try: | |
makedirs(head,exist_ok=exist_ok) | |
exceptFileExistsError: | |
# Defeats race condition when another thread created the path | |
pass | |
cdir=curdir | |
ifisinstance(tail,bytes): | |
cdir=bytes(curdir,'ASCII') | |
iftail==cdir:# xxx/newdir/. exists if xxx/newdir exists | |
return | |
try: | |
mkdir(name,mode) | |
exceptOSError: | |
# Cannot rely on checking for EEXIST, since the operating system | |
# could give priority to other errors like EACCES or EROFS | |
ifnotexist_okornotpath.isdir(name): | |
raise | |
defremovedirs(name): | |
"""removedirs(name) | |
Super-rmdir; remove a leaf directory and all empty intermediate | |
ones. Works like rmdir except that, if the leaf directory is | |
successfully removed, directories corresponding to rightmost path | |
segments will be pruned away until either the whole path is | |
consumed or an error occurs. Errors during this latter phase are | |
ignored -- they generally mean that a directory was not empty. | |
""" | |
rmdir(name) | |
head,tail=path.split(name) | |
ifnottail: | |
head,tail=path.split(head) | |
whileheadandtail: | |
try: | |
rmdir(head) | |
exceptOSError: | |
break | |
head,tail=path.split(head) | |
defrenames(old,new): | |
"""renames(old, new) | |
Super-rename; create directories as necessary and delete any left | |
empty. Works like rename, except creation of any intermediate | |
directories needed to make the new pathname good is attempted | |
first. After the rename, directories corresponding to rightmost | |
path segments of the old name will be pruned until either the | |
whole path is consumed or a nonempty directory is found. | |
Note: this function can fail with the new directory structure made | |
if you lack permissions needed to unlink the leaf directory or | |
file. | |
""" | |
head,tail=path.split(new) | |
ifheadandtailandnotpath.exists(head): | |
makedirs(head) | |
rename(old,new) | |
head,tail=path.split(old) | |
ifheadandtail: | |
try: | |
removedirs(head) | |
exceptOSError: | |
pass | |
__all__.extend(["makedirs","removedirs","renames"]) | |
defwalk(top,topdown=True,onerror=None,followlinks=False): | |
"""Directory tree generator. | |
For each directory in the directory tree rooted at top (including top | |
itself, but excluding '.' and '..'), yields a 3-tuple | |
dirpath, dirnames, filenames | |
dirpath is a string, the path to the directory. dirnames is a list of | |
the names of the subdirectories in dirpath (excluding '.' and '..'). | |
filenames is a list of the names of the non-directory files in dirpath. | |
Note that the names in the lists are just names, with no path components. | |
To get a full path (which begins with top) to a file or directory in | |
dirpath, do os.path.join(dirpath, name). | |
If optional arg 'topdown' is true or not specified, the triple for a | |
directory is generated before the triples for any of its subdirectories | |
(directories are generated top down). If topdown is false, the triple | |
for a directory is generated after the triples for all of its | |
subdirectories (directories are generated bottom up). | |
When topdown is true, the caller can modify the dirnames list in-place | |
(e.g., via del or slice assignment), and walk will only recurse into the | |
subdirectories whose names remain in dirnames; this can be used to prune the | |
search, or to impose a specific order of visiting. Modifying dirnames when | |
topdown is false has no effect on the behavior of os.walk(), since the | |
directories in dirnames have already been generated by the time dirnames | |
itself is generated. No matter the value of topdown, the list of | |
subdirectories is retrieved before the tuples for the directory and its | |
subdirectories are generated. | |
By default errors from the os.scandir() call are ignored. If | |
optional arg 'onerror' is specified, it should be a function; it | |
will be called with one argument, an OSError instance. It can | |
report the error to continue with the walk, or raise the exception | |
to abort the walk. Note that the filename is available as the | |
filename attribute of the exception object. | |
By default, os.walk does not follow symbolic links to subdirectories on | |
systems that support them. In order to get this functionality, set the | |
optional argument 'followlinks' to true. | |
Caution: if you pass a relative pathname for top, don't change the | |
current working directory between resumptions of walk. walk never | |
changes the current directory, and assumes that the client doesn't | |
either. | |
Example: | |
import os | |
from os.path import join, getsize | |
for root, dirs, files in os.walk('python/Lib/email'): | |
print(root, "consumes", end="") | |
print(sum(getsize(join(root, name)) for name in files), end="") | |
print("bytes in", len(files), "non-directory files") | |
if 'CVS' in dirs: | |
dirs.remove('CVS') # don't visit CVS directories | |
""" | |
sys.audit("os.walk",top,topdown,onerror,followlinks) | |
return_walk(fspath(top),topdown,onerror,followlinks) | |
def_walk(top,topdown,onerror,followlinks): | |
dirs= [] | |
nondirs= [] | |
walk_dirs= [] | |
# We may not have read permission for top, in which case we can't | |
# get a list of the files the directory contains. os.walk | |
# always suppressed the exception then, rather than blow up for a | |
# minor reason when (say) a thousand readable directories are still | |
# left to visit. That logic is copied here. | |
try: | |
# Note that scandir is global in this module due | |
# to earlier import-*. | |
scandir_it=scandir(top) | |
exceptOSErroraserror: | |
ifonerrorisnotNone: | |
onerror(error) | |
return | |
withscandir_it: | |
whileTrue: | |
try: | |
try: | |
entry=next(scandir_it) | |
exceptStopIteration: | |
break | |
exceptOSErroraserror: | |
ifonerrorisnotNone: | |
onerror(error) | |
return | |
try: | |
is_dir=entry.is_dir() | |
exceptOSError: | |
# If is_dir() raises an OSError, consider that the entry is not | |
# a directory, same behaviour than os.path.isdir(). | |
is_dir=False | |
ifis_dir: | |
dirs.append(entry.name) | |
else: | |
nondirs.append(entry.name) | |
ifnottopdownandis_dir: | |
# Bottom-up: recurse into sub-directory, but exclude symlinks to | |
# directories if followlinks is False | |
iffollowlinks: | |
walk_into=True | |
else: | |
try: | |
is_symlink=entry.is_symlink() | |
exceptOSError: | |
# If is_symlink() raises an OSError, consider that the | |
# entry is not a symbolic link, same behaviour than | |
# os.path.islink(). | |
is_symlink=False | |
walk_into=notis_symlink | |
ifwalk_into: | |
walk_dirs.append(entry.path) | |
# Yield before recursion if going top down | |
iftopdown: | |
yieldtop,dirs,nondirs | |
# Recurse into sub-directories | |
islink,join=path.islink,path.join | |
fordirnameindirs: | |
new_path=join(top,dirname) | |
# Issue #23605: os.path.islink() is used instead of caching | |
# entry.is_symlink() result during the loop on os.scandir() because | |
# the caller can replace the directory entry during the "yield" | |
# above. | |
iffollowlinksornotislink(new_path): | |
yieldfrom_walk(new_path,topdown,onerror,followlinks) | |
else: | |
# Recurse into sub-directories | |
fornew_pathinwalk_dirs: | |
yieldfrom_walk(new_path,topdown,onerror,followlinks) | |
# Yield after recursion if going bottom up | |
yieldtop,dirs,nondirs | |
__all__.append("walk") | |
if {open,stat}<=supports_dir_fdand {scandir,stat}<=supports_fd: | |
deffwalk(top=".",topdown=True,onerror=None, *,follow_symlinks=False,dir_fd=None): | |
"""Directory tree generator. | |
This behaves exactly like walk(), except that it yields a 4-tuple | |
dirpath, dirnames, filenames, dirfd | |
`dirpath`, `dirnames` and `filenames` are identical to walk() output, | |
and `dirfd` is a file descriptor referring to the directory `dirpath`. | |
The advantage of fwalk() over walk() is that it's safe against symlink | |
races (when follow_symlinks is False). | |
If dir_fd is not None, it should be a file descriptor open to a directory, | |
and top should be relative; top will then be relative to that directory. | |
(dir_fd is always supported for fwalk.) | |
Caution: | |
Since fwalk() yields file descriptors, those are only valid until the | |
next iteration step, so you should dup() them if you want to keep them | |
for a longer period. | |
Example: | |
import os | |
for root, dirs, files, rootfd in os.fwalk('python/Lib/email'): | |
print(root, "consumes", end="") | |
print(sum(os.stat(name, dir_fd=rootfd).st_size for name in files), | |
end="") | |
print("bytes in", len(files), "non-directory files") | |
if 'CVS' in dirs: | |
dirs.remove('CVS') # don't visit CVS directories | |
""" | |
sys.audit("os.fwalk",top,topdown,onerror,follow_symlinks,dir_fd) | |
ifnotisinstance(top,int)ornothasattr(top,'__index__'): | |
top=fspath(top) | |
# Note: To guard against symlink races, we use the standard | |
# lstat()/open()/fstat() trick. | |
ifnotfollow_symlinks: | |
orig_st=stat(top,follow_symlinks=False,dir_fd=dir_fd) | |
topfd=open(top,O_RDONLY,dir_fd=dir_fd) | |
try: | |
if (follow_symlinksor (st.S_ISDIR(orig_st.st_mode)and | |
path.samestat(orig_st,stat(topfd)))): | |
yieldfrom_fwalk(topfd,top,isinstance(top,bytes), | |
topdown,onerror,follow_symlinks) | |
finally: | |
close(topfd) | |
def_fwalk(topfd,toppath,isbytes,topdown,onerror,follow_symlinks): | |
# Note: This uses O(depth of the directory tree) file descriptors: if | |
# necessary, it can be adapted to only require O(1) FDs, see issue | |
# #13734. | |
scandir_it=scandir(topfd) | |
dirs= [] | |
nondirs= [] | |
entries=Noneiftopdownorfollow_symlinkselse [] | |
forentryinscandir_it: | |
name=entry.name | |
ifisbytes: | |
name=fsencode(name) | |
try: | |
ifentry.is_dir(): | |
dirs.append(name) | |
ifentriesisnotNone: | |
entries.append(entry) | |
else: | |
nondirs.append(name) | |
exceptOSError: | |
try: | |
# Add dangling symlinks, ignore disappeared files | |
ifentry.is_symlink(): | |
nondirs.append(name) | |
exceptOSError: | |
pass | |
iftopdown: | |
yieldtoppath,dirs,nondirs,topfd | |
fornameindirsifentriesisNoneelsezip(dirs,entries): | |
try: | |
ifnotfollow_symlinks: | |
iftopdown: | |
orig_st=stat(name,dir_fd=topfd,follow_symlinks=False) | |
else: | |
assertentriesisnotNone | |
name,entry=name | |
orig_st=entry.stat(follow_symlinks=False) | |
dirfd=open(name,O_RDONLY,dir_fd=topfd) | |
exceptOSErroraserr: | |
ifonerrorisnotNone: | |
onerror(err) | |
continue | |
try: | |
iffollow_symlinksorpath.samestat(orig_st,stat(dirfd)): | |
dirpath=path.join(toppath,name) | |
yieldfrom_fwalk(dirfd,dirpath,isbytes, | |
topdown,onerror,follow_symlinks) | |
finally: | |
close(dirfd) | |
ifnottopdown: | |
yieldtoppath,dirs,nondirs,topfd | |
__all__.append("fwalk") | |
defexecl(file,*args): | |
"""execl(file, *args) | |
Execute the executable file with argument list args, replacing the | |
current process. """ | |
execv(file,args) | |
defexecle(file,*args): | |
"""execle(file, *args, env) | |
Execute the executable file with argument list args and | |
environment env, replacing the current process. """ | |
env=args[-1] | |
execve(file,args[:-1],env) | |
defexeclp(file,*args): | |
"""execlp(file, *args) | |
Execute the executable file (which is searched for along $PATH) | |
with argument list args, replacing the current process. """ | |
execvp(file,args) | |
defexeclpe(file,*args): | |
"""execlpe(file, *args, env) | |
Execute the executable file (which is searched for along $PATH) | |
with argument list args and environment env, replacing the current | |
process. """ | |
env=args[-1] | |
execvpe(file,args[:-1],env) | |
defexecvp(file,args): | |
"""execvp(file, args) | |
Execute the executable file (which is searched for along $PATH) | |
with argument list args, replacing the current process. | |
args may be a list or tuple of strings. """ | |
_execvpe(file,args) | |
defexecvpe(file,args,env): | |
"""execvpe(file, args, env) | |
Execute the executable file (which is searched for along $PATH) | |
with argument list args and environment env, replacing the | |
current process. | |
args may be a list or tuple of strings. """ | |
_execvpe(file,args,env) | |
__all__.extend(["execl","execle","execlp","execlpe","execvp","execvpe"]) | |
def_execvpe(file,args,env=None): | |
ifenvisnotNone: | |
exec_func=execve | |
argrest= (args,env) | |
else: | |
exec_func=execv | |
argrest= (args,) | |
env=environ | |
ifpath.dirname(file): | |
exec_func(file,*argrest) | |
return | |
saved_exc=None | |
path_list=get_exec_path(env) | |
ifname!='nt': | |
file=fsencode(file) | |
path_list=map(fsencode,path_list) | |
fordirinpath_list: | |
fullname=path.join(dir,file) | |
try: | |
exec_func(fullname,*argrest) | |
except (FileNotFoundError,NotADirectoryError)ase: | |
last_exc=e | |
exceptOSErrorase: | |
last_exc=e | |
ifsaved_excisNone: | |
saved_exc=e | |
ifsaved_excisnotNone: | |
raisesaved_exc | |
raiselast_exc | |
defget_exec_path(env=None): | |
"""Returns the sequence of directories that will be searched for the | |
named executable (similar to a shell) when launching a process. | |
*env* must be an environment variable dict or None. If *env* is None, | |
os.environ will be used. | |
""" | |
# Use a local import instead of a global import to limit the number of | |
# modules loaded at startup: the os module is always loaded at startup by | |
# Python. It may also avoid a bootstrap issue. | |
importwarnings | |
ifenvisNone: | |
env=environ | |
# {b'PATH': ...}.get('PATH') and {'PATH': ...}.get(b'PATH') emit a | |
# BytesWarning when using python -b or python -bb: ignore the warning | |
withwarnings.catch_warnings(): | |
warnings.simplefilter("ignore",BytesWarning) | |
try: | |
path_list=env.get('PATH') | |
exceptTypeError: | |
path_list=None | |
ifsupports_bytes_environ: | |
try: | |
path_listb=env[b'PATH'] | |
except (KeyError,TypeError): | |
pass | |
else: | |
ifpath_listisnotNone: | |
raiseValueError( | |
"env cannot contain 'PATH' and b'PATH' keys") | |
path_list=path_listb | |
ifpath_listisnotNoneandisinstance(path_list,bytes): | |
path_list=fsdecode(path_list) | |
ifpath_listisNone: | |
path_list=defpath | |
returnpath_list.split(pathsep) | |
# Change environ to automatically call putenv() and unsetenv() | |
from_collections_abcimportMutableMapping,Mapping | |
class_Environ(MutableMapping): | |
def__init__(self,data,encodekey,decodekey,encodevalue,decodevalue): | |
self.encodekey=encodekey | |
self.decodekey=decodekey | |
self.encodevalue=encodevalue | |
self.decodevalue=decodevalue | |
self._data=data | |
def__getitem__(self,key): | |
try: | |
value=self._data[self.encodekey(key)] | |
exceptKeyError: | |
# raise KeyError with the original key value | |
raiseKeyError(key)fromNone | |
returnself.decodevalue(value) | |
def__setitem__(self,key,value): | |
key=self.encodekey(key) | |
value=self.encodevalue(value) | |
putenv(key,value) | |
self._data[key]=value | |
def__delitem__(self,key): | |
encodedkey=self.encodekey(key) | |
unsetenv(encodedkey) | |
try: | |
delself._data[encodedkey] | |
exceptKeyError: | |
# raise KeyError with the original key value | |
raiseKeyError(key)fromNone | |
def__iter__(self): | |
# list() from dict object is an atomic operation | |
keys=list(self._data) | |
forkeyinkeys: | |
yieldself.decodekey(key) | |
def__len__(self): | |
returnlen(self._data) | |
def__repr__(self): | |
return'environ({{{}}})'.format(', '.join( | |
('{!r}: {!r}'.format(self.decodekey(key),self.decodevalue(value)) | |
forkey,valueinself._data.items()))) | |
defcopy(self): | |
returndict(self) | |
defsetdefault(self,key,value): | |
ifkeynotinself: | |
self[key]=value | |
returnself[key] | |
def__ior__(self,other): | |
self.update(other) | |
returnself | |
def__or__(self,other): | |
ifnotisinstance(other,Mapping): | |
returnNotImplemented | |
new=dict(self) | |
new.update(other) | |
returnnew | |
def__ror__(self,other): | |
ifnotisinstance(other,Mapping): | |
returnNotImplemented | |
new=dict(other) | |
new.update(self) | |
returnnew | |
def_createenviron(): | |
ifname=='nt': | |
# Where Env Var Names Must Be UPPERCASE | |
defcheck_str(value): | |
ifnotisinstance(value,str): | |
raiseTypeError("str expected, not %s"%type(value).__name__) | |
returnvalue | |
encode=check_str | |
decode=str | |
defencodekey(key): | |
returnencode(key).upper() | |
data= {} | |
forkey,valueinenviron.items(): | |
data[encodekey(key)]=value | |
else: | |
# Where Env Var Names Can Be Mixed Case | |
encoding=sys.getfilesystemencoding() | |
defencode(value): | |
ifnotisinstance(value,str): | |
raiseTypeError("str expected, not %s"%type(value).__name__) | |
returnvalue.encode(encoding,'surrogateescape') | |
defdecode(value): | |
returnvalue.decode(encoding,'surrogateescape') | |
encodekey=encode | |
data=environ | |
return_Environ(data, | |
encodekey,decode, | |
encode,decode) | |
# unicode environ | |
environ=_createenviron() | |
del_createenviron | |
defgetenv(key,default=None): | |
"""Get an environment variable, return None if it doesn't exist. | |
The optional second argument can specify an alternate default. | |
key, default and the result are str.""" | |
returnenviron.get(key,default) | |
supports_bytes_environ= (name!='nt') | |
__all__.extend(("getenv","supports_bytes_environ")) | |
ifsupports_bytes_environ: | |
def_check_bytes(value): | |
ifnotisinstance(value,bytes): | |
raiseTypeError("bytes expected, not %s"%type(value).__name__) | |
returnvalue | |
# bytes environ | |
environb=_Environ(environ._data, | |
_check_bytes,bytes, | |
_check_bytes,bytes) | |
del_check_bytes | |
defgetenvb(key,default=None): | |
"""Get an environment variable, return None if it doesn't exist. | |
The optional second argument can specify an alternate default. | |
key, default and the result are bytes.""" | |
returnenvironb.get(key,default) | |
__all__.extend(("environb","getenvb")) | |
def_fscodec(): | |
encoding=sys.getfilesystemencoding() | |
errors=sys.getfilesystemencodeerrors() | |
deffsencode(filename): | |
"""Encode filename (an os.PathLike, bytes, or str) to the filesystem | |
encoding with 'surrogateescape' error handler, return bytes unchanged. | |
On Windows, use 'strict' error handler if the file system encoding is | |
'mbcs' (which is the default encoding). | |
""" | |
filename=fspath(filename)# Does type-checking of `filename`. | |
ifisinstance(filename,str): | |
returnfilename.encode(encoding,errors) | |
else: | |
returnfilename | |
deffsdecode(filename): | |
"""Decode filename (an os.PathLike, bytes, or str) from the filesystem | |
encoding with 'surrogateescape' error handler, return str unchanged. On | |
Windows, use 'strict' error handler if the file system encoding is | |
'mbcs' (which is the default encoding). | |
""" | |
filename=fspath(filename)# Does type-checking of `filename`. | |
ifisinstance(filename,bytes): | |
returnfilename.decode(encoding,errors) | |
else: | |
returnfilename | |
returnfsencode,fsdecode | |
fsencode,fsdecode=_fscodec() | |
del_fscodec | |
# Supply spawn*() (probably only for Unix) | |
if_exists("fork")andnot_exists("spawnv")and_exists("execv"): | |
P_WAIT=0 | |
P_NOWAIT=P_NOWAITO=1 | |
__all__.extend(["P_WAIT","P_NOWAIT","P_NOWAITO"]) | |
# XXX Should we support P_DETACH? I suppose it could fork()**2 | |
# and close the std I/O streams. Also, P_OVERLAY is the same | |
# as execv*()? | |
def_spawnvef(mode,file,args,env,func): | |
# Internal helper; func is the exec*() function to use | |
ifnotisinstance(args, (tuple,list)): | |
raiseTypeError('argv must be a tuple or a list') | |
ifnotargsornotargs[0]: | |
raiseValueError('argv first element cannot be empty') | |
pid=fork() | |
ifnotpid: | |
# Child | |
try: | |
ifenvisNone: | |
func(file,args) | |
else: | |
func(file,args,env) | |
except: | |
_exit(127) | |
else: | |
# Parent | |
ifmode==P_NOWAIT: | |
returnpid# Caller is responsible for waiting! | |
while1: | |
wpid,sts=waitpid(pid,0) | |
ifWIFSTOPPED(sts): | |
continue | |
returnwaitstatus_to_exitcode(sts) | |
defspawnv(mode,file,args): | |
"""spawnv(mode, file, args) -> integer | |
Execute file with arguments from args in a subprocess. | |
If mode == P_NOWAIT return the pid of the process. | |
If mode == P_WAIT return the process's exit code if it exits normally; | |
otherwise return -SIG, where SIG is the signal that killed it. """ | |
return_spawnvef(mode,file,args,None,execv) | |
defspawnve(mode,file,args,env): | |
"""spawnve(mode, file, args, env) -> integer | |
Execute file with arguments from args in a subprocess with the | |
specified environment. | |
If mode == P_NOWAIT return the pid of the process. | |
If mode == P_WAIT return the process's exit code if it exits normally; | |
otherwise return -SIG, where SIG is the signal that killed it. """ | |
return_spawnvef(mode,file,args,env,execve) | |
# Note: spawnvp[e] isn't currently supported on Windows | |
defspawnvp(mode,file,args): | |
"""spawnvp(mode, file, args) -> integer | |
Execute file (which is looked for along $PATH) with arguments from | |
args in a subprocess. | |
If mode == P_NOWAIT return the pid of the process. | |
If mode == P_WAIT return the process's exit code if it exits normally; | |
otherwise return -SIG, where SIG is the signal that killed it. """ | |
return_spawnvef(mode,file,args,None,execvp) | |
defspawnvpe(mode,file,args,env): | |
"""spawnvpe(mode, file, args, env) -> integer | |
Execute file (which is looked for along $PATH) with arguments from | |
args in a subprocess with the supplied environment. | |
If mode == P_NOWAIT return the pid of the process. | |
If mode == P_WAIT return the process's exit code if it exits normally; | |
otherwise return -SIG, where SIG is the signal that killed it. """ | |
return_spawnvef(mode,file,args,env,execvpe) | |
__all__.extend(["spawnv","spawnve","spawnvp","spawnvpe"]) | |
if_exists("spawnv"): | |
# These aren't supplied by the basic Windows code | |
# but can be easily implemented in Python | |
defspawnl(mode,file,*args): | |
"""spawnl(mode, file, *args) -> integer | |
Execute file with arguments from args in a subprocess. | |
If mode == P_NOWAIT return the pid of the process. | |
If mode == P_WAIT return the process's exit code if it exits normally; | |
otherwise return -SIG, where SIG is the signal that killed it. """ | |
returnspawnv(mode,file,args) | |
defspawnle(mode,file,*args): | |
"""spawnle(mode, file, *args, env) -> integer | |
Execute file with arguments from args in a subprocess with the | |
supplied environment. | |
If mode == P_NOWAIT return the pid of the process. | |
If mode == P_WAIT return the process's exit code if it exits normally; | |
otherwise return -SIG, where SIG is the signal that killed it. """ | |
env=args[-1] | |
returnspawnve(mode,file,args[:-1],env) | |
__all__.extend(["spawnl","spawnle"]) | |
if_exists("spawnvp"): | |
# At the moment, Windows doesn't implement spawnvp[e], | |
# so it won't have spawnlp[e] either. | |
defspawnlp(mode,file,*args): | |
"""spawnlp(mode, file, *args) -> integer | |
Execute file (which is looked for along $PATH) with arguments from | |
args in a subprocess with the supplied environment. | |
If mode == P_NOWAIT return the pid of the process. | |
If mode == P_WAIT return the process's exit code if it exits normally; | |
otherwise return -SIG, where SIG is the signal that killed it. """ | |
returnspawnvp(mode,file,args) | |
defspawnlpe(mode,file,*args): | |
"""spawnlpe(mode, file, *args, env) -> integer | |
Execute file (which is looked for along $PATH) with arguments from | |
args in a subprocess with the supplied environment. | |
If mode == P_NOWAIT return the pid of the process. | |
If mode == P_WAIT return the process's exit code if it exits normally; | |
otherwise return -SIG, where SIG is the signal that killed it. """ | |
env=args[-1] | |
returnspawnvpe(mode,file,args[:-1],env) | |
__all__.extend(["spawnlp","spawnlpe"]) | |
# Supply os.popen() | |
defpopen(cmd,mode="r",buffering=-1): | |
ifnotisinstance(cmd,str): | |
raiseTypeError("invalid cmd type (%s, expected string)"%type(cmd)) | |
ifmodenotin ("r","w"): | |
raiseValueError("invalid mode %r"%mode) | |
ifbuffering==0orbufferingisNone: | |
raiseValueError("popen() does not support unbuffered streams") | |
importsubprocess,io | |
ifmode=="r": | |
proc=subprocess.Popen(cmd, | |
shell=True, | |
stdout=subprocess.PIPE, | |
bufsize=buffering) | |
return_wrap_close(io.TextIOWrapper(proc.stdout),proc) | |
else: | |
proc=subprocess.Popen(cmd, | |
shell=True, | |
stdin=subprocess.PIPE, | |
bufsize=buffering) | |
return_wrap_close(io.TextIOWrapper(proc.stdin),proc) | |
# Helper for popen() -- a proxy for a file whose close waits for the process | |
class_wrap_close: | |
def__init__(self,stream,proc): | |
self._stream=stream | |
self._proc=proc | |
defclose(self): | |
self._stream.close() | |
returncode=self._proc.wait() | |
ifreturncode==0: | |
returnNone | |
ifname=='nt': | |
returnreturncode | |
else: | |
returnreturncode<<8# Shift left to match old behavior | |
def__enter__(self): | |
returnself | |
def__exit__(self,*args): | |
self.close() | |
def__getattr__(self,name): | |
returngetattr(self._stream,name) | |
def__iter__(self): | |
returniter(self._stream) | |
# Supply os.fdopen() | |
deffdopen(fd,*args,**kwargs): | |
ifnotisinstance(fd,int): | |
raiseTypeError("invalid fd type (%s, expected integer)"%type(fd)) | |
importio | |
returnio.open(fd,*args,**kwargs) | |
# For testing purposes, make sure the function is available when the C | |
# implementation exists. | |
def_fspath(path): | |
"""Return the path representation of a path-like object. | |
If str or bytes is passed in, it is returned unchanged. Otherwise the | |
os.PathLike interface is used to get the path representation. If the | |
path representation is not str or bytes, TypeError is raised. If the | |
provided path is not str, bytes, or os.PathLike, TypeError is raised. | |
""" | |
ifisinstance(path, (str,bytes)): | |
returnpath | |
# Work from the object's type to match method resolution of other magic | |
# methods. | |
path_type=type(path) | |
try: | |
path_repr=path_type.__fspath__(path) | |
exceptAttributeError: | |
ifhasattr(path_type,'__fspath__'): | |
raise | |
else: | |
raiseTypeError("expected str, bytes or os.PathLike object, " | |
"not "+path_type.__name__) | |
ifisinstance(path_repr, (str,bytes)): | |
returnpath_repr | |
else: | |
raiseTypeError("expected {}.__fspath__() to return str or bytes, " | |
"not {}".format(path_type.__name__, | |
type(path_repr).__name__)) | |
# If there is no C implementation, make the pure Python version the | |
# implementation as transparently as possible. | |
ifnot_exists('fspath'): | |
fspath=_fspath | |
fspath.__name__="fspath" | |
classPathLike(abc.ABC): | |
"""Abstract base class for implementing the file system path protocol.""" | |
@abc.abstractmethod | |
def__fspath__(self): | |
"""Return the file system path representation of the object.""" | |
raiseNotImplementedError | |
@classmethod | |
def__subclasshook__(cls,subclass): | |
ifclsisPathLike: | |
return_check_methods(subclass,'__fspath__') | |
returnNotImplemented | |
__class_getitem__=classmethod(GenericAlias) | |
ifname=='nt': | |
class_AddedDllDirectory: | |
def__init__(self,path,cookie,remove_dll_directory): | |
self.path=path | |
self._cookie=cookie | |
self._remove_dll_directory=remove_dll_directory | |
defclose(self): | |
self._remove_dll_directory(self._cookie) | |
self.path=None | |
def__enter__(self): | |
returnself | |
def__exit__(self,*args): | |
self.close() | |
def__repr__(self): | |
ifself.path: | |
return"<AddedDllDirectory({!r})>".format(self.path) | |
return"<AddedDllDirectory()>" | |
defadd_dll_directory(path): | |
"""Add a path to the DLL search path. | |
This search path is used when resolving dependencies for imported | |
extension modules (the module itself is resolved through sys.path), | |
and also by ctypes. | |
Remove the directory by calling close() on the returned object or | |
using it in a with statement. | |
""" | |
importnt | |
cookie=nt._add_dll_directory(path) | |
return_AddedDllDirectory( | |
path, | |
cookie, | |
nt._remove_dll_directory | |
) |