Convert all tests to pytest

Previously, tests were run against Anki launched by user.

Now,
 * most tests run against isolated Anki in current process;
 * tests in `test_server.py` launch another Anki in a separate process
   and run a few commands to test the server;
 * nearly all tests were preserved in the sense that
   what was being tested is tested still.
   A few tests in `test_graphical.py` are skipped due to
   a problem with the method tests, see the comments;
 * tests can be run:
   * In a single profile, using --no-tear-down-profile-after-each-test;
   * In a single app instance, but with the profile being torn down
     after each test--default;
   * In separate processes, using --forked.
This commit is contained in:
oakkitten 2022-03-30 20:16:39 +01:00
parent ddad426563
commit 8f1a2cc5fd
12 changed files with 1010 additions and 596 deletions

267
tests/conftest.py Normal file
View File

@ -0,0 +1,267 @@
import concurrent.futures
import time
from contextlib import contextmanager
from dataclasses import dataclass
import aqt.operations.note
import pytest
from PyQt5 import QtTest
from _pytest.monkeypatch import MonkeyPatch
from pytest_anki._launch import anki_running # noqa
from plugin import AnkiConnect
from plugin.edit import Edit
from plugin.util import DEFAULT_CONFIG
ac = AnkiConnect()
# wait for n seconds, while events are being processed
def wait(seconds):
milliseconds = int(seconds * 1000)
QtTest.QTest.qWait(milliseconds) # noqa
def wait_until(booleanish_function, at_most_seconds=30):
deadline = time.time() + at_most_seconds
while time.time() < deadline:
if booleanish_function():
return
wait(0.01)
raise Exception(f"Function {booleanish_function} never once returned "
f"a positive value in {at_most_seconds} seconds")
def delete_model(model_name):
model = ac.collection().models.byName(model_name)
ac.collection().models.remove(model["id"])
def close_all_dialogs_and_wait_for_them_to_run_closing_callbacks():
aqt.dialogs.closeAll(onsuccess=lambda: None)
wait_until(aqt.dialogs.allClosed)
# largely analogous to `aqt.mw.pm.remove`.
# by default, the profile is moved to trash. this is a problem for us,
# as on some systems trash folders may not exist.
# we can't delete folder and *then* call `aqt.mw.pm.remove`,
# as it calls `profileFolder` and that *creates* the folder!
def remove_current_profile():
import os
import shutil
def send2trash(profile_folder):
assert profile_folder.endswith("User 1")
if os.path.exists(profile_folder):
shutil.rmtree(profile_folder)
with MonkeyPatch().context() as monkey:
monkey.setattr(aqt.profiles, "send2trash", send2trash)
aqt.mw.pm.remove(aqt.mw.pm.name)
@contextmanager
def empty_anki_session_started():
with anki_running(
qtbot=None, # noqa
enable_web_debugging=False,
) as session:
yield session
# backups are run in a thread and can lead to warnings when the thread dies
# after trying to open collection after it's been deleted
@contextmanager
def profile_loaded(session):
with session.profile_loaded():
aqt.mw.pm.profile["numBackups"] = 0
yield session
@contextmanager
def anki_connect_config_loaded(session, web_bind_port):
with session.addon_config_created(
package_name="plugin",
default_config=DEFAULT_CONFIG,
user_config={**DEFAULT_CONFIG, "webBindPort": web_bind_port}
):
yield
@contextmanager
def current_decks_and_models_etc_preserved():
deck_names_before = ac.deckNames()
model_names_before = ac.modelNames()
try:
yield
finally:
deck_names_after = ac.deckNames()
model_names_after = ac.modelNames()
deck_names_to_delete = {*deck_names_after} - {*deck_names_before}
model_names_to_delete = {*model_names_after} - {*model_names_before}
ac.deleteDecks(decks=deck_names_to_delete, cardsToo=True)
for model_name in model_names_to_delete:
delete_model(model_name)
ac.guiDeckBrowser()
@dataclass
class Setup:
deck_id: int
note1_id: int
note2_id: int
card_ids: "list[int]"
def set_up_test_deck_and_test_model_and_two_notes():
ac.createModel(
modelName="test_model",
inOrderFields=["field1", "field2"],
cardTemplates=[
{"Front": "{{field1}}", "Back": "{{field2}}"},
{"Front": "{{field2}}", "Back": "{{field1}}"}
],
css="* {}",
)
deck_id = ac.createDeck("test_deck")
note1_id = ac.addNote(dict(
deckName="test_deck",
modelName="test_model",
fields={"field1": "note1 field1", "field2": "note1 field2"},
tags={"tag1"},
))
note2_id = ac.addNote(dict(
deckName="test_deck",
modelName="test_model",
fields={"field1": "note2 field1", "field2": "note2 field2"},
tags={"tag2"},
))
card_ids = ac.findCards(query="deck:test_deck")
return Setup(
deck_id=deck_id,
note1_id=note1_id,
note2_id=note2_id,
card_ids=card_ids,
)
#############################################################################
def pytest_addoption(parser):
parser.addoption("--tear-down-profile-after-each-test",
action="store_true",
default=True)
parser.addoption("--no-tear-down-profile-after-each-test", "-T",
action="store_false",
dest="tear_down_profile_after_each_test")
def pytest_report_header(config):
if config.option.forked:
return "test isolation: perfect; each test is run in a separate process"
if config.option.tear_down_profile_after_each_test:
return "test isolation: good; user profile is torn down after each test"
else:
return "test isolation: poor; only newly created decks and models " \
"are cleaned up between tests"
@pytest.fixture(autouse=True)
def run_background_tasks_on_main_thread(request, monkeypatch): # noqa
"""
Makes background operations such as card deletion execute on main thread
and execute the callback immediately
"""
def run_in_background(task, on_done=None, kwargs=None):
future = concurrent.futures.Future()
try:
future.set_result(task(**kwargs if kwargs is not None else {}))
except BaseException as e:
future.set_exception(e)
if on_done is not None:
on_done(future)
monkeypatch.setattr(aqt.mw.taskman, "run_in_background",
run_in_background)
# don't use run_background_tasks_on_main_thread for tests that don't run Anki
def pytest_generate_tests(metafunc):
if (
run_background_tasks_on_main_thread.__name__ in metafunc.fixturenames
and session_scope_empty_session.__name__ not in metafunc.fixturenames
):
metafunc.fixturenames.remove(run_background_tasks_on_main_thread.__name__)
@pytest.fixture(scope="session")
def session_scope_empty_session():
with empty_anki_session_started() as session:
yield session
@pytest.fixture(scope="session")
def session_scope_session_with_profile_loaded(session_scope_empty_session):
with profile_loaded(session_scope_empty_session):
yield session_scope_empty_session
@pytest.fixture
def session_with_profile_loaded(session_scope_empty_session, request):
"""
Like anki_session fixture from pytest-anki, but:
* Default profile is loaded
* It's relying on session-wide app instance so that
it can be used without forking every test;
this can be useful to speed up tests and also
to examine Anki's stdout/stderr, which is not visible with forking.
* If command line option --no-tear-down-profile-after-each-test is passed,
only the newly created decks and models are deleted.
Otherwise, the profile is completely torn down after each test.
Tearing down the profile is significantly slower.
"""
if request.config.option.tear_down_profile_after_each_test:
try:
with profile_loaded(session_scope_empty_session):
yield session_scope_empty_session
finally:
remove_current_profile()
else:
session = request.getfixturevalue(
session_scope_session_with_profile_loaded.__name__
)
with current_decks_and_models_etc_preserved():
yield session
@pytest.fixture
def setup(session_with_profile_loaded):
"""
Like session_with_profile_loaded, but also:
* Added are:
* A deck `test_deck`
* A model `test_model` with fields `filed1` and `field2`
and two cards per note
* Two notes with two valid cards each using the above deck and model
* Edit dialog is registered with dialog manager
* Any dialogs, if open, are safely closed on exit
"""
Edit.register_with_dialog_manager()
yield set_up_test_deck_and_test_model_and_two_notes()
close_all_dialogs_and_wait_for_them_to_run_closing_callbacks()

