Merge remote-tracking branch 'oakkitten/edit-dialog'

This commit is contained in:
Alex Yatskov 2022-04-25 19:48:26 -07:00
commit 8e9879f80f
21 changed files with 1886 additions and 880 deletions

25
.github/workflows/main.yml vendored Normal file
View File

@ -0,0 +1,25 @@
name: Tests
on: [push, pull_request, workflow_dispatch]
jobs:
run-tests:
name: Run tests
runs-on: ubuntu-latest
steps:
- name: Install dependencies
run: |
sudo apt-get update
sudo apt-get install -y pyqt5-dev-tools xvfb
- name: Setup Python
uses: actions/setup-python@v2
- name: Install tox
run: pip install tox
- name: Checkout repository
uses: actions/checkout@v2
- name: Run tests
run: tox -- --forked --verbose

137
README.md
View File

@ -238,6 +238,35 @@ corresponding to when the API was available for use.
}
```
* **setSpecificValueOfCard**
Sets specific value of a single card. Given the risk of wreaking havor in the database when changing some of the values of a card, some of the keys require the argument "warning_check" set to True.
This can be used to set a card's flag, change it's ease factor, change the review order in a filtered deck and change the column "data" (not currently used by anki apparantly), and many other values.
A list of values and explanation of their respective utility can be found at [AnkiDroid's wiki](https://github.com/ankidroid/Anki-Android/wiki/Database-Structure).
*Sample request*:
```json
{
"action": "setSpecificValueOfCard",
"version": 6,
"params": {
"card": 1483959291685,
"keys": ["flags", "odue"],
"newValues": ["1", "-100"]
}
}
```
*Sample result*:
```json
{
"result": [true, true],
"error": null
}
```
* **suspend**
Suspend cards by card ID; returns `true` if successful (at least one card wasn't already suspended) or `false`
@ -716,8 +745,8 @@ corresponding to when the API was available for use.
* **deleteDecks**
Deletes decks with the given names. If `cardsToo` is `true` (defaults to `false` if unspecified), the cards within
the deleted decks will also be deleted; otherwise they will be moved to the default deck.
Deletes decks with the given names.
The argument `cardsToo` *must* be specified and set to `true`.
*Sample request*:
```json
@ -931,76 +960,6 @@ corresponding to when the API was available for use.
}
```
* **updateCompleteDeck**
Pastes all transmitted data into the database and reloads the collection.
You can send a deckName and corresponding cards, notes and models.
All cards are assumed to belong to the given deck.
All notes referenced by given cards should be present.
All models referenced by given notes should be present.
*Sample request*:
```json
{
"action": "updateCompleteDeck",
"version": 6,
"params": {
"data": {
"deck": "test3",
"cards": {
"1485369472028": {
"id": 1485369472028,
"nid": 1485369340204,
"ord": 0,
"type": 0,
"queue": 0,
"due": 1186031,
"factor": 0,
"ivl": 0,
"reps": 0,
"lapses": 0,
"left": 0
}
},
"notes": {
"1485369340204": {
"id": 1485369340204,
"mid": 1375786181313,
"fields": [
"frontValue",
"backValue"
],
"tags": [
"aTag"
]
}
},
"models": {
"1375786181313": {
"id": 1375786181313,
"name": "anotherModel",
"fields": [
"Front",
"Back"
],
"templateNames": [
"Card 1"
]
}
}
}
}
}
```
*Sample result*:
```json
{
"result": null,
"error": null
}
```
#### Graphical Actions
* **guiBrowse**
@ -1056,9 +1015,6 @@ corresponding to when the API was available for use.
Audio, video, and picture files can be embedded into the fields via the `audio`, `video`, and `picture` keys, respectively.
Refer to the documentation of `addNote` and `storeMediaFile` for an explanation of these fields.
The `closeAfterAdding` member inside `options` group can be set to true to create a dialog that closes upon adding the note.
Invoking the action mutliple times with this option will create _multiple windows_.
The result is the ID of the note which would be added, if the user chose to confirm the *Add Cards* dialogue.
*Sample request*:
@ -1074,9 +1030,6 @@ corresponding to when the API was available for use.
"Text": "The capital of Romania is {{c1::Bucharest}}",
"Extra": "Romania is a country in Europe"
},
"options": {
"closeAfterAdding": true
},
"tags": [
"countries"
],
@ -1100,6 +1053,34 @@ corresponding to when the API was available for use.
}
```
* **guiEditNote**
Opens the *Edit* dialog with a note corresponding to given note ID.
The dialog is similar to the *Edit Current* dialog, but:
* has a Preview button to preview the cards for the note
* has a Browse button to open the browser with these cards
* has Previous/Back buttons to navigate the history of the dialog
* has no bar with the Close button
*Sample request*:
```json
{
"action": "guiEditNote",
"version": 6,
"params": {
"note": 1649198355435
}
}
```
*Sample result*:
```json
{
"result": null,
"error": null
}
```
* **guiCurrentCard**
Returns information about the current card or `null` if not in review mode.

View File

@ -40,7 +40,8 @@ from anki.consts import MODEL_CLOZE
from anki.exporting import AnkiPackageExporter
from anki.importing import AnkiPackageImporter
from anki.notes import Note
from anki.utils import joinFields, intTime, guid64, fieldChecksum
from .edit import Edit
try:
from anki.rsbackend import NotFoundError
@ -59,14 +60,19 @@ class AnkiConnect:
def __init__(self):
self.log = None
self.timer = None
self.server = web.WebServer(self.handler)
def initLogging(self):
logPath = util.setting('apiLogPath')
if logPath is not None:
self.log = open(logPath, 'w')
def startWebServer(self):
try:
self.server = web.WebServer(self.handler)
self.server.listen()
# only keep reference to prevent garbage collection
self.timer = QTimer()
self.timer.timeout.connect(self.advance)
self.timer.start(util.setting('apiPollInterval'))
@ -531,12 +537,22 @@ class AnkiConnect:
@util.api()
def deleteDecks(self, decks, cardsToo=False):
if not cardsToo:
# since f592672fa952260655881a75a2e3c921b2e23857 (2.1.28)
# (see anki$ git log "-Gassert cardsToo")
# you can't delete decks without deleting cards as well.
# however, since 62c23c6816adf912776b9378c008a52bb50b2e8d (2.1.45)
# passing cardsToo to `rem` (long deprecated) won't raise an error!
# this is dangerous, so let's raise our own exception
if self._anki21_version >= 28:
raise Exception("Since Anki 2.1.28 it's not possible "
"to delete decks without deleting cards as well")
try:
self.startEditing()
decks = filter(lambda d: d in self.deckNames(), decks)
for deck in decks:
did = self.decks().id(deck)
self.decks().rem(did, cardsToo)
self.decks().rem(did, cardsToo=cardsToo)
finally:
self.stopEditing()
@ -833,6 +849,38 @@ class AnkiConnect:
return couldSetEaseFactors
@util.api()
def setSpecificValueOfCard(self, card, keys,
newValues, warning_check=False):
if isinstance(card, list):
print("card has to be int, not list")
return False
if not isinstance(keys, list) or not isinstance(newValues, list):
print("keys and newValues have to be lists.")
return False
if len(newValues) != len(keys):
print("Invalid list lengths.")
return False
for key in keys:
if key in ["did", "id", "ivl", "lapses", "left", "mod", "nid",
"odid", "odue", "ord", "queue", "reps", "type", "usn"]:
if warning_check is False:
return False
result = []
try:
ankiCard = self.getCard(card)
for i, key in enumerate(keys):
setattr(ankiCard, key, newValues[i])
ankiCard.flush()
result.append(True)
except Exception as e:
result.append([False, str(e)])
return result
@util.api()
def getEaseFactors(self, cards):
@ -1095,7 +1143,7 @@ class AnkiConnect:
model = self.collection().models.byName(modelName)
if model is None:
raise Exception('model was not found: {}'.format(modelName))
ankiModel = [model]
ankiModel = [modelName]
updatedModels = 0
for model in ankiModel:
model = self.collection().models.byName(model)
@ -1244,45 +1292,6 @@ class AnkiConnect:
) or 0
@util.api()
def updateCompleteDeck(self, data):
self.startEditing()
did = self.decks().id(data['deck'])
self.decks().flush()
model_manager = self.collection().models
for _, card in data['cards'].items():
self.database().execute(
'replace into cards (id, nid, did, ord, type, queue, due, ivl, factor, reps, lapses, left, '
'mod, usn, odue, odid, flags, data) '
'values (' + '?,' * (12 + 6 - 1) + '?)',
card['id'], card['nid'], did, card['ord'], card['type'], card['queue'], card['due'],
card['ivl'], card['factor'], card['reps'], card['lapses'], card['left'],
intTime(), -1, 0, 0, 0, 0
)
note = data['notes'][str(card['nid'])]
tags = self.collection().tags.join(self.collection().tags.canonify(note['tags']))
self.database().execute(
'replace into notes(id, mid, tags, flds,'
'guid, mod, usn, flags, data, sfld, csum) values (' + '?,' * (4 + 7 - 1) + '?)',
note['id'], note['mid'], tags, joinFields(note['fields']),
guid64(), intTime(), -1, 0, 0, '', fieldChecksum(note['fields'][0])
)
model = data['models'][str(note['mid'])]
if not model_manager.get(model['id']):
model_o = model_manager.new(model['name'])
for field_name in model['fields']:
field = model_manager.newField(field_name)
model_manager.addField(model_o, field)
for template_name in model['templateNames']:
template = model_manager.newTemplate(template_name)
model_manager.addTemplate(model_o, template)
model_o['id'] = model['id']
model_manager.update(model_o)
model_manager.flush()
self.stopEditing()
@util.api()
def insertReviews(self, reviews):
if len(reviews) > 0:
@ -1359,6 +1368,12 @@ class AnkiConnect:
return self.findCards(query)
@util.api()
def guiEditNote(self, note):
Edit.open_dialog_and_show_note_with_id(note)
@util.api()
def guiSelectedNotes(self):
(creator, instance) = aqt.dialogs._dialogs['Browser']
@ -1385,91 +1400,6 @@ class AnkiConnect:
collection.models.setCurrent(model)
collection.models.update(model)
closeAfterAdding = False
if note is not None and 'options' in note:
if 'closeAfterAdding' in note['options']:
closeAfterAdding = note['options']['closeAfterAdding']
if type(closeAfterAdding) is not bool:
raise Exception('option parameter \'closeAfterAdding\' must be boolean')
addCards = None
if closeAfterAdding:
randomString = ''.join(random.choice(string.ascii_letters) for _ in range(10))
windowName = 'AddCardsAndClose' + randomString
class AddCardsAndClose(aqt.addcards.AddCards):
def __init__(self, mw):
# the window must only reset if
# * function `onModelChange` has been called prior
# * window was newly opened
self.modelHasChanged = True
super().__init__(mw)
self.addButton.setText('Add and Close')
self.addButton.setShortcut(aqt.qt.QKeySequence('Ctrl+Return'))
def _addCards(self):
super()._addCards()
# if adding was successful it must mean it was added to the history of the window
if len(self.history):
self.reject()
def onModelChange(self):
if self.isActiveWindow():
super().onModelChange()
self.modelHasChanged = True
def onReset(self, model=None, keep=False):
if self.isActiveWindow() or self.modelHasChanged:
super().onReset(model, keep)
self.modelHasChanged = False
else:
# modelchoosers text is changed by a reset hook
# therefore we need to change it back manually
self.modelChooser.models.setText(self.editor.note.model()['name'])
self.modelHasChanged = False
def _reject(self):
savedMarkClosed = aqt.dialogs.markClosed
aqt.dialogs.markClosed = lambda _: savedMarkClosed(windowName)
super()._reject()
aqt.dialogs.markClosed = savedMarkClosed
aqt.dialogs._dialogs[windowName] = [AddCardsAndClose, None]
addCards = aqt.dialogs.open(windowName, self.window())
if savedMid:
deck['mid'] = savedMid
editor = addCards.editor
ankiNote = editor.note
if 'fields' in note:
for name, value in note['fields'].items():
if name in ankiNote:
ankiNote[name] = value
self.addMediaFromNote(ankiNote, note)
editor.loadNote()
if 'tags' in note:
ankiNote.tags = note['tags']
editor.updateTags()
# if Anki does not Focus, the window will not notice that the
# fields are actually filled
aqt.dialogs.open(windowName, self.window())
addCards.setAndFocusNote(editor.note)
return ankiNote.id
elif note is not None:
collection = self.collection()
ankiNote = anki.notes.Note(collection, model)
# fill out card beforehand, so we can be sure of the note id
@ -1696,4 +1626,11 @@ class AnkiConnect:
# Entry
#
ac = AnkiConnect()
# when run inside Anki, `__name__` would be either numeric,
# or, if installed via `link.sh`, `AnkiConnectDev`
if __name__ != "plugin":
Edit.register_with_anki()
ac = AnkiConnect()
ac.initLogging()
ac.startWebServer()

