PermalinkSwitch branches/tags cpython/Lib/cgi.py/
Go to file
Cannot retrieve contributors at this time
master
Nothing to show
{{ refName }}default
Nothing to show
{{ refName }}defaultcpython/Lib/cgi.py/Jump to
Code definitions No definitions found in this file.
Go to fileJump to
No definitions found in this file.
- Go to file
Copy path Copy permalink
#! /usr/local/bin/python | |
# NOTE: the above "/usr/local/bin/python" is NOT a mistake. It is | |
# intentionally NOT "/usr/bin/env python". On many systems | |
# (e.g. Solaris), /usr/local/bin is not in $PATH as passed to CGI | |
# scripts, and /usr/local/bin is the default directory where Python is | |
# installed, so /usr/bin/env would be unable to find python. Granted, | |
# binary installations by Linux vendors often install Python in | |
# /usr/bin. So let those vendors patch cgi.py to match their choice | |
# of installation. | |
"""Support module for CGI (Common Gateway Interface) scripts. | |
This module defines a number of utilities for use by CGI scripts | |
written in Python. | |
""" | |
# History | |
# ------- | |
# | |
# Michael McLay started this module. Steve Majewski changed the | |
# interface to SvFormContentDict and FormContentDict. The multipart | |
# parsing was inspired by code submitted by Andreas Paepcke. Guido van | |
# Rossum rewrote, reformatted and documented the module and is currently | |
# responsible for its maintenance. | |
# | |
__version__="2.6" | |
# Imports | |
# ======= | |
fromioimportStringIO,BytesIO,TextIOWrapper | |
fromcollections.abcimportMapping | |
importsys | |
importos | |
importurllib.parse | |
fromemail.parserimportFeedParser | |
fromemail.messageimportMessage | |
importhtml | |
importlocale | |
importtempfile | |
__all__= ["MiniFieldStorage","FieldStorage","parse","parse_multipart", | |
"parse_header","test","print_exception","print_environ", | |
"print_form","print_directory","print_arguments", | |
"print_environ_usage"] | |
# Logging support | |
# =============== | |
logfile=""# Filename to log to, if not empty | |
logfp=None# File object to log to, if not None | |
definitlog(*allargs): | |
"""Write a log message, if there is a log file. | |
Even though this function is called initlog(), you should always | |
use log(); log is a variable that is set either to initlog | |
(initially), to dolog (once the log file has been opened), or to | |
nolog (when logging is disabled). | |
The first argument is a format string; the remaining arguments (if | |
any) are arguments to the % operator, so e.g. | |
log("%s: %s", "a", "b") | |
will write "a: b" to the log file, followed by a newline. | |
If the global logfp is not None, it should be a file object to | |
which log data is written. | |
If the global logfp is None, the global logfile may be a string | |
giving a filename to open, in append mode. This file should be | |
world writable!!! If the file can't be opened, logging is | |
silently disabled (since there is no safe place where we could | |
send an error message). | |
""" | |
globallog,logfile,logfp | |
iflogfileandnotlogfp: | |
try: | |
logfp=open(logfile,"a") | |
exceptOSError: | |
pass | |
ifnotlogfp: | |
log=nolog | |
else: | |
log=dolog | |
log(*allargs) | |
defdolog(fmt,*args): | |
"""Write a log message to the log file. See initlog() for docs.""" | |
logfp.write(fmt%args+"\n") | |
defnolog(*allargs): | |
"""Dummy function, assigned to log when logging is disabled.""" | |
pass | |
defcloselog(): | |
"""Close the log file.""" | |
globallog,logfile,logfp | |
logfile='' | |
iflogfp: | |
logfp.close() | |
logfp=None | |
log=initlog | |
log=initlog# The current logging function | |
# Parsing functions | |
# ================= | |
# Maximum input we will accept when REQUEST_METHOD is POST | |
# 0 ==> unlimited input | |
maxlen=0 | |
defparse(fp=None,environ=os.environ,keep_blank_values=0, | |
strict_parsing=0,separator='&'): | |
"""Parse a query in the environment or from a file (default stdin) | |
Arguments, all optional: | |
fp : file pointer; default: sys.stdin.buffer | |
environ : environment dictionary; default: os.environ | |
keep_blank_values: flag indicating whether blank values in | |
percent-encoded forms should be treated as blank strings. | |
A true value indicates that blanks should be retained as | |
blank strings. The default false value indicates that | |
blank values are to be ignored and treated as if they were | |
not included. | |
strict_parsing: flag indicating what to do with parsing errors. | |
If false (the default), errors are silently ignored. | |
If true, errors raise a ValueError exception. | |
separator: str. The symbol to use for separating the query arguments. | |
Defaults to &. | |
""" | |
iffpisNone: | |
fp=sys.stdin | |
# field keys and values (except for files) are returned as strings | |
# an encoding is required to decode the bytes read from self.fp | |
ifhasattr(fp,'encoding'): | |
encoding=fp.encoding | |
else: | |
encoding='latin-1' | |
# fp.read() must return bytes | |
ifisinstance(fp,TextIOWrapper): | |
fp=fp.buffer | |
ifnot'REQUEST_METHOD'inenviron: | |
environ['REQUEST_METHOD']='GET'# For testing stand-alone | |
ifenviron['REQUEST_METHOD']=='POST': | |
ctype,pdict=parse_header(environ['CONTENT_TYPE']) | |
ifctype=='multipart/form-data': | |
returnparse_multipart(fp,pdict,separator=separator) | |
elifctype=='application/x-www-form-urlencoded': | |
clength=int(environ['CONTENT_LENGTH']) | |
ifmaxlenandclength>maxlen: | |
raiseValueError('Maximum content length exceeded') | |
qs=fp.read(clength).decode(encoding) | |
else: | |
qs=''# Unknown content-type | |
if'QUERY_STRING'inenviron: | |
ifqs:qs=qs+'&' | |
qs=qs+environ['QUERY_STRING'] | |
elifsys.argv[1:]: | |
ifqs:qs=qs+'&' | |
qs=qs+sys.argv[1] | |
environ['QUERY_STRING']=qs# XXX Shouldn't, really | |
elif'QUERY_STRING'inenviron: | |
qs=environ['QUERY_STRING'] | |
else: | |
ifsys.argv[1:]: | |
qs=sys.argv[1] | |
else: | |
qs="" | |
environ['QUERY_STRING']=qs# XXX Shouldn't, really | |
returnurllib.parse.parse_qs(qs,keep_blank_values,strict_parsing, | |
encoding=encoding,separator=separator) | |
defparse_multipart(fp,pdict,encoding="utf-8",errors="replace",separator='&'): | |
"""Parse multipart input. | |
Arguments: | |
fp : input file | |
pdict: dictionary containing other parameters of content-type header | |
encoding, errors: request encoding and error handler, passed to | |
FieldStorage | |
Returns a dictionary just like parse_qs(): keys are the field names, each | |
value is a list of values for that field. For non-file fields, the value | |
is a list of strings. | |
""" | |
# RFC 2046, Section 5.1 : The "multipart" boundary delimiters are always | |
# represented as 7bit US-ASCII. | |
boundary=pdict['boundary'].decode('ascii') | |
ctype="multipart/form-data; boundary={}".format(boundary) | |
headers=Message() | |
headers.set_type(ctype) | |
try: | |
headers['Content-Length']=pdict['CONTENT-LENGTH'] | |
exceptKeyError: | |
pass | |
fs=FieldStorage(fp,headers=headers,encoding=encoding,errors=errors, | |
environ={'REQUEST_METHOD':'POST'},separator=separator) | |
return {k:fs.getlist(k)forkinfs} | |
def_parseparam(s): | |
whiles[:1]==';': | |
s=s[1:] | |
end=s.find(';') | |
whileend>0and (s.count('"',0,end)-s.count('\\"',0,end))%2: | |
end=s.find(';',end+1) | |
ifend<0: | |
end=len(s) | |
f=s[:end] | |
yieldf.strip() | |
s=s[end:] | |
defparse_header(line): | |
"""Parse a Content-type like header. | |
Return the main content-type and a dictionary of options. | |
""" | |
parts=_parseparam(';'+line) | |
key=parts.__next__() | |
pdict= {} | |
forpinparts: | |
i=p.find('=') | |
ifi>=0: | |
name=p[:i].strip().lower() | |
value=p[i+1:].strip() | |
iflen(value)>=2andvalue[0]==value[-1]=='"': | |
value=value[1:-1] | |
value=value.replace('\\\\','\\').replace('\\"','"') | |
pdict[name]=value | |
returnkey,pdict | |
# Classes for field storage | |
# ========================= | |
classMiniFieldStorage: | |
"""Like FieldStorage, for use when no file uploads are possible.""" | |
# Dummy attributes | |
filename=None | |
list=None | |
type=None | |
file=None | |
type_options= {} | |
disposition=None | |
disposition_options= {} | |
headers= {} | |
def__init__(self,name,value): | |
"""Constructor from field name and value.""" | |
self.name=name | |
self.value=value | |
# self.file = StringIO(value) | |
def__repr__(self): | |
"""Return printable representation.""" | |
return"MiniFieldStorage(%r, %r)"% (self.name,self.value) | |
classFieldStorage: | |
"""Store a sequence of fields, reading multipart/form-data. | |
This class provides naming, typing, files stored on disk, and | |
more. At the top level, it is accessible like a dictionary, whose | |
keys are the field names. (Note: None can occur as a field name.) | |
The items are either a Python list (if there's multiple values) or | |
another FieldStorage or MiniFieldStorage object. If it's a single | |
object, it has the following attributes: | |
name: the field name, if specified; otherwise None | |
filename: the filename, if specified; otherwise None; this is the | |
client side filename, *not* the file name on which it is | |
stored (that's a temporary file you don't deal with) | |
value: the value as a *string*; for file uploads, this | |
transparently reads the file every time you request the value | |
and returns *bytes* | |
file: the file(-like) object from which you can read the data *as | |
bytes* ; None if the data is stored a simple string | |
type: the content-type, or None if not specified | |
type_options: dictionary of options specified on the content-type | |
line | |
disposition: content-disposition, or None if not specified | |
disposition_options: dictionary of corresponding options | |
headers: a dictionary(-like) object (sometimes email.message.Message or a | |
subclass thereof) containing *all* headers | |
The class is subclassable, mostly for the purpose of overriding | |
the make_file() method, which is called internally to come up with | |
a file open for reading and writing. This makes it possible to | |
override the default choice of storing all files in a temporary | |
directory and unlinking them as soon as they have been opened. | |
""" | |
def__init__(self,fp=None,headers=None,outerboundary=b'', | |
environ=os.environ,keep_blank_values=0,strict_parsing=0, | |
limit=None,encoding='utf-8',errors='replace', | |
max_num_fields=None,separator='&'): | |
"""Constructor. Read multipart/* until last part. | |
Arguments, all optional: | |
fp : file pointer; default: sys.stdin.buffer | |
(not used when the request method is GET) | |
Can be : | |
1. a TextIOWrapper object | |
2. an object whose read() and readline() methods return bytes | |
headers : header dictionary-like object; default: | |
taken from environ as per CGI spec | |
outerboundary : terminating multipart boundary | |
(for internal use only) | |
environ : environment dictionary; default: os.environ | |
keep_blank_values: flag indicating whether blank values in | |
percent-encoded forms should be treated as blank strings. | |
A true value indicates that blanks should be retained as | |
blank strings. The default false value indicates that | |
blank values are to be ignored and treated as if they were | |
not included. | |
strict_parsing: flag indicating what to do with parsing errors. | |
If false (the default), errors are silently ignored. | |
If true, errors raise a ValueError exception. | |
limit : used internally to read parts of multipart/form-data forms, | |
to exit from the reading loop when reached. It is the difference | |
between the form content-length and the number of bytes already | |
read | |
encoding, errors : the encoding and error handler used to decode the | |
binary stream to strings. Must be the same as the charset defined | |
for the page sending the form (content-type : meta http-equiv or | |
header) | |
max_num_fields: int. If set, then __init__ throws a ValueError | |
if there are more than n fields read by parse_qsl(). | |
""" | |
method='GET' | |
self.keep_blank_values=keep_blank_values | |
self.strict_parsing=strict_parsing | |
self.max_num_fields=max_num_fields | |
self.separator=separator | |
if'REQUEST_METHOD'inenviron: | |
method=environ['REQUEST_METHOD'].upper() | |
self.qs_on_post=None | |
ifmethod=='GET'ormethod=='HEAD': | |
if'QUERY_STRING'inenviron: | |
qs=environ['QUERY_STRING'] | |
elifsys.argv[1:]: | |
qs=sys.argv[1] | |
else: | |
qs="" | |
qs=qs.encode(locale.getpreferredencoding(),'surrogateescape') | |
fp=BytesIO(qs) | |
ifheadersisNone: | |
headers= {'content-type': | |
"application/x-www-form-urlencoded"} | |
ifheadersisNone: | |
headers= {} | |
ifmethod=='POST': | |
# Set default content-type for POST to what's traditional | |
headers['content-type']="application/x-www-form-urlencoded" | |
if'CONTENT_TYPE'inenviron: | |
headers['content-type']=environ['CONTENT_TYPE'] | |
if'QUERY_STRING'inenviron: | |
self.qs_on_post=environ['QUERY_STRING'] | |
if'CONTENT_LENGTH'inenviron: | |
headers['content-length']=environ['CONTENT_LENGTH'] | |
else: | |
ifnot (isinstance(headers, (Mapping,Message))): | |
raiseTypeError("headers must be mapping or an instance of " | |
"email.message.Message") | |
self.headers=headers | |
iffpisNone: | |
self.fp=sys.stdin.buffer | |
# self.fp.read() must return bytes | |
elifisinstance(fp,TextIOWrapper): | |
self.fp=fp.buffer | |
else: | |
ifnot (hasattr(fp,'read')andhasattr(fp,'readline')): | |
raiseTypeError("fp must be file pointer") | |
self.fp=fp | |
self.encoding=encoding | |
self.errors=errors | |
ifnotisinstance(outerboundary,bytes): | |
raiseTypeError('outerboundary must be bytes, not %s' | |
%type(outerboundary).__name__) | |
self.outerboundary=outerboundary | |
self.bytes_read=0 | |
self.limit=limit | |
# Process content-disposition header | |
cdisp,pdict="", {} | |
if'content-disposition'inself.headers: | |
cdisp,pdict=parse_header(self.headers['content-disposition']) | |
self.disposition=cdisp | |
self.disposition_options=pdict | |
self.name=None | |
if'name'inpdict: | |
self.name=pdict['name'] | |
self.filename=None | |
if'filename'inpdict: | |
self.filename=pdict['filename'] | |
self._binary_file=self.filenameisnotNone | |
# Process content-type header | |
# | |
# Honor any existing content-type header. But if there is no | |
# content-type header, use some sensible defaults. Assume | |
# outerboundary is "" at the outer level, but something non-false | |
# inside a multi-part. The default for an inner part is text/plain, | |
# but for an outer part it should be urlencoded. This should catch | |
# bogus clients which erroneously forget to include a content-type | |
# header. | |
# | |
# See below for what we do if there does exist a content-type header, | |
# but it happens to be something we don't understand. | |
if'content-type'inself.headers: | |
ctype,pdict=parse_header(self.headers['content-type']) | |
elifself.outerboundaryormethod!='POST': | |
ctype,pdict="text/plain", {} | |
else: | |
ctype,pdict='application/x-www-form-urlencoded', {} | |
self.type=ctype | |
self.type_options=pdict | |
if'boundary'inpdict: | |
self.innerboundary=pdict['boundary'].encode(self.encoding, | |
self.errors) | |
else: | |
self.innerboundary=b"" | |
clen=-1 | |
if'content-length'inself.headers: | |
try: | |
clen=int(self.headers['content-length']) | |
exceptValueError: | |
pass | |
ifmaxlenandclen>maxlen: | |
raiseValueError('Maximum content length exceeded') | |
self.length=clen | |
ifself.limitisNoneandclen>=0: | |
self.limit=clen | |
self.list=self.file=None | |
self.done=0 | |
ifctype=='application/x-www-form-urlencoded': | |
self.read_urlencoded() | |
elifctype[:10]=='multipart/': | |
self.read_multi(environ,keep_blank_values,strict_parsing) | |
else: | |
self.read_single() | |
def__del__(self): | |
try: | |
self.file.close() | |
exceptAttributeError: | |
pass | |
def__enter__(self): | |
returnself | |
def__exit__(self,*args): | |
self.file.close() | |
def__repr__(self): | |
"""Return a printable representation.""" | |
return"FieldStorage(%r, %r, %r)"% ( | |
self.name,self.filename,self.value) | |
def__iter__(self): | |
returniter(self.keys()) | |
def__getattr__(self,name): | |
ifname!='value': | |
raiseAttributeError(name) | |
ifself.file: | |
self.file.seek(0) | |
value=self.file.read() | |
self.file.seek(0) | |
elifself.listisnotNone: | |
value=self.list | |
else: | |
value=None | |
returnvalue | |
def__getitem__(self,key): | |
"""Dictionary style indexing.""" | |
ifself.listisNone: | |
raiseTypeError("not indexable") | |
found= [] | |
foriteminself.list: | |
ifitem.name==key:found.append(item) | |
ifnotfound: | |
raiseKeyError(key) | |
iflen(found)==1: | |
returnfound[0] | |
else: | |
returnfound | |
defgetvalue(self,key,default=None): | |
"""Dictionary style get() method, including 'value' lookup.""" | |
ifkeyinself: | |
value=self[key] | |
ifisinstance(value,list): | |
return [x.valueforxinvalue] | |
else: | |
returnvalue.value | |
else: | |
returndefault | |
defgetfirst(self,key,default=None): | |
""" Return the first value received.""" | |
ifkeyinself: | |
value=self[key] | |
ifisinstance(value,list): | |
returnvalue[0].value | |
else: | |
returnvalue.value | |
else: | |
returndefault | |
defgetlist(self,key): | |
""" Return list of received values.""" | |
ifkeyinself: | |
value=self[key] | |
ifisinstance(value,list): | |
return [x.valueforxinvalue] | |
else: | |
return [value.value] | |
else: | |
return [] | |
defkeys(self): | |
"""Dictionary style keys() method.""" | |
ifself.listisNone: | |
raiseTypeError("not indexable") | |
returnlist(set(item.nameforiteminself.list)) | |
def__contains__(self,key): | |
"""Dictionary style __contains__ method.""" | |
ifself.listisNone: | |
raiseTypeError("not indexable") | |
returnany(item.name==keyforiteminself.list) | |
def__len__(self): | |
"""Dictionary style len(x) support.""" | |
returnlen(self.keys()) | |
def__bool__(self): | |
ifself.listisNone: | |
raiseTypeError("Cannot be converted to bool.") | |
returnbool(self.list) | |
defread_urlencoded(self): | |
"""Internal: read data in query string format.""" | |
qs=self.fp.read(self.length) | |
ifnotisinstance(qs,bytes): | |
raiseValueError("%s should return bytes, got %s" \ | |
% (self.fp,type(qs).__name__)) | |
qs=qs.decode(self.encoding,self.errors) | |
ifself.qs_on_post: | |
qs+='&'+self.qs_on_post | |
query=urllib.parse.parse_qsl( | |
qs,self.keep_blank_values,self.strict_parsing, | |
encoding=self.encoding,errors=self.errors, | |
max_num_fields=self.max_num_fields,separator=self.separator) | |
self.list= [MiniFieldStorage(key,value)forkey,valueinquery] | |
self.skip_lines() | |
FieldStorageClass=None | |
defread_multi(self,environ,keep_blank_values,strict_parsing): | |
"""Internal: read a part that is itself multipart.""" | |
ib=self.innerboundary | |
ifnotvalid_boundary(ib): | |
raiseValueError('Invalid boundary in multipart form: %r'% (ib,)) | |
self.list= [] | |
ifself.qs_on_post: | |
query=urllib.parse.parse_qsl( | |
self.qs_on_post,self.keep_blank_values,self.strict_parsing, | |
encoding=self.encoding,errors=self.errors, | |
max_num_fields=self.max_num_fields,separator=self.separator) | |
self.list.extend(MiniFieldStorage(key,value)forkey,valueinquery) | |
klass=self.FieldStorageClassorself.__class__ | |
first_line=self.fp.readline()# bytes | |
ifnotisinstance(first_line,bytes): | |
raiseValueError("%s should return bytes, got %s" \ | |
% (self.fp,type(first_line).__name__)) | |
self.bytes_read+=len(first_line) | |
# Ensure that we consume the file until we've hit our inner boundary | |
while (first_line.strip()!= (b"--"+self.innerboundary)and | |
first_line): | |
first_line=self.fp.readline() | |
self.bytes_read+=len(first_line) | |
# Propagate max_num_fields into the sub class appropriately | |
max_num_fields=self.max_num_fields | |
ifmax_num_fieldsisnotNone: | |
max_num_fields-=len(self.list) | |
whileTrue: | |
parser=FeedParser() | |
hdr_text=b"" | |
whileTrue: | |
data=self.fp.readline() | |
hdr_text+=data | |
ifnotdata.strip(): | |
break | |
ifnothdr_text: | |
break | |
# parser takes strings, not bytes | |
self.bytes_read+=len(hdr_text) | |
parser.feed(hdr_text.decode(self.encoding,self.errors)) | |
headers=parser.close() | |
# Some clients add Content-Length for part headers, ignore them | |
if'content-length'inheaders: | |
delheaders['content-length'] | |
limit=Noneifself.limitisNone \ | |
elseself.limit-self.bytes_read | |
part=klass(self.fp,headers,ib,environ,keep_blank_values, | |
strict_parsing,limit, | |
self.encoding,self.errors,max_num_fields,self.separator) | |
ifmax_num_fieldsisnotNone: | |
max_num_fields-=1 | |
ifpart.list: | |
max_num_fields-=len(part.list) | |
ifmax_num_fields<0: | |
raiseValueError('Max number of fields exceeded') | |
self.bytes_read+=part.bytes_read | |
self.list.append(part) | |
ifpart.doneorself.bytes_read>=self.length>0: | |
break | |
self.skip_lines() | |
defread_single(self): | |
"""Internal: read an atomic part.""" | |
ifself.length>=0: | |
self.read_binary() | |
self.skip_lines() | |
else: | |
self.read_lines() | |
self.file.seek(0) | |
bufsize=8*1024# I/O buffering size for copy to file | |
defread_binary(self): | |
"""Internal: read binary data.""" | |
self.file=self.make_file() | |
todo=self.length | |
iftodo>=0: | |
whiletodo>0: | |
data=self.fp.read(min(todo,self.bufsize))# bytes | |
ifnotisinstance(data,bytes): | |
raiseValueError("%s should return bytes, got %s" | |
% (self.fp,type(data).__name__)) | |
self.bytes_read+=len(data) | |
ifnotdata: | |
self.done=-1 | |
break | |
self.file.write(data) | |
todo=todo-len(data) | |
defread_lines(self): | |
"""Internal: read lines until EOF or outerboundary.""" | |
ifself._binary_file: | |
self.file=self.__file=BytesIO()# store data as bytes for files | |
else: | |
self.file=self.__file=StringIO()# as strings for other fields | |
ifself.outerboundary: | |
self.read_lines_to_outerboundary() | |
else: | |
self.read_lines_to_eof() | |
def__write(self,line): | |
"""line is always bytes, not string""" | |
ifself.__fileisnotNone: | |
ifself.__file.tell()+len(line)>1000: | |
self.file=self.make_file() | |
data=self.__file.getvalue() | |
self.file.write(data) | |
self.__file=None | |
ifself._binary_file: | |
# keep bytes | |
self.file.write(line) | |
else: | |
# decode to string | |
self.file.write(line.decode(self.encoding,self.errors)) | |
defread_lines_to_eof(self): | |
"""Internal: read lines until EOF.""" | |
while1: | |
line=self.fp.readline(1<<16)# bytes | |
self.bytes_read+=len(line) | |
ifnotline: | |
self.done=-1 | |
break | |
self.__write(line) | |
defread_lines_to_outerboundary(self): | |
"""Internal: read lines until outerboundary. | |
Data is read as bytes: boundaries and line ends must be converted | |
to bytes for comparisons. | |
""" | |
next_boundary=b"--"+self.outerboundary | |
last_boundary=next_boundary+b"--" | |
delim=b"" | |
last_line_lfend=True | |
_read=0 | |
while1: | |
ifself.limitisnotNoneand0<=self.limit<=_read: | |
break | |
line=self.fp.readline(1<<16)# bytes | |
self.bytes_read+=len(line) | |
_read+=len(line) | |
ifnotline: | |
self.done=-1 | |
break | |
ifdelim==b"\r": | |
line=delim+line | |
delim=b"" | |
ifline.startswith(b"--")andlast_line_lfend: | |
strippedline=line.rstrip() | |
ifstrippedline==next_boundary: | |
break | |
ifstrippedline==last_boundary: | |
self.done=1 | |
break | |
odelim=delim | |
ifline.endswith(b"\r\n"): | |
delim=b"\r\n" | |
line=line[:-2] | |
last_line_lfend=True | |
elifline.endswith(b"\n"): | |
delim=b"\n" | |
line=line[:-1] | |
last_line_lfend=True | |
elifline.endswith(b"\r"): | |
# We may interrupt \r\n sequences if they span the 2**16 | |
# byte boundary | |
delim=b"\r" | |
line=line[:-1] | |
last_line_lfend=False | |
else: | |
delim=b"" | |
last_line_lfend=False | |
self.__write(odelim+line) | |
defskip_lines(self): | |
"""Internal: skip lines until outer boundary if defined.""" | |
ifnotself.outerboundaryorself.done: | |
return | |
next_boundary=b"--"+self.outerboundary | |
last_boundary=next_boundary+b"--" | |
last_line_lfend=True | |
whileTrue: | |
line=self.fp.readline(1<<16) | |
self.bytes_read+=len(line) | |
ifnotline: | |
self.done=-1 | |
break | |
ifline.endswith(b"--")andlast_line_lfend: | |
strippedline=line.strip() | |
ifstrippedline==next_boundary: | |
break | |
ifstrippedline==last_boundary: | |
self.done=1 | |
break | |
last_line_lfend=line.endswith(b'\n') | |
defmake_file(self): | |
"""Overridable: return a readable & writable file. | |
The file will be used as follows: | |
- data is written to it | |
- seek(0) | |
- data is read from it | |
The file is opened in binary mode for files, in text mode | |
for other fields | |
This version opens a temporary file for reading and writing, | |
and immediately deletes (unlinks) it. The trick (on Unix!) is | |
that the file can still be used, but it can't be opened by | |
another process, and it will automatically be deleted when it | |
is closed or when the current process terminates. | |
If you want a more permanent file, you derive a class which | |
overrides this method. If you want a visible temporary file | |
that is nevertheless automatically deleted when the script | |
terminates, try defining a __del__ method in a derived class | |
which unlinks the temporary files you have created. | |
""" | |
ifself._binary_file: | |
returntempfile.TemporaryFile("wb+") | |
else: | |
returntempfile.TemporaryFile("w+", | |
encoding=self.encoding,newline='\n') | |
# Test/debug code | |
# =============== | |
deftest(environ=os.environ): | |
"""Robust test CGI script, usable as main program. | |
Write minimal HTTP headers and dump all information provided to | |
the script in HTML form. | |
""" | |
print("Content-type: text/html") | |
print() | |
sys.stderr=sys.stdout | |
try: | |
form=FieldStorage()# Replace with other classes to test those | |
print_directory() | |
print_arguments() | |
print_form(form) | |
print_environ(environ) | |
print_environ_usage() | |
deff(): | |
exec("testing print_exception() -- <I>italics?</I>") | |
defg(f=f): | |
f() | |
print("<H3>What follows is a test, not an actual exception:</H3>") | |
g() | |
except: | |
print_exception() | |
print("<H1>Second try with a small maxlen...</H1>") | |
globalmaxlen | |
maxlen=50 | |
try: | |
form=FieldStorage()# Replace with other classes to test those | |
print_directory() | |
print_arguments() | |
print_form(form) | |
print_environ(environ) | |
except: | |
print_exception() | |
defprint_exception(type=None,value=None,tb=None,limit=None): | |
iftypeisNone: | |
type,value,tb=sys.exc_info() | |
importtraceback | |
print() | |
print("<H3>Traceback (most recent call last):</H3>") | |
list=traceback.format_tb(tb,limit)+ \ | |
traceback.format_exception_only(type,value) | |
print("<PRE>%s<B>%s</B></PRE>"% ( | |
html.escape("".join(list[:-1])), | |
html.escape(list[-1]), | |
)) | |
deltb | |
defprint_environ(environ=os.environ): | |
"""Dump the shell environment as HTML.""" | |
keys=sorted(environ.keys()) | |
print() | |
print("<H3>Shell Environment:</H3>") | |
print("<DL>") | |
forkeyinkeys: | |
print("<DT>",html.escape(key),"<DD>",html.escape(environ[key])) | |
print("</DL>") | |
print() | |
defprint_form(form): | |
"""Dump the contents of a form as HTML.""" | |
keys=sorted(form.keys()) | |
print() | |
print("<H3>Form Contents:</H3>") | |
ifnotkeys: | |
print("<P>No form fields.") | |
print("<DL>") | |
forkeyinkeys: | |
print("<DT>"+html.escape(key)+":",end=' ') | |
value=form[key] | |
print("<i>"+html.escape(repr(type(value)))+"</i>") | |
print("<DD>"+html.escape(repr(value))) | |
print("</DL>") | |
print() | |
defprint_directory(): | |
"""Dump the current directory as HTML.""" | |
print() | |
print("<H3>Current Working Directory:</H3>") | |
try: | |
pwd=os.getcwd() | |
exceptOSErrorasmsg: | |
print("OSError:",html.escape(str(msg))) | |
else: | |
print(html.escape(pwd)) | |
print() | |
defprint_arguments(): | |
print() | |
print("<H3>Command Line Arguments:</H3>") | |
print() | |
print(sys.argv) | |
print() | |
defprint_environ_usage(): | |
"""Dump a list of environment variables used by CGI as HTML.""" | |
print(""" | |
<H3>These environment variables could have been set:</H3> | |
<UL> | |
<LI>AUTH_TYPE | |
<LI>CONTENT_LENGTH | |
<LI>CONTENT_TYPE | |
<LI>DATE_GMT | |
<LI>DATE_LOCAL | |
<LI>DOCUMENT_NAME | |
<LI>DOCUMENT_ROOT | |
<LI>DOCUMENT_URI | |
<LI>GATEWAY_INTERFACE | |
<LI>LAST_MODIFIED | |
<LI>PATH | |
<LI>PATH_INFO | |
<LI>PATH_TRANSLATED | |
<LI>QUERY_STRING | |
<LI>REMOTE_ADDR | |
<LI>REMOTE_HOST | |
<LI>REMOTE_IDENT | |
<LI>REMOTE_USER | |
<LI>REQUEST_METHOD | |
<LI>SCRIPT_NAME | |
<LI>SERVER_NAME | |
<LI>SERVER_PORT | |
<LI>SERVER_PROTOCOL | |
<LI>SERVER_ROOT | |
<LI>SERVER_SOFTWARE | |
</UL> | |
In addition, HTTP headers sent by the server may be passed in the | |
environment as well. Here are some common variable names: | |
<UL> | |
<LI>HTTP_ACCEPT | |
<LI>HTTP_CONNECTION | |
<LI>HTTP_HOST | |
<LI>HTTP_PRAGMA | |
<LI>HTTP_REFERER | |
<LI>HTTP_USER_AGENT | |
</UL> | |
""") | |
# Utilities | |
# ========= | |
defvalid_boundary(s): | |
importre | |
ifisinstance(s,bytes): | |
_vb_pattern=b"^[ -~]{0,200}[!-~]$" | |
else: | |
_vb_pattern="^[ -~]{0,200}[!-~]$" | |
returnre.match(_vb_pattern,s) | |
# Invoke mainline | |
# =============== | |
# Call test() when this file is run as a script (not imported as a module) | |
if__name__=='__main__': | |
test() |