View File

@ -1,95 +1,78 @@
#!/usr/bin/env python
import pytest
from anki.errors import NotFoundError # noqa
import unittest
import util
from conftest import ac
class TestCards(unittest.TestCase):
def setUp(self):
util.invoke('createDeck', deck='test')
note = {
'deckName': 'test',
'modelName': 'Basic',
'fields': {'Front': 'front1', 'Back': 'back1'},
'tags': ['tag1'],
'options': {
'allowDuplicate': True
}
}
self.noteId = util.invoke('addNote', note=note)
def test_findCards(setup):
card_ids = ac.findCards(query="deck:test_deck")
assert len(card_ids) == 4
def tearDown(self):
util.invoke('deleteDecks', decks=['test'], cardsToo=True)
class TestEaseFactors:
def test_setEaseFactors(self, setup):
result = ac.setEaseFactors(cards=setup.card_ids, easeFactors=[4200] * 4)
assert result == [True] * 4
def test_setEaseFactors_with_invalid_card_id(self, setup):
result = ac.setEaseFactors(cards=[123], easeFactors=[4200])
assert result == [False]
def test_getEaseFactors(self, setup):
ac.setEaseFactors(cards=setup.card_ids, easeFactors=[4200] * 4)
result = ac.getEaseFactors(cards=setup.card_ids)
assert result == [4200] * 4
def test_getEaseFactors_with_invalid_card_id(self, setup):
assert ac.getEaseFactors(cards=[123]) == [None]
def runTest(self):
incorrectId = 1234
class TestSuspending:
def test_suspend(self, setup):
assert ac.suspend(cards=setup.card_ids) is True
# findCards
cardIds = util.invoke('findCards', query='deck:test')
self.assertEqual(len(cardIds), 1)
def test_suspend_fails_with_incorrect_id(self, setup):
with pytest.raises(NotFoundError):
assert ac.suspend(cards=[123])
# setEaseFactors
EASE_TO_TRY = 4200
easeFactors = [EASE_TO_TRY for card in cardIds]
couldGetEaseFactors = util.invoke('setEaseFactors', cards=cardIds, easeFactors=easeFactors)
self.assertEqual([True for card in cardIds], couldGetEaseFactors)
couldGetEaseFactors = util.invoke('setEaseFactors', cards=[incorrectId], easeFactors=[EASE_TO_TRY])
self.assertEqual([False], couldGetEaseFactors)
def test_areSuspended_returns_False_for_regular_cards(self, setup):
result = ac.areSuspended(cards=setup.card_ids)
assert result == [False] * 4
# getEaseFactors
easeFactorsFound = util.invoke('getEaseFactors', cards=cardIds)
self.assertEqual(easeFactors, easeFactorsFound)
easeFactorsFound = util.invoke('getEaseFactors', cards=[incorrectId])
self.assertEqual([None], easeFactorsFound)
def test_areSuspended_returns_True_for_suspended_cards(self, setup):
ac.suspend(setup.card_ids)
result = ac.areSuspended(cards=setup.card_ids)
assert result == [True] * 4
# suspend
util.invoke('suspend', cards=cardIds)
self.assertRaises(Exception, lambda: util.invoke('suspend', cards=[incorrectId]))
# areSuspended (part 1)
suspendedStates = util.invoke('areSuspended', cards=cardIds)
self.assertEqual(len(cardIds), len(suspendedStates))
self.assertNotIn(False, suspendedStates)
self.assertEqual([None], util.invoke('areSuspended', cards=[incorrectId]))
def test_areDue_returns_True_for_new_cards(setup):
result = ac.areDue(cards=setup.card_ids)
assert result == [True] * 4
# unsuspend
util.invoke('unsuspend', cards=cardIds)
# areSuspended (part 2)
suspendedStates = util.invoke('areSuspended', cards=cardIds)
self.assertEqual(len(cardIds), len(suspendedStates))
self.assertNotIn(True, suspendedStates)
def test_getIntervals(setup):
ac.getIntervals(cards=setup.card_ids, complete=False)
ac.getIntervals(cards=setup.card_ids, complete=True)
# areDue
dueStates = util.invoke('areDue', cards=cardIds)
self.assertEqual(len(cardIds), len(dueStates))
self.assertNotIn(False, dueStates)
# getIntervals
util.invoke('getIntervals', cards=cardIds, complete=True)
util.invoke('getIntervals', cards=cardIds, complete=False)
def test_cardsToNotes(setup):
result = ac.cardsToNotes(cards=setup.card_ids)
assert {*result} == {setup.note1_id, setup.note2_id}
# cardsToNotes
noteIds = util.invoke('cardsToNotes', cards=cardIds)
self.assertEqual(len(noteIds), len(cardIds))
self.assertIn(self.noteId, noteIds)
# cardsInfo
cardsInfo = util.invoke('cardsInfo', cards=cardIds)
self.assertEqual(len(cardsInfo), len(cardIds))
for i, cardInfo in enumerate(cardsInfo):
self.assertEqual(cardInfo['cardId'], cardIds[i])
cardsInfo = util.invoke('cardsInfo', cards=[incorrectId])
self.assertEqual(len(cardsInfo), 1)
self.assertDictEqual(cardsInfo[0], dict())
class TestCardInfo:
def test_with_valid_ids(self, setup):
result = ac.cardsInfo(cards=setup.card_ids)
assert [item["cardId"] for item in result] == setup.card_ids
# forgetCards
util.invoke('forgetCards', cards=cardIds)
def test_with_incorrect_id(self, setup):
result = ac.cardsInfo(cards=[123])
assert result == [{}]
# relearnCards
util.invoke('relearnCards', cards=cardIds)
if __name__ == '__main__':
unittest.main()
def test_forgetCards(setup):
ac.forgetCards(cards=setup.card_ids)
def test_relearnCards(setup):
ac.relearnCards(cards=setup.card_ids)

View File