387
plugin/edit.py Normal file
View File

@ -0,0 +1,387 @@
import aqt
import aqt.editor
from aqt import gui_hooks
from aqt.qt import QDialog, Qt, QKeySequence, QShortcut
from aqt.utils import disable_help_button, restoreGeom, saveGeom, tooltip
from anki.errors import NotFoundError
from anki.consts import QUEUE_TYPE_SUSPENDED
from anki.utils import ids2str
# Edit dialog. Like Edit Current, but:
# * has a Preview button to preview the cards for the note
# * has Previous/Back buttons to navigate the history of the dialog
# * has a Browse button to open the history in the Browser
# * has no bar with the Close button
#
# To register in Anki's dialog system:
# > from .edit import Edit
# > Edit.register_with_anki()
#
# To (re)open (note_id is an integer):
# > Edit.open_dialog_and_show_note_with_id(note_id)
DOMAIN_PREFIX = "foosoft.ankiconnect."
def get_note_by_note_id(note_id):
return aqt.mw.col.get_note(note_id)
def is_card_suspended(card):
return card.queue == QUEUE_TYPE_SUSPENDED
def filter_valid_note_ids(note_ids):
return aqt.mw.col.db.list(
"select id from notes where id in " + ids2str(note_ids)
)
##############################################################################
class DecentPreviewer(aqt.browser.previewer.MultiCardPreviewer):
class Adapter:
def get_current_card(self): raise NotImplementedError
def can_select_previous_card(self): raise NotImplementedError
def can_select_next_card(self): raise NotImplementedError
def select_previous_card(self): raise NotImplementedError
def select_next_card(self): raise NotImplementedError
def __init__(self, adapter: Adapter):
super().__init__(parent=None, mw=aqt.mw, on_close=lambda: None) # noqa
self.adapter = adapter
self.last_card_id = 0
def card(self):
return self.adapter.get_current_card()
def card_changed(self):
current_card_id = self.adapter.get_current_card().id
changed = self.last_card_id != current_card_id
self.last_card_id = current_card_id
return changed
# the check if we can select next/previous card is needed because
# the buttons sometimes get disabled a tad too late
# and can still be pressed by user.
# this is likely due to Anki sometimes delaying rendering of cards
# in order to avoid rendering them too fast?
def _on_prev_card(self):
if self.adapter.can_select_previous_card():
self.adapter.select_previous_card()
self.render_card()
def _on_next_card(self):
if self.adapter.can_select_next_card():
self.adapter.select_next_card()
self.render_card()
def _should_enable_prev(self):
return self.showing_answer_and_can_show_question() or \
self.adapter.can_select_previous_card()
def _should_enable_next(self):
return self.showing_question_and_can_show_answer() or \
self.adapter.can_select_next_card()
def _render_scheduled(self):
super()._render_scheduled() # noqa
self._updateButtons()
def showing_answer_and_can_show_question(self):
return self._state == "answer" and not self._show_both_sides
def showing_question_and_can_show_answer(self):
return self._state == "question"
class ReadyCardsAdapter(DecentPreviewer.Adapter):
def __init__(self, cards):
self.cards = cards
self.current = 0
def get_current_card(self):
return self.cards[self.current]
def can_select_previous_card(self):
return self.current > 0
def can_select_next_card(self):
return self.current < len(self.cards) - 1
def select_previous_card(self):
self.current -= 1
def select_next_card(self):
self.current += 1
##############################################################################
# store note ids instead of notes, as note objects don't implement __eq__ etc
class History:
number_of_notes_to_keep_in_history = 25
def __init__(self):
self.note_ids = []
def append(self, note):
if note.id in self.note_ids:
self.note_ids.remove(note.id)
self.note_ids.append(note.id)
self.note_ids = self.note_ids[-self.number_of_notes_to_keep_in_history:]
def has_note_to_left_of(self, note):
return note.id in self.note_ids and note.id != self.note_ids[0]
def has_note_to_right_of(self, note):
return note.id in self.note_ids and note.id != self.note_ids[-1]
def get_note_to_left_of(self, note):
note_id = self.note_ids[self.note_ids.index(note.id) - 1]
return get_note_by_note_id(note_id)
def get_note_to_right_of(self, note):
note_id = self.note_ids[self.note_ids.index(note.id) + 1]
return get_note_by_note_id(note_id)
def get_last_note(self): # throws IndexError if history empty
return get_note_by_note_id(self.note_ids[-1])
def remove_invalid_notes(self):
self.note_ids = filter_valid_note_ids(self.note_ids)
history = History()
# see method `find_cards` of `collection.py`
def trigger_search_for_dialog_history_notes(search_context, use_history_order):
search_context.search = " or ".join(
f"nid:{note_id}" for note_id in history.note_ids
)
if use_history_order:
search_context.order = f"""case c.nid {
" ".join(
f"when {note_id} then {n}"
for (n, note_id) in enumerate(reversed(history.note_ids))
)
} end asc"""
##############################################################################
# noinspection PyAttributeOutsideInit
class Edit(aqt.editcurrent.EditCurrent):
dialog_geometry_tag = DOMAIN_PREFIX + "edit"
dialog_registry_tag = DOMAIN_PREFIX + "Edit"
dialog_search_tag = DOMAIN_PREFIX + "edit.history"
# depending on whether the dialog already exists,
# upon a request to open the dialog via `aqt.dialogs.open()`,
# the manager will call either the constructor or the `reopen` method
def __init__(self, note):
QDialog.__init__(self, None, Qt.Window)
aqt.mw.garbage_collect_on_dialog_finish(self)
self.form = aqt.forms.editcurrent.Ui_Dialog()
self.form.setupUi(self)
self.setWindowTitle("Edit")
self.setMinimumWidth(250)
self.setMinimumHeight(400)
restoreGeom(self, self.dialog_geometry_tag)
disable_help_button(self)
self.form.buttonBox.setVisible(False) # hides the Close button bar
self.setup_editor_buttons()
history.remove_invalid_notes()
history.append(note)
self.show_note(note)
self.show()
gui_hooks.operation_did_execute.append(self.on_operation_did_execute)
gui_hooks.editor_did_load_note.append(self.editor_did_load_note)
def reopen(self, note):
history.append(note)
self.show_note(note)
def cleanup_and_close(self):
gui_hooks.editor_did_load_note.remove(self.editor_did_load_note)
gui_hooks.operation_did_execute.remove(self.on_operation_did_execute)
self.editor.cleanup()
saveGeom(self, self.dialog_geometry_tag)
aqt.dialogs.markClosed(self.dialog_registry_tag)
QDialog.reject(self)
#################################### hooks enabled during dialog lifecycle
def on_operation_did_execute(self, changes, handler):
if changes.note_text and handler is not self.editor:
self.reload_notes_after_user_action_elsewhere()
# adjusting buttons right after initializing doesn't have any effect;
# this seems to do the trick
def editor_did_load_note(self, _editor):
self.enable_disable_next_and_previous_buttons()
###################################################### load & reload notes
# setting editor.card is required for the "Cards…" button to work properly
def show_note(self, note):
self.note = note
cards = note.cards()
self.editor.set_note(note)
self.editor.card = cards[0] if cards else None
if any(is_card_suspended(card) for card in cards):
tooltip("Some of the cards associated with this note "
"have been suspended", parent=self)
def reload_notes_after_user_action_elsewhere(self):
history.remove_invalid_notes()
try:
self.note.load() # this also updates the fields
except NotFoundError:
try:
self.note = history.get_last_note()
except IndexError:
self.cleanup_and_close()
return
self.show_note(self.note)
################################################################## actions
# search two times, one is to select the current note or its cards,
# and another to show the whole history, while keeping the above selection
# set sort column to our search tag, which:
# * prevents the column sort indicator from being shown
# * serves as a hint for us to show notes or cards in history order
# (user can then click on any of the column names
# to show history cards in the order of their choosing)
def show_browser(self, *_):
def search_input_select_all(hook_browser, *_):
hook_browser.form.searchEdit.lineEdit().selectAll()
gui_hooks.browser_did_change_row.remove(search_input_select_all)
gui_hooks.browser_did_change_row.append(search_input_select_all)
browser = aqt.dialogs.open("Browser", aqt.mw)
browser.table._state.sort_column = self.dialog_search_tag # noqa
browser.table._set_sort_indicator() # noqa
browser.search_for(f"nid:{self.note.id}")
browser.table.select_all()
browser.search_for(self.dialog_search_tag)
def show_preview(self, *_):
if cards := self.note.cards():
previewer = DecentPreviewer(ReadyCardsAdapter(cards))
previewer.open()
return previewer
else:
tooltip("No cards found", parent=self)
return None
def show_previous(self, *_):
if history.has_note_to_left_of(self.note):
self.show_note(history.get_note_to_left_of(self.note))
def show_next(self, *_):
if history.has_note_to_right_of(self.note):
self.show_note(history.get_note_to_right_of(self.note))
################################################## button and hotkey setup
def setup_editor_buttons(self):
gui_hooks.editor_did_init.append(self.add_preview_button)
gui_hooks.editor_did_init_buttons.append(self.add_right_hand_side_buttons)
self.editor = aqt.editor.Editor(aqt.mw, self.form.fieldsArea, self)
gui_hooks.editor_did_init_buttons.remove(self.add_right_hand_side_buttons)
gui_hooks.editor_did_init.remove(self.add_preview_button)
# taken from `setupEditor` of browser.py
# PreviewButton calls pycmd `preview`, which is hardcoded.
# copying _links is needed so that opening Anki's browser does not
# screw them up as they are apparently shared between instances?!
def add_preview_button(self, editor):
QShortcut(QKeySequence("Ctrl+Shift+P"), self, self.show_preview)
editor._links = editor._links.copy()
editor._links["preview"] = self.show_preview
editor.web.eval("""
$editorToolbar.then(({notetypeButtons}) =>
notetypeButtons.appendButton(
{component: editorToolbar.PreviewButton, id: 'preview'}
)
);
""")
def add_right_hand_side_buttons(self, buttons, editor):
def add(cmd, function, label, tip, keys):
button_html = editor.addButton(
icon=None,
cmd=DOMAIN_PREFIX + cmd,
id=DOMAIN_PREFIX + cmd,
func=function,
label=f"&nbsp;&nbsp;{label}&nbsp;&nbsp;",
tip=f"{tip} ({keys})",
keys=keys,
)
# adding class `btn` properly styles buttons when disabled
button_html = button_html.replace('class="', 'class="btn ')
buttons.append(button_html)
add("browse", self.show_browser, "Browse", "Browse", "Ctrl+F")
add("previous", self.show_previous, "&lt;", "Previous", "Alt+Left")
add("next", self.show_next, "&gt;", "Next", "Alt+Right")
def enable_disable_next_and_previous_buttons(self):
def to_js(boolean):
return "true" if boolean else "false"
disable_previous = to_js(not(history.has_note_to_left_of(self.note)))
disable_next = to_js(not(history.has_note_to_right_of(self.note)))
self.editor.web.eval(f"""
$editorToolbar.then(({{ toolbar }}) => {{
setTimeout(function() {{
document.getElementById("{DOMAIN_PREFIX}previous")
.disabled = {disable_previous};
document.getElementById("{DOMAIN_PREFIX}next")
.disabled = {disable_next};
}}, 1);
}});
""")
##########################################################################
@classmethod
def browser_will_search(cls, search_context):
if search_context.search == cls.dialog_search_tag:
trigger_search_for_dialog_history_notes(
search_context=search_context,
use_history_order=cls.dialog_search_tag ==
search_context.browser.table._state.sort_column # noqa
)
@classmethod
def register_with_anki(cls):
if cls.dialog_registry_tag not in aqt.dialogs._dialogs: # noqa
aqt.dialogs.register_dialog(cls.dialog_registry_tag, cls)
gui_hooks.browser_will_search.append(cls.browser_will_search)
@classmethod
def open_dialog_and_show_note_with_id(cls, note_id): # raises NotFoundError
note = get_note_by_note_id(note_id)
return aqt.dialogs.open(cls.dialog_registry_tag, note)

