Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
273 changes: 140 additions & 133 deletions test/test_util_archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,8 @@

import hashlib
import io
import os
import shutil
import stat
import tarfile
import tempfile
import unittest

import pytest

Expand All @@ -34,141 +30,152 @@ def file_hash(path):
return h.hexdigest()


class TestArchive(unittest.TestCase):
def _create_files(self, root):
@pytest.fixture
def create_files(tmp_path):
def inner():
files = {}
for i in range(10):
p = os.path.join(root, f"file{i:02d}")
with open(p, "wb") as fh:
fh.write(b"file%02d" % i)
p = tmp_path / f"file{i:02d}"
p.write_bytes(b"file%02d" % i)
# Need to set permissions or umask may influence testing.
os.chmod(p, MODE_STANDARD)
files[f"file{i:02d}"] = p
p.chmod(MODE_STANDARD)
files[f"file{i:02d}"] = str(p)

for i in range(10):
files[f"file{i + 10:02d}"] = io.BytesIO(b"file%02d" % (i + 10))

return files

def _verify_basic_tarfile(self, tf):
self.assertEqual(len(tf.getmembers()), 20)

names = [f"file{i:02d}" for i in range(20)]
self.assertEqual(tf.getnames(), names)

for ti in tf.getmembers():
self.assertEqual(ti.uid, 0)
self.assertEqual(ti.gid, 0)
self.assertEqual(ti.uname, "")
self.assertEqual(ti.gname, "")
self.assertEqual(ti.mode, MODE_STANDARD)
self.assertEqual(ti.mtime, DEFAULT_MTIME)

@pytest.mark.xfail(
reason="ValueError is not thrown despite being provided directory."
)
def test_dirs_refused(self):
d = tempfile.mkdtemp()
try:
tp = os.path.join(d, "test.tar")
with open(tp, "wb") as fh:
with self.assertRaisesRegex(ValueError, "not a regular"):
create_tar_from_files(fh, {"test": d})
finally:
shutil.rmtree(d)

def test_setuid_setgid_refused(self):
d = tempfile.mkdtemp()
try:
uid = os.path.join(d, "setuid")
gid = os.path.join(d, "setgid")
with open(uid, "a"):
pass
with open(gid, "a"):
pass

os.chmod(uid, MODE_STANDARD | stat.S_ISUID)
os.chmod(gid, MODE_STANDARD | stat.S_ISGID)

tp = os.path.join(d, "test.tar")
with open(tp, "wb") as fh:
with self.assertRaisesRegex(ValueError, "cannot add file with setuid"):
create_tar_from_files(fh, {"test": uid})
with self.assertRaisesRegex(ValueError, "cannot add file with setuid"):
create_tar_from_files(fh, {"test": gid})
finally:
shutil.rmtree(d)

def test_create_tar_basic(self):
d = tempfile.mkdtemp()
try:
files = self._create_files(d)

tp = os.path.join(d, "test.tar")
with open(tp, "wb") as fh:
create_tar_from_files(fh, files)

# Output should be deterministic.
self.assertEqual(file_hash(tp), "01cd314e277f060e98c7de6c8ea57f96b3a2065c")

with tarfile.open(tp, "r") as tf:
self._verify_basic_tarfile(tf)

finally:
shutil.rmtree(d)

@pytest.mark.xfail(reason="hash mismatch")
def test_executable_preserved(self):
d = tempfile.mkdtemp()
try:
p = os.path.join(d, "exec")
with open(p, "wb") as fh:
fh.write("#!/bin/bash\n")
os.chmod(p, MODE_STANDARD | stat.S_IXUSR)

tp = os.path.join(d, "test.tar")
with open(tp, "wb") as fh:
create_tar_from_files(fh, {"exec": p})

self.assertEqual(file_hash(tp), "357e1b81c0b6cfdfa5d2d118d420025c3c76ee93")

with tarfile.open(tp, "r") as tf:
m = tf.getmember("exec")
self.assertEqual(m.mode, MODE_STANDARD | stat.S_IXUSR)

finally:
shutil.rmtree(d)

def test_create_tar_gz_basic(self):
d = tempfile.mkdtemp()
try:
files = self._create_files(d)

gp = os.path.join(d, "test.tar.gz")
with open(gp, "wb") as fh:
create_tar_gz_from_files(fh, files)

self.assertEqual(file_hash(gp), "7c4da5adc5088cdf00911d5daf9a67b15de714b7")

with tarfile.open(gp, "r:gz") as tf:
self._verify_basic_tarfile(tf)

finally:
shutil.rmtree(d)

def test_tar_gz_name(self):
d = tempfile.mkdtemp()
try:
files = self._create_files(d)

gp = os.path.join(d, "test.tar.gz")
with open(gp, "wb") as fh:
create_tar_gz_from_files(fh, files, filename="foobar")