@ -1,23 +0,0 @@
#!/usr/bin/env python
import unittest
import util
class TestNotes(unittest.TestCase):
def setUp(self):
util.invoke('createDeck', deck='test')
def tearDown(self):
util.invoke('deleteDecks', decks=['test'], cardsToo=True)
def test_bug164(self):
note = {'deckName': 'test', 'modelName': 'Basic', 'fields': {'Front': ' Whitespace\n', 'Back': ''}, 'options': { 'allowDuplicate': False, 'duplicateScope': 'deck'}}
util.invoke('addNote', note=note)
self.assertRaises(Exception, lambda: util.invoke('addNote', note=note))
if __name__ == '__main__':
unittest.main()

View File

@ -1,98 +1,73 @@
#!/usr/bin/env python
import pytest
import unittest
import util
from conftest import ac
class TestDecks(unittest.TestCase):
def runTest(self):
# deckNames (part 1)
deckNames = util.invoke('deckNames')
self.assertIn('Default', deckNames)
# deckNamesAndIds
result = util.invoke('deckNamesAndIds')
self.assertIn('Default', result)
self.assertEqual(result['Default'], 1)
# createDeck
util.invoke('createDeck', deck='test1')
# deckNames (part 2)
deckNames = util.invoke('deckNames')
self.assertIn('test1', deckNames)
# changeDeck
note = {'deckName': 'test1', 'modelName': 'Basic', 'fields': {'Front': 'front', 'Back': 'back'}, 'tags': ['tag']}
noteId = util.invoke('addNote', note=note)
cardIds = util.invoke('findCards', query='deck:test1')
util.invoke('changeDeck', cards=cardIds, deck='test2')
# deckNames (part 3)
deckNames = util.invoke('deckNames')
self.assertIn('test2', deckNames)
# deleteDecks
util.invoke('deleteDecks', decks=['test1', 'test2'], cardsToo=True)
# deckNames (part 4)
deckNames = util.invoke('deckNames')
self.assertNotIn('test1', deckNames)
self.assertNotIn('test2', deckNames)
# getDeckConfig
deckConfig = util.invoke('getDeckConfig', deck='Default')
self.assertEqual('Default', deckConfig['name'])
# saveDeckConfig
deckConfig = util.invoke('saveDeckConfig', config=deckConfig)
# setDeckConfigId
setDeckConfigId = util.invoke('setDeckConfigId', decks=['Default'], configId=1)
self.assertTrue(setDeckConfigId)
# cloneDeckConfigId (part 1)
deckConfigId = util.invoke('cloneDeckConfigId', cloneFrom=1, name='test')
self.assertTrue(deckConfigId)
# removeDeckConfigId (part 1)
removedDeckConfigId = util.invoke('removeDeckConfigId', configId=deckConfigId)
self.assertTrue(removedDeckConfigId)
# removeDeckConfigId (part 2)
removedDeckConfigId = util.invoke('removeDeckConfigId', configId=deckConfigId)
self.assertFalse(removedDeckConfigId)
# cloneDeckConfigId (part 2)
deckConfigId = util.invoke('cloneDeckConfigId', cloneFrom=deckConfigId, name='test')
self.assertFalse(deckConfigId)
# updateCompleteDeck
util.invoke('updateCompleteDeck', data={
'deck': 'test3',
'cards': {
'12': {
'id': 12, 'nid': 23, 'ord': 0, 'type': 0, 'queue': 0,
'due': 1186031, 'factor': 0, 'ivl': 0, 'reps': 0, 'lapses': 0, 'left': 0
}
},
'notes': {
'23': {
'id': 23, 'mid': 34, 'fields': ['frontValue', 'backValue'], 'tags': ['aTag']
}
},
'models': {
'34': {
'id': 34, 'fields': ['Front', 'Back'], 'templateNames': ['Card 1'], 'name': 'anotherModel',
}
}
})
deckNames = util.invoke('deckNames')
self.assertIn('test3', deckNames)
cardIDs = util.invoke('findCards', query='deck:test3')
self.assertEqual(len(cardIDs), 1)
self.assertEqual(cardIDs[0], 12)
def test_deckNames(session_with_profile_loaded):
result = ac.deckNames()
assert result == ["Default"]
if __name__ == '__main__':
unittest.main()
def test_deckNamesAndIds(session_with_profile_loaded):
result = ac.deckNamesAndIds()
assert result == {"Default": 1}
def test_createDeck(session_with_profile_loaded):
ac.createDeck("foo")
assert {*ac.deckNames()} == {"Default", "foo"}
def test_changeDeck(setup):
ac.changeDeck(cards=setup.card_ids, deck="bar")
assert "bar" in ac.deckNames()
def test_deleteDeck(setup):
before = ac.deckNames()
ac.deleteDecks(decks=["test_deck"], cardsToo=True)
after = ac.deckNames()
assert {*before} - {*after} == {"test_deck"}
@pytest.mark.skipif(
condition=ac._anki21_version < 28,
reason=f"Not applicable to Anki < 2.1.28"
)
def test_deleteDeck_must_be_called_with_cardsToo_set_to_True_on_later_api(setup):
with pytest.raises(Exception):
ac.deleteDecks(decks=["test_deck"])
with pytest.raises(Exception):
ac.deleteDecks(decks=["test_deck"], cardsToo=False)
def test_getDeckConfig(session_with_profile_loaded):
result = ac.getDeckConfig(deck="Default")
assert result["name"] == "Default"
def test_saveDeckConfig(session_with_profile_loaded):
config = ac.getDeckConfig(deck="Default")
result = ac.saveDeckConfig(config=config)
assert result is True
def test_setDeckConfigId(session_with_profile_loaded):
result = ac.setDeckConfigId(decks=["Default"], configId=1)
assert result is True
def test_cloneDeckConfigId(session_with_profile_loaded):
result = ac.cloneDeckConfigId(cloneFrom=1, name="test")
assert isinstance(result, int)
def test_removedDeckConfigId(session_with_profile_loaded):
new_config_id = ac.cloneDeckConfigId(cloneFrom=1, name="test")
assert ac.removeDeckConfigId(configId=new_config_id) is True
def test_removedDeckConfigId_fails_with_invalid_id(session_with_profile_loaded):
new_config_id = ac.cloneDeckConfigId(cloneFrom=1, name="test")
assert ac.removeDeckConfigId(configId=new_config_id) is True
assert ac.removeDeckConfigId(configId=new_config_id) is False

View File