View File

@ -43,10 +43,9 @@ def download(url):
def api(*versions):
def decorator(func):
method = lambda *args, **kwargs: func(*args, **kwargs)
setattr(method, 'versions', versions)
setattr(method, 'api', True)
return method
setattr(func, 'versions', versions)
setattr(func, 'api', True)
return func
return decorator
@ -65,22 +64,22 @@ def cardAnswer(card):
return card.answer()
def setting(key):
defaults = {
'apiKey': None,
'apiLogPath': None,
'apiPollInterval': 25,
'apiVersion': 6,
'webBacklog': 5,
'webBindAddress': os.getenv('ANKICONNECT_BIND_ADDRESS', '127.0.0.1'),
'webBindPort': 8765,
'webCorsOrigin': os.getenv('ANKICONNECT_CORS_ORIGIN', None),
'webCorsOriginList': ['http://localhost'],
'ignoreOriginList': [],
'webTimeout': 10000,
}
DEFAULT_CONFIG = {
'apiKey': None,
'apiLogPath': None,
'apiPollInterval': 25,
'apiVersion': 6,
'webBacklog': 5,
'webBindAddress': os.getenv('ANKICONNECT_BIND_ADDRESS', '127.0.0.1'),
'webBindPort': 8765,
'webCorsOrigin': os.getenv('ANKICONNECT_CORS_ORIGIN', None),
'webCorsOriginList': ['http://localhost'],
'ignoreOriginList': [],
'webTimeout': 10000,
}
def setting(key):
try:
return aqt.mw.addonManager.getConfig(__name__).get(key, defaults[key])
return aqt.mw.addonManager.getConfig(__name__).get(key, DEFAULT_CONFIG[key])
except:
raise Exception('setting {} not found'.format(key))

View File

@ -24,7 +24,8 @@ from . import util
#
class WebRequest:
def __init__(self, headers, body):
def __init__(self, method, headers, body):
self.method = method
self.headers = headers
self.body = body
@ -95,8 +96,15 @@ class WebClient:
if len(parts) == 1:
return None, 0
lines = parts[0].split('\r\n'.encode('utf-8'))
method = None
if len(lines) > 0:
request_line_parts = lines[0].split(' '.encode('utf-8'))
method = request_line_parts[0].upper() if len(request_line_parts) > 0 else None
headers = {}
for line in parts[0].split('\r\n'.encode('utf-8')):
for line in lines[1:]:
pair = line.split(': '.encode('utf-8'))
headers[pair[0].lower()] = pair[1] if len(pair) > 1 else None
@ -108,8 +116,7 @@ class WebClient:
return None, 0
body = data[headerLength : totalLength]
return WebRequest(headers, body), totalLength
return WebRequest(method, headers, body), totalLength
#
# WebServer
@ -154,7 +161,54 @@ class WebServer:
def handlerWrapper(self, req):
allowed, corsOrigin = self.allowOrigin(req)
if req.method == b'OPTIONS':
body = ''.encode('utf-8')
headers = self.buildHeaders(corsOrigin, body)
if b'access-control-request-private-network' in req.headers and (
req.headers[b'access-control-request-private-network'] == b'true'):
# include this header so that if a public origin is included in the whitelist,
# then browsers won't fail requests due to the private network access check
headers.append(['Access-Control-Allow-Private-Network', 'true'])
return self.buildResponse(headers, body)
paramsError = False
try:
params = json.loads(req.body.decode('utf-8'))
except ValueError:
body = json.dumps(None).encode('utf-8')
paramsError = True
if allowed or not paramsError and params.get('action', '') == 'requestPermission':
if len(req.body) == 0:
body = 'AnkiConnect v.{}'.format(util.setting('apiVersion')).encode('utf-8')
else:
if params.get('action', '') == 'requestPermission':
params['params'] = params.get('params', {})
params['params']['allowed'] = allowed
params['params']['origin'] = b'origin' in req.headers and req.headers[b'origin'].decode() or ''
if not allowed :
corsOrigin = params['params']['origin']
body = json.dumps(self.handler(params)).encode('utf-8')
headers = self.buildHeaders(corsOrigin, body)
else :
headers = [
['HTTP/1.1 403 Forbidden', None],
['Access-Control-Allow-Origin', corsOrigin],
['Access-Control-Allow-Headers', '*']
]
body = ''.encode('utf-8')
return self.buildResponse(headers, body)
def allowOrigin(self, req):
# handle multiple cors origins by checking the 'origin'-header against the allowed origin list from the config
webCorsOriginList = util.setting('webCorsOriginList')
@ -183,43 +237,22 @@ class WebServer:
allowed = True
else:
allowed = True
return allowed, corsOrigin
def buildHeaders(self, corsOrigin, body):
return [
['HTTP/1.1 200 OK', None],
['Content-Type', 'text/json'],
['Access-Control-Allow-Origin', corsOrigin],
['Access-Control-Allow-Headers', '*'],
['Content-Length', str(len(body))]
]
def buildResponse(self, headers, body):
resp = bytes()
paramsError = False
try:
params = json.loads(req.body.decode('utf-8'))
except ValueError:
body = json.dumps(None).encode('utf-8')
paramsError = True
if allowed or not paramsError and params.get('action', '') == 'requestPermission':
if len(req.body) == 0:
body = 'AnkiConnect v.{}'.format(util.setting('apiVersion')).encode('utf-8')
else:
if params.get('action', '') == 'requestPermission':
params['params'] = params.get('params', {})
params['params']['allowed'] = allowed
params['params']['origin'] = b'origin' in req.headers and req.headers[b'origin'].decode() or ''
if not allowed :
corsOrigin = params['params']['origin']
body = json.dumps(self.handler(params)).encode('utf-8')
headers = [
['HTTP/1.1 200 OK', None],
['Content-Type', 'text/json'],
['Access-Control-Allow-Origin', corsOrigin],
['Access-Control-Allow-Headers', '*'],
['Content-Length', str(len(body))]
]
else :
headers = [
['HTTP/1.1 403 Forbidden', None],
['Access-Control-Allow-Origin', corsOrigin],
['Access-Control-Allow-Headers', '*']
]
body = ''.encode('utf-8')
for key, value in headers:
if value is None:
resp += '{}\r\n'.format(key).encode('utf-8')
@ -228,7 +261,6 @@ class WebServer:
resp += '\r\n'.encode('utf-8')
resp += body
return resp

View File

@ -1,2 +0,0 @@
#!/usr/bin/bash
python -m unittest discover -v tests

283
tests/conftest.py Normal file
View File