self.assertEqual(file_hash(gp), "721e00083c17d16df2edbddf40136298c06d0c49")

with tarfile.open(gp, "r:gz") as tf:
self._verify_basic_tarfile(tf)

finally:
shutil.rmtree(d)
return inner


def verify_basic_tarfile(tf):
assert len(tf.getmembers()) == 20

names = [f"file{i:02d}" for i in range(20)]
assert tf.getnames() == names

for ti in tf.getmembers():
assert ti.uid == 0
assert ti.gid == 0
assert ti.uname == ""
assert ti.gname == ""
assert ti.mode == MODE_STANDARD
assert ti.mtime == DEFAULT_MTIME


@pytest.mark.xfail(reason="ValueError is not thrown despite being provided directory.")
def test_dirs_refused(tmp_path):
tp = tmp_path / "test.tar"
with open(tp, "wb") as fh:
with pytest.raises(ValueError, match="not a regular"):
create_tar_from_files(fh, {"test": str(tmp_path)})


def test_setuid_setgid_refused(tmp_path):
uid = tmp_path / "setuid"
uid.touch()
uid.chmod(MODE_STANDARD | stat.S_ISUID)

gid = tmp_path / "setgid"
gid.touch()
gid.chmod(MODE_STANDARD | stat.S_ISGID)

tp = tmp_path / "test.tar"
with open(tp, "wb") as fh:
with pytest.raises(ValueError, match="cannot add file with setuid"):
create_tar_from_files(fh, {"test": str(uid)})
with pytest.raises(ValueError, match="cannot add file with setuid"):
create_tar_from_files(fh, {"test": str(gid)})


def test_create_tar_basic(tmp_path, create_files):
files = create_files()

tp = tmp_path / "test.tar"
with open(tp, "wb") as fh:
create_tar_from_files(fh, files)

# Output should be deterministic.
assert file_hash(tp) == "01cd314e277f060e98c7de6c8ea57f96b3a2065c"

with tarfile.open(tp, "r") as tf:
verify_basic_tarfile(tf)


def test_executable_preserved(tmp_path):
p = tmp_path / "exec"
p.write_bytes(b"#!/bin/bash\n")
p.chmod(MODE_STANDARD | stat.S_IXUSR)

tp = tmp_path / "test.tar"
with open(tp, "wb") as fh:
create_tar_from_files(fh, {"exec": str(p)})

# Test determinism by creating the same file again
tp2 = tmp_path / "test2.tar"
with open(tp2, "wb") as fh:
create_tar_from_files(fh, {"exec": str(p)})

assert file_hash(str(tp)) == file_hash(str(tp2))

# Verify executable permissions are preserved in tar
with tarfile.open(tp, "r") as tf:
m = tf.getmember("exec")
assert m.mode == MODE_STANDARD | stat.S_IXUSR

# Verify file content is correct
extracted_content = tf.extractfile(m).read()
assert extracted_content == b"#!/bin/bash\n"


def test_create_tar_gz_basic(tmp_path, create_files):
gp = tmp_path / "test.tar.gz"
with open(gp, "wb") as fh:
create_tar_gz_from_files(fh, create_files())

# Test determinism by creating the same file again with fresh BytesIO objects
gp2 = tmp_path / "test2.tar.gz"
with open(gp2, "wb") as fh:
create_tar_gz_from_files(fh, create_files())

assert file_hash(str(gp)) == file_hash(str(gp2))

# Create uncompressed version for size comparison
tp = tmp_path / "test.tar"
with open(tp, "wb") as fh:
create_tar_from_files(fh, create_files())
uncompressed_size = tp.stat().st_size
compressed_size = gp.stat().st_size

# Compressed should be smaller than uncompressed
assert compressed_size < uncompressed_size

# Verify the contents are correct
with tarfile.open(gp, "r:gz") as tf:
verify_basic_tarfile(tf)


def test_tar_gz_name(tmp_path, create_files):
gp = tmp_path / "test.tar.gz"
with open(gp, "wb") as fh:
create_tar_gz_from_files(fh, create_files(), filename="foobar")

# Test determinism by creating the same file again with fresh BytesIO objects
gp2 = tmp_path / "test2.tar.gz"
with open(gp2, "wb") as fh:
create_tar_gz_from_files(fh, create_files(), filename="foobar")

assert file_hash(str(gp)) == file_hash(str(gp2))

# Create version without filename for comparison
gp_no_name = tmp_path / "test_no_name.tar.gz"
with open(gp_no_name, "wb") as fh:
create_tar_gz_from_files(fh, create_files())

# Files should be different (different filename in gzip header)
assert file_hash(str(gp)) != file_hash(str(gp_no_name))

# Verify the contents are correct
with tarfile.open(gp, "r:gz") as tf:
verify_basic_tarfile(tf)
Loading