@ -1,76 +1,151 @@
#!/usr/bin/env python
import aqt
import pytest
import unittest
import util
from conftest import ac, wait, wait_until, \
close_all_dialogs_and_wait_for_them_to_run_closing_callbacks
class TestGui(unittest.TestCase):
def runTest(self):
# guiBrowse
util.invoke('guiBrowse', query='deck:Default')
def test_guiBrowse(setup):
ac.guiBrowse()
# guiSelectedNotes
util.invoke('guiSelectedNotes')
# guiAddCards
util.invoke('guiAddCards')
def test_guiDeckBrowser(setup):
ac.guiDeckBrowser()
# guiAddCards with preset
util.invoke('createDeck', deck='test')
# todo executing this test without running background tasks on main thread
# rarely causes media server (`aqt.mediasrv`) to fail:
# its `run` method raises OSError: invalid file descriptor.
# this can cause other tests to fail to tear down;
# particularly, any dialogs with editor may fail to close
# due to their trying to save the note first, which is done via web view,
# which fails to complete due to corrupt media server. investigate?
def test_guiCheckDatabase(setup, run_background_tasks_on_main_thread):
ac.guiCheckDatabase()
def test_guiDeckOverview(setup):
assert ac.guiDeckOverview(name="test_deck") is True
class TestAddCards:
note = {
'deckName': 'test',
'modelName': 'Basic',
'fields': {
'Front': 'front1',
'Back': 'back1'
},
'tags': ['tag1'],
"deckName": "test_deck",
"modelName": "Basic",
"fields": {"Front": "new front1", "Back": "new back1"},
"tags": ["tag1"]
}
util.invoke('guiAddCards', note=note)
# guiAddCards with preset and closeAfterAdding
util.invoke('guiAddCards', note={
**note,
'options': { 'closeAfterAdding': True },
})
options_closeAfterAdding = {
"options": {
"closeAfterAdding": True
}
}
util.invoke('guiAddCards', note={
**note,
'picture': [{
'url': 'https://via.placeholder.com/150.png',
'filename': 'placeholder.png',
'fields': ['Front'],
}]
})
# an actual small image, you can see it if you run the test with GUI
# noinspection SpellCheckingInspection
base64_gif = "R0lGODlhBQAEAHAAACwAAAAABQAEAIH///8AAAAAAAAAAAACB0QMqZcXDwoAOw=="
# guiCurrentCard
# util.invoke('guiCurrentCard')
picture = {
"picture": [
{
"data": base64_gif,
"filename": "smiley.gif",
"fields": ["Front"],
}
]
}
# guiStartCardTimer
util.invoke('guiStartCardTimer')
@staticmethod
def click_on_add_card_dialog_save_button(dialog_name="AddCards"):
dialog = aqt.dialogs._dialogs[dialog_name][1]
dialog.addButton.click()
# guiShowQuestion
util.invoke('guiShowQuestion')
# todo previously, these tests were verifying
# that the return value of `guiAddCards` is `int`.
# while it is indeed `int`, on modern Anki it is also always a `0`,
# so we consider it useless. update documentation?
def test_without_note(self, setup):
ac.guiAddCards()
# guiShowAnswer
util.invoke('guiShowAnswer')
def test_with_note(self, setup):
ac.guiAddCards(note=self.note)
self.click_on_add_card_dialog_save_button()
close_all_dialogs_and_wait_for_them_to_run_closing_callbacks()
# guiAnswerCard
util.invoke('guiAnswerCard', ease=1)
assert len(ac.findCards(query="new")) == 1
# guiDeckOverview
util.invoke('guiDeckOverview', name='Default')
def test_with_note_and_a_picture(self, setup):
ac.guiAddCards(note={**self.note, **self.picture})
self.click_on_add_card_dialog_save_button()
close_all_dialogs_and_wait_for_them_to_run_closing_callbacks()
# guiDeckBrowser
util.invoke('guiDeckBrowser')
assert len(ac.findCards(query="new")) == 1
assert ac.retrieveMediaFile(filename="smiley.gif") == self.base64_gif
# guiDatabaseCheck
util.invoke('guiDatabaseCheck')
# todo the tested method, when called with option `closeAfterAdding=True`,
# is broken for the following reasons:
# * it uses the note that comes with dialog's Editor.
# this note might be of a different model than the proposed note,
# and field values from the proposed note can't be put into it.
# * most crucially, `AddCardsAndClose` is trying to override the method
# `_addCards` that is no longer present or called by the superclass.
# also, it creates and registers a new class each time it is called.
# todo fix the method, or ignore/disallow the option `closeAfterAdding`?
@pytest.mark.skip("API method `guiAddCards` is broken "
"when called with note option `closeAfterAdding=True`")
def test_with_note_and_closeAfterAdding(self, setup):
def find_AddCardsAndClose_dialog_registered_name():
for name in aqt.dialogs._dialogs.keys():
if name.startswith("AddCardsAndClose"):
return name
# guiExitAnki
# util.invoke('guiExitAnki')
def dialog_is_open(name):
return aqt.dialogs._dialogs[name][1] is not None
ac.guiAddCards(note={**self.note, **self.options_closeAfterAdding})
dialog_name = find_AddCardsAndClose_dialog_registered_name()
assert dialog_is_open(dialog_name)
self.click_on_add_card_dialog_save_button(dialog_name)
wait_until(aqt.dialogs.allClosed)
if __name__ == '__main__':
unittest.main()
class TestReviewActions:
@pytest.fixture
def reviewing_started(self, setup):
assert ac.guiDeckReview(name="test_deck") is True
def test_startCardTimer(self, reviewing_started):
assert ac.guiStartCardTimer() is True
def test_guiShowQuestion(self, reviewing_started):
assert ac.guiShowQuestion() is True
assert ac.reviewer().state == "question"
def test_guiShowAnswer(self, reviewing_started):
assert ac.guiShowAnswer() is True
assert ac.reviewer().state == "answer"
def test_guiAnswerCard(self, reviewing_started):
ac.guiShowAnswer()
reviews_before = ac.cardReviews(deck="test_deck", startID=0)
assert ac.guiAnswerCard(ease=4) is True
reviews_after = ac.cardReviews(deck="test_deck", startID=0)
assert len(reviews_after) == len(reviews_before) + 1
class TestSelectedNotes:
def test_with_valid_deck_query(self, setup):
ac.guiBrowse(query="deck:test_deck")
wait_until(ac.guiSelectedNotes)
assert ac.guiSelectedNotes()[0] in {setup.note1_id, setup.note2_id}
def test_with_invalid_deck_query(self, setup):
ac.guiBrowse(query="deck:test_deck")
wait_until(ac.guiSelectedNotes)
ac.guiBrowse(query="deck:invalid")
wait_until(lambda: not ac.guiSelectedNotes())

View File