@ -0,0 +1,283 @@
import concurrent.futures
import time
from contextlib import contextmanager
from dataclasses import dataclass
import aqt.operations.note
import pytest
from PyQt5 import QtTest
from _pytest.monkeypatch import MonkeyPatch # noqa
from pytest_anki._launch import anki_running, temporary_user # noqa
from waitress import wasyncore
from plugin import AnkiConnect
from plugin.edit import Edit
from plugin.util import DEFAULT_CONFIG
ac = AnkiConnect()
# wait for n seconds, while events are being processed
def wait(seconds):
milliseconds = int(seconds * 1000)
QtTest.QTest.qWait(milliseconds) # noqa
def wait_until(booleanish_function, at_most_seconds=30):
deadline = time.time() + at_most_seconds
while time.time() < deadline:
if booleanish_function():
return
wait(0.01)
raise Exception(f"Function {booleanish_function} never once returned "
f"a positive value in {at_most_seconds} seconds")
def delete_model(model_name):
model = ac.collection().models.byName(model_name)
ac.collection().models.remove(model["id"])
def close_all_dialogs_and_wait_for_them_to_run_closing_callbacks():
aqt.dialogs.closeAll(onsuccess=lambda: None)
wait_until(aqt.dialogs.allClosed)
def get_dialog_instance(name):
return aqt.dialogs._dialogs[name][1] # noqa
# waitress is a WSGI server that Anki starts to serve css etc to its web views.
# it seems to have a race condition issue;
# the main loop thread is trying to `select.select` the sockets
# which a worker thread is closing because of a dead connection.
# this is especially pronounced in tests,
# as we open and close windows rapidly--and so web views and their connections.
# this small patch makes waitress skip actually closing the sockets
# (unless the server is shutting down--if it is, loop exceptions are ignored).
# while the unclosed sockets might accumulate,
# this should not pose an issue in test environment.
# see https://github.com/Pylons/waitress/issues/374
@contextmanager
def waitress_patched_to_prevent_it_from_dying():
original_close = wasyncore.dispatcher.close
sockets_that_must_not_be_garbage_collected = [] # lists are thread-safe
def close(self):
if not aqt.mw.mediaServer.is_shutdown:
sockets_that_must_not_be_garbage_collected.append(self.socket)
self.socket = None
original_close(self)
with MonkeyPatch().context() as monkey:
monkey.setattr(wasyncore.dispatcher, "close", close)
yield
@contextmanager
def empty_anki_session_started():
with waitress_patched_to_prevent_it_from_dying():
with anki_running(
qtbot=None, # noqa
enable_web_debugging=False,
profile_name="test_user",
) as session:
yield session
@contextmanager
def profile_created_and_loaded(session):
with temporary_user(session.base, "test_user", "en_US"):
with session.profile_loaded():
aqt.mw.pm.profile["numBackups"] = 0 # don't try to do backups
yield session
@contextmanager
def anki_connect_config_loaded(session, web_bind_port):
with session.addon_config_created(
package_name="plugin",
default_config=DEFAULT_CONFIG,
user_config={**DEFAULT_CONFIG, "webBindPort": web_bind_port}
):
yield
@contextmanager
def current_decks_and_models_etc_preserved():
deck_names_before = ac.deckNames()
model_names_before = ac.modelNames()
try:
yield
finally:
deck_names_after = ac.deckNames()
model_names_after = ac.modelNames()
deck_names_to_delete = {*deck_names_after} - {*deck_names_before}
model_names_to_delete = {*model_names_after} - {*model_names_before}
ac.deleteDecks(decks=deck_names_to_delete, cardsToo=True)
for model_name in model_names_to_delete:
delete_model(model_name)
ac.guiDeckBrowser()
@dataclass
class Setup:
deck_id: int
note1_id: int
note2_id: int
note1_card_ids: "list[int]"
note2_card_ids: "list[int]"
card_ids: "list[int]"
def set_up_test_deck_and_test_model_and_two_notes():
ac.createModel(
modelName="test_model",
inOrderFields=["field1", "field2"],
cardTemplates=[
{"Front": "{{field1}}", "Back": "{{field2}}"},
{"Front": "{{field2}}", "Back": "{{field1}}"}
],
css="* {}",
)
deck_id = ac.createDeck("test_deck")
note1_id = ac.addNote(dict(
deckName="test_deck",
modelName="test_model",
fields={"field1": "note1 field1", "field2": "note1 field2"},
tags={"tag1"},
))
note2_id = ac.addNote(dict(
deckName="test_deck",
modelName="test_model",
fields={"field1": "note2 field1", "field2": "note2 field2"},
tags={"tag2"},
))
note1_card_ids = ac.findCards(query=f"nid:{note1_id}")
note2_card_ids = ac.findCards(query=f"nid:{note2_id}")
card_ids = ac.findCards(query="deck:test_deck")
return Setup(
deck_id=deck_id,
note1_id=note1_id,
note2_id=note2_id,
note1_card_ids=note1_card_ids,
note2_card_ids=note2_card_ids,
card_ids=card_ids,
)
#############################################################################
def pytest_addoption(parser):
parser.addoption("--tear-down-profile-after-each-test",
action="store_true",
default=True)
parser.addoption("--no-tear-down-profile-after-each-test", "-T",
action="store_false",
dest="tear_down_profile_after_each_test")
def pytest_report_header(config):
if config.option.forked:
return "test isolation: perfect; each test is run in a separate process"
if config.option.tear_down_profile_after_each_test:
return "test isolation: good; user profile is torn down after each test"
else:
return "test isolation: poor; only newly created decks and models " \
"are cleaned up between tests"
@pytest.fixture(autouse=True)
def run_background_tasks_on_main_thread(request, monkeypatch): # noqa
"""
Makes background operations such as card deletion execute on main thread
and execute the callback immediately
"""
def run_in_background(task, on_done=None, kwargs=None):
future = concurrent.futures.Future()
try:
future.set_result(task(**kwargs if kwargs is not None else {}))
except BaseException as e:
future.set_exception(e)
if on_done is not None:
on_done(future)
monkeypatch.setattr(aqt.mw.taskman, "run_in_background", run_in_background)
# don't use run_background_tasks_on_main_thread for tests that don't run Anki
def pytest_generate_tests(metafunc):
if (
run_background_tasks_on_main_thread.__name__ in metafunc.fixturenames
and session_scope_empty_session.__name__ not in metafunc.fixturenames
):
metafunc.fixturenames.remove(run_background_tasks_on_main_thread.__name__)
@pytest.fixture(scope="session")
def session_scope_empty_session():
with empty_anki_session_started() as session:
yield session
@pytest.fixture(scope="session")
def session_scope_session_with_profile_loaded(session_scope_empty_session):
with profile_created_and_loaded(session_scope_empty_session):
yield session_scope_empty_session
@pytest.fixture
def session_with_profile_loaded(session_scope_empty_session, request):
"""
Like anki_session fixture from pytest-anki, but:
* Default profile is loaded
* It's relying on session-wide app instance so that
it can be used without forking every test;
this can be useful to speed up tests and also
to examine Anki's stdout/stderr, which is not visible with forking.
* If command line option --no-tear-down-profile-after-each-test is passed,
only the newly created decks and models are deleted.
Otherwise, the profile is completely torn down after each test.
Tearing down the profile is significantly slower.
"""
if request.config.option.tear_down_profile_after_each_test:
with profile_created_and_loaded(session_scope_empty_session):
yield session_scope_empty_session
else:
session = request.getfixturevalue(
session_scope_session_with_profile_loaded.__name__
)
with current_decks_and_models_etc_preserved():
yield session
@pytest.fixture
def setup(session_with_profile_loaded):
"""
Like session_with_profile_loaded, but also:
* Added are:
* A deck `test_deck`
* A model `test_model` with fields `filed1` and `field2`
and two cards per note
* Two notes with two valid cards each using the above deck and model
* Edit dialog is registered with dialog manager
* Any dialogs, if open, are safely closed on exit
"""
Edit.register_with_anki()
yield set_up_test_deck_and_test_model_and_two_notes()
close_all_dialogs_and_wait_for_them_to_run_closing_callbacks()

View File

@ -1,95 +1,78 @@
#!/usr/bin/env python
import pytest
from anki.errors import NotFoundError # noqa
import unittest
import util
from conftest import ac
class TestCards(unittest.TestCase):
def setUp(self):
util.invoke('createDeck', deck='test')
note = {
'deckName': 'test',
'modelName': 'Basic',
'fields': {'Front': 'front1', 'Back': 'back1'},
'tags': ['tag1'],
'options': {
'allowDuplicate': True
}
}
self.noteId = util.invoke('addNote', note=note)
def test_findCards(setup):
card_ids = ac.findCards(query="deck:test_deck")
assert len(card_ids) == 4
def tearDown(self):
util.invoke('deleteDecks', decks=['test'], cardsToo=True)
class TestEaseFactors:
def test_setEaseFactors(self, setup):
result = ac.setEaseFactors(cards=setup.card_ids, easeFactors=[4200] * 4)
assert result == [True] * 4
def test_setEaseFactors_with_invalid_card_id(self, setup):
result = ac.setEaseFactors(cards=[123], easeFactors=[4200])
assert result == [False]
def test_getEaseFactors(self, setup):
ac.setEaseFactors(cards=setup.card_ids, easeFactors=[4200] * 4)
result = ac.getEaseFactors(cards=setup.card_ids)
assert result == [4200] * 4
def test_getEaseFactors_with_invalid_card_id(self, setup):
assert ac.getEaseFactors(cards=[123]) == [None]
def runTest(self):
incorrectId = 1234
class TestSuspending:
def test_suspend(self, setup):
assert ac.suspend(cards=setup.card_ids) is True
# findCards
cardIds = util.invoke('findCards', query='deck:test')
self.assertEqual(len(cardIds), 1)
def test_suspend_fails_with_incorrect_id(self, setup):
with pytest.raises(NotFoundError):
assert ac.suspend(cards=[123])
# setEaseFactors
EASE_TO_TRY = 4200
easeFactors = [EASE_TO_TRY for card in cardIds]
couldGetEaseFactors = util.invoke('setEaseFactors', cards=cardIds, easeFactors=easeFactors)
self.assertEqual([True for card in cardIds], couldGetEaseFactors)
couldGetEaseFactors = util.invoke('setEaseFactors', cards=[incorrectId], easeFactors=[EASE_TO_TRY])
self.assertEqual([False], couldGetEaseFactors)
def test_areSuspended_returns_False_for_regular_cards(self, setup):
result = ac.areSuspended(cards=setup.card_ids)
assert result == [False] * 4
# getEaseFactors
easeFactorsFound = util.invoke('getEaseFactors', cards=cardIds)
self.assertEqual(easeFactors, easeFactorsFound)
easeFactorsFound = util.invoke('getEaseFactors', cards=[incorrectId])
self.assertEqual([None], easeFactorsFound)
def test_areSuspended_returns_True_for_suspended_cards(self, setup):
ac.suspend(setup.card_ids)
result = ac.areSuspended(cards=setup.card_ids)
assert result == [True] * 4
# suspend
util.invoke('suspend', cards=cardIds)
self.assertRaises(Exception, lambda: util.invoke('suspend', cards=[incorrectId]))
# areSuspended (part 1)
suspendedStates = util.invoke('areSuspended', cards=cardIds)
self.assertEqual(len(cardIds), len(suspendedStates))
self.assertNotIn(False, suspendedStates)
self.assertEqual([None], util.invoke('areSuspended', cards=[incorrectId]))
def test_areDue_returns_True_for_new_cards(setup):
result = ac.areDue(cards=setup.card_ids)
assert result == [True] * 4
# unsuspend
util.invoke('unsuspend', cards=cardIds)
# areSuspended (part 2)
suspendedStates = util.invoke('areSuspended', cards=cardIds)
self.assertEqual(len(cardIds), len(suspendedStates))
self.assertNotIn(True, suspendedStates)
def test_getIntervals(setup):
ac.getIntervals(cards=setup.card_ids, complete=False)
ac.getIntervals(cards=setup.card_ids, complete=True)
# areDue
dueStates = util.invoke('areDue', cards=cardIds)
self.assertEqual(len(cardIds), len(dueStates))
self.assertNotIn(False, dueStates)
# getIntervals
util.invoke('getIntervals', cards=cardIds, complete=True)
util.invoke('getIntervals', cards=cardIds, complete=False)
def test_cardsToNotes(setup):
result = ac.cardsToNotes(cards=setup.card_ids)
assert {*result} == {setup.note1_id, setup.note2_id}
# cardsToNotes
noteIds = util.invoke('cardsToNotes', cards=cardIds)
self.assertEqual(len(noteIds), len(cardIds))
self.assertIn(self.noteId, noteIds)
# cardsInfo
cardsInfo = util.invoke('cardsInfo', cards=cardIds)
self.assertEqual(len(cardsInfo), len(cardIds))
for i, cardInfo in enumerate(cardsInfo):
self.assertEqual(cardInfo['cardId'], cardIds[i])
cardsInfo = util.invoke('cardsInfo', cards=[incorrectId])
self.assertEqual(len(cardsInfo), 1)
self.assertDictEqual(cardsInfo[0], dict())
class TestCardInfo:
def test_with_valid_ids(self, setup):
result = ac.cardsInfo(cards=setup.card_ids)
assert [item["cardId"] for item in result] == setup.card_ids
# forgetCards
util.invoke('forgetCards', cards=cardIds)
def test_with_incorrect_id(self, setup):
result = ac.cardsInfo(cards=[123])
assert result == [{}]
# relearnCards
util.invoke('relearnCards', cards=cardIds)
if __name__ == '__main__':
unittest.main()
def test_forgetCards(setup):
ac.forgetCards(cards=setup.card_ids)
def test_relearnCards(setup):
ac.relearnCards(cards=setup.card_ids)

View File

@ -1,23 +0,0 @@
#!/usr/bin/env python
import unittest
import util
class TestNotes(unittest.TestCase):
def setUp(self):
util.invoke('createDeck', deck='test')
def tearDown(self):
util.invoke('deleteDecks', decks=['test'], cardsToo=True)
def test_bug164(self):
note = {'deckName': 'test', 'modelName': 'Basic', 'fields': {'Front': ' Whitespace\n', 'Back': ''}, 'options': { 'allowDuplicate': False, 'duplicateScope': 'deck'}}
util.invoke('addNote', note=note)
self.assertRaises(Exception, lambda: util.invoke('addNote', note=note))
if __name__ == '__main__':
unittest.main()

View File

