Browse Source

refactor tests and add start to span/subscribe tests

pull/1/head
Brett Langdon 5 years ago
parent
commit
6b76790ce2
Signed by: brettlangdon GPG Key ID: A70042D88B95AA2B
8 changed files with 300 additions and 41 deletions
  1. +4
    -0
      sysaudit/__init__.py
  2. +0
    -0
      tests/__init__.py
  3. +45
    -0
      tests/base.py
  4. +150
    -0
      tests/span-tests.py
  5. +45
    -0
      tests/subscribe-tests.py
  6. +5
    -41
      tests/test_audit.py
  7. +36
    -0
      tests/test_spans.py
  8. +15
    -0
      tests/test_subscribe.py

+ 4
- 0
sysaudit/__init__.py View File

@ -102,6 +102,10 @@ class Span:
return self
def end(self, data=None):
if not self.started:
raise RuntimeError(
"Attempting to end span {} before it was started".format(self)
)
if not self.ended:
self.message("end", data)
self.ended = True


+ 0
- 0
tests/__init__.py View File


+ 45
- 0
tests/base.py View File

@ -0,0 +1,45 @@
import subprocess
import sys
import unittest
class BaseTest(unittest.TestCase):
TEST_FILE_PY = None
def do_test(self, *args):
popen_kwargs = dict(
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if sys.version_info >= (3, 6):
popen_kwargs["encoding"] = "utf-8"
p = subprocess.Popen(
[sys.executable, self.TEST_FILE_PY] + list(args), **popen_kwargs
)
p.wait()
sys.stdout.writelines(p.stdout)
sys.stderr.writelines(p.stderr)
if p.returncode:
self.fail("".join(p.stderr))
def run_python(self, *args):
events = []
popen_kwargs = dict(
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if sys.version_info >= (3, 6):
popen_kwargs["encoding"] = "utf-8"
p = subprocess.Popen(
[sys.executable, self.TEST_FILE_PY] + list(args), **popen_kwargs
)
p.wait()
sys.stderr.writelines(p.stderr)
return (
p.returncode,
[line.strip().partition(" ") for line in p.stdout],
"".join(p.stderr),
)

+ 150
- 0
tests/span-tests.py View File

@ -0,0 +1,150 @@
import mock
import pytest
import sysaudit
def assert_span_message(span, call_args, type, data=None):
(args,) = call_args
(message,) = args
assert isinstance(message, sysaudit.Span.Message)
assert message.span == span
assert message.type == type
assert message.data == data
def assert_start_message(span, call_args, data=None):
assert_span_message(span, call_args, "start", data=data)
def assert_end_message(span, call_args, data=None):
assert_span_message(span, call_args, "end", data=data)
def assert_annotate_message(span, call_args, data=None):
assert_span_message(span, call_args, "annotate", data=data)
def test_properties():
# Name only
span = sysaudit.Span("span.name")
assert span.started is False
assert span.ended is False
assert span.data is None
assert span.name == "span.name"
assert span.id == id(span)
# Name and data
span = sysaudit.Span("span.name", data="some-data")
assert span.started is False
assert span.ended is False
assert span.data == "some-data"
assert span.name == "span.name"
assert span.id == id(span)
def test_started_ended():
span = sysaudit.Span("span.name")
span.start()
assert span.started is True
assert span.ended is False
span.end()
assert span.started is True
assert span.ended is True
def test_started_ended_contextmanager():
span = sysaudit.Span("span.name")
with span:
assert span.started is True
assert span.ended is False
assert span.started is True
assert span.ended is True
def test_lifecycle_events():
hook = mock.Mock()
sysaudit.subscribe("span.name", hook)
span = sysaudit.Span("span.name")
hook.assert_not_called()
span.start()
assert hook.call_count == 1
assert_start_message(span, hook.call_args[0])
# Call a second time
span.start()
# We did not emit a second start message
assert hook.call_count == 1
span.end()
assert hook.call_count == 2
assert_end_message(span, hook.call_args[0])
# Call a second time
span.end()
# We did not emit a second end message
assert hook.call_count == 2
def test_lifecycle_events_contextmanager():
hook = mock.Mock()
sysaudit.subscribe("span.name", hook)
with sysaudit.Span("span.name") as span:
assert hook.call_count == 1
assert_start_message(span, hook.call_args[0])
assert hook.call_count == 2
assert_end_message(
span, hook.call_args[0], data=dict(exc_tb=None, exc_type=None, exc_val=None)
)
def test_lifecycle_events_data():
hook = mock.Mock()
sysaudit.subscribe("span.name", hook)
span = sysaudit.Span("span.name")
span.start("start-data")
assert hook.call_count == 1
assert_start_message(span, hook.call_args[0], data="start-data")
span.end("end-data")
assert hook.call_count == 2
assert_end_message(span, hook.call_args[0], data="end-data")
def test_end_before_start():
span = sysaudit.Span("span.name")
with pytest.raises(RuntimeError):
span.end()
def test_custom_message():
hook = mock.Mock()
sysaudit.subscribe("span.name", hook)
span = sysaudit.Span("span.name")
span.message("custom", "message")
assert hook.call_count == 1
assert_span_message(span, hook.call_args[0], type="custom", data="message")
def test_annotate_message():
hook = mock.Mock()
sysaudit.subscribe("span.name", hook)
span = sysaudit.Span("span.name")
span.annotate("extra-data")
assert hook.call_count == 1
assert_annotate_message(span, hook.call_args[0], data="extra-data")

+ 45
- 0
tests/subscribe-tests.py View File

@ -0,0 +1,45 @@
import mock
import sysaudit
def test_basic():
test_hook = mock.Mock()
event_hook = mock.Mock()
sysaudit.subscribe("test", test_hook)
sysaudit.subscribe("event", event_hook)
assert sysaudit._subscriptions == dict(test=[test_hook], event=[event_hook])
sysaudit.audit("test", 1)
sysaudit.audit("test", 2)
sysaudit.audit("event", 3)
sysaudit.audit("event", 4)
sysaudit.audit("event", 5)
assert test_hook.mock_calls == [
mock.call((1,)),
mock.call((2,)),
]
assert event_hook.mock_calls == [
mock.call((3,)),
mock.call((4,)),
mock.call((5,)),
]
def test_multiple_hooks():
test_hook_1 = mock.Mock()
test_hook_2 = mock.Mock()
test_hook_3 = mock.Mock()
sysaudit.subscribe("test", test_hook_1)
sysaudit.subscribe("test", test_hook_2)
sysaudit.subscribe("test", test_hook_3)
sysaudit.audit("test", 1)
test_hook_1.assert_called_once_with(((1,)))
test_hook_2.assert_called_once_with(((1,)))
test_hook_3.assert_called_once_with(((1,)))

+ 5
- 41
tests/test_audit.py View File

@ -8,54 +8,18 @@ import subprocess
import sys
import unittest
from .base import BaseTest
AUDIT_TESTS_PY = os.path.abspath(
os.path.join(os.path.dirname(__file__), "audit-tests.py")
)
skip_old_py = unittest.skipIf(
sys.version_info < (3, 8), "Skipping tests testing built-in events"
)
class AuditTest(unittest.TestCase):
def do_test(self, *args):
popen_kwargs = dict(
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if sys.version_info >= (3, 6):
popen_kwargs["encoding"] = "utf-8"
p = subprocess.Popen(
[sys.executable, AUDIT_TESTS_PY] + list(args), **popen_kwargs
)
p.wait()
sys.stdout.writelines(p.stdout)
sys.stderr.writelines(p.stderr)
if p.returncode:
self.fail("".join(p.stderr))
def run_python(self, *args):
events = []
popen_kwargs = dict(
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if sys.version_info >= (3, 6):
popen_kwargs["encoding"] = "utf-8"
p = subprocess.Popen(
[sys.executable, AUDIT_TESTS_PY] + list(args), **popen_kwargs
)
p.wait()
sys.stderr.writelines(p.stderr)
return (
p.returncode,
[line.strip().partition(" ") for line in p.stdout],
"".join(p.stderr),
)
class AuditTest(BaseTest):
TEST_FILE_PY = os.path.abspath(
os.path.join(os.path.dirname(__file__), "audit-tests.py")
)
def test_basic(self):
self.do_test("test_basic")


+ 36
- 0
tests/test_spans.py View File

@ -0,0 +1,36 @@
import os
from .base import BaseTest
class SubscribeTest(BaseTest):
TEST_FILE_PY = os.path.abspath(
os.path.join(os.path.dirname(__file__), "span-tests.py")
)
def test_properties(self):
self.do_test("test_properties")
def test_started_ended(self):
self.do_test("test_started_ended")
def test_started_ended_contextmanager(self):
self.do_test("test_started_ended_contextmanager")
def test_lifecycle_events(self):
self.do_test("test_lifecycle_events")
def test_lifecycle_events_contextmanager(self):
self.do_test("test_lifecycle_events_contextmanager")
def test_lifecycle_events_data(self):
self.do_test("test_lifecycle_events_data")
def test_end_before_start(self):
self.do_test("test_end_before_start")
def test_custom_message(self):
self.do_test("test_custom_message")
def test_annotate_message(self):
self.do_test("test_custom_message")

+ 15
- 0
tests/test_subscribe.py View File

@ -0,0 +1,15 @@
import os
from .base import BaseTest
class SubscribeTest(BaseTest):
TEST_FILE_PY = os.path.abspath(
os.path.join(os.path.dirname(__file__), "subscribe-tests.py")
)
def test_basic(self):
self.do_test("test_basic")
def test_multiple_hooks(self):
self.do_test("test_multiple_hooks")

Loading…
Cancel
Save