@ -1,33 +1,51 @@
#!/usr/bin/env python
import base64
import unittest
import util
from conftest import ac
class TestMedia(unittest.TestCase):
def runTest(self):
filename = '_test.txt'
data = 'test'
# storeMediaFile
util.invoke('storeMediaFile', filename=filename, data=data)
filename2 = util.invoke('storeMediaFile', filename=filename, data='testtest', deleteExisting=False)
self.assertNotEqual(filename2, filename)
# retrieveMediaFile (part 1)
media = util.invoke('retrieveMediaFile', filename=filename)
self.assertEqual(media, data)
names = util.invoke('getMediaFilesNames', pattern='_tes*.txt')
self.assertEqual(set(names), set([filename, filename2]))
# deleteMediaFile
util.invoke('deleteMediaFile', filename=filename)
# retrieveMediaFile (part 2)
media = util.invoke('retrieveMediaFile', filename=filename)
self.assertFalse(media)
FILENAME = "_test.txt"
BASE64_DATA_1 = base64.b64encode(b"test 1").decode("ascii")
BASE64_DATA_2 = base64.b64encode(b"test 2").decode("ascii")
if __name__ == '__main__':
unittest.main()
def store_one_media_file():
return ac.storeMediaFile(filename=FILENAME, data=BASE64_DATA_1)
def store_two_media_files():
filename_1 = ac.storeMediaFile(filename=FILENAME, data=BASE64_DATA_1)
filename_2 = ac.storeMediaFile(filename=FILENAME, data=BASE64_DATA_2,
deleteExisting=False)
return filename_1, filename_2
##############################################################################
def test_storeMediaFile_one_file(session_with_profile_loaded):
filename_1 = store_one_media_file()
assert FILENAME == filename_1
def test_storeMediaFile_two_files_with_the_same_name(session_with_profile_loaded):
filename_1, filename_2 = store_two_media_files()
assert FILENAME == filename_1 != filename_2
def test_retrieveMediaFile(session_with_profile_loaded):
store_one_media_file()
result = ac.retrieveMediaFile(filename=FILENAME)
assert result == BASE64_DATA_1
def test_getMediaFilesNames(session_with_profile_loaded):
filenames = store_two_media_files()
result = ac.getMediaFilesNames(pattern="_tes*.txt")
assert {*filenames} == {*result}
def test_deleteMediaFile(session_with_profile_loaded):
filename_1, filename_2 = store_two_media_files()
ac.deleteMediaFile(filename=filename_1)
assert ac.retrieveMediaFile(filename=filename_1) is False
assert ac.getMediaFilesNames(pattern="_tes*.txt") == [filename_2]

View File

@ -1,68 +1,50 @@
#!/usr/bin/env python
import aqt
import os
import tempfile
import unittest
import util
from conftest import ac, anki_connect_config_loaded, \
set_up_test_deck_and_test_model_and_two_notes, \
current_decks_and_models_etc_preserved, wait
class TestMisc(unittest.TestCase):
def runTest(self):
# version
self.assertEqual(util.invoke('version'), 6)
# sync
util.invoke('sync')
# getProfiles
profiles = util.invoke('getProfiles')
self.assertIsInstance(profiles, list)
self.assertGreater(len(profiles), 0)
# loadProfile
util.invoke('loadProfile', name=profiles[0])
# multi
actions = [util.request('version'), util.request('version'), util.request('version')]
results = util.invoke('multi', actions=actions)
self.assertEqual(len(results), len(actions))
for result in results:
self.assertIsNone(result['error'])
self.assertEqual(result['result'], 6)
# exportPackage
fd, newname = tempfile.mkstemp(prefix='testexport', suffix='.apkg')
os.close(fd)
os.unlink(newname)
result = util.invoke('exportPackage', deck='Default', path=newname)
self.assertTrue(result)
self.assertTrue(os.path.exists(newname))
# importPackage
deckName = 'importTest'
fd, newname = tempfile.mkstemp(prefix='testimport', suffix='.apkg')
os.close(fd)
os.unlink(newname)
util.invoke('createDeck', deck=deckName)
note = {
'deckName': deckName,
'modelName': 'Basic',
'fields': {'Front': 'front1', 'Back': 'back1'},
'tags': '',
'options': {
'allowDuplicate': True
}
}
noteId = util.invoke('addNote', note=note)
util.invoke('exportPackage', deck=deckName, path=newname)
util.invoke('deleteDecks', decks=[deckName], cardsToo=True)
util.invoke('importPackage', path=newname)
deckNames = util.invoke('deckNames')
self.assertIn(deckName, deckNames)
# reloadCollection
util.invoke('reloadCollection')
# version is retrieved from config
def test_version(session_with_profile_loaded):
with anki_connect_config_loaded(
session=session_with_profile_loaded,
web_bind_port=0,
):
assert ac.version() == 6
if __name__ == '__main__':
unittest.main()
def test_reloadCollection(setup):
ac.reloadCollection()
class TestProfiles:
def test_getProfiles(self, session_with_profile_loaded):
result = ac.getProfiles()
assert result == ["User 1"]
# waiting a little while gets rid of the cryptic warning:
# Qt warning: QXcbConnection: XCB error: 8 (BadMatch), sequence: 658,
# resource id: 2097216, major code: 42 (SetInputFocus), minor code: 0
def test_loadProfile(self, session_with_profile_loaded):
aqt.mw.unloadProfileAndShowProfileManager()
wait(0.1)
ac.loadProfile(name="User 1")
class TestExportImport:
def test_exportPackage(self, session_with_profile_loaded, setup):
filename = session_with_profile_loaded.base + "/export.apkg"
ac.exportPackage(deck="test_deck", path=filename)
def test_importPackage(self, session_with_profile_loaded):
filename = session_with_profile_loaded.base + "/export.apkg"
with current_decks_and_models_etc_preserved():
set_up_test_deck_and_test_model_and_two_notes()
ac.exportPackage(deck="test_deck", path=filename)
with current_decks_and_models_etc_preserved():
assert "test_deck" not in ac.deckNames()
ac.importPackage(path=filename)
assert "test_deck" in ac.deckNames()

View File