@ -1,98 +1,73 @@
#!/usr/bin/env python
import pytest
import unittest
import util
from conftest import ac
class TestDecks(unittest.TestCase):
def runTest(self):
# deckNames (part 1)
deckNames = util.invoke('deckNames')
self.assertIn('Default', deckNames)
# deckNamesAndIds
result = util.invoke('deckNamesAndIds')
self.assertIn('Default', result)
self.assertEqual(result['Default'], 1)
# createDeck
util.invoke('createDeck', deck='test1')
# deckNames (part 2)
deckNames = util.invoke('deckNames')
self.assertIn('test1', deckNames)
# changeDeck
note = {'deckName': 'test1', 'modelName': 'Basic', 'fields': {'Front': 'front', 'Back': 'back'}, 'tags': ['tag']}
noteId = util.invoke('addNote', note=note)
cardIds = util.invoke('findCards', query='deck:test1')
util.invoke('changeDeck', cards=cardIds, deck='test2')
# deckNames (part 3)
deckNames = util.invoke('deckNames')
self.assertIn('test2', deckNames)
# deleteDecks
util.invoke('deleteDecks', decks=['test1', 'test2'], cardsToo=True)
# deckNames (part 4)
deckNames = util.invoke('deckNames')
self.assertNotIn('test1', deckNames)
self.assertNotIn('test2', deckNames)
# getDeckConfig
deckConfig = util.invoke('getDeckConfig', deck='Default')
self.assertEqual('Default', deckConfig['name'])
# saveDeckConfig
deckConfig = util.invoke('saveDeckConfig', config=deckConfig)
# setDeckConfigId
setDeckConfigId = util.invoke('setDeckConfigId', decks=['Default'], configId=1)
self.assertTrue(setDeckConfigId)
# cloneDeckConfigId (part 1)
deckConfigId = util.invoke('cloneDeckConfigId', cloneFrom=1, name='test')
self.assertTrue(deckConfigId)
# removeDeckConfigId (part 1)
removedDeckConfigId = util.invoke('removeDeckConfigId', configId=deckConfigId)
self.assertTrue(removedDeckConfigId)
# removeDeckConfigId (part 2)
removedDeckConfigId = util.invoke('removeDeckConfigId', configId=deckConfigId)
self.assertFalse(removedDeckConfigId)
# cloneDeckConfigId (part 2)
deckConfigId = util.invoke('cloneDeckConfigId', cloneFrom=deckConfigId, name='test')
self.assertFalse(deckConfigId)
# updateCompleteDeck
util.invoke('updateCompleteDeck', data={
'deck': 'test3',
'cards': {
'12': {
'id': 12, 'nid': 23, 'ord': 0, 'type': 0, 'queue': 0,
'due': 1186031, 'factor': 0, 'ivl': 0, 'reps': 0, 'lapses': 0, 'left': 0
}
},
'notes': {
'23': {
'id': 23, 'mid': 34, 'fields': ['frontValue', 'backValue'], 'tags': ['aTag']
}
},
'models': {
'34': {
'id': 34, 'fields': ['Front', 'Back'], 'templateNames': ['Card 1'], 'name': 'anotherModel',
}
}
})
deckNames = util.invoke('deckNames')
self.assertIn('test3', deckNames)
cardIDs = util.invoke('findCards', query='deck:test3')
self.assertEqual(len(cardIDs), 1)
self.assertEqual(cardIDs[0], 12)
def test_deckNames(session_with_profile_loaded):
result = ac.deckNames()
assert result == ["Default"]
if __name__ == '__main__':
unittest.main()
def test_deckNamesAndIds(session_with_profile_loaded):
result = ac.deckNamesAndIds()
assert result == {"Default": 1}
def test_createDeck(session_with_profile_loaded):
ac.createDeck("foo")
assert {*ac.deckNames()} == {"Default", "foo"}
def test_changeDeck(setup):
ac.changeDeck(cards=setup.card_ids, deck="bar")
assert "bar" in ac.deckNames()
def test_deleteDeck(setup):
before = ac.deckNames()
ac.deleteDecks(decks=["test_deck"], cardsToo=True)
after = ac.deckNames()
assert {*before} - {*after} == {"test_deck"}
@pytest.mark.skipif(
condition=ac._anki21_version < 28,
reason=f"Not applicable to Anki < 2.1.28"
)
def test_deleteDeck_must_be_called_with_cardsToo_set_to_True_on_later_api(setup):
with pytest.raises(Exception):
ac.deleteDecks(decks=["test_deck"])
with pytest.raises(Exception):
ac.deleteDecks(decks=["test_deck"], cardsToo=False)
def test_getDeckConfig(session_with_profile_loaded):
result = ac.getDeckConfig(deck="Default")
assert result["name"] == "Default"
def test_saveDeckConfig(session_with_profile_loaded):
config = ac.getDeckConfig(deck="Default")
result = ac.saveDeckConfig(config=config)
assert result is True
def test_setDeckConfigId(session_with_profile_loaded):
result = ac.setDeckConfigId(decks=["Default"], configId=1)
assert result is True
def test_cloneDeckConfigId(session_with_profile_loaded):
result = ac.cloneDeckConfigId(cloneFrom=1, name="test")
assert isinstance(result, int)
def test_removedDeckConfigId(session_with_profile_loaded):
new_config_id = ac.cloneDeckConfigId(cloneFrom=1, name="test")
assert ac.removeDeckConfigId(configId=new_config_id) is True
def test_removedDeckConfigId_fails_with_invalid_id(session_with_profile_loaded):
new_config_id = ac.cloneDeckConfigId(cloneFrom=1, name="test")
assert ac.removeDeckConfigId(configId=new_config_id) is True
assert ac.removeDeckConfigId(configId=new_config_id) is False

174
tests/test_edit.py Normal file
View File

@ -0,0 +1,174 @@
import aqt.operations.note
import pytest
from conftest import get_dialog_instance
from plugin.edit import Edit, DecentPreviewer, history
def test_edit_dialog_opens(setup):
Edit.open_dialog_and_show_note_with_id(setup.note1_id)
def test_edit_dialog_opens_only_once(setup):
dialog1 = Edit.open_dialog_and_show_note_with_id(setup.note1_id)
dialog2 = Edit.open_dialog_and_show_note_with_id(setup.note1_id)
assert dialog1 is dialog2
def test_edit_dialog_fails_to_open_with_invalid_note(setup):
with pytest.raises(Exception):
Edit.open_dialog_and_show_note_with_id(123)
class TestBrowser:
@staticmethod
def get_selected_card_ids():
return get_dialog_instance("Browser").table.get_selected_card_ids()
def test_dialog_opens(self, setup):
dialog = Edit.open_dialog_and_show_note_with_id(setup.note1_id)
dialog.show_browser()
def test_selects_cards_of_last_note(self, setup):
Edit.open_dialog_and_show_note_with_id(setup.note1_id)
Edit.open_dialog_and_show_note_with_id(setup.note2_id).show_browser()
assert {*self.get_selected_card_ids()} == {*setup.note2_card_ids}
def test_selects_cards_of_note_before_last_after_previous_button_pressed(self, setup):
Edit.open_dialog_and_show_note_with_id(setup.note1_id)
dialog = Edit.open_dialog_and_show_note_with_id(setup.note2_id)
def verify_that_the_table_shows_note2_cards_then_note1_cards():
get_dialog_instance("Browser").table.select_all()
assert {*self.get_selected_card_ids()[:2]} == {*setup.note2_card_ids}
assert {*self.get_selected_card_ids()[2:]} == {*setup.note1_card_ids}
dialog.show_previous()
dialog.show_browser()
assert {*self.get_selected_card_ids()} == {*setup.note1_card_ids}
verify_that_the_table_shows_note2_cards_then_note1_cards()
dialog.show_next()
dialog.show_browser()
assert {*self.get_selected_card_ids()} == {*setup.note2_card_ids}
verify_that_the_table_shows_note2_cards_then_note1_cards()
class TestPreviewDialog:
def test_opens(self, setup):
edit_dialog = Edit.open_dialog_and_show_note_with_id(setup.note1_id)
edit_dialog.show_preview()
@pytest.fixture
def dialog(self, setup):
edit_dialog = Edit.open_dialog_and_show_note_with_id(setup.note1_id)
preview_dialog: DecentPreviewer = edit_dialog.show_preview()
def press_next_button(times=0):
for _ in range(times):
preview_dialog._last_render = 0 # render without delay
preview_dialog._on_next()
preview_dialog.press_next_button = press_next_button
yield preview_dialog
@pytest.mark.parametrize(
"next_button_presses, current_card, "
"showing_question_only, previous_enabled, next_enabled",
[
pytest.param(0, 0, True, False, True,
id="next button pressed 0 times; first card, question"),
pytest.param(1, 0, False, True, True,
id="next button pressed 1 time; first card, answer"),
pytest.param(2, 1, True, True, True,
id="next button pressed 2 times; second card, question"),
pytest.param(3, 1, False, True, False,
id="next button pressed 3 times; second card, answer"),
pytest.param(4, 1, False, True, False,
id="next button pressed 4 times; second card still, answer"),
]
)
def test_navigation(self, dialog, next_button_presses, current_card,
showing_question_only, previous_enabled, next_enabled):
dialog.press_next_button(times=next_button_presses)
assert dialog.adapter.current == current_card
assert dialog.showing_question_and_can_show_answer() is showing_question_only
assert dialog._should_enable_prev() is previous_enabled
assert dialog._should_enable_next() is next_enabled
class TestHistory:
@pytest.fixture(autouse=True)
def cleanup(self):
history.note_ids = []
def test_single_note(self, setup):
assert history.note_ids == []
Edit.open_dialog_and_show_note_with_id(setup.note1_id)
assert history.note_ids == [setup.note1_id]
def test_two_notes(self, setup):
Edit.open_dialog_and_show_note_with_id(setup.note1_id)
Edit.open_dialog_and_show_note_with_id(setup.note2_id)
assert history.note_ids == [setup.note1_id, setup.note2_id]
def test_old_note_reopened(self, setup):
Edit.open_dialog_and_show_note_with_id(setup.note1_id)
Edit.open_dialog_and_show_note_with_id(setup.note2_id)
Edit.open_dialog_and_show_note_with_id(setup.note1_id)
assert history.note_ids == [setup.note2_id, setup.note1_id]
def test_navigation(self, setup):
dialog = Edit.open_dialog_and_show_note_with_id(setup.note1_id)
Edit.open_dialog_and_show_note_with_id(setup.note2_id)
dialog.show_previous()
assert dialog.note.id == setup.note1_id
dialog.show_previous()
assert dialog.note.id == setup.note1_id
dialog.show_next()
assert dialog.note.id == setup.note2_id
dialog.show_next()
assert dialog.note.id == setup.note2_id
class TestNoteDeletionElsewhere:
@pytest.fixture
def delete_note(self, run_background_tasks_on_main_thread):
"""
Yields a function that accepts a single note id and deletes the note,
running the required hooks in sync
"""
return (
lambda note_id: aqt.operations.note
.remove_notes(parent=None, note_ids=[note_id]) # noqa
.run_in_background()
)
@staticmethod
def edit_dialog_is_open():
return aqt.dialogs._dialogs[Edit.dialog_registry_tag][1] is not None # noqa
@pytest.fixture
def dialog(self, setup):
Edit.open_dialog_and_show_note_with_id(setup.note1_id)
yield Edit.open_dialog_and_show_note_with_id(setup.note2_id)
def test_one_of_the_history_notes_is_deleted_and_dialog_stays(self,
setup, dialog, delete_note):
assert dialog.note.id == setup.note2_id
delete_note(setup.note2_id)
assert self.edit_dialog_is_open()
assert dialog.note.id == setup.note1_id
def test_all_of_the_history_notes_are_deleted_and_dialog_closes(self,
setup, dialog, delete_note):
delete_note(setup.note1_id)
delete_note(setup.note2_id)
assert not self.edit_dialog_is_open()

View File

