Skip to content
279 changes: 279 additions & 0 deletions tornado/httputil.py
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,285 @@ def parse_multipart_form_data(
arguments.setdefault(name, []).append(value)


_BOUNDARY_REGEX = re.compile(r'boundary="?(?P<boundary>[^"]+)"?')
"""Regex to match the boundary option."""

_1MB = 1048576
"""Number of bytes in 1 Megabyte."""


class AbstractFileDelegate:
def start_file(self, name: str, headers: HTTPHeaders) -> Optional[Awaitable[None]]:
pass

def file_data_received(self, name: str, data: bytes) -> Optional[Awaitable[None]]:
pass

def finish_file(self, name: str) -> Optional[Awaitable[None]]:
pass


class ParserState:

PARSE_BOUNDARY_LINE = 1
"""State that parses the initial boundary."""

PARSE_FILE_HEADERS = 2
"""State that parses the 'headers' for the next file/object."""

PARSE_BODY = 3
"""State that parses some body text."""

PARSING_DONE = 4
"""State that denotes the parser is finished."""


class StreamingMultipartFormDataParser(object):
"""Basic parser that accepts data and parses it into distinct files.

This parser handles 'multipart/form-data' Content-Type uploads, which
permits multiple file uploads in a single request.
"""

@classmethod
def from_content_type_header(
cls, delegate, header
) -> "StreamingMultipartFormDataParser":
if isinstance(header, bytes):
header = header.decode("utf-8")
boundary = None
# Make sure the header is the multipart/form-data.
parts = [part.strip() for part in header.split(";")]
if parts[0].lower() != "multipart/form-data":
raise ValueError("Invalid Content-Type: {}".format(parts[0]))

# Search for 'boundary='
for part in parts:
m = _BOUNDARY_REGEX.match(part)
if m:
boundary = m.group("boundary")
return cls(delegate, boundary)
raise ValueError("Required 'boundary' option not found in header!")

def __init__(
self, delegate: AbstractFileDelegate, boundary: str, max_buffer_size=_1MB
):
"""Create a StreamingMultipartFormDataParser.

This parser (asynchronously) receives data, parses it, and invokes the
given delegate appropriately.
"""
# Be nice and decode the boundary if it is a bytes object.
if isinstance(boundary, bytes):
boundary = boundary.decode("utf-8")
# Store the delegate to write out the data.
self._delegate = delegate
self._boundary = boundary
self._max_buffer_size = max_buffer_size
self._name = None

# Variables to store the current state of the parser.
self._state = ParserState.PARSE_BOUNDARY_LINE
self._buffer = bytearray()

# Variables to hold the boundary matches.
self._boundary_next = "--{}\r\n".format(self._boundary).encode()
self._boundary_end = "--{}--\r\n".format(self._boundary).encode()
self._boundary_base = self._boundary_next[:-2]

# Variables for caching boundary matching.
self._last_idx = 0
self._boundary_idx = 0

@property
def boundary(self) -> str:
"""Return the boundary text that denotes the end of a file."""
return self._boundary

def _change_state(self, state: ParserState, name: Optional[str] = None):
"""Helper to change the state of the parser.

This also clears some variables used in different states.
"""
self._state = state
self._last_idx = 0
self._boundary_idx = 0
self._name = name

async def data_received(self, chunk: bytes) -> None:
# Process the data received, based on the current state.
#
# It is possible for 'chunk' here to be larger than the maximum buffer
# size. Initially, this is okay because we still need to process the
# chunk. However, when the buffer _remains_ this size after going
# through the rest of this call, then the input is bad since each state
# should incrementally consume data from the buffer contain its size.
if len(self._buffer) > self._max_buffer_size:
raise ValueError(
"Buffer is growing larger than: {} bytes!".format(self._buffer)
)
# Ignore incrementing the buffer when in the DONE state altogether.
if self._state != ParserState.PARSING_DONE:
self._buffer.extend(chunk)

# Iterate over and over while there is sufficient data in the buffer.
# Each loop should either consume data, or move to a state where not
# enough data is available, in which case this should exit to await
# more data.
while True:
# PARSE_BODY state --> Expecting to parse the file contents.
if self._state == ParserState.PARSE_BODY:
# Search for the boundary characters.
idx = self._buffer.find(b"-")
if idx < 0:
# No match against any boundary character. Write out the
# whole buffer.
data = self._buffer
self._buffer = bytearray()
fut = self._delegate.file_data_received(self._name, data)
if fut is not None:
await fut

# Return because the whole buffer was written out.
return

# If 'idx > 0', write the data _up to_ this boundary point,
# then proceed in the same manner as 'idx == 0'.
if idx > 0:
# Write out all of the data, _up to_ this boundary point,
# then cycle around to check whether we are at the bounary
# or not. This simplifies the logic for checking against
# the boundary cases.
data = self._buffer[:idx]
self._buffer = self._buffer[idx:]
fut = self._delegate.file_data_received(self._name, data)
if fut is not None:
await fut

# Not enough data (technically) to check against. Wait for
# more data to be certain whether the boundary was parsed.
if len(self._buffer) < len(self._boundary_next):
return

# If the buffer starts with the same contents as
# 'self._boundary_base', switch states and let that state
# handle this case more cleanly.
if self._buffer.startswith(self._boundary_next):
# Mark the current file as finished.
fut = self._delegate.finish_file(self._name)
if fut is not None:
await fut
self._change_state(ParserState.PARSE_BOUNDARY_LINE)
continue

# Check the end boundary as well. The end boundary _might_
# match if the 'self._boundary_base' matches, but the
# 'self._boundary_next' does not. Wait for more data if the
# buffer does not have enough data to be sure.
if len(self._buffer) < len(self._boundary_end):
return

if self._buffer.startswith(self._boundary_end):
fut = self._delegate.finish_file(self._name)
if fut is not None:
await fut
self._change_state(ParserState.PARSE_BOUNDARY_LINE)
continue

# No match so far, so write out the data up to the next
# boundary delimiter.
next_idx = self._buffer.find(b"-", 1)
if next_idx < 0:
data = self._buffer
self._buffer = bytearray()
else:
data = self._buffer[:next_idx]
self._buffer = self._buffer[next_idx:]
fut = self._delegate.file_data_received(self._name, data)
if fut is not None:
await fut

# Continue and run the check after this update.
continue

# PARSE_BOUNDARY_LINE state --> Expecting to parse either:
# - self._boundary_next (for the next file)
# - self._boundary_end (for the end of the request)
if self._state == ParserState.PARSE_BOUNDARY_LINE:
# Parse the first boundary chunk.
if len(self._buffer) < len(self._boundary_next):
# Not enough data, so exit.
return
# This implies we are parsing another file, so transition to
# the 'PARSE_HEADER' state. Also, continue to run through the
# loop again with the new state.
if self._buffer.startswith(self._boundary_next):
self._buffer = self._buffer[len(self._boundary_next) :]
self._change_state(ParserState.PARSE_FILE_HEADERS)
continue
# Check against 'self._boundary_end' as well. There is a slim
# chance that we are at the self._boundary_end case, but still
# do not have enough data, so handle that here.
if len(self._buffer) < len(self._boundary_end):
# Hope we get more data to confirm the boundary end case.
return
elif self._buffer.startswith(self._boundary_end):
# Done parsing. We should probably sanity-check that all
# data was consumed.
self._buffer = self._buffer[len(self._boundary_end) :]
self._change_state(ParserState.PARSING_DONE)
continue
else:
gen_log.warning("Invalid boundary parsed!")

# PARSE_HEADERS state --> Expecting to parse headers with CRLF.
if self._state == ParserState.PARSE_FILE_HEADERS:
idx = self._buffer.find(b"\r\n\r\n", self._last_idx)
# Implies no match. Update the next index to search to be:
# max(0, len(buffer) - 3)
# as an optimization to speed up future comparisons. This
# should work; if there is no match, then the buffer could
# (in the worst case) have '\r\n\r', but not the final '\n'
# so we might need to rescan the previous 3 characters, but
# not 4. (Cap at 0 in case the buffer is too small for some
# reason.)
#
# In any case, there is not enough data, so just exit.
if idx < 0:
self._last_idx = max(0, len(self._buffer) - 3)
return
# Otherwise, we have a match. Parse this into a dictionary of
# headers and pass the result to create a new file.
data = self._buffer[: idx + 4].decode("utf-8")
self._buffer = self._buffer[idx + 4 :]
headers = HTTPHeaders.parse(data)
_, plist = _parse_header(headers.get("Content-Disposition", ""))
name = plist.get("name")

# Call the delegate with the new file.
fut = self._delegate.start_file(name, headers=headers)
if fut is not None:
await fut

# Update the buffer and the state.
self._change_state(ParserState.PARSE_BODY, name=name)
continue

# PARSE_DONE state --> Expect no more data, but break the loop.
if self._state == ParserState.PARSING_DONE:
if len(self._buffer) > 0:
# WARNING: Data is left in the buffer when we should be
# finished...
gen_log.warning(
"Finished with non-empty buffer (%s bytes remaining).",
len(self._buffer),
)
self._buffer.clear()

# Even if there is data remaining, we should exit the loop.
return


def format_timestamp(
ts: Union[int, float, tuple, time.struct_time, datetime.datetime]
) -> str:
Expand Down
Loading