@ -1,66 +1,112 @@
#!/usr/bin/env python
import unittest
import util
import uuid
from conftest import ac
MODEL_1_NAME = str(uuid.uuid4())
MODEL_2_NAME = str(uuid.uuid4())
def test_modelNames(setup):
result = ac.modelNames()
assert "test_model" in result
CSS = 'some random css'
NEW_CSS = 'new random css'
CARD_1_TEMPLATE = {'Front': 'field1', 'Back': 'field2'}
NEW_CARD_1_TEMPLATE = {'Front': 'question: field1', 'Back': 'answer: field2'}
def test_modelNamesAndIds(setup):
result = ac.modelNamesAndIds()
assert isinstance(result["test_model"], int)
TEXT_TO_REPLACE = "new random css"
REPLACE_WITH_TEXT = "new updated css"
class TestModels(unittest.TestCase):
def runTest(self):
# modelNames
modelNames = util.invoke('modelNames')
self.assertGreater(len(modelNames), 0)
def test_modelFieldNames(setup):
result = ac.modelFieldNames(modelName="test_model")
assert result == ["field1", "field2"]
# modelNamesAndIds
modelNamesAndIds = util.invoke('modelNamesAndIds')
self.assertGreater(len(modelNames), 0)
# modelFieldNames
modelFields = util.invoke('modelFieldNames', modelName=modelNames[0])
def test_modelFieldsOnTemplates(setup):
result = ac.modelFieldsOnTemplates(modelName="test_model")
assert result == {
"Card 1": [["field1"], ["field2"]],
"Card 2": [["field2"], ["field1"]],
}
# modelFieldsOnTemplates
modelFieldsOnTemplates = util.invoke('modelFieldsOnTemplates', modelName=modelNames[0])
# createModel with css
newModel = util.invoke('createModel', modelName=MODEL_1_NAME, inOrderFields=['field1', 'field2'], cardTemplates=[CARD_1_TEMPLATE], css=CSS)
class TestCreateModel:
createModel_kwargs = {
"modelName": "test_model_foo",
"inOrderFields": ["field1", "field2"],
"cardTemplates": [{"Front": "{{field1}}", "Back": "{{field2}}"}],
}
# createModel without css
newModel = util.invoke('createModel', modelName=MODEL_2_NAME, inOrderFields=['field1', 'field2'], cardTemplates=[CARD_1_TEMPLATE])
def test_createModel_without_css(self, session_with_profile_loaded):
ac.createModel(**self.createModel_kwargs)
# modelStyling: get model 1 css
css = util.invoke('modelStyling', modelName=MODEL_1_NAME)
self.assertEqual({'css': CSS}, css)
def test_createModel_with_css(self, session_with_profile_loaded):
ac.createModel(**self.createModel_kwargs, css="* {}")
# modelTemplates: get model 1 templates
templates = util.invoke('modelTemplates', modelName=MODEL_1_NAME)
self.assertEqual({'Card 1': CARD_1_TEMPLATE}, templates)
# updateModelStyling: change and verify model css
util.invoke('updateModelStyling', model={'name': MODEL_1_NAME, 'css': NEW_CSS})
new_css = util.invoke('modelStyling', modelName=MODEL_1_NAME)
self.assertEqual({'css': NEW_CSS}, new_css)
class TestStyling:
def test_modelStyling(self, setup):
result = ac.modelStyling(modelName="test_model")
assert result == {"css": "* {}"}
# updateModelTemplates: change and verify model 1 templates
util.invoke('updateModelTemplates', model={'name': MODEL_1_NAME, 'templates': {'Card 1': NEW_CARD_1_TEMPLATE}})
templates = util.invoke('modelTemplates', modelName=MODEL_1_NAME)
self.assertEqual({'Card 1': NEW_CARD_1_TEMPLATE}, templates)
def test_updateModelStyling(self, setup):
ac.updateModelStyling(model={
"name": "test_model",
"css": "* {color: red;}"
})
# findAndReplaceInModels: find and replace text in all models or model by name
util.invoke('findAndReplaceInModels', modelName=MODEL_1_NAME, findText=TEXT_TO_REPLACE, replaceText=REPLACE_WITH_TEXT, front=True, back=True, css=True)
new_css = util.invoke('modelStyling', modelName=MODEL_1_NAME)
self.assertEqual({'css': REPLACE_WITH_TEXT}, new_css)
assert ac.modelStyling(modelName="test_model") == {
"css": "* {color: red;}"
}
if __name__ == '__main__':
unittest.main()
class TestModelTemplates:
def test_modelTemplates(self, setup):
result = ac.modelTemplates(modelName="test_model")
assert result == {
"Card 1": {"Front": "{{field1}}", "Back": "{{field2}}"},
"Card 2": {"Front": "{{field2}}", "Back": "{{field1}}"}
}
def test_updateModelTemplates(self, setup):
ac.updateModelTemplates(model={
"name": "test_model",
"templates": {"Card 1": {"Front": "{{field1}}", "Back": "foo"}}
})
assert ac.modelTemplates(modelName="test_model") == {
"Card 1": {"Front": "{{field1}}", "Back": "foo"},
"Card 2": {"Front": "{{field2}}", "Back": "{{field1}}"}
}
def test_findAndReplaceInModels(setup):
ac.findAndReplaceInModels(
modelName="test_model",
findText="}}",
replaceText="}}!",
front=True,
back=False,
css=False,
)
ac.findAndReplaceInModels(
modelName="test_model",
findText="}}",
replaceText="}}?",
front=True,
back=True,
css=False,
)
ac.findAndReplaceInModels(
modelName="test_model",
findText="}",
replaceText="color: blue;}",
front=False,
back=False,
css=True,
)
assert ac.modelTemplates(modelName="test_model") == {
"Card 1": {"Front": "{{field1}}?!", "Back": "{{field2}}?"},
"Card 2": {"Front": "{{field2}}?!", "Back": "{{field1}}?"}
}
assert ac.modelStyling(modelName="test_model") == {
"css": "* {color: blue;}"
}

View File