@ -1,76 +1,118 @@
#!/usr/bin/env python
import pytest
import unittest
import util
from conftest import ac, wait_until, \
close_all_dialogs_and_wait_for_them_to_run_closing_callbacks, \
get_dialog_instance
class TestGui(unittest.TestCase):
def runTest(self):
# guiBrowse
util.invoke('guiBrowse', query='deck:Default')
# guiSelectedNotes
util.invoke('guiSelectedNotes')
# guiAddCards
util.invoke('guiAddCards')
# guiAddCards with preset
util.invoke('createDeck', deck='test')
note = {
'deckName': 'test',
'modelName': 'Basic',
'fields': {
'Front': 'front1',
'Back': 'back1'
},
'tags': ['tag1'],
}
util.invoke('guiAddCards', note=note)
# guiAddCards with preset and closeAfterAdding
util.invoke('guiAddCards', note={
**note,
'options': { 'closeAfterAdding': True },
})
util.invoke('guiAddCards', note={
**note,
'picture': [{
'url': 'https://via.placeholder.com/150.png',
'filename': 'placeholder.png',
'fields': ['Front'],
}]
})
# guiCurrentCard
# util.invoke('guiCurrentCard')
# guiStartCardTimer
util.invoke('guiStartCardTimer')
# guiShowQuestion
util.invoke('guiShowQuestion')
# guiShowAnswer
util.invoke('guiShowAnswer')
# guiAnswerCard
util.invoke('guiAnswerCard', ease=1)
# guiDeckOverview
util.invoke('guiDeckOverview', name='Default')
# guiDeckBrowser
util.invoke('guiDeckBrowser')
# guiDatabaseCheck
util.invoke('guiDatabaseCheck')
# guiExitAnki
# util.invoke('guiExitAnki')
def test_guiBrowse(setup):
ac.guiBrowse()
if __name__ == '__main__':
unittest.main()
def test_guiDeckBrowser(setup):
ac.guiDeckBrowser()
# todo executing this test without running background tasks on main thread
# rarely causes media server (`aqt.mediasrv`) to fail:
# its `run` method raises OSError: invalid file descriptor.
# this can cause other tests to fail to tear down;
# particularly, any dialogs with editor may fail to close
# due to their trying to save the note first, which is done via web view,
# which fails to complete due to corrupt media server. investigate?
def test_guiCheckDatabase(setup, run_background_tasks_on_main_thread):
ac.guiCheckDatabase()
def test_guiDeckOverview(setup):
assert ac.guiDeckOverview(name="test_deck") is True
class TestAddCards:
note = {
"deckName": "test_deck",
"modelName": "Basic",
"fields": {"Front": "new front1", "Back": "new back1"},
"tags": ["tag1"]
}
# an actual small image, you can see it if you run the test with GUI
# noinspection SpellCheckingInspection
base64_gif = "R0lGODlhBQAEAHAAACwAAAAABQAEAIH///8AAAAAAAAAAAACB0QMqZcXDwoAOw=="
picture = {
"picture": [
{
"data": base64_gif,
"filename": "smiley.gif",
"fields": ["Front"],
}
]
}
@staticmethod
def click_on_add_card_dialog_save_button():
dialog = get_dialog_instance("AddCards")
dialog.addButton.click()
# todo previously, these tests were verifying
# that the return value of `guiAddCards` is `int`.
# while it is indeed `int`, on modern Anki it is also always a `0`,
# so we consider it useless. update documentation?
def test_without_note(self, setup):
ac.guiAddCards()
def test_with_note(self, setup):
ac.guiAddCards(note=self.note)
self.click_on_add_card_dialog_save_button()
close_all_dialogs_and_wait_for_them_to_run_closing_callbacks()
assert len(ac.findCards(query="new")) == 1
def test_with_note_and_a_picture(self, setup):
ac.guiAddCards(note={**self.note, **self.picture})
self.click_on_add_card_dialog_save_button()
close_all_dialogs_and_wait_for_them_to_run_closing_callbacks()
assert len(ac.findCards(query="new")) == 1
assert ac.retrieveMediaFile(filename="smiley.gif") == self.base64_gif
class TestReviewActions:
@pytest.fixture
def reviewing_started(self, setup):
assert ac.guiDeckReview(name="test_deck") is True
def test_startCardTimer(self, reviewing_started):
assert ac.guiStartCardTimer() is True
def test_guiShowQuestion(self, reviewing_started):
assert ac.guiShowQuestion() is True
assert ac.reviewer().state == "question"
def test_guiShowAnswer(self, reviewing_started):
assert ac.guiShowAnswer() is True
assert ac.reviewer().state == "answer"
def test_guiAnswerCard(self, reviewing_started):
ac.guiShowAnswer()
reviews_before = ac.cardReviews(deck="test_deck", startID=0)
assert ac.guiAnswerCard(ease=4) is True
reviews_after = ac.cardReviews(deck="test_deck", startID=0)
assert len(reviews_after) == len(reviews_before) + 1
class TestSelectedNotes:
def test_with_valid_deck_query(self, setup):
ac.guiBrowse(query="deck:test_deck")
wait_until(ac.guiSelectedNotes)
assert ac.guiSelectedNotes()[0] in {setup.note1_id, setup.note2_id}
def test_with_invalid_deck_query(self, setup):
ac.guiBrowse(query="deck:test_deck")
wait_until(ac.guiSelectedNotes)
ac.guiBrowse(query="deck:invalid")
wait_until(lambda: not ac.guiSelectedNotes())

View File

@ -1,33 +1,51 @@
#!/usr/bin/env python
import base64
import unittest
import util
from conftest import ac
class TestMedia(unittest.TestCase):
def runTest(self):
filename = '_test.txt'
data = 'test'
# storeMediaFile
util.invoke('storeMediaFile', filename=filename, data=data)
filename2 = util.invoke('storeMediaFile', filename=filename, data='testtest', deleteExisting=False)
self.assertNotEqual(filename2, filename)
# retrieveMediaFile (part 1)
media = util.invoke('retrieveMediaFile', filename=filename)
self.assertEqual(media, data)
names = util.invoke('getMediaFilesNames', pattern='_tes*.txt')
self.assertEqual(set(names), set([filename, filename2]))
# deleteMediaFile
util.invoke('deleteMediaFile', filename=filename)
# retrieveMediaFile (part 2)
media = util.invoke('retrieveMediaFile', filename=filename)
self.assertFalse(media)
FILENAME = "_test.txt"
BASE64_DATA_1 = base64.b64encode(b"test 1").decode("ascii")
BASE64_DATA_2 = base64.b64encode(b"test 2").decode("ascii")
if __name__ == '__main__':
unittest.main()
def store_one_media_file():
return ac.storeMediaFile(filename=FILENAME, data=BASE64_DATA_1)
def store_two_media_files():
filename_1 = ac.storeMediaFile(filename=FILENAME, data=BASE64_DATA_1)
filename_2 = ac.storeMediaFile(filename=FILENAME, data=BASE64_DATA_2,
deleteExisting=False)
return filename_1, filename_2
##############################################################################
def test_storeMediaFile_one_file(session_with_profile_loaded):
filename_1 = store_one_media_file()
assert FILENAME == filename_1
def test_storeMediaFile_two_files_with_the_same_name(session_with_profile_loaded):
filename_1, filename_2 = store_two_media_files()
assert FILENAME == filename_1 != filename_2
def test_retrieveMediaFile(session_with_profile_loaded):
store_one_media_file()
result = ac.retrieveMediaFile(filename=FILENAME)
assert result == BASE64_DATA_1
def test_getMediaFilesNames(session_with_profile_loaded):
filenames = store_two_media_files()
result = ac.getMediaFilesNames(pattern="_tes*.txt")
assert {*filenames} == {*result}
def test_deleteMediaFile(session_with_profile_loaded):
filename_1, filename_2 = store_two_media_files()
ac.deleteMediaFile(filename=filename_1)
assert ac.retrieveMediaFile(filename=filename_1) is False
assert ac.getMediaFilesNames(pattern="_tes*.txt") == [filename_2]

View File

@ -1,68 +1,50 @@
#!/usr/bin/env python
import aqt
import os
import tempfile
import unittest
import util
from conftest import ac, anki_connect_config_loaded, \
set_up_test_deck_and_test_model_and_two_notes, \
current_decks_and_models_etc_preserved, wait
class TestMisc(unittest.TestCase):
def runTest(self):
# version
self.assertEqual(util.invoke('version'), 6)
# sync
util.invoke('sync')
# getProfiles
profiles = util.invoke('getProfiles')
self.assertIsInstance(profiles, list)
self.assertGreater(len(profiles), 0)
# loadProfile
util.invoke('loadProfile', name=profiles[0])
# multi
actions = [util.request('version'), util.request('version'), util.request('version')]
results = util.invoke('multi', actions=actions)
self.assertEqual(len(results), len(actions))
for result in results:
self.assertIsNone(result['error'])
self.assertEqual(result['result'], 6)
# exportPackage
fd, newname = tempfile.mkstemp(prefix='testexport', suffix='.apkg')
os.close(fd)
os.unlink(newname)
result = util.invoke('exportPackage', deck='Default', path=newname)
self.assertTrue(result)
self.assertTrue(os.path.exists(newname))
# importPackage
deckName = 'importTest'
fd, newname = tempfile.mkstemp(prefix='testimport', suffix='.apkg')
os.close(fd)
os.unlink(newname)
util.invoke('createDeck', deck=deckName)
note = {
'deckName': deckName,
'modelName': 'Basic',
'fields': {'Front': 'front1', 'Back': 'back1'},
'tags': '',
'options': {
'allowDuplicate': True
}
}
noteId = util.invoke('addNote', note=note)
util.invoke('exportPackage', deck=deckName, path=newname)
util.invoke('deleteDecks', decks=[deckName], cardsToo=True)
util.invoke('importPackage', path=newname)
deckNames = util.invoke('deckNames')
self.assertIn(deckName, deckNames)
# reloadCollection
util.invoke('reloadCollection')
# version is retrieved from config
def test_version(session_with_profile_loaded):
with anki_connect_config_loaded(
session=session_with_profile_loaded,
web_bind_port=0,
):
assert ac.version() == 6
if __name__ == '__main__':
unittest.main()
def test_reloadCollection(setup):
ac.reloadCollection()
class TestProfiles:
def test_getProfiles(self, session_with_profile_loaded):
result = ac.getProfiles()
assert result == ["test_user"]
# waiting a little while gets rid of the cryptic warning:
# Qt warning: QXcbConnection: XCB error: 8 (BadMatch), sequence: 658,
# resource id: 2097216, major code: 42 (SetInputFocus), minor code: 0
def test_loadProfile(self, session_with_profile_loaded):
aqt.mw.unloadProfileAndShowProfileManager()
wait(0.1)
ac.loadProfile(name="test_user")
class TestExportImport:
def test_exportPackage(self, session_with_profile_loaded, setup):
filename = session_with_profile_loaded.base + "/export.apkg"
ac.exportPackage(deck="test_deck", path=filename)
def test_importPackage(self, session_with_profile_loaded):
filename = session_with_profile_loaded.base + "/export.apkg"
with current_decks_and_models_etc_preserved():
set_up_test_deck_and_test_model_and_two_notes()
ac.exportPackage(deck="test_deck", path=filename)
with current_decks_and_models_etc_preserved():
assert "test_deck" not in ac.deckNames()
ac.importPackage(path=filename)
assert "test_deck" in ac.deckNames()

View File

