diff --git a/AnkiConnect.py b/AnkiConnect.py index 5699e22..5a2a4b4 100644 --- a/AnkiConnect.py +++ b/AnkiConnect.py @@ -27,9 +27,10 @@ import re import select import socket import sys + +from operator import itemgetter from time import time from unicodedata import normalize -from operator import itemgetter # @@ -37,16 +38,16 @@ from operator import itemgetter # API_VERSION = 5 -TICK_INTERVAL = 25 -URL_TIMEOUT = 10 -URL_UPGRADE = 'https://raw.githubusercontent.com/FooSoft/anki-connect/master/AnkiConnect.py' NET_ADDRESS = os.getenv('ANKICONNECT_BIND_ADDRESS', '127.0.0.1') NET_BACKLOG = 5 NET_PORT = 8765 +TICK_INTERVAL = 25 +URL_TIMEOUT = 10 +URL_UPGRADE = 'https://raw.githubusercontent.com/FooSoft/anki-connect/master/AnkiConnect.py' # -# General helpers +# Helpers # if sys.version_info[0] < 3: @@ -65,19 +66,6 @@ else: from PyQt5.QtWidgets import QMessageBox -# -# Helpers -# - -def webApi(*versions): - def decorator(func): - method = lambda *args, **kwargs: func(*args, **kwargs) - setattr(method, 'versions', versions) - setattr(method, 'api', True) - return method - return decorator - - def makeBytes(data): return data.encode('utf-8') @@ -86,52 +74,31 @@ def makeStr(data): return data.decode('utf-8') -def download(url): - try: - resp = web.urlopen(url, timeout=URL_TIMEOUT) - except web.URLError as e: - raise Exception('A urlError has occurred for url ' + url + '. Error messages was: ' + e.message) +def api(*versions): + def decorator(func): + method = lambda *args, **kwargs: func(*args, **kwargs) + setattr(method, 'versions', versions) + setattr(method, 'api', True) + return method - if resp.code != 200: - raise Exception('Return code for url request' + url + 'was not 200. Error code: ' + resp.code) - - return resp.read() - - -def audioInject(note, fields, filename): - for field in fields: - if field in note: - note[field] += u'[sound:{}]'.format(filename) - - -def verifyString(string): - t = type(string) - return t == str or t == unicode - - -def verifyStringList(strings): - for s in strings: - if not verifyString(s): - return False - - return True + return decorator # -# AjaxRequest +# WebRequest # -class AjaxRequest: +class WebRequest: def __init__(self, headers, body): self.headers = headers self.body = body # -# AjaxClient +# WebClient # -class AjaxClient: +class WebClient: def __init__(self, sock, handler): self.sock = sock self.handler = handler @@ -195,14 +162,14 @@ class AjaxClient: return None, 0 body = data[headerLength : totalLength] - return AjaxRequest(headers, body), totalLength + return WebRequest(headers, body), totalLength # -# AjaxServer +# WebServer # -class AjaxServer: +class WebServer: def __init__(self, handler): self.handler = handler self.clients = [] @@ -211,7 +178,7 @@ class AjaxServer: def setHeader(self, name, value): - self.extraHeaders[name] = value + self.headersOpt[name] = value def resetHeaders(self): @@ -220,13 +187,14 @@ class AjaxServer: ['Content-Type', 'text/json'], ['Access-Control-Allow-Origin', '*'] ] - self.extraHeaders = {} + self.headersOpt = {} def getHeaders(self): headers = self.headers[:] - for name in self.extraHeaders: - headers.append([name, self.extraHeaders[name]]) + for name in self.headersOpt: + headers.append([name, self.headersOpt[name]]) + return headers @@ -244,7 +212,7 @@ class AjaxServer: clientSock = self.sock.accept()[0] if clientSock is not None: clientSock.setblocking(False) - self.clients.append(AjaxClient(clientSock, self.handlerWrapper)) + self.clients.append(WebClient(clientSock, self.handlerWrapper)) def advanceClients(self): @@ -299,697 +267,13 @@ class AjaxServer: self.clients = [] -# -# AnkiNoteParams -# - -class AnkiNoteParams: - def __init__(self, params): - self.deckName = params.get('deckName') - self.modelName = params.get('modelName') - self.fields = params.get('fields', {}) - self.tags = params.get('tags', []) - - class Audio: - def __init__(self, params): - self.url = params.get('url') - self.filename = params.get('filename') - self.skipHash = params.get('skipHash') - self.fields = params.get('fields', []) - - def validate(self): - return ( - verifyString(self.url) and - verifyString(self.filename) and os.path.dirname(self.filename) == '' and - verifyStringList(self.fields) and - (verifyString(self.skipHash) or self.skipHash is None) - ) - - audio = Audio(params.get('audio', {})) - self.audio = audio if audio.validate() else None - - - def validate(self): - return ( - verifyString(self.deckName) and - verifyString(self.modelName) and - type(self.fields) == dict and verifyStringList(list(self.fields.keys())) and verifyStringList(list(self.fields.values())) and - type(self.tags) == list and verifyStringList(self.tags) - ) - - - def __str__(self): - return 'DeckName: ' + self.deckName + '. ModelName: ' + self.modelName + '. Fields: ' + str(self.fields) + '. Tags: ' + str(self.tags) + '.' - -# -# AnkiBridge -# - -class AnkiBridge: - def storeMediaFile(self, filename, data): - self.deleteMediaFile(filename) - self.media().writeData(filename, base64.b64decode(data)) - - - def retrieveMediaFile(self, filename): - # based on writeData from anki/media.py - filename = os.path.basename(filename) - filename = normalize('NFC', filename) - filename = self.media().stripIllegal(filename) - - path = os.path.join(self.media().dir(), filename) - if os.path.exists(path): - with open(path, 'rb') as file: - return base64.b64encode(file.read()).decode('ascii') - - return False - - - def deleteMediaFile(self, filename): - self.media().syncDelete(filename) - - - def addNote(self, params): - collection = self.collection() - if collection is None: - raise Exception('Collection was not found.') - - note = self.createNote(params) - if note is None: - raise Exception('Failed to create note from params: ' + str(params)) - - if params.audio is not None and len(params.audio.fields) > 0: - data = download(params.audio.url) - if data is not None: - if params.audio.skipHash is None: - skip = False - else: - m = hashlib.md5() - m.update(data) - skip = params.audio.skipHash == m.hexdigest() - - if not skip: - audioInject(note, params.audio.fields, params.audio.filename) - self.media().writeData(params.audio.filename, data) - - self.startEditing() - collection.addNote(note) - collection.autosave() - self.stopEditing() - - return note.id - - - def canAddNote(self, note): - try: - return bool(self.createNote(note)) - except: - return False - - - def createNote(self, params): - collection = self.collection() - if collection is None: - raise Exception('Collection was not found.') - - model = collection.models.byName(params.modelName) - if model is None: - raise Exception('Model was not found for model: ' + params.modelName) - - deck = collection.decks.byName(params.deckName) - if deck is None: - raise Exception('Deck was not found for deck: ' + params.deckName) - - note = anki.notes.Note(collection, model) - note.model()['did'] = deck['id'] - note.tags = params.tags - - for name, value in params.fields.items(): - if name in note: - note[name] = value - - # Returns 1 if empty. 2 if duplicate. Otherwise returns False - duplicateOrEmpty = note.dupeOrEmpty() - if duplicateOrEmpty == 1: - raise Exception('Note was empty. Param were: ' + str(params)) - elif duplicateOrEmpty == 2: - raise Exception('Note is duplicate of existing note. Params were: ' + str(params)) - elif duplicateOrEmpty == False: - return note - - - def updateNoteFields(self, params): - collection = self.collection() - if collection is None: - raise Exception('Collection was not found.') - - note = collection.getNote(params['id']) - if note is None: - raise Exception('Failed to get note:{}'.format(params['id'])) - for name, value in params['fields'].items(): - if name in note: - note[name] = value - note.flush() - - - def addTags(self, notes, tags, add=True): - self.startEditing() - self.collection().tags.bulkAdd(notes, tags, add) - self.stopEditing() - - - def getTags(self): - return self.collection().tags.all() - - - def suspend(self, cards, suspend=True): - for card in cards: - isSuspended = self.isSuspended(card) - if suspend and isSuspended: - cards.remove(card) - elif not suspend and not isSuspended: - cards.remove(card) - - if cards: - self.startEditing() - if suspend: - self.collection().sched.suspendCards(cards) - else: - self.collection().sched.unsuspendCards(cards) - self.stopEditing() - return True - - return False - - - def isSuspended(self, card): - card = self.collection().getCard(card) - if card.queue == -1: - return True - else: - return False - - - def areSuspended(self, cards): - suspended = [] - for card in cards: - suspended.append(self.isSuspended(card)) - return suspended - - - def areDue(self, cards): - due = [] - for card in cards: - if self.findCards('cid:%s is:new' % card): - due.append(True) - continue - - date, ivl = self.collection().db.all('select id/1000.0, ivl from revlog where cid = ?', card)[-1] - if (ivl >= -1200): - if self.findCards('cid:%s is:due' % card): - due.append(True) - else: - due.append(False) - else: - if date - ivl <= time(): - due.append(True) - else: - due.append(False) - - return due - - - def getIntervals(self, cards, complete=False): - intervals = [] - for card in cards: - if self.findCards('cid:%s is:new' % card): - intervals.append(0) - continue - - interval = self.collection().db.list('select ivl from revlog where cid = ?', card) - if not complete: - interval = interval[-1] - intervals.append(interval) - return intervals - - - def startEditing(self): - self.window().requireReset() - - - def stopEditing(self): - if self.collection() is not None: - self.window().maybeReset() - - - def window(self): - return aqt.mw - - - def reviewer(self): - return self.window().reviewer - - - def collection(self): - return self.window().col - - - def scheduler(self): - return self.collection().sched - - - def multi(self, actions): - response = [] - for item in actions: - response.append(AnkiConnect.handler(ac, item)) - return response - - - def media(self): - collection = self.collection() - if collection is not None: - return collection.media - - - def modelNames(self): - collection = self.collection() - if collection is not None: - return collection.models.allNames() - - - def modelNamesAndIds(self): - models = {} - - modelNames = self.modelNames() - for model in modelNames: - mid = self.collection().models.byName(model)['id'] - mid = int(mid) # sometimes Anki stores the ID as a string - models[model] = mid - - return models - - - def modelNameFromId(self, modelId): - collection = self.collection() - if collection is not None: - model = collection.models.get(modelId) - if model is not None: - return model['name'] - - - def modelFieldNames(self, modelName): - collection = self.collection() - if collection is not None: - model = collection.models.byName(modelName) - if model is not None: - return [field['name'] for field in model['flds']] - - - def modelFieldsOnTemplates(self, modelName): - model = self.collection().models.byName(modelName) - - if model is not None: - templates = {} - for template in model['tmpls']: - fields = [] - - for side in ['qfmt', 'afmt']: - fieldsForSide = [] - - # based on _fieldsOnTemplate from aqt/clayout.py - matches = re.findall('{{[^#/}]+?}}', template[side]) - for match in matches: - # remove braces and modifiers - match = re.sub(r'[{}]', '', match) - match = match.split(':')[-1] - - # for the answer side, ignore fields present on the question side + the FrontSide field - if match == 'FrontSide' or side == 'afmt' and match in fields[0]: - continue - fieldsForSide.append(match) - - - fields.append(fieldsForSide) - - templates[template['name']] = fields - - return templates - - - def getDeckConfig(self, deck): - if not deck in self.deckNames(): - return False - - did = self.collection().decks.id(deck) - return self.collection().decks.confForDid(did) - - - def saveDeckConfig(self, config): - configId = str(config['id']) - if not configId in self.collection().decks.dconf: - return False - - mod = anki.utils.intTime() - usn = self.collection().usn() - - config['mod'] = mod - config['usn'] = usn - - self.collection().decks.dconf[configId] = config - self.collection().decks.changed = True - return True - - - def setDeckConfigId(self, decks, configId): - for deck in decks: - if not deck in self.deckNames(): - return False - - if not str(configId) in self.collection().decks.dconf: - return False - - for deck in decks: - did = str(self.collection().decks.id(deck)) - aqt.mw.col.decks.decks[did]['conf'] = configId - - return True - - - def cloneDeckConfigId(self, name, cloneFrom=1): - if not str(cloneFrom) in self.collection().decks.dconf: - return False - - cloneFrom = self.collection().decks.getConf(cloneFrom) - return self.collection().decks.confId(name, cloneFrom) - - - def removeDeckConfigId(self, configId): - if configId == 1 or not str(configId) in self.collection().decks.dconf: - return False - - self.collection().decks.remConf(configId) - return True - - - def deckNames(self): - collection = self.collection() - if collection is not None: - return collection.decks.allNames() - - - def deckNamesAndIds(self): - decks = {} - - deckNames = self.deckNames() - for deck in deckNames: - did = self.collection().decks.id(deck) - decks[deck] = did - - return decks - - - def deckNameFromId(self, deckId): - collection = self.collection() - if collection is not None: - deck = collection.decks.get(deckId) - if deck is not None: - return deck['name'] - - - def findNotes(self, query=None): - if query is not None: - return self.collection().findNotes(query) - else: - return [] - - - def findCards(self, query=None): - if query is not None: - return self.collection().findCards(query) - else: - return [] - - - def cardsInfo(self,cards): - result = [] - for cid in cards: - try: - card = self.collection().getCard(cid) - model = card.model() - note = card.note() - fields = {} - for info in model['flds']: - order = info['ord'] - name = info['name'] - fields[name] = {'value': note.fields[order], 'order': order} - - result.append({ - 'cardId': card.id, - 'fields': fields, - 'fieldOrder': card.ord, - 'question': card._getQA()['q'], - 'answer': card._getQA()['a'], - 'modelName': model['name'], - 'deckName': self.deckNameFromId(card.did), - 'css': model['css'], - 'factor': card.factor, - #This factor is 10 times the ease percentage, - # so an ease of 310% would be reported as 3100 - 'interval': card.ivl, - 'note': card.nid - }) - except TypeError as e: - # Anki will give a TypeError if the card ID does not exist. - # Best behavior is probably to add an 'empty card' to the - # returned result, so that the items of the input and return - # lists correspond. - result.append({}) - - return result - - - def notesInfo(self,notes): - result = [] - for nid in notes: - try: - note = self.collection().getNote(nid) - model = note.model() - - fields = {} - for info in model['flds']: - order = info['ord'] - name = info['name'] - fields[name] = {'value': note.fields[order], 'order': order} - - result.append({ - 'noteId': note.id, - 'tags' : note.tags, - 'fields': fields, - 'modelName': model['name'], - 'cards': self.collection().db.list( - 'select id from cards where nid = ? order by ord', note.id) - }) - except TypeError as e: - # Anki will give a TypeError if the note ID does not exist. - # Best behavior is probably to add an 'empty card' to the - # returned result, so that the items of the input and return - # lists correspond. - result.append({}) - return result - - - def getDecks(self, cards): - decks = {} - for card in cards: - did = self.collection().db.scalar('select did from cards where id = ?', card) - deck = self.collection().decks.get(did)['name'] - - if deck in decks: - decks[deck].append(card) - else: - decks[deck] = [card] - - return decks - - - def createDeck(self, deck): - self.startEditing() - deckId = self.collection().decks.id(deck) - self.stopEditing() - - return deckId - - - def changeDeck(self, cards, deck): - self.startEditing() - - did = self.collection().decks.id(deck) - mod = anki.utils.intTime() - usn = self.collection().usn() - - # normal cards - scids = anki.utils.ids2str(cards) - # remove any cards from filtered deck first - self.collection().sched.remFromDyn(cards) - - # then move into new deck - self.collection().db.execute('update cards set usn=?, mod=?, did=? where id in ' + scids, usn, mod, did) - self.stopEditing() - - - def deleteDecks(self, decks, cardsToo=False): - self.startEditing() - for deck in decks: - did = self.collection().decks.id(deck) - self.collection().decks.rem(did, cardsToo) - self.stopEditing() - - - def cardsToNotes(self, cards): - return self.collection().db.list('select distinct nid from cards where id in ' + anki.utils.ids2str(cards)) - - - def guiBrowse(self, query=None): - browser = aqt.dialogs.open('Browser', self.window()) - browser.activateWindow() - - if query is not None: - browser.form.searchEdit.lineEdit().setText(query) - if hasattr(browser, 'onSearch'): - browser.onSearch() - else: - browser.onSearchActivated() - - return browser.model.cards - - - def guiAddCards(self): - addCards = aqt.dialogs.open('AddCards', self.window()) - addCards.activateWindow() - - - def guiReviewActive(self): - return self.reviewer().card is not None and self.window().state == 'review' - - - def guiCurrentCard(self): - if not self.guiReviewActive(): - raise Exception('Gui review is not currently active.') - - reviewer = self.reviewer() - card = reviewer.card - model = card.model() - note = card.note() - - fields = {} - for info in model['flds']: - order = info['ord'] - name = info['name'] - fields[name] = {'value': note.fields[order], 'order': order} - - if card is not None: - return { - 'cardId': card.id, - 'fields': fields, - 'fieldOrder': card.ord, - 'question': card._getQA()['q'], - 'answer': card._getQA()['a'], - 'buttons': [b[0] for b in reviewer._answerButtonList()], - 'modelName': model['name'], - 'deckName': self.deckNameFromId(card.did), - 'css': model['css'] - } - - - def guiStartCardTimer(self): - if not self.guiReviewActive(): - return False - - card = self.reviewer().card - - if card is not None: - card.startTimer() - return True - else: - return False - - - def guiShowQuestion(self): - if self.guiReviewActive(): - self.reviewer()._showQuestion() - return True - else: - return False - - - def guiShowAnswer(self): - if self.guiReviewActive(): - self.window().reviewer._showAnswer() - return True - else: - return False - - - def guiAnswerCard(self, ease): - if not self.guiReviewActive(): - return False - - reviewer = self.reviewer() - if reviewer.state != 'answer': - return False - if ease <= 0 or ease > self.scheduler().answerButtons(reviewer.card): - return False - - reviewer._answerCard(ease) - return True - - - def guiDeckOverview(self, name): - collection = self.collection() - if collection is not None: - deck = collection.decks.byName(name) - if deck is not None: - collection.decks.select(deck['id']) - self.window().onOverview() - return True - - return False - - - def guiDeckBrowser(self): - self.window().moveToState('deckBrowser') - - - def guiDeckReview(self, name): - if self.guiDeckOverview(name): - self.window().moveToState('review') - return True - else: - return False - - - def guiExitAnki(self): - timer = QTimer() - def exitAnki(): - timer.stop() - self.window().close() - timer.timeout.connect(exitAnki) - timer.start(1000) # 1s should be enough to allow the response to be sent. - - - def sync(self): - self.window().onSync() - - # # AnkiConnect # - class AnkiConnect: def __init__(self): - self.anki = AnkiBridge() - self.server = AjaxServer(self.handler) + self.server = WebServer(self.handler) try: self.server.listen() @@ -999,7 +283,7 @@ class AnkiConnect: self.timer.start(TICK_INTERVAL) except: QMessageBox.critical( - self.anki.window(), + self.window(), 'AnkiConnect', 'Failed to listen on port {}.\nMake sure it is available and is not in use.'.format(NET_PORT) ) @@ -1048,294 +332,786 @@ class AnkiConnect: return reply['result'] - @webApi() - def multi(self, actions): - return self.anki.multi(actions) + def download(self, url): + resp = web.urlopen(url, timeout=URL_TIMEOUT) + if resp.code == 200: + return resp.read() + else: + raise Exception('return code for download of {} was {}'.format(url, resp.code)) - @webApi() - def storeMediaFile(self, filename, data): - return self.anki.storeMediaFile(filename, data) + def window(self): + return aqt.mw - @webApi() - def retrieveMediaFile(self, filename): - return self.anki.retrieveMediaFile(filename) + def reviewer(self): + reviewer = self.window().reviewer + if reviewer is None: + raise Exception('reviewer is not available') + else: + return reviewer - @webApi() - def deleteMediaFile(self, filename): - return self.anki.deleteMediaFile(filename) + def collection(self): + collection = self.window().col + if collection is None: + raise Exception('collection is not available') + else: + return collection - @webApi() - def deckNames(self): - return self.anki.deckNames() + def decks(self): + decks = self.collection().decks + if decks is None: + raise Exception('decks are not available') + else: + return decks - @webApi() - def deckNamesAndIds(self): - return self.anki.deckNamesAndIds() + def scheduler(self): + scheduler = self.collection().sched + if scheduler is None: + raise Exception('scheduler is not available') + else: + return scheduler - @webApi() - def modelNames(self): - return self.anki.modelNames() + def database(self): + database = self.collection().db + if database is None: + raise Exception('database is not available') + else: + return database - @webApi() - def modelNamesAndIds(self): - return self.anki.modelNamesAndIds() + def media(self): + media = self.collection().media + if media is None: + raise Exception('media is not available') + else: + return media - @webApi() - def modelFieldNames(self, modelName): - return self.anki.modelFieldNames(modelName) + def startEditing(self): + self.window().requireReset() - @webApi() - def modelFieldsOnTemplates(self, modelName): - return self.anki.modelFieldsOnTemplates(modelName) + def stopEditing(self): + if self.collection() is not None: + self.window().maybeReset() - @webApi() - def getDeckConfig(self, deck): - return self.anki.getDeckConfig(deck) + def createNote(self, note): + collection = self.collection() + + model = collection.models.byName(note['modelName']) + if model is None: + raise Exception('model was not found: {}'.format(note['modelName'])) + + deck = collection.decks.byName(note['deckName']) + if deck is None: + raise Exception('deck was not found: {}'.format(note['deckName'])) + + ankiNote = anki.notes.Note(collection, model) + ankiNote.model()['did'] = deck['id'] + ankiNote.tags = note['tags'] + + for name, value in note['fields'].items(): + if name in ankiNote: + ankiNote[name] = value + + duplicateOrEmpty = ankiNote.dupeOrEmpty() + if duplicateOrEmpty == 1: + raise Exception('cannot create note because it is empty') + elif duplicateOrEmpty == 2: + raise Exception('cannot create note because it is a duplicte') + elif duplicateOrEmpty == False: + return ankiNote + else: + raise Exception('cannot create note for unknown reason') - @webApi() - def saveDeckConfig(self, config): - return self.anki.saveDeckConfig(config) + # + # Miscellaneous + # + + @api() + def version(self): + return API_VERSION - @webApi() - def setDeckConfigId(self, decks, configId): - return self.anki.setDeckConfigId(decks, configId) - - - @webApi() - def cloneDeckConfigId(self, name, cloneFrom=1): - return self.anki.cloneDeckConfigId(name, cloneFrom) - - - @webApi() - def removeDeckConfigId(self, configId): - return self.anki.removeDeckConfigId(configId) - - - @webApi() - def addNote(self, note): - params = AnkiNoteParams(note) - if params.validate(): - return self.anki.addNote(params) - - - @webApi() - def addNotes(self, notes): - results = [] - for note in notes: - try: - params = AnkiNoteParams(note) - if params.validate(): - results.append(self.anki.addNote(params)) - else: - results.append(None) - except Exception: - results.append(None) - - return results - - - @webApi() - def updateNoteFields(self, note): - return self.anki.updateNoteFields(note) - - - @webApi() - def canAddNotes(self, notes): - results = [] - for note in notes: - params = AnkiNoteParams(note) - results.append(params.validate() and self.anki.canAddNote(params)) - - return results - - - @webApi() - def addTags(self, notes, tags, add=True): - return self.anki.addTags(notes, tags, add) - - - @webApi() - def removeTags(self, notes, tags): - return self.anki.addTags(notes, tags, False) - - - @webApi() - def getTags(self): - return self.anki.getTags() - - - @webApi() - def suspend(self, cards, suspend=True): - return self.anki.suspend(cards, suspend) - - - @webApi() - def unsuspend(self, cards): - return self.anki.suspend(cards, False) - - - @webApi() - def areSuspended(self, cards): - return self.anki.areSuspended(cards) - - - @webApi() - def areDue(self, cards): - return self.anki.areDue(cards) - - - @webApi() - def getIntervals(self, cards, complete=False): - return self.anki.getIntervals(cards, complete) - - - @webApi() + @api() def upgrade(self): response = QMessageBox.question( - self.anki.window(), + self.window(), 'AnkiConnect', 'Upgrade to the latest version?', QMessageBox.Yes | QMessageBox.No ) if response == QMessageBox.Yes: - data = download(URL_UPGRADE) - if data is None: - QMessageBox.critical(self.anki.window(), 'AnkiConnect', 'Failed to download latest version.') - else: + try: + data = self.download(URL_UPGRADE) path = os.path.splitext(__file__)[0] + '.py' with open(path, 'w') as fp: fp.write(makeStr(data)) - QMessageBox.information(self.anki.window(), 'AnkiConnect', 'Upgraded to the latest version, please restart Anki.') + QMessageBox.information( + self.window(), + 'AnkiConnect', + 'Upgraded to the latest version, please restart Anki.' + ) + return True + except Exception as e: + QMessageBox.critical(self.window(), 'AnkiConnect', 'Failed to download latest version.') + raise e + + return False + + + @api() + def sync(self): + self.window().onSync() + + + @api() + def multi(self, actions): + return map(self.handler, actions) + + + # + # Decks + # + + @api() + def deckNames(self): + return self.decks().allNames() + + + @api() + def deckNamesAndIds(self): + decks = {} + for deck in self.deckNames(): + decks[deck] = self.decks().id(deck) + + return decks + + + @api() + def getDecks(self, cards): + decks = {} + for card in cards: + did = self.database().scalar('select did from cards where id=?', card) + deck = self.decks().get(did)['name'] + if deck in decks: + decks[deck].append(card) + else: + decks[deck] = [card] + + return decks + + + @api() + def createDeck(self, deck): + try: + self.startEditing() + did = self.decks().id(deck) + finally: + self.stopEditing() + + return did + + + @api() + def changeDeck(self, cards, deck): + self.startEditing() + + did = self.collection().decks.id(deck) + mod = anki.utils.intTime() + usn = self.collection().usn() + + # normal cards + scids = anki.utils.ids2str(cards) + # remove any cards from filtered deck first + self.collection().sched.remFromDyn(cards) + + # then move into new deck + self.collection().db.execute('update cards set usn=?, mod=?, did=? where id in ' + scids, usn, mod, did) + self.stopEditing() + + + @api() + def deleteDecks(self, decks, cardsToo=False): + try: + self.startEditing() + decks = filter(lambda d: d in self.deckNames(), decks) + for deck in decks: + did = self.decks().id(deck) + self.decks().rem(did, cardsToo) + finally: + self.stopEditing() + + + @api() + def getDeckConfig(self, deck): + if not deck in self.deckNames(): + return False + + collection = self.collection() + did = collection.decks.id(deck) + return collection.decks.confForDid(did) + + + @api() + def saveDeckConfig(self, config): + collection = self.collection() + + config['id'] = str(config['id']) + config['mod'] = anki.utils.intTime() + config['usn'] = collection.usn() + + if not config['id'] in collection.decks.dconf: + return False + + collection.decks.dconf[config['id']] = config + collection.decks.changed = True + return True + + + @api() + def setDeckConfigId(self, decks, configId): + configId = str(configId) + for deck in decks: + if not deck in self.deckNames(): + return False + + collection = self.collection() + if not configId in collection.decks.dconf: + return False + + for deck in decks: + did = str(collection.decks.id(deck)) + aqt.mw.col.decks.decks[did]['conf'] = configId + + return True + + + @api() + def cloneDeckConfigId(self, name, cloneFrom='1'): + configId = str(cloneFrom) + if not configId in self.collection().decks.dconf: + return False + + config = self.collection().decks.getConf(configId) + return self.collection().decks.confId(name, config) + + + @api() + def removeDeckConfigId(self, configId): + configId = str(configId) + collection = self.collection() + if configId == 1 or not configId in collection.decks.dconf: + return False + + collection.decks.remConf(configId) + return True + + + @api() + def storeMediaFile(self, filename, data): + self.deleteMediaFile(filename) + self.media().writeData(filename, base64.b64decode(data)) + + + @api() + def retrieveMediaFile(self, filename): + filename = os.path.basename(filename) + filename = normalize('NFC', filename) + filename = self.media().stripIllegal(filename) + + path = os.path.join(self.media().dir(), filename) + if os.path.exists(path): + with open(path, 'rb') as file: + return base64.b64encode(file.read()).decode('ascii') + + return False + + + @api() + def deleteMediaFile(self, filename): + self.media().syncDelete(filename) + + + @api() + def addNote(self, note): + ankiNote = self.createNote(note) + + audio = note.get('audio') + if audio is not None and len(audio['fields']) > 0: + data = download(audio['url']) + if data is not None: + if audio['skipHash'] is None: + skip = False + else: + m = hashlib.md5() + m.update(data) + skip = audio['skipHash'] == m.hexdigest() + + if not skip: + for field in audio['fields']: + if field in ankiNote: + ankiNote[field] += u'[sound:{}]'.format(audio['filename']) + + self.media().writeData(audio['filename'], data) + + collection = self.collection() + self.startEditing() + collection.addNote(ankiNote) + collection.autosave() + self.stopEditing() + + return ankiNote.id + + + @api() + def canAddNote(self, note): + try: + return bool(self.createNote(note)) + except: + return False + + + @api() + def updateNoteFields(self, note): + ankiNote = self.collection().getNote(note['id']) + if ankiNote is None: + raise Exception('note was not found: {}'.format(note['id'])) + + for name, value in note['fields'].items(): + if name in ankiNote: + ankiNote[name] = value + + ankiNote.flush() + + + @api() + def addTags(self, notes, tags, add=True): + self.startEditing() + self.collection().tags.bulkAdd(notes, tags, add) + self.stopEditing() + + + @api() + def removeTags(self, notes, tags): + return self.addTags(notes, tags, False) + + + @api() + def getTags(self): + return self.collection().tags.all() + + + @api() + def suspend(self, cards, suspend=True): + for card in cards: + if self.suspended(card) == suspend: + cards.remove(card) + + if len(cards) == 0: + return False + + scheduler = self.scheduler() + self.startEditing() + if suspend: + scheduler.suspendCards(cards) + else: + scheduler.unsuspendCards(cards) + self.stopEditing() + + return True + + + @api() + def unsuspend(self, cards): + self.suspend(cards, False) + + + @api() + def suspended(self, card): + card = self.collection().getCard(card) + return card.queue == -1 + + + @api() + def areSuspended(self, cards): + suspended = [] + for card in cards: + suspended.append(self.suspended(card)) + + return suspended + + + @api() + def areDue(self, cards): + due = [] + for card in cards: + if self.findCards('cid:{} is:new'.format(card)): + due.append(True) + else: + date, ivl = self.collection().db.all('select id/1000.0, ivl from revlog where cid = ?', card)[-1] + if ivl >= -1200: + duo.append(bool(self.findCards('cid:{} is:due'.format(card)))) + else: + due.append(date - ivl <= time()) + + return due + + + @api() + def getIntervals(self, cards, complete=False): + intervals = [] + for card in cards: + if self.findCards('cid:{} is:new'.format(card)): + intervals.append(0) + else: + interval = self.collection().db.list('select ivl from revlog where cid = ?', card) + if not complete: + interval = interval[-1] + intervals.append(interval) + + return intervals + + + + @api() + def modelNames(self): + return self.collection().models.allNames() + + + @api() + def modelNamesAndIds(self): + models = {} + for model in self.modelNames(): + models[model] = int(self.collection().models.byName(model)['id']) + + return models + + + @api() + def modelNameFromId(self, modelId): + model = self.collection().models.get(modelId) + if model is None: + raise Exception('model was not found: {}'.format(modelId)) + else: + return model['name'] + + + @api() + def modelFieldNames(self, modelName): + model = self.collection().models.byName(modelName) + if model is None: + raise Exception('model was not found: {}'.format(modelName)) + else: + return [field['name'] for field in model['flds']] + + + @api() + def modelFieldsOnTemplates(self, modelName): + model = self.collection().models.byName(modelName) + if model is None: + raise Exception('model was not found: {}'.format(modelName)) + + templates = {} + for template in model['tmpls']: + fields = [] + for side in ['qfmt', 'afmt']: + fieldsForSide = [] + + # based on _fieldsOnTemplate from aqt/clayout.py + matches = re.findall('{{[^#/}]+?}}', template[side]) + for match in matches: + # remove braces and modifiers + match = re.sub(r'[{}]', '', match) + match = match.split(':')[-1] + + # for the answer side, ignore fields present on the question side + the FrontSide field + if match == 'FrontSide' or side == 'afmt' and match in fields[0]: + continue + fieldsForSide.append(match) + + fields.append(fieldsForSide) + + templates[template['name']] = fields + + return templates + + + @api() + def deckNameFromId(self, deckId): + deck = self.collection().decks.get(deckId) + if deck is None: + raise Exception('deck was not found: {}'.format(deckId)) + else: + return deck['name'] + + + @api() + def findNotes(self, query=None): + if query is None: + return [] + else: + return self.collection().findNotes(query) + + + @api() + def findCards(self, query=None): + if query is None: + return [] + else: + return self.collection().findCards(query) + + + @api() + def cardsInfo(self, cards): + result = [] + for cid in cards: + try: + card = self.collection().getCard(cid) + model = card.model() + note = card.note() + fields = {} + for info in model['flds']: + order = info['ord'] + name = info['name'] + fields[name] = {'value': note.fields[order], 'order': order} + + result.append({ + 'cardId': card.id, + 'fields': fields, + 'fieldOrder': card.ord, + 'question': card._getQA()['q'], + 'answer': card._getQA()['a'], + 'modelName': model['name'], + 'deckName': self.deckNameFromId(card.did), + 'css': model['css'], + 'factor': card.factor, + #This factor is 10 times the ease percentage, + # so an ease of 310% would be reported as 3100 + 'interval': card.ivl, + 'note': card.nid + }) + except TypeError as e: + # Anki will give a TypeError if the card ID does not exist. + # Best behavior is probably to add an 'empty card' to the + # returned result, so that the items of the input and return + # lists correspond. + result.append({}) + + return result + + + @api() + def notesInfo(self, notes): + result = [] + for nid in notes: + try: + note = self.collection().getNote(nid) + model = note.model() + + fields = {} + for info in model['flds']: + order = info['ord'] + name = info['name'] + fields[name] = {'value': note.fields[order], 'order': order} + + result.append({ + 'noteId': note.id, + 'tags' : note.tags, + 'fields': fields, + 'modelName': model['name'], + 'cards': self.collection().db.list('select id from cards where nid = ? order by ord', note.id) + }) + except TypeError as e: + # Anki will give a TypeError if the note ID does not exist. + # Best behavior is probably to add an 'empty card' to the + # returned result, so that the items of the input and return + # lists correspond. + result.append({}) + + return result + + + + + + @api() + def cardsToNotes(self, cards): + return self.collection().db.list('select distinct nid from cards where id in ' + anki.utils.ids2str(cards)) + + + @api() + def guiBrowse(self, query=None): + browser = aqt.dialogs.open('Browser', self.window()) + browser.activateWindow() + + if query is not None: + browser.form.searchEdit.lineEdit().setText(query) + if hasattr(browser, 'onSearch'): + browser.onSearch() + else: + browser.onSearchActivated() + + return browser.model.cards + + + @api() + def guiAddCards(self): + addCards = aqt.dialogs.open('AddCards', self.window()) + addCards.activateWindow() + + + @api() + def guiReviewActive(self): + return self.reviewer().card is not None and self.window().state == 'review' + + + @api() + def guiCurrentCard(self): + if not self.guiReviewActive(): + raise Exception('Gui review is not currently active.') + + reviewer = self.reviewer() + card = reviewer.card + model = card.model() + note = card.note() + + fields = {} + for info in model['flds']: + order = info['ord'] + name = info['name'] + fields[name] = {'value': note.fields[order], 'order': order} + + if card is not None: + return { + 'cardId': card.id, + 'fields': fields, + 'fieldOrder': card.ord, + 'question': card._getQA()['q'], + 'answer': card._getQA()['a'], + 'buttons': [b[0] for b in reviewer._answerButtonList()], + 'modelName': model['name'], + 'deckName': self.deckNameFromId(card.did), + 'css': model['css'] + } + + + @api() + def guiStartCardTimer(self): + if not self.guiReviewActive(): + return False + + card = self.reviewer().card + + if card is not None: + card.startTimer() + return True + else: + return False + + + @api() + def guiShowQuestion(self): + if self.guiReviewActive(): + self.reviewer()._showQuestion() + return True + else: + return False + + + @api() + def guiShowAnswer(self): + if self.guiReviewActive(): + self.window().reviewer._showAnswer() + return True + else: + return False + + + @api() + def guiAnswerCard(self, ease): + if not self.guiReviewActive(): + return False + + reviewer = self.reviewer() + if reviewer.state != 'answer': + return False + if ease <= 0 or ease > self.scheduler().answerButtons(reviewer.card): + return False + + reviewer._answerCard(ease) + return True + + + @api() + def guiDeckOverview(self, name): + collection = self.collection() + if collection is not None: + deck = collection.decks.byName(name) + if deck is not None: + collection.decks.select(deck['id']) + self.window().onOverview() return True return False - @webApi() - def version(self): - return API_VERSION - - - @webApi() - def findNotes(self, query=None): - return self.anki.findNotes(query) - - - @webApi() - def findCards(self, query=None): - return self.anki.findCards(query) - - - @webApi() - def getDecks(self, cards): - return self.anki.getDecks(cards) - - - @webApi() - def createDeck(self, deck): - return self.anki.createDeck(deck) - - - @webApi() - def changeDeck(self, cards, deck): - return self.anki.changeDeck(cards, deck) - - - @webApi() - def deleteDecks(self, decks, cardsToo=False): - return self.anki.deleteDecks(decks, cardsToo) - - - @webApi() - def cardsToNotes(self, cards): - return self.anki.cardsToNotes(cards) - - - @webApi() - def guiBrowse(self, query=None): - return self.anki.guiBrowse(query) - - - @webApi() - def guiAddCards(self): - return self.anki.guiAddCards() - - - @webApi() - def guiCurrentCard(self): - return self.anki.guiCurrentCard() - - - @webApi() - def guiStartCardTimer(self): - return self.anki.guiStartCardTimer() - - - @webApi() - def guiAnswerCard(self, ease): - return self.anki.guiAnswerCard(ease) - - - @webApi() - def guiShowQuestion(self): - return self.anki.guiShowQuestion() - - - @webApi() - def guiShowAnswer(self): - return self.anki.guiShowAnswer() - - - @webApi() - def guiDeckOverview(self, name): - return self.anki.guiDeckOverview(name) - - - @webApi() + @api() def guiDeckBrowser(self): - return self.anki.guiDeckBrowser() + self.window().moveToState('deckBrowser') - @webApi() + @api() def guiDeckReview(self, name): - return self.anki.guiDeckReview(name) + if self.guiDeckOverview(name): + self.window().moveToState('review') + return True + else: + return False - @webApi() + @api() def guiExitAnki(self): - return self.anki.guiExitAnki() + timer = QTimer() + def exitAnki(): + timer.stop() + self.window().close() + timer.timeout.connect(exitAnki) + timer.start(1000) # 1s should be enough to allow the response to be sent. - @webApi() - def cardsInfo(self, cards): - return self.anki.cardsInfo(cards) + + @api() + def addNotes(self, notes): + results = [] + for note in notes: + try: + results.append(self.addNote(note)) + except Exception: + results.append(None) + + return results - @webApi() - def notesInfo(self, notes): - return self.anki.notesInfo(notes) + @api() + def canAddNotes(self, notes): + results = [] + for note in notes: + results.append(self.canAddNote(note)) - - @webApi() - def sync(self): - return self.anki.sync() + return results # -# Entry +# Entry # ac = AnkiConnect() diff --git a/test.sh b/test.sh new file mode 100755 index 0000000..1fa881b --- /dev/null +++ b/test.sh @@ -0,0 +1,3 @@ +#!/usr/bin/sh + +python -m unittest discover -v tests diff --git a/tests/test_cards.py b/tests/test_cards.py new file mode 100755 index 0000000..372002b --- /dev/null +++ b/tests/test_cards.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python + +import unittest +import util + + +class TestCards(unittest.TestCase): + def setUp(self): + util.invoke('createDeck', deck='test') + note = {'deckName': 'test', 'modelName': 'Basic', 'fields': {'Front': 'front1', 'Back': 'back1'}, 'tags': ['tag1']} + self.noteId = util.invoke('addNote', note=note) + + + def tearDown(self): + util.invoke('deleteDecks', decks=['test'], cardsToo=True) + + + def runTest(self): + # findCards + cardIds = util.invoke('findCards', query='deck:test') + self.assertEqual(len(cardIds), 1) + + # suspend + util.invoke('suspend', cards=cardIds) + + # areSuspended (part 1) + suspendedStates = util.invoke('areSuspended', cards=cardIds) + self.assertEqual(len(cardIds), len(suspendedStates)) + self.assertNotIn(False, suspendedStates) + + # unsuspend + util.invoke('unsuspend', cards=cardIds) + + # areSuspended (part 2) + suspendedStates = util.invoke('areSuspended', cards=cardIds) + self.assertEqual(len(cardIds), len(suspendedStates)) + self.assertNotIn(True, suspendedStates) + + # areDue + dueStates = util.invoke('areDue', cards=cardIds) + self.assertEqual(len(cardIds), len(dueStates)) + self.assertNotIn(False, dueStates) + + # getIntervals + util.invoke('getIntervals', cards=cardIds, complete=True) + util.invoke('getIntervals', cards=cardIds, complete=False) + + # cardsToNotes + noteIds = util.invoke('cardsToNotes', cards=cardIds) + self.assertEqual(len(noteIds), len(cardIds)) + self.assertIn(self.noteId, noteIds) + + # cardsInfo + cardsInfo = util.invoke('cardsInfo', cards=cardIds) + self.assertEqual(len(cardsInfo), len(cardIds)) + for i, cardInfo in enumerate(cardsInfo): + self.assertEqual(cardInfo['cardId'], cardIds[i]) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_decks.py b/tests/test_decks.py old mode 100644 new mode 100755 index f4a79e8..7edd95a --- a/tests/test_decks.py +++ b/tests/test_decks.py @@ -1,16 +1,71 @@ -# -*- coding: utf-8 -*- +#!/usr/bin/env python + import unittest -from unittest import TestCase -from util import callAnkiConnectEndpoint +import util -class TestDeckNames(TestCase): - def test_deckNames(self): - response = callAnkiConnectEndpoint({'action': 'deckNames'}) - self.assertEqual(['Default'], response) +class TestDecks(unittest.TestCase): + def runTest(self): + # deckNames (part 1) + deckNames = util.invoke('deckNames') + self.assertIn('Default', deckNames) -class TestGetDeckConfig(TestCase): - - def test_getDeckConfig(self): - response = callAnkiConnectEndpoint({'action': 'getDeckConfig', 'params': {'deck': 'Default'}}) - self.assertDictContainsSubset({'name': 'Default', 'replayq': True}, response) \ No newline at end of file + # deckNamesAndIds + result = util.invoke('deckNamesAndIds') + self.assertIn('Default', result) + self.assertEqual(result['Default'], 1) + + # createDeck + util.invoke('createDeck', deck='test1') + + # deckNames (part 2) + deckNames = util.invoke('deckNames') + self.assertIn('test1', deckNames) + + # changeDeck + note = {'deckName': 'test1', 'modelName': 'Basic', 'fields': {'Front': 'front', 'Back': 'back'}, 'tags': ['tag']} + noteId = util.invoke('addNote', note=note) + cardIds = util.invoke('findCards', query='deck:test1') + util.invoke('changeDeck', cards=cardIds, deck='test2') + + # deckNames (part 3) + deckNames = util.invoke('deckNames') + self.assertIn('test2', deckNames) + + # deleteDecks + util.invoke('deleteDecks', decks=['test1', 'test2'], cardsToo=True) + + # deckNames (part 4) + deckNames = util.invoke('deckNames') + self.assertNotIn('test1', deckNames) + self.assertNotIn('test2', deckNames) + + # getDeckConfig + deckConfig = util.invoke('getDeckConfig', deck='Default') + self.assertEqual('Default', deckConfig['name']) + + # saveDeckConfig + deckConfig = util.invoke('saveDeckConfig', config=deckConfig) + + # setDeckConfigId + setDeckConfigId = util.invoke('setDeckConfigId', decks=['Default'], configId=1) + self.assertTrue(setDeckConfigId) + + # cloneDeckConfigId (part 1) + deckConfigId = util.invoke('cloneDeckConfigId', cloneFrom=1, name='test') + self.assertTrue(deckConfigId) + + # removeDeckConfigId (part 1) + removedDeckConfigId = util.invoke('removeDeckConfigId', configId=deckConfigId) + self.assertTrue(removedDeckConfigId) + + # removeDeckConfigId (part 2) + removedDeckConfigId = util.invoke('removeDeckConfigId', configId=deckConfigId) + self.assertFalse(removedDeckConfigId) + + # cloneDeckConfigId (part 2) + deckConfigId = util.invoke('cloneDeckConfigId', cloneFrom=deckConfigId, name='test') + self.assertFalse(deckConfigId) + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_graphical.py b/tests/test_graphical.py new file mode 100755 index 0000000..97a68c8 --- /dev/null +++ b/tests/test_graphical.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python + +import unittest +import util + + +class TestGui(unittest.TestCase): + def runTest(self): + # guiBrowse + util.invoke('guiBrowse', query='deck:Default') + + # guiAddCards + util.invoke('guiAddCards') + + # guiCurrentCard + # util.invoke('guiCurrentCard') + + # guiStartCardTimer + util.invoke('guiStartCardTimer') + + # guiShowQuestion + util.invoke('guiShowQuestion') + + # guiShowAnswer + util.invoke('guiShowAnswer') + + # guiAnswerCard + util.invoke('guiAnswerCard', ease=1) + + # guiDeckOverview + util.invoke('guiDeckOverview', name='Default') + + # guiDeckBrowser + util.invoke('guiDeckBrowser') + + # guiExitAnki + # util.invoke('guiExitAnki') + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_media.py b/tests/test_media.py new file mode 100755 index 0000000..7870fbc --- /dev/null +++ b/tests/test_media.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python + +import unittest +import util + + +class TestMedia(unittest.TestCase): + def runTest(self): + filename = '_test.txt' + data = 'test' + + # storeMediaFile + util.invoke('storeMediaFile', filename='_test.txt', data=data) + + # retrieveMediaFile (part 1) + media = util.invoke('retrieveMediaFile', filename=filename) + self.assertEqual(media, data) + + # deleteMediaFile + util.invoke('deleteMediaFile', filename=filename) + + # retrieveMediaFile (part 2) + media = util.invoke('retrieveMediaFile', filename=filename) + self.assertFalse(media) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_misc.py b/tests/test_misc.py old mode 100644 new mode 100755 index 11c8651..e1b29a8 --- a/tests/test_misc.py +++ b/tests/test_misc.py @@ -1,10 +1,28 @@ -# -*- coding: utf-8 -*- +#!/usr/bin/env python + import unittest -from unittest import TestCase -from util import callAnkiConnectEndpoint +import util -class TestVersion(TestCase): - def test_version(self): - response = callAnkiConnectEndpoint({'action': 'version'}) - self.assertEqual(5, response) +class TestMisc(unittest.TestCase): + def runTest(self): + # version + self.assertEqual(util.invoke('version'), 5) + + # upgrade + util.invoke('upgrade') + + # sync + util.invoke('sync') + + # multi + actions = [util.request('version'), util.request('version'), util.request('version')] + results = util.invoke('multi', actions=actions) + self.assertEqual(len(results), len(actions)) + for result in results: + self.assertIsNone(result['error']) + self.assertEqual(result['result'], 5) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_models.py b/tests/test_models.py new file mode 100755 index 0000000..7f3c07c --- /dev/null +++ b/tests/test_models.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python + +import unittest +import util + + +class TestModels(unittest.TestCase): + def runTest(self): + # modelNames + modelNames = util.invoke('modelNames') + self.assertGreater(len(modelNames), 0) + + # modelNamesAndIds + modelNamesAndIds = util.invoke('modelNamesAndIds') + self.assertGreater(len(modelNames), 0) + + # modelFieldNames + modelFields = util.invoke('modelFieldNames', modelName=modelNames[0]) + + # modelFieldsOnTemplates + modelFieldsOnTemplates = util.invoke('modelFieldsOnTemplates', modelName=modelNames[0]) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_notes.py b/tests/test_notes.py new file mode 100755 index 0000000..eddac05 --- /dev/null +++ b/tests/test_notes.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python + +import unittest +import util + + +class TestNotes(unittest.TestCase): + def setUp(self): + util.invoke('createDeck', deck='test') + + + def tearDown(self): + util.invoke('deleteDecks', decks=['test'], cardsToo=True) + + + def runTest(self): + # addNote + note = {'deckName': 'test', 'modelName': 'Basic', 'fields': {'Front': 'front1', 'Back': 'back1'}, 'tags': ['tag1']} + noteId = util.invoke('addNote', note=note) + self.assertRaises(Exception, lambda: util.invoke('addNote', note=note)) + + # addTags + util.invoke('addTags', notes=[noteId], tags='tag2') + + # notesInfo (part 1) + noteInfos = util.invoke('notesInfo', notes=[noteId]) + self.assertEqual(len(noteInfos), 1) + noteInfo = noteInfos[0] + self.assertEqual(noteInfo['noteId'], noteId) + self.assertSetEqual(set(noteInfo['tags']), {'tag1', 'tag2'}) + self.assertEqual(noteInfo['fields']['Front']['value'], 'front1') + self.assertEqual(noteInfo['fields']['Back']['value'], 'back1') + + # getTags + allTags = util.invoke('getTags') + self.assertIn('tag1', allTags) + self.assertIn('tag2', allTags) + + # removeTags + util.invoke('removeTags', notes=[noteId], tags='tag2') + + # updateNoteFields + noteUpdate = {'id': noteId, 'fields': {'Front': 'front2', 'Back': 'back2'}} + util.invoke('updateNoteFields', note=noteUpdate) + + # notesInfo (part 2) + noteInfos = util.invoke('notesInfo', notes=[noteId]) + self.assertEqual(len(noteInfos), 1) + noteInfo = noteInfos[0] + self.assertSetEqual(set(noteInfo['tags']), {'tag1'}) + self.assertIn('tag1', noteInfo['tags']) + self.assertNotIn('tag2', noteInfo['tags']) + self.assertEqual(noteInfo['fields']['Front']['value'], 'front2') + self.assertEqual(noteInfo['fields']['Back']['value'], 'back2') + + notes = [ + {'deckName': 'test', 'modelName': 'Basic', 'fields': {'Front': 'front3', 'Back': 'back3'}, 'tags': ['tag']}, + {'deckName': 'test', 'modelName': 'Basic', 'fields': {'Front': 'front4', 'Back': 'back4'}, 'tags': ['tag']} + ] + + # canAddNotes (part 1) + noteStates = util.invoke('canAddNotes', notes=notes) + self.assertEqual(len(noteStates), len(notes)) + self.assertNotIn(False, noteStates) + + # addNotes (part 1) + noteIds = util.invoke('addNotes', notes=notes) + self.assertEqual(len(noteIds), len(notes)) + for noteId in noteIds: + self.assertNotEqual(noteId, None) + + # canAddNotes (part 2) + noteStates = util.invoke('canAddNotes', notes=notes) + self.assertNotIn(True, noteStates) + self.assertEqual(len(noteStates), len(notes)) + + # addNotes (part 2) + noteIds = util.invoke('addNotes', notes=notes) + self.assertEqual(len(noteIds), len(notes)) + for noteId in noteIds: + self.assertEqual(noteId, None) + + # findNotes + noteIds = util.invoke('findNotes', query='deck:test') + self.assertEqual(len(noteIds), len(notes) + 1) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/util.py b/tests/util.py index bf121a0..3435b97 100644 --- a/tests/util.py +++ b/tests/util.py @@ -1,11 +1,23 @@ import json -import urllib import urllib2 -def callAnkiConnectEndpoint(data): - url = 'http://docker:8888' - dumpedData = json.dumps(data) - req = urllib2.Request(url, dumpedData) - response = urllib2.urlopen(req).read() - responseData = json.loads(response) - return responseData \ No newline at end of file +API_VERSION = 5 +API_URL = 'http://localhost:8765' + + +def request(action, **params): + return {'action': action, 'params': params, 'version': API_VERSION} + + +def invoke(action, **params): + requestJson = json.dumps(request(action, **params)) + response = json.load(urllib2.urlopen(urllib2.Request(API_URL, requestJson))) + if len(response) != 2: + raise Exception('response has an unexpected number of fields') + if 'error' not in response: + raise Exception('response is missing required error field') + if 'result' not in response: + raise Exception('response is missing required result field') + if response['error'] is not None: + raise Exception(response['error']) + return response['result'] diff --git a/build_zip.sh b/zip.sh similarity index 100% rename from build_zip.sh rename to zip.sh