@ -1,155 +1,142 @@
#!/usr/bin/env python
import pytest
from anki.errors import NotFoundError # noqa
import unittest
import util
from conftest import ac
class TestNotes(unittest.TestCase):
def setUp(self):
util.invoke('createDeck', deck='test')
def tearDown(self):
util.invoke('deleteDecks', decks=['test'], cardsToo=True)
def runTest(self):
options = {
'allowDuplicate': True
def make_note(*, front="front1", allow_duplicates=False):
note = {
"deckName": "test_deck",
"modelName": "Basic",
"fields": {"Front": front, "Back": "back1"},
"tags": ["tag1"],
}
note1 = {
'deckName': 'test',
'modelName': 'Basic',
'fields': {'Front': 'front1', 'Back': 'back1'},
'tags': ['tag1'],
'options': options
if allow_duplicates:
return {**note, "options": {"allowDuplicate": True}}
else:
return note
##############################################################################
class TestNoteAddition:
def test_addNote(self, setup):
result = ac.addNote(note=make_note())
assert isinstance(result, int)
def test_addNote_will_not_allow_duplicates_by_default(self, setup):
ac.addNote(make_note())
with pytest.raises(Exception, match="it is a duplicate"):
ac.addNote(make_note())
def test_addNote_will_allow_duplicates_if_options_say_aye(self, setup):
ac.addNote(make_note())
ac.addNote(make_note(allow_duplicates=True))
def test_addNotes(self, setup):
result = ac.addNotes(notes=[
make_note(front="foo"),
make_note(front="bar"),
make_note(front="foo"),
])
assert len(result) == 3
assert isinstance(result[0], int)
assert isinstance(result[1], int)
assert result[2] is None
def test_bug164(self, setup):
note = {
"deckName": "test_deck",
"modelName": "Basic",
"fields": {"Front": " Whitespace\n", "Back": ""},
"options": {"allowDuplicate": False, "duplicateScope": "deck"}
}
note2 = {
'deckName': 'test',
'modelName': 'Basic',
'fields': {'Front': 'front1', 'Back': 'back1'},
'tags': ['tag1']
}
ac.addNote(note=note)
with pytest.raises(Exception, match="it is a duplicate"):
ac.addNote(note=note)
notes1 = [
{
'deckName': 'test',
'modelName': 'Basic',
'fields': {'Front': 'front3', 'Back': 'back3'},
'tags': ['tag'],
'options': options
},
{
'deckName': 'test',
'modelName': 'Basic',
'fields': {'Front': 'front4', 'Back': 'back4'},
'tags': ['tag'],
'options': options
}
]
notes2 = [
{
'deckName': 'test',
'modelName': 'Basic',
'fields': {'Front': 'front3', 'Back': 'back3'},
'tags': ['tag']
},
{
'deckName': 'test',
'modelName': 'Basic',
'fields': {'Front': 'front4', 'Back': 'back4'},
'tags': ['tag']
}
def test_notesInfo(setup):
result = ac.notesInfo(notes=[setup.note1_id])
assert len(result) == 1
assert result[0]["noteId"] == setup.note1_id
assert result[0]["tags"] == ["tag1"]
assert result[0]["fields"]["field1"]["value"] == "note1 field1"
class TestTags:
def test_addTags(self, setup):
ac.addTags(notes=[setup.note1_id], tags="tag2")
tags = ac.notesInfo(notes=[setup.note1_id])[0]["tags"]
assert {*tags} == {"tag1", "tag2"}
def test_getTags(self, setup):
result = ac.getTags()
assert {*result} == {"tag1", "tag2"}
def test_removeTags(self, setup):
ac.removeTags(notes=[setup.note2_id], tags="tag2")
assert ac.notesInfo(notes=[setup.note2_id])[0]["tags"] == []
def test_replaceTags(self, setup):
ac.replaceTags(notes=[setup.note1_id, 123],
tag_to_replace="tag1", replace_with_tag="foo")
notes_info = ac.notesInfo(notes=[setup.note1_id])
assert notes_info[0]["tags"] == ["foo"]
def test_replaceTagsInAllNotes(self, setup):
ac.replaceTagsInAllNotes(tag_to_replace="tag1", replace_with_tag="foo")
notes_info = ac.notesInfo(notes=[setup.note1_id])
assert notes_info[0]["tags"] == ["foo"]
def test_clearUnusedTags(self, setup):
ac.removeTags(notes=[setup.note2_id], tags="tag2")
ac.clearUnusedTags()
assert ac.getTags() == ["tag1"]
class TestUpdateNoteFields:
def test_updateNoteFields(self, setup):
new_fields = {"field1": "foo", "field2": "bar"}
good_note = {"id": setup.note1_id, "fields": new_fields}
ac.updateNoteFields(note=good_note)
notes_info = ac.notesInfo(notes=[setup.note1_id])
assert notes_info[0]["fields"]["field2"]["value"] == "bar"
def test_updateNoteFields_will_note_update_invalid_notes(self, setup):
bad_note = {"id": 123, "fields": make_note()["fields"]}
with pytest.raises(NotFoundError):
ac.updateNoteFields(note=bad_note)
class TestCanAddNotes:
foo_bar_notes = [make_note(front="foo"), make_note(front="bar")]
def test_canAddNotes(self, setup):
result = ac.canAddNotes(notes=self.foo_bar_notes)
assert result == [True, True]
def test_canAddNotes_will_not_add_duplicates_if_options_do_not_say_aye(self, setup):
ac.addNotes(notes=self.foo_bar_notes)
notes = [
make_note(front="foo"),
make_note(front="baz"),
make_note(front="foo", allow_duplicates=True)
]
result = ac.canAddNotes(notes=notes)
assert result == [False, True, True]
# addNote
noteId = util.invoke('addNote', note=note1)
self.assertRaises(Exception, lambda: util.invoke('addNote', note=note2))
def test_findNotes(setup):
result = ac.findNotes(query="deck:test_deck")
assert {*result} == {setup.note1_id, setup.note2_id}
# addTags
util.invoke('addTags', notes=[noteId], tags='tag2')
# notesInfo (part 1)
noteInfos = util.invoke('notesInfo', notes=[noteId])
self.assertEqual(len(noteInfos), 1)
noteInfo = noteInfos[0]
self.assertEqual(noteInfo['noteId'], noteId)
self.assertSetEqual(set(noteInfo['tags']), {'tag1', 'tag2'})
self.assertEqual(noteInfo['fields']['Front']['value'], 'front1')
self.assertEqual(noteInfo['fields']['Back']['value'], 'back1')
# getTags
allTags = util.invoke('getTags')
self.assertIn('tag1', allTags)
self.assertIn('tag2', allTags)
# removeTags
util.invoke('removeTags', notes=[noteId], tags='tag2')
# updateNoteFields
incorrectId = 1234
noteUpdateIncorrectId = {'id': incorrectId, 'fields': {'Front': 'front2', 'Back': 'back2'}}
self.assertRaises(Exception, lambda: util.invoke('updateNoteFields', note=noteUpdateIncorrectId))
noteUpdate = {'id': noteId, 'fields': {'Front': 'front2', 'Back': 'back2'}}
util.invoke('updateNoteFields', note=noteUpdate)
# replaceTags
util.invoke('replaceTags', notes=[noteId, incorrectId], tag_to_replace='tag1', replace_with_tag='new_tag')
# notesInfo (part 2)
noteInfos = util.invoke('notesInfo', notes=[noteId, incorrectId])
self.assertEqual(len(noteInfos), 2)
self.assertDictEqual(noteInfos[1], dict()) # Test that returns empty dict if incorrect id was passed
noteInfo = noteInfos[0]
self.assertSetEqual(set(noteInfo['tags']), {'new_tag'})
self.assertIn('new_tag', noteInfo['tags'])
self.assertNotIn('tag2', noteInfo['tags'])
self.assertEqual(noteInfo['fields']['Front']['value'], 'front2')
self.assertEqual(noteInfo['fields']['Back']['value'], 'back2')
# canAddNotes (part 1)
noteStates = util.invoke('canAddNotes', notes=notes1)
self.assertEqual(len(noteStates), len(notes1))
self.assertNotIn(False, noteStates)
# addNotes (part 1)
noteIds = util.invoke('addNotes', notes=notes1)
self.assertEqual(len(noteIds), len(notes1))
for noteId in noteIds:
self.assertNotEqual(noteId, None)
# replaceTagsInAllNotes
currentTag = notes1[0]['tags'][0]
new_tag = 'new_tag'
util.invoke('replaceTagsInAllNotes', tag_to_replace=currentTag, replace_with_tag=new_tag)
noteInfos = util.invoke('notesInfo', notes=noteIds)
for noteInfo in noteInfos:
self.assertIn(new_tag, noteInfo['tags'])
self.assertNotIn(currentTag, noteInfo['tags'])
# canAddNotes (part 2)
noteStates = util.invoke('canAddNotes', notes=notes2)
self.assertNotIn(True, noteStates)
self.assertEqual(len(noteStates), len(notes2))
# addNotes (part 2)
noteIds = util.invoke('addNotes', notes=notes2)
self.assertEqual(len(noteIds), len(notes2))
for noteId in noteIds:
self.assertEqual(noteId, None)
# findNotes
noteIds = util.invoke('findNotes', query='deck:test')
self.assertEqual(len(noteIds), len(notes1) + 1)
# deleteNotes
util.invoke('deleteNotes', notes=noteIds)
noteIds = util.invoke('findNotes', query='deck:test')
self.assertEqual(len(noteIds), 0)
if __name__ == '__main__':
unittest.main()
def test_deleteNotes(setup):
ac.deleteNotes(notes=[setup.note1_id, setup.note2_id])
result = ac.findNotes(query="deck:test_deck")
assert result == []

149
tests/test_server.py Normal file
View File

@ -0,0 +1,149 @@
import json
import multiprocessing
import time
import urllib.error
import urllib.request
from contextlib import contextmanager
from dataclasses import dataclass
from functools import partial
import pytest
from pytest_anki._launch import anki_running # noqa
from pytest_anki._util import find_free_port # noqa
from plugin import AnkiConnect
from tests.conftest import wait_until, \
empty_anki_session_started, \
anki_connect_config_loaded, \
profile_loaded
@contextmanager
def function_running_in_a_process(context, function):
process = context.Process(target=function)
process.start()
try:
yield process
finally:
process.join()
# todo stop the server?
@contextmanager
def anki_connect_web_server_started():
plugin = AnkiConnect()
plugin.startWebServer()
yield plugin
@dataclass
class Client:
port: int
@staticmethod
def make_request(action, **params):
return {"action": action, "params": params, "version": 6}
def send_request(self, action, **params):
request_url = f"http://localhost:{self.port}"
request_data = self.make_request(action, **params)
request_json = json.dumps(request_data).encode("utf-8")
request = urllib.request.Request(request_url, request_json)
response = json.load(urllib.request.urlopen(request))
return response
def wait_for_web_server_to_come_live(self, at_most_seconds=30):
deadline = time.time() + at_most_seconds
while time.time() < deadline:
try:
self.send_request("version")
return
except urllib.error.URLError:
time.sleep(0.01)
raise Exception(f"Anki-Connect web server did not come live "
f"in {at_most_seconds} seconds")
# spawning requires a top-level function for pickling
def external_anki_entry_function(web_bind_port, exit_event):
with empty_anki_session_started() as session:
with anki_connect_config_loaded(session, web_bind_port):
with anki_connect_web_server_started():
with profile_loaded(session):
wait_until(exit_event.is_set)
@contextmanager
def external_anki_running(process_run_method):
context = multiprocessing.get_context(process_run_method)
exit_event = context.Event()
web_bind_port = find_free_port()
function = partial(external_anki_entry_function, web_bind_port, exit_event)
with function_running_in_a_process(context, function) as process:
client = Client(port=web_bind_port)
client.wait_for_web_server_to_come_live()
try:
yield client
finally:
exit_event.set()
assert process.exitcode == 0
# if a Qt app was already launched in current process,
# launching a new Qt app, even from grounds up, fails or hangs.
# of course, this includes forked processes. therefore,
# * if launching without --forked, use the `spawn` process run method;
# * otherwise, use the `fork` method, as it is significantly faster.
# with --forked, each test has its fixtures assembled inside the fork,
# which means that when the test begins, Qt was never started in the fork.
@pytest.fixture(scope="module")
def external_anki(request):
"""
Runs Anki in an external process, with the plugin loaded and started.
On exit, neatly ends the process and makes sure its exit code is 0.
Yields a client that can send web request to the external process.
"""
with external_anki_running(
"fork" if request.config.option.forked else "spawn"
) as client:
yield client
##############################################################################
def test_successful_request(external_anki):
response = external_anki.send_request("version")
assert response == {"error": None, "result": 6}
def test_can_handle_multiple_requests(external_anki):
assert external_anki.send_request("version") == {"error": None, "result": 6}
assert external_anki.send_request("version") == {"error": None, "result": 6}
def test_multi_request(external_anki):
version_request = Client.make_request("version")
response = external_anki.send_request("multi", actions=[version_request] * 3)
assert response == {
"error": None,
"result": [{"error": None, "result": 6}] * 3
}
def test_failing_request_due_to_bad_arguments(external_anki):
response = external_anki.send_request("addNote", bad="request")
assert response["result"] is None
assert "unexpected keyword argument" in response["error"]
def test_failing_request_due_to_anki_raising_exception(external_anki):
response = external_anki.send_request("suspend", cards=[-123])
assert response["result"] is None
assert "Card was not found" in response["error"]

View File

@ -1,55 +1,31 @@
#!/usr/bin/env python
import os
import tempfile
import unittest
import util
from conftest import ac
class TestStats(unittest.TestCase):
def setUp(self):
util.invoke('createDeck', deck='test')
note = {
'deckName': 'test',
'modelName': 'Basic',
'fields': {'Front': 'front1', 'Back': 'back1'},
'tags': ['tag1'],
'options': {
'allowDuplicate': True
}
}
self.noteId = util.invoke('addNote', note=note)
def tearDown(self):
util.invoke('deleteDecks', decks=['test'], cardsToo=True)
def runTest(self):
# getNumCardsReviewedToday
result = util.invoke('getNumCardsReviewedToday')
self.assertIsInstance(result, int)
# getNumCardsReviewedByDay
result = util.invoke('getNumCardsReviewedByDay')
self.assertIsInstance(result, list)
# collectionStats
result = util.invoke('getCollectionStatsHTML')
self.assertIsInstance(result, str)
# no reviews for new deck
self.assertEqual(len(util.invoke('cardReviews', deck='test', startID=0)), 0)
self.assertEqual(util.invoke('getLatestReviewID', deck='test'), 0)
# # add reviews
# cardId = int(util.invoke('findCards', query='deck:test')[0])
# latestID = 123456 # small enough to not interfere with existing reviews
# util.invoke('insertReviews', reviews=[
# [latestID-1, cardId, -1, 3, 4, -60, 2500, 6157, 0],
# [latestID, cardId, -1, 1, -60, -60, 0, 4846, 0]
# ])
# self.assertEqual(len(util.invoke('cardReviews', deck='test', startID=0)), 2)
# self.assertEqual(util.invoke('getLatestReviewID', deck='test'), latestID)
def test_getNumCardsReviewedToday(setup):
result = ac.getNumCardsReviewedToday()
assert isinstance(result, int)
if __name__ == '__main__':
unittest.main()
def test_getNumCardsReviewedByDay(setup):
result = ac.getNumCardsReviewedByDay()
assert isinstance(result, list)
def test_getCollectionStatsHTML(setup):
result = ac.getCollectionStatsHTML()
assert isinstance(result, str)
class TestReviews:
def test_zero_reviews_for_a_new_deck(self, setup):
assert ac.cardReviews(deck="test_deck", startID=0) == []
assert ac.getLatestReviewID(deck="test_deck") == 0
def test_some_reviews_for_a_reviewed_deck(self, setup):
ac.insertReviews(reviews=[
(456, setup.card_ids[0], -1, 3, 4, -60, 2500, 6157, 0),
(789, setup.card_ids[1], -1, 1, -60, -60, 0, 4846, 0)
])
assert len(ac.cardReviews(deck="test_deck", startID=0)) == 2
assert ac.getLatestReviewID(deck="test_deck") == 789

View File

@ -1,21 +0,0 @@
import json
import urllib.request
def request(action, **params):
return {'action': action, 'params': params, 'version': 6}
def invoke(action, **params):
requestJson = json.dumps(request(action, **params)).encode('utf-8')
response = json.load(urllib.request.urlopen(urllib.request.Request('http://localhost:8765', requestJson)))
if len(response) != 2:
raise Exception('response has an unexpected number of fields')
if 'error' not in response:
raise Exception('response is missing required error field')
if 'result' not in response:
raise Exception('response is missing required result field')
if response['error'] is not None:
raise Exception(response['error'])
return response['result']