@ -1,66 +1,112 @@
#!/usr/bin/env python
import unittest
import util
import uuid
from conftest import ac
MODEL_1_NAME = str(uuid.uuid4())
MODEL_2_NAME = str(uuid.uuid4())
def test_modelNames(setup):
result = ac.modelNames()
assert "test_model" in result
CSS = 'some random css'
NEW_CSS = 'new random css'
CARD_1_TEMPLATE = {'Front': 'field1', 'Back': 'field2'}
NEW_CARD_1_TEMPLATE = {'Front': 'question: field1', 'Back': 'answer: field2'}
def test_modelNamesAndIds(setup):
result = ac.modelNamesAndIds()
assert isinstance(result["test_model"], int)
TEXT_TO_REPLACE = "new random css"
REPLACE_WITH_TEXT = "new updated css"
class TestModels(unittest.TestCase):
def runTest(self):
# modelNames
modelNames = util.invoke('modelNames')
self.assertGreater(len(modelNames), 0)
def test_modelFieldNames(setup):
result = ac.modelFieldNames(modelName="test_model")
assert result == ["field1", "field2"]
# modelNamesAndIds
modelNamesAndIds = util.invoke('modelNamesAndIds')
self.assertGreater(len(modelNames), 0)
# modelFieldNames
modelFields = util.invoke('modelFieldNames', modelName=modelNames[0])
def test_modelFieldsOnTemplates(setup):
result = ac.modelFieldsOnTemplates(modelName="test_model")
assert result == {
"Card 1": [["field1"], ["field2"]],
"Card 2": [["field2"], ["field1"]],
}
# modelFieldsOnTemplates
modelFieldsOnTemplates = util.invoke('modelFieldsOnTemplates', modelName=modelNames[0])
# createModel with css
newModel = util.invoke('createModel', modelName=MODEL_1_NAME, inOrderFields=['field1', 'field2'], cardTemplates=[CARD_1_TEMPLATE], css=CSS)
class TestCreateModel:
createModel_kwargs = {
"modelName": "test_model_foo",
"inOrderFields": ["field1", "field2"],
"cardTemplates": [{"Front": "{{field1}}", "Back": "{{field2}}"}],
}
# createModel without css
newModel = util.invoke('createModel', modelName=MODEL_2_NAME, inOrderFields=['field1', 'field2'], cardTemplates=[CARD_1_TEMPLATE])
def test_createModel_without_css(self, session_with_profile_loaded):
ac.createModel(**self.createModel_kwargs)
# modelStyling: get model 1 css
css = util.invoke('modelStyling', modelName=MODEL_1_NAME)
self.assertEqual({'css': CSS}, css)
def test_createModel_with_css(self, session_with_profile_loaded):
ac.createModel(**self.createModel_kwargs, css="* {}")
# modelTemplates: get model 1 templates
templates = util.invoke('modelTemplates', modelName=MODEL_1_NAME)
self.assertEqual({'Card 1': CARD_1_TEMPLATE}, templates)
# updateModelStyling: change and verify model css
util.invoke('updateModelStyling', model={'name': MODEL_1_NAME, 'css': NEW_CSS})
new_css = util.invoke('modelStyling', modelName=MODEL_1_NAME)
self.assertEqual({'css': NEW_CSS}, new_css)
class TestStyling:
def test_modelStyling(self, setup):
result = ac.modelStyling(modelName="test_model")
assert result == {"css": "* {}"}
# updateModelTemplates: change and verify model 1 templates
util.invoke('updateModelTemplates', model={'name': MODEL_1_NAME, 'templates': {'Card 1': NEW_CARD_1_TEMPLATE}})
templates = util.invoke('modelTemplates', modelName=MODEL_1_NAME)
self.assertEqual({'Card 1': NEW_CARD_1_TEMPLATE}, templates)
def test_updateModelStyling(self, setup):
ac.updateModelStyling(model={
"name": "test_model",
"css": "* {color: red;}"
})
# findAndReplaceInModels: find and replace text in all models or model by name
util.invoke('findAndReplaceInModels', modelName=MODEL_1_NAME, findText=TEXT_TO_REPLACE, replaceText=REPLACE_WITH_TEXT, front=True, back=True, css=True)
new_css = util.invoke('modelStyling', modelName=MODEL_1_NAME)
self.assertEqual({'css': REPLACE_WITH_TEXT}, new_css)
assert ac.modelStyling(modelName="test_model") == {
"css": "* {color: red;}"
}
if __name__ == '__main__':
unittest.main()
class TestModelTemplates:
def test_modelTemplates(self, setup):
result = ac.modelTemplates(modelName="test_model")
assert result == {
"Card 1": {"Front": "{{field1}}", "Back": "{{field2}}"},
"Card 2": {"Front": "{{field2}}", "Back": "{{field1}}"}
}
def test_updateModelTemplates(self, setup):
ac.updateModelTemplates(model={
"name": "test_model",
"templates": {"Card 1": {"Front": "{{field1}}", "Back": "foo"}}
})
assert ac.modelTemplates(modelName="test_model") == {
"Card 1": {"Front": "{{field1}}", "Back": "foo"},
"Card 2": {"Front": "{{field2}}", "Back": "{{field1}}"}
}
def test_findAndReplaceInModels(setup):
ac.findAndReplaceInModels(
modelName="test_model",
findText="}}",
replaceText="}}!",
front=True,
back=False,
css=False,
)
ac.findAndReplaceInModels(
modelName="test_model",
findText="}}",
replaceText="}}?",
front=True,
back=True,
css=False,
)
ac.findAndReplaceInModels(
modelName="test_model",
findText="}",
replaceText="color: blue;}",
front=False,
back=False,
css=True,
)
assert ac.modelTemplates(modelName="test_model") == {
"Card 1": {"Front": "{{field1}}?!", "Back": "{{field2}}?"},
"Card 2": {"Front": "{{field2}}?!", "Back": "{{field1}}?"}
}
assert ac.modelStyling(modelName="test_model") == {
"css": "* {color: blue;}"
}

View File

