diff options
Diffstat (limited to 'parts/django/tests/regressiontests/file_uploads')
6 files changed, 476 insertions, 0 deletions
diff --git a/parts/django/tests/regressiontests/file_uploads/__init__.py b/parts/django/tests/regressiontests/file_uploads/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/parts/django/tests/regressiontests/file_uploads/__init__.py diff --git a/parts/django/tests/regressiontests/file_uploads/models.py b/parts/django/tests/regressiontests/file_uploads/models.py new file mode 100644 index 0000000..9d02050 --- /dev/null +++ b/parts/django/tests/regressiontests/file_uploads/models.py @@ -0,0 +1,10 @@ +import tempfile +import os +from django.db import models +from django.core.files.storage import FileSystemStorage + +temp_storage = FileSystemStorage(tempfile.mkdtemp()) +UPLOAD_TO = os.path.join(temp_storage.location, 'test_upload') + +class FileModel(models.Model): + testfile = models.FileField(storage=temp_storage, upload_to='test_upload') diff --git a/parts/django/tests/regressiontests/file_uploads/tests.py b/parts/django/tests/regressiontests/file_uploads/tests.py new file mode 100644 index 0000000..99d0b6b --- /dev/null +++ b/parts/django/tests/regressiontests/file_uploads/tests.py @@ -0,0 +1,305 @@ +#! -*- coding: utf-8 -*- +import errno +import os +import shutil +import unittest +from StringIO import StringIO + +from django.core.files import temp as tempfile +from django.core.files.uploadedfile import SimpleUploadedFile +from django.test import TestCase, client +from django.utils import simplejson +from django.utils.hashcompat import sha_constructor +from django.http.multipartparser import MultiPartParser + +from models import FileModel, temp_storage, UPLOAD_TO +import uploadhandler + + +UNICODE_FILENAME = u'test-0123456789_中文_Orléans.jpg' + +class FileUploadTests(TestCase): + def test_simple_upload(self): + post_data = { + 'name': 'Ringo', + 'file_field': open(__file__), + } + response = self.client.post('/file_uploads/upload/', post_data) + self.assertEqual(response.status_code, 200) + + def test_large_upload(self): + tdir = tempfile.gettempdir() + + file1 = tempfile.NamedTemporaryFile(suffix=".file1", dir=tdir) + file1.write('a' * (2 ** 21)) + file1.seek(0) + + file2 = tempfile.NamedTemporaryFile(suffix=".file2", dir=tdir) + file2.write('a' * (10 * 2 ** 20)) + file2.seek(0) + + post_data = { + 'name': 'Ringo', + 'file_field1': file1, + 'file_field2': file2, + } + + for key in post_data.keys(): + try: + post_data[key + '_hash'] = sha_constructor(post_data[key].read()).hexdigest() + post_data[key].seek(0) + except AttributeError: + post_data[key + '_hash'] = sha_constructor(post_data[key]).hexdigest() + + response = self.client.post('/file_uploads/verify/', post_data) + + self.assertEqual(response.status_code, 200) + + def test_unicode_file_name(self): + tdir = tempfile.gettempdir() + + # This file contains chinese symbols and an accented char in the name. + file1 = open(os.path.join(tdir, UNICODE_FILENAME.encode('utf-8')), 'w+b') + file1.write('b' * (2 ** 10)) + file1.seek(0) + + post_data = { + 'file_unicode': file1, + } + + response = self.client.post('/file_uploads/unicode_name/', post_data) + + file1.close() + try: + os.unlink(file1.name) + except: + pass + + self.assertEqual(response.status_code, 200) + + def test_dangerous_file_names(self): + """Uploaded file names should be sanitized before ever reaching the view.""" + # This test simulates possible directory traversal attacks by a + # malicious uploader We have to do some monkeybusiness here to construct + # a malicious payload with an invalid file name (containing os.sep or + # os.pardir). This similar to what an attacker would need to do when + # trying such an attack. + scary_file_names = [ + "/tmp/hax0rd.txt", # Absolute path, *nix-style. + "C:\\Windows\\hax0rd.txt", # Absolute path, win-syle. + "C:/Windows/hax0rd.txt", # Absolute path, broken-style. + "\\tmp\\hax0rd.txt", # Absolute path, broken in a different way. + "/tmp\\hax0rd.txt", # Absolute path, broken by mixing. + "subdir/hax0rd.txt", # Descendant path, *nix-style. + "subdir\\hax0rd.txt", # Descendant path, win-style. + "sub/dir\\hax0rd.txt", # Descendant path, mixed. + "../../hax0rd.txt", # Relative path, *nix-style. + "..\\..\\hax0rd.txt", # Relative path, win-style. + "../..\\hax0rd.txt" # Relative path, mixed. + ] + + payload = [] + for i, name in enumerate(scary_file_names): + payload.extend([ + '--' + client.BOUNDARY, + 'Content-Disposition: form-data; name="file%s"; filename="%s"' % (i, name), + 'Content-Type: application/octet-stream', + '', + 'You got pwnd.' + ]) + payload.extend([ + '--' + client.BOUNDARY + '--', + '', + ]) + + payload = "\r\n".join(payload) + r = { + 'CONTENT_LENGTH': len(payload), + 'CONTENT_TYPE': client.MULTIPART_CONTENT, + 'PATH_INFO': "/file_uploads/echo/", + 'REQUEST_METHOD': 'POST', + 'wsgi.input': client.FakePayload(payload), + } + response = self.client.request(**r) + + # The filenames should have been sanitized by the time it got to the view. + recieved = simplejson.loads(response.content) + for i, name in enumerate(scary_file_names): + got = recieved["file%s" % i] + self.assertEqual(got, "hax0rd.txt") + + def test_filename_overflow(self): + """File names over 256 characters (dangerous on some platforms) get fixed up.""" + name = "%s.txt" % ("f"*500) + payload = "\r\n".join([ + '--' + client.BOUNDARY, + 'Content-Disposition: form-data; name="file"; filename="%s"' % name, + 'Content-Type: application/octet-stream', + '', + 'Oops.' + '--' + client.BOUNDARY + '--', + '', + ]) + r = { + 'CONTENT_LENGTH': len(payload), + 'CONTENT_TYPE': client.MULTIPART_CONTENT, + 'PATH_INFO': "/file_uploads/echo/", + 'REQUEST_METHOD': 'POST', + 'wsgi.input': client.FakePayload(payload), + } + got = simplejson.loads(self.client.request(**r).content) + self.assert_(len(got['file']) < 256, "Got a long file name (%s characters)." % len(got['file'])) + + def test_custom_upload_handler(self): + # A small file (under the 5M quota) + smallfile = tempfile.NamedTemporaryFile() + smallfile.write('a' * (2 ** 21)) + smallfile.seek(0) + + # A big file (over the quota) + bigfile = tempfile.NamedTemporaryFile() + bigfile.write('a' * (10 * 2 ** 20)) + bigfile.seek(0) + + # Small file posting should work. + response = self.client.post('/file_uploads/quota/', {'f': smallfile}) + got = simplejson.loads(response.content) + self.assert_('f' in got) + + # Large files don't go through. + response = self.client.post("/file_uploads/quota/", {'f': bigfile}) + got = simplejson.loads(response.content) + self.assert_('f' not in got) + + def test_broken_custom_upload_handler(self): + f = tempfile.NamedTemporaryFile() + f.write('a' * (2 ** 21)) + f.seek(0) + + # AttributeError: You cannot alter upload handlers after the upload has been processed. + self.assertRaises( + AttributeError, + self.client.post, + '/file_uploads/quota/broken/', + {'f': f} + ) + + def test_fileupload_getlist(self): + file1 = tempfile.NamedTemporaryFile() + file1.write('a' * (2 ** 23)) + file1.seek(0) + + file2 = tempfile.NamedTemporaryFile() + file2.write('a' * (2 * 2 ** 18)) + file2.seek(0) + + file2a = tempfile.NamedTemporaryFile() + file2a.write('a' * (5 * 2 ** 20)) + file2a.seek(0) + + response = self.client.post('/file_uploads/getlist_count/', { + 'file1': file1, + 'field1': u'test', + 'field2': u'test3', + 'field3': u'test5', + 'field4': u'test6', + 'field5': u'test7', + 'file2': (file2, file2a) + }) + got = simplejson.loads(response.content) + + self.assertEqual(got.get('file1'), 1) + self.assertEqual(got.get('file2'), 2) + + def test_file_error_blocking(self): + """ + The server should not block when there are upload errors (bug #8622). + This can happen if something -- i.e. an exception handler -- tries to + access POST while handling an error in parsing POST. This shouldn't + cause an infinite loop! + """ + class POSTAccessingHandler(client.ClientHandler): + """A handler that'll access POST during an exception.""" + def handle_uncaught_exception(self, request, resolver, exc_info): + ret = super(POSTAccessingHandler, self).handle_uncaught_exception(request, resolver, exc_info) + p = request.POST + return ret + + post_data = { + 'name': 'Ringo', + 'file_field': open(__file__), + } + # Maybe this is a little more complicated that it needs to be; but if + # the django.test.client.FakePayload.read() implementation changes then + # this test would fail. So we need to know exactly what kind of error + # it raises when there is an attempt to read more than the available bytes: + try: + client.FakePayload('a').read(2) + except Exception, reference_error: + pass + + # install the custom handler that tries to access request.POST + self.client.handler = POSTAccessingHandler() + + try: + response = self.client.post('/file_uploads/upload_errors/', post_data) + except reference_error.__class__, err: + self.failIf( + str(err) == str(reference_error), + "Caught a repeated exception that'll cause an infinite loop in file uploads." + ) + except Exception, err: + # CustomUploadError is the error that should have been raised + self.assertEqual(err.__class__, uploadhandler.CustomUploadError) + +class DirectoryCreationTests(unittest.TestCase): + """ + Tests for error handling during directory creation + via _save_FIELD_file (ticket #6450) + """ + def setUp(self): + self.obj = FileModel() + if not os.path.isdir(temp_storage.location): + os.makedirs(temp_storage.location) + if os.path.isdir(UPLOAD_TO): + os.chmod(UPLOAD_TO, 0700) + shutil.rmtree(UPLOAD_TO) + + def tearDown(self): + os.chmod(temp_storage.location, 0700) + shutil.rmtree(temp_storage.location) + + def test_readonly_root(self): + """Permission errors are not swallowed""" + os.chmod(temp_storage.location, 0500) + try: + self.obj.testfile.save('foo.txt', SimpleUploadedFile('foo.txt', 'x')) + except OSError, err: + self.assertEquals(err.errno, errno.EACCES) + except Exception, err: + self.fail("OSError [Errno %s] not raised." % errno.EACCES) + + def test_not_a_directory(self): + """The correct IOError is raised when the upload directory name exists but isn't a directory""" + # Create a file with the upload directory name + fd = open(UPLOAD_TO, 'w') + fd.close() + try: + self.obj.testfile.save('foo.txt', SimpleUploadedFile('foo.txt', 'x')) + except IOError, err: + # The test needs to be done on a specific string as IOError + # is raised even without the patch (just not early enough) + self.assertEquals(err.args[0], + "%s exists and is not a directory." % UPLOAD_TO) + except: + self.fail("IOError not raised") + +class MultiParserTests(unittest.TestCase): + + def test_empty_upload_handlers(self): + # We're not actually parsing here; just checking if the parser properly + # instantiates with empty upload handlers. + parser = MultiPartParser({ + 'CONTENT_TYPE': 'multipart/form-data; boundary=_foo', + 'CONTENT_LENGTH': '1' + }, StringIO('x'), [], 'utf-8') diff --git a/parts/django/tests/regressiontests/file_uploads/uploadhandler.py b/parts/django/tests/regressiontests/file_uploads/uploadhandler.py new file mode 100644 index 0000000..9f3960a --- /dev/null +++ b/parts/django/tests/regressiontests/file_uploads/uploadhandler.py @@ -0,0 +1,34 @@ +""" +Upload handlers to test the upload API. +""" + +from django.core.files.uploadhandler import FileUploadHandler, StopUpload + +class QuotaUploadHandler(FileUploadHandler): + """ + This test upload handler terminates the connection if more than a quota + (5MB) is uploaded. + """ + + QUOTA = 5 * 2**20 # 5 MB + + def __init__(self, request=None): + super(QuotaUploadHandler, self).__init__(request) + self.total_upload = 0 + + def receive_data_chunk(self, raw_data, start): + self.total_upload += len(raw_data) + if self.total_upload >= self.QUOTA: + raise StopUpload(connection_reset=True) + return raw_data + + def file_complete(self, file_size): + return None + +class CustomUploadError(Exception): + pass + +class ErroringUploadHandler(FileUploadHandler): + """A handler that raises an exception.""" + def receive_data_chunk(self, raw_data, start): + raise CustomUploadError("Oops!") diff --git a/parts/django/tests/regressiontests/file_uploads/urls.py b/parts/django/tests/regressiontests/file_uploads/urls.py new file mode 100644 index 0000000..413080e --- /dev/null +++ b/parts/django/tests/regressiontests/file_uploads/urls.py @@ -0,0 +1,13 @@ +from django.conf.urls.defaults import * +import views + +urlpatterns = patterns('', + (r'^upload/$', views.file_upload_view), + (r'^verify/$', views.file_upload_view_verify), + (r'^unicode_name/$', views.file_upload_unicode_name), + (r'^echo/$', views.file_upload_echo), + (r'^quota/$', views.file_upload_quota), + (r'^quota/broken/$', views.file_upload_quota_broken), + (r'^getlist_count/$', views.file_upload_getlist_count), + (r'^upload_errors/$', views.file_upload_errors), +) diff --git a/parts/django/tests/regressiontests/file_uploads/views.py b/parts/django/tests/regressiontests/file_uploads/views.py new file mode 100644 index 0000000..50bc3f8 --- /dev/null +++ b/parts/django/tests/regressiontests/file_uploads/views.py @@ -0,0 +1,114 @@ +import os +from django.core.files.uploadedfile import UploadedFile +from django.http import HttpResponse, HttpResponseServerError +from django.utils import simplejson +from models import FileModel, UPLOAD_TO +from uploadhandler import QuotaUploadHandler, ErroringUploadHandler +from django.utils.hashcompat import sha_constructor +from tests import UNICODE_FILENAME + +def file_upload_view(request): + """ + Check that a file upload can be updated into the POST dictionary without + going pear-shaped. + """ + form_data = request.POST.copy() + form_data.update(request.FILES) + if isinstance(form_data.get('file_field'), UploadedFile) and isinstance(form_data['name'], unicode): + # If a file is posted, the dummy client should only post the file name, + # not the full path. + if os.path.dirname(form_data['file_field'].name) != '': + return HttpResponseServerError() + return HttpResponse('') + else: + return HttpResponseServerError() + +def file_upload_view_verify(request): + """ + Use the sha digest hash to verify the uploaded contents. + """ + form_data = request.POST.copy() + form_data.update(request.FILES) + + for key, value in form_data.items(): + if key.endswith('_hash'): + continue + if key + '_hash' not in form_data: + continue + submitted_hash = form_data[key + '_hash'] + if isinstance(value, UploadedFile): + new_hash = sha_constructor(value.read()).hexdigest() + else: + new_hash = sha_constructor(value).hexdigest() + if new_hash != submitted_hash: + return HttpResponseServerError() + + # Adding large file to the database should succeed + largefile = request.FILES['file_field2'] + obj = FileModel() + obj.testfile.save(largefile.name, largefile) + + return HttpResponse('') + +def file_upload_unicode_name(request): + + # Check to see if unicode name came through properly. + if not request.FILES['file_unicode'].name.endswith(UNICODE_FILENAME): + return HttpResponseServerError() + + response = None + + # Check to make sure the exotic characters are preserved even + # through file save. + uni_named_file = request.FILES['file_unicode'] + obj = FileModel.objects.create(testfile=uni_named_file) + full_name = u'%s/%s' % (UPLOAD_TO, uni_named_file.name) + if not os.path.exists(full_name): + response = HttpResponseServerError() + + # Cleanup the object with its exotic file name immediately. + # (shutil.rmtree used elsewhere in the tests to clean up the + # upload directory has been seen to choke on unicode + # filenames on Windows.) + obj.delete() + + if response: + return response + else: + return HttpResponse('') + +def file_upload_echo(request): + """ + Simple view to echo back info about uploaded files for tests. + """ + r = dict([(k, f.name) for k, f in request.FILES.items()]) + return HttpResponse(simplejson.dumps(r)) + +def file_upload_quota(request): + """ + Dynamically add in an upload handler. + """ + request.upload_handlers.insert(0, QuotaUploadHandler()) + return file_upload_echo(request) + +def file_upload_quota_broken(request): + """ + You can't change handlers after reading FILES; this view shouldn't work. + """ + response = file_upload_echo(request) + request.upload_handlers.insert(0, QuotaUploadHandler()) + return response + +def file_upload_getlist_count(request): + """ + Check the .getlist() function to ensure we receive the correct number of files. + """ + file_counts = {} + + for key in request.FILES.keys(): + file_counts[key] = len(request.FILES.getlist(key)) + return HttpResponse(simplejson.dumps(file_counts)) + +def file_upload_errors(request): + request.upload_handlers.insert(0, ErroringUploadHandler()) + return file_upload_echo(request) |