@ -1,155 +1,142 @@
#!/usr/bin/env python
import pytest
from anki.errors import NotFoundError # noqa
import unittest
import util
from conftest import ac
class TestNotes(unittest.TestCase):
def setUp(self):
util.invoke('createDeck', deck='test')
def make_note(*, front="front1", allow_duplicates=False):
note = {
"deckName": "test_deck",
"modelName": "Basic",
"fields": {"Front": front, "Back": "back1"},
"tags": ["tag1"],
}
if allow_duplicates:
return {**note, "options": {"allowDuplicate": True}}
else:
return note
def tearDown(self):
util.invoke('deleteDecks', decks=['test'], cardsToo=True)
##############################################################################
def runTest(self):
options = {
'allowDuplicate': True
class TestNoteAddition:
def test_addNote(self, setup):
result = ac.addNote(note=make_note())
assert isinstance(result, int)
def test_addNote_will_not_allow_duplicates_by_default(self, setup):
ac.addNote(make_note())
with pytest.raises(Exception, match="it is a duplicate"):
ac.addNote(make_note())
def test_addNote_will_allow_duplicates_if_options_say_aye(self, setup):
ac.addNote(make_note())
ac.addNote(make_note(allow_duplicates=True))
def test_addNotes(self, setup):
result = ac.addNotes(notes=[
make_note(front="foo"),
make_note(front="bar"),
make_note(front="foo"),
])
assert len(result) == 3
assert isinstance(result[0], int)
assert isinstance(result[1], int)
assert result[2] is None
def test_bug164(self, setup):
note = {
"deckName": "test_deck",
"modelName": "Basic",
"fields": {"Front": " Whitespace\n", "Back": ""},
"options": {"allowDuplicate": False, "duplicateScope": "deck"}
}
note1 = {
'deckName': 'test',
'modelName': 'Basic',
'fields': {'Front': 'front1', 'Back': 'back1'},
'tags': ['tag1'],
'options': options
}
ac.addNote(note=note)
with pytest.raises(Exception, match="it is a duplicate"):
ac.addNote(note=note)
note2 = {
'deckName': 'test',
'modelName': 'Basic',
'fields': {'Front': 'front1', 'Back': 'back1'},
'tags': ['tag1']
}
notes1 = [
{
'deckName': 'test',
'modelName': 'Basic',
'fields': {'Front': 'front3', 'Back': 'back3'},
'tags': ['tag'],
'options': options
},
{
'deckName': 'test',
'modelName': 'Basic',
'fields': {'Front': 'front4', 'Back': 'back4'},
'tags': ['tag'],
'options': options
}
]
notes2 = [
{
'deckName': 'test',
'modelName': 'Basic',
'fields': {'Front': 'front3', 'Back': 'back3'},
'tags': ['tag']
},
{
'deckName': 'test',
'modelName': 'Basic',
'fields': {'Front': 'front4', 'Back': 'back4'},
'tags': ['tag']
}
def test_notesInfo(setup):
result = ac.notesInfo(notes=[setup.note1_id])
assert len(result) == 1
assert result[0]["noteId"] == setup.note1_id
assert result[0]["tags"] == ["tag1"]
assert result[0]["fields"]["field1"]["value"] == "note1 field1"
class TestTags:
def test_addTags(self, setup):
ac.addTags(notes=[setup.note1_id], tags="tag2")
tags = ac.notesInfo(notes=[setup.note1_id])[0]["tags"]
assert {*tags} == {"tag1", "tag2"}
def test_getTags(self, setup):
result = ac.getTags()
assert {*result} == {"tag1", "tag2"}
def test_removeTags(self, setup):
ac.removeTags(notes=[setup.note2_id], tags="tag2")
assert ac.notesInfo(notes=[setup.note2_id])[0]["tags"] == []
def test_replaceTags(self, setup):
ac.replaceTags(notes=[setup.note1_id, 123],
tag_to_replace="tag1", replace_with_tag="foo")
notes_info = ac.notesInfo(notes=[setup.note1_id])
assert notes_info[0]["tags"] == ["foo"]
def test_replaceTagsInAllNotes(self, setup):
ac.replaceTagsInAllNotes(tag_to_replace="tag1", replace_with_tag="foo")
notes_info = ac.notesInfo(notes=[setup.note1_id])
assert notes_info[0]["tags"] == ["foo"]
def test_clearUnusedTags(self, setup):
ac.removeTags(notes=[setup.note2_id], tags="tag2")
ac.clearUnusedTags()
assert ac.getTags() == ["tag1"]
class TestUpdateNoteFields:
def test_updateNoteFields(self, setup):
new_fields = {"field1": "foo", "field2": "bar"}
good_note = {"id": setup.note1_id, "fields": new_fields}
ac.updateNoteFields(note=good_note)
notes_info = ac.notesInfo(notes=[setup.note1_id])
assert notes_info[0]["fields"]["field2"]["value"] == "bar"
def test_updateNoteFields_will_note_update_invalid_notes(self, setup):
bad_note = {"id": 123, "fields": make_note()["fields"]}
with pytest.raises(NotFoundError):
ac.updateNoteFields(note=bad_note)
class TestCanAddNotes:
foo_bar_notes = [make_note(front="foo"), make_note(front="bar")]
def test_canAddNotes(self, setup):
result = ac.canAddNotes(notes=self.foo_bar_notes)
assert result == [True, True]
def test_canAddNotes_will_not_add_duplicates_if_options_do_not_say_aye(self, setup):
ac.addNotes(notes=self.foo_bar_notes)
notes = [
make_note(front="foo"),
make_note(front="baz"),
make_note(front="foo", allow_duplicates=True)
]
result = ac.canAddNotes(notes=notes)
assert result == [False, True, True]
# addNote
noteId = util.invoke('addNote', note=note1)
self.assertRaises(Exception, lambda: util.invoke('addNote', note=note2))
def test_findNotes(setup):
result = ac.findNotes(query="deck:test_deck")
assert {*result} == {setup.note1_id, setup.note2_id}
# addTags
util.invoke('addTags', notes=[noteId], tags='tag2')
# notesInfo (part 1)
noteInfos = util.invoke('notesInfo', notes=[noteId])
self.assertEqual(len(noteInfos), 1)
noteInfo = noteInfos[0]
self.assertEqual(noteInfo['noteId'], noteId)
self.assertSetEqual(set(noteInfo['tags']), {'tag1', 'tag2'})
self.assertEqual(noteInfo['fields']['Front']['value'], 'front1')
self.assertEqual(noteInfo['fields']['Back']['value'], 'back1')
# getTags
allTags = util.invoke('getTags')
self.assertIn('tag1', allTags)
self.assertIn('tag2', allTags)
# removeTags
util.invoke('removeTags', notes=[noteId], tags='tag2')
# updateNoteFields
incorrectId = 1234
noteUpdateIncorrectId = {'id': incorrectId, 'fields': {'Front': 'front2', 'Back': 'back2'}}
self.assertRaises(Exception, lambda: util.invoke('updateNoteFields', note=noteUpdateIncorrectId))
noteUpdate = {'id': noteId, 'fields': {'Front': 'front2', 'Back': 'back2'}}
util.invoke('updateNoteFields', note=noteUpdate)
# replaceTags
util.invoke('replaceTags', notes=[noteId, incorrectId], tag_to_replace='tag1', replace_with_tag='new_tag')
# notesInfo (part 2)
noteInfos = util.invoke('notesInfo', notes=[noteId, incorrectId])
self.assertEqual(len(noteInfos), 2)
self.assertDictEqual(noteInfos[1], dict()) # Test that returns empty dict if incorrect id was passed
noteInfo = noteInfos[0]
self.assertSetEqual(set(noteInfo['tags']), {'new_tag'})
self.assertIn('new_tag', noteInfo['tags'])
self.assertNotIn('tag2', noteInfo['tags'])
self.assertEqual(noteInfo['fields']['Front']['value'], 'front2')
self.assertEqual(noteInfo['fields']['Back']['value'], 'back2')
# canAddNotes (part 1)
noteStates = util.invoke('canAddNotes', notes=notes1)
self.assertEqual(len(noteStates), len(notes1))
self.assertNotIn(False, noteStates)
# addNotes (part 1)
noteIds = util.invoke('addNotes', notes=notes1)
self.assertEqual(len(noteIds), len(notes1))
for noteId in noteIds:
self.assertNotEqual(noteId, None)
# replaceTagsInAllNotes
currentTag = notes1[0]['tags'][0]
new_tag = 'new_tag'
util.invoke('replaceTagsInAllNotes', tag_to_replace=currentTag, replace_with_tag=new_tag)
noteInfos = util.invoke('notesInfo', notes=noteIds)
for noteInfo in noteInfos:
self.assertIn(new_tag, noteInfo['tags'])
self.assertNotIn(currentTag, noteInfo['tags'])
# canAddNotes (part 2)
noteStates = util.invoke('canAddNotes', notes=notes2)
self.assertNotIn(True, noteStates)
self.assertEqual(len(noteStates), len(notes2))
# addNotes (part 2)
noteIds = util.invoke('addNotes', notes=notes2)
self.assertEqual(len(noteIds), len(notes2))
for noteId in noteIds:
self.assertEqual(noteId, None)
# findNotes
noteIds = util.invoke('findNotes', query='deck:test')
self.assertEqual(len(noteIds), len(notes1) + 1)
# deleteNotes
util.invoke('deleteNotes', notes=noteIds)
noteIds = util.invoke('findNotes', query='deck:test')
self.assertEqual(len(noteIds), 0)
if __name__ == '__main__':
unittest.main()
def test_deleteNotes(setup):
ac.deleteNotes(notes=[setup.note1_id, setup.note2_id])
result = ac.findNotes(query="deck:test_deck")
assert result == []

149
tests/test_server.py Normal file
View File

@ -0,0 +1,149 @@
import json
import multiprocessing
import time
import urllib.error
import urllib.request
from contextlib import contextmanager
from dataclasses import dataclass
from functools import partial
import pytest
from pytest_anki._launch import anki_running # noqa
from pytest_anki._util import find_free_port # noqa
from plugin import AnkiConnect
from tests.conftest import wait_until, \
empty_anki_session_started, \
anki_connect_config_loaded, \
profile_created_and_loaded
@contextmanager
def function_running_in_a_process(context, function):
process = context.Process(target=function)
process.start()
try:
yield process
finally:
process.join()
# todo stop the server?
@contextmanager
def anki_connect_web_server_started():
plugin = AnkiConnect()
plugin.startWebServer()
yield plugin
@dataclass
class Client:
port: int
@staticmethod
def make_request(action, **params):
return {"action": action, "params": params, "version": 6}
def send_request(self, action, **params):
request_url = f"http://localhost:{self.port}"
request_data = self.make_request(action, **params)
request_json = json.dumps(request_data).encode("utf-8")
request = urllib.request.Request(request_url, request_json)
response = json.load(urllib.request.urlopen(request))
return response
def wait_for_web_server_to_come_live(self, at_most_seconds=30):
deadline = time.time() + at_most_seconds
while time.time() < deadline:
try:
self.send_request("version")
return
except urllib.error.URLError:
time.sleep(0.01)
raise Exception(f"Anki-Connect web server did not come live "
f"in {at_most_seconds} seconds")
# spawning requires a top-level function for pickling
def external_anki_entry_function(web_bind_port, exit_event):
with empty_anki_session_started() as session:
with anki_connect_config_loaded(session, web_bind_port):
with anki_connect_web_server_started():
with profile_created_and_loaded(session):
wait_until(exit_event.is_set)
@contextmanager
def external_anki_running(process_run_method):
context = multiprocessing.get_context(process_run_method)
exit_event = context.Event()
web_bind_port = find_free_port()
function = partial(external_anki_entry_function, web_bind_port, exit_event)
with function_running_in_a_process(context, function) as process:
client = Client(port=web_bind_port)
client.wait_for_web_server_to_come_live()
try:
yield client
finally:
exit_event.set()
assert process.exitcode == 0
# if a Qt app was already launched in current process,
# launching a new Qt app, even from grounds up, fails or hangs.
# of course, this includes forked processes. therefore,
# * if launching without --forked, use the `spawn` process run method;
# * otherwise, use the `fork` method, as it is significantly faster.
# with --forked, each test has its fixtures assembled inside the fork,
# which means that when the test begins, Qt was never started in the fork.
@pytest.fixture(scope="module")
def external_anki(request):
"""
Runs Anki in an external process, with the plugin loaded and started.
On exit, neatly ends the process and makes sure its exit code is 0.
Yields a client that can send web request to the external process.
"""
with external_anki_running(
"fork" if request.config.option.forked else "spawn"
) as client:
yield client
##############################################################################
def test_successful_request(external_anki):
response = external_anki.send_request("version")
assert response == {"error": None, "result": 6}
def test_can_handle_multiple_requests(external_anki):
assert external_anki.send_request("version") == {"error": None, "result": 6}
assert external_anki.send_request("version") == {"error": None, "result": 6}
def test_multi_request(external_anki):
version_request = Client.make_request("version")
response = external_anki.send_request("multi", actions=[version_request] * 3)
assert response == {
"error": None,
"result": [{"error": None, "result": 6}] * 3
}
def test_failing_request_due_to_bad_arguments(external_anki):
response = external_anki.send_request("addNote", bad="request")
assert response["result"] is None
assert "unexpected keyword argument" in response["error"]
def test_failing_request_due_to_anki_raising_exception(external_anki):
response = external_anki.send_request("suspend", cards=[-123])
assert response["result"] is None
assert "Card was not found" in response["error"]

View File

@ -1,55 +1,31 @@
#!/usr/bin/env python
import os
import tempfile
import unittest
import util
from conftest import ac
class TestStats(unittest.TestCase):
def setUp(self):
util.invoke('createDeck', deck='test')
note = {
'deckName': 'test',
'modelName': 'Basic',
'fields': {'Front': 'front1', 'Back': 'back1'},
'tags': ['tag1'],
'options': {
'allowDuplicate': True
}
}
self.noteId = util.invoke('addNote', note=note)
def tearDown(self):
util.invoke('deleteDecks', decks=['test'], cardsToo=True)
def runTest(self):
# getNumCardsReviewedToday
result = util.invoke('getNumCardsReviewedToday')
self.assertIsInstance(result, int)
# getNumCardsReviewedByDay
result = util.invoke('getNumCardsReviewedByDay')
self.assertIsInstance(result, list)
# collectionStats
result = util.invoke('getCollectionStatsHTML')
self.assertIsInstance(result, str)
# no reviews for new deck
self.assertEqual(len(util.invoke('cardReviews', deck='test', startID=0)), 0)
self.assertEqual(util.invoke('getLatestReviewID', deck='test'), 0)
# # add reviews
# cardId = int(util.invoke('findCards', query='deck:test')[0])
# latestID = 123456 # small enough to not interfere with existing reviews
# util.invoke('insertReviews', reviews=[
# [latestID-1, cardId, -1, 3, 4, -60, 2500, 6157, 0],
# [latestID, cardId, -1, 1, -60, -60, 0, 4846, 0]
# ])
# self.assertEqual(len(util.invoke('cardReviews', deck='test', startID=0)), 2)
# self.assertEqual(util.invoke('getLatestReviewID', deck='test'), latestID)
def test_getNumCardsReviewedToday(setup):
result = ac.getNumCardsReviewedToday()
assert isinstance(result, int)
if __name__ == '__main__':
unittest.main()
def test_getNumCardsReviewedByDay(setup):
result = ac.getNumCardsReviewedByDay()
assert isinstance(result, list)
def test_getCollectionStatsHTML(setup):
result = ac.getCollectionStatsHTML()
assert isinstance(result, str)
class TestReviews:
def test_zero_reviews_for_a_new_deck(self, setup):
assert ac.cardReviews(deck="test_deck", startID=0) == []
assert ac.getLatestReviewID(deck="test_deck") == 0
def test_some_reviews_for_a_reviewed_deck(self, setup):
ac.insertReviews(reviews=[
(456, setup.card_ids[0], -1, 3, 4, -60, 2500, 6157, 0),
(789, setup.card_ids[1], -1, 1, -60, -60, 0, 4846, 0)
])
assert len(ac.cardReviews(deck="test_deck", startID=0)) == 2
assert ac.getLatestReviewID(deck="test_deck") == 789

View File

@ -1,21 +0,0 @@
import json
import urllib.request
def request(action, **params):
return {'action': action, 'params': params, 'version': 6}
def invoke(action, **params):
requestJson = json.dumps(request(action, **params)).encode('utf-8')
response = json.load(urllib.request.urlopen(urllib.request.Request('http://localhost:8765', requestJson)))
if len(response) != 2:
raise Exception('response has an unexpected number of fields')
if 'error' not in response:
raise Exception('response is missing required error field')
if 'result' not in response:
raise Exception('response is missing required result field')
if response['error'] is not None:
raise Exception(response['error'])
return response['result']

76
tox.ini Normal file
View File

@ -0,0 +1,76 @@
# For testing, you will need:
# * PyQt5 dev tools
# * tox
# * X virtual framebuffer--to test without GUI
#
# Install these by running:
# $ sudo apt install pyqt5-dev-tools xvfb
# $ python3 -m pip install --user --upgrade tox
#
# Then, to run tests against multiple anki versions:
# $ tox
#
# To run tests slightly less safely, but faster:
# $ tox -- --no-tear-down-profile-after-each-test
#
# To run tests more safely, but *much* slower:
# $ tox -- --forked
# Test tool cheat sheet:
# * Test several environments in parallel:
# $ tox -p auto
#
# * To activate one of the test environments:
# $ source .tox/py38-anki49/bin/activate
#
# * Stop on first failure:
# $ xvfb-run python -m pytest -x
#
# * See stdout/stderr (doesn't work with --forked!):
# $ xvfb-run python -m pytest -s
#
# * Run some specific tests:
# $ xvfb-run python -m pytest -k "test_cards.py or test_guiBrowse"
#
# * To run with visible GUI in WSL2
# (Make sure to disable access control in your X server, such as VcXsrv):
# $ DISPLAY=$(ip route list default | awk '{print $3}'):0 python -m pytest
#
# * Environmental variables of interest:
# LIBGL_ALWAYS_INDIRECT=1
# QTWEBENGINE_CHROMIUM_FLAGS="--disable-gpu"
# QT_DEBUG_PLUGINS=1
# ANKIDEV=true
[tox]
minversion = 3.24
skipsdist = true
skip_install = true
envlist = py38-anki{45,46,47,48,49}
[testenv]
commands =
xvfb-run python -m pytest {posargs}
setenv =
HOME={envdir}/home
allowlist_externals =
xvfb-run
deps =
pytest==7.1.1
pytest-forked==1.4.0
pytest-anki @ git+https://github.com/oakkitten/pytest-anki.git@17d19043
anki45: anki==2.1.45
anki45: aqt==2.1.45
anki46: anki==2.1.46
anki46: aqt==2.1.46
anki47: anki==2.1.47
anki47: aqt==2.1.47
anki48: anki==2.1.48
anki48: aqt==2.1.48
anki49: anki==2.1.49
anki49: aqt==2.1.49