#!/usr/bin/env python3
"""
Anemone 1.76 (http://ssb22.user.srcf.net/anemone)
(c) 2023-24 Silas S. Brown. License: Apache 2
To use this module, either run it from the command
line, or import it and use the anemone() function.
"""
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Where to find history:
# on GitHub at https://github.com/ssb22/indexer
# and on GitLab at https://gitlab.com/ssb22/indexer
# and on BitBucket https://bitbucket.org/ssb22/indexer
# and at https://gitlab.developers.cam.ac.uk/ssb22/indexer
# and in China: https://gitee.com/ssb22/indexer
def anemone(*files,**options) -> list[str]:
"""This function can be called by scripts that
import anemone: simply put the equivalent of
the command line into 'files' and 'options'.
You can also specify a JSON dictionary instead
of the name of a JSON file, and/or an HTML
string instead of the name of an HTML file
(this can also be done on the command line
with careful quoting).
Additionally, you can set the options
warning_callback, info_callback and/or
progress_callback. These are Python callables
to log an in-progress conversion (useful for
multi-threaded UIs). warning_callback and
info_callback each take a string, and
progress_callback takes an integer percentage.
If warning_callback or info_callback is set,
the corresponding information is not written
to standand error.
If you do not give this function any arguments
it will look at the system command line.
Return value is a list of warnings, if any."""
try:
R=Run(*files,**options)
R.check() ; R.import_libs(files)
R.write_all(R.get_texts())
return R.warnings
except AnemoneError: raise
except Exception as e: # make sure wrapped in AnemoneError
if __name__=="__main__" or "ANEMONE_DEBUG" in os.environ:
sys.stderr.write(traceback.format_exc())
tb = e.__traceback__
while tb.tb_next: tb = tb.tb_next
raise AnemoneError(f"Unhandled {e.__class__.__name__}: {e} at line {tb.tb_lineno} (v{version})")
def populate_argument_parser(args) -> None:
"""Calls add_argument on args, with the names
of all Anemone command-line options, which are
also options for anemone(), and help text.
This is also used for runtime module help."""
args.add_argument("files",metavar="file",
nargs="+",help="""
file name of: an MP3 or WAV recording, a text file
containing its title (if no full text), an XHTML
file containing its full text, a JSON file
containing its time markers (or text plus time in
JSON transcript format), or the output ZIP file.
Only one output file may be specified, but any
number of the other files can be included; URLs
may be given if they are to be fetched. If only
sound files are given then titles are taken from
their filenames. You may also specify @filename
where filename contains a list of files one per
line.""")
args.add_argument("--lang",default="en",
help="""
the ISO 639 language code of the publication (defaults to en for English)""")
args.add_argument("--title",default="",help="the title of the publication")
args.add_argument("--url",default="",help="the URL or ISBN of the publication")
args.add_argument("--creator",default="",help="the creator name, if known")
args.add_argument("--publisher",default="",help="the publisher name, if known")
args.add_argument("--reader",default="",
help="""
the name of the reader who voiced the recordings, if known""")
args.add_argument("--date",help="the publication date as YYYY-MM-DD, default is current date")
args.add_argument("--marker-attribute",
default="data-pid",help="""
the attribute used in the HTML to indicate a
segment number corresponding to a JSON time marker
entry, default is data-pid""")
args.add_argument("--marker-attribute-prefix",
default="", help="""
When extracting all text for chapters that don't
have timings, ignore any marker attributes whose
values don't start with the given prefix""")
args.add_argument("--page-attribute",
default="data-no",help="""
the attribute used in the HTML to indicate a page number, default is data-no""")
args.add_argument("--image-attribute",
default="data-zoom",help="""
the attribute used in the HTML to indicate an
absolute image URL to be included in the DAISY
file, default is data-zoom""")
args.add_argument("--refresh",
action="store_true",help="""
if images etc have already been fetched from URLs, ask the server if they should be fetched again (use If-Modified-Since)""")
args.add_argument("--cache",
default="cache",help="""
path name for the URL-fetching cache (default
'cache' in the current directory; set to empty
string if you don't want to save anything); when
using anemone as a module, you can instead pass in
a requests_cache session object if you want that
to do it instead""")
args.add_argument("--reload",dest="refetch",
action="store_true",help="""
if images etc have already been fetched from URLs,
fetch them again without If-Modified-Since""")
args.add_argument("--delay",default=0,help="""
minimum number of seconds between URL fetches (default none)""")
args.add_argument("--retries",default=0,help="""
number of times to retry URL fetches on timeouts
and unhandled exceptions (default no retries)""")
args.add_argument("--user-agent",default=f"Mozilla/5.0 (compatible, {' '.join(generator.split()[:2])})",help="User-Agent string to send for URL fetches")
args.add_argument("--daisy3",
action="store_true",help="""
Use the Daisy 3 format (ANSI/NISO Z39.86) instead
of the Daisy 2.02 format. This may require more
modern reader software, and Anemone does not yet
support Daisy 3 only features like tables.""")
args.add_argument("--mp3-recode",
action="store_true",help="""
re-code the MP3 files to ensure they are constant
bitrate and more likely to work with the more
limited DAISY-reading programs like FSReader 3
(this requires LAME or miniaudio/lameenc)""")
args.add_argument("--squash",
action="store_true",help="""
re-code MP3 files to 16kHz, resulting in a smaller
DAISY but at the expense of reduced sound quality.
This requires LAME or miniaudio/lameenc.
If Pillow/PIL is available too, reduce the sizes
of any large images. Use this option only if
space is at a premium and you don't mind a severe
loss of audio and image quality.""")
args.add_argument("--aac",
action="store_true",help="""
use AAC instead of MP3. This DOES NOT PLAY on all
DAISY readers, so use this only if you know your
reader supports it! It saves some space at normal
size and slightly increases quality at squash.
It requires fdkaac binary + miniaudio library,
or afconvert binary. It also requires the daisy3 setting.
The current implementation of this creates temporary files:
on Unix set TMPDIR if you want to put them somewhere other
than the default temporary directory, for example on a
RAMdisk.""")
# (faac/pyfaac can't do CBR and Daisy 3 specs say players aren't required to be able to cope with VBR, so need fdkaac/afconvert for AAC, assuming ffmpeg native AAC coder isn't up to it)
# Thorium can play AAC audio; Dolphin EasyReader 11 on Android cannot play it; others not tested
args.add_argument("--allow-jumps",
action="store_true",help="""
Allow jumps in heading levels e.g. h1 to h3 if the
input HTML does it. This seems OK on modern
readers but might cause older reading devices to
give an error. Without this option, headings are
promoted where necessary to ensure only
incremental depth increase.""") # might cause older reading devices to give an error: and is also flagged up by the validator
args.add_argument("--strict-ncc-divs",
action="store_true",help="""
When generating Daisy 2, avoid using a heading in
the navigation control centre when there isn't a
heading in the text. This currently applies when
spans with verse numbering are detected. Turning
on this option will make the DAISY more conformant
to the specification, but some readers (EasyReader
10, Thorium) won't show these headings in the
navigation in Daisy 2 (but will show them anyway
in Daisy 3, so this option is applied
automatically in Daisy 3). On the other hand,
when using verse-numbered spans without this
option, EasyReader 10 may not show any text at all
in Daisy 2 (Anemone will warn if this is the
case). This setting cannot stop EasyReader
promoting all verses to headings (losing paragraph
formatting) in Daisy 3, which is the least bad
option if you want these navigation points to
work.""")
args.add_argument("--merge-books",
default="",help="""
Combine multiple books into one, for saving media
on CD-based DAISY players that cannot handle more
than one book. The format of this option is
book1/N1,book2/N2,etc where book1 is the book
title and N1 is the number of MP3 files to group
into it (or if passing the option into the anemone
module, you may use a list of tuples). All
headings are pushed down one level and book name
headings are added at top level.""")
args.add_argument("--chapter-titles",
default="",help="""
Comma-separated list of titles to use for chapters
that don't have titles, e.g. 'Chapter N' in the
language of the book (this can help for
search-based navigation). If passing this option
into the anemone module, you may use a list
instead of a comma-separated string, which might
be useful if there are commas in some chapter
titles. Use blank titles for chapters that
already have them in the markup.""")
args.add_argument("--chapter-heading-level",default=1,help="Heading level to use for chapters that don't have titles")
args.add_argument("--warnings-are-errors",action="store_true",help="Treat warnings as errors")
args.add_argument("--ignore-chapter-skips",action="store_true",help="Don't emit warnings or errors about chapter numbers being skipped")
args.add_argument("--dry-run",action="store_true",help="Don't actually output DAISY, just check the input and parameters")
args.add_argument("--version",action="store_true",help="Just print version number and exit (takes effect only if called from the command line)")
generator=__doc__.strip().split('\n')[0] # string we use to identify ourselves in HTTP requests and in Daisy files
def get_argument_parser():
"populates an ArgumentParser for Anemone"
from argparse import ArgumentParser
if __name__=="__main__": AP=ArgumentParser
else:
# module: wrap it so errors don't exit
class AP(ArgumentParser):
def error(self, message):
raise AnemoneError(message)
args = AP(
prog="anemone",
description=generator,
fromfile_prefix_chars='@')
populate_argument_parser(args)
return args
import time, sys, os, re, json, traceback, tempfile
if __name__ == "__main__" and "--version" in sys.argv:
print (generator)
raise SystemExit
import textwrap, inspect
from collections import namedtuple as NT
from functools import reduce
from subprocess import run, PIPE
from zipfile import ZipFile, ZIP_DEFLATED
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import cpu_count
from urllib.request import urlopen,Request
from urllib.error import HTTPError
from urllib.parse import unquote
from pathlib import Path # Python 3.5+
from shutil import which
from io import BytesIO
class DocUpdater:
"""Utility class to copy the argument parser
help text into the module help text, so you
can read via: import anemone; help(anemone)"""
def __init__(self,p):
self.p = p
self.p.__doc__ += "\nOptions when run as a command-line utility:\n"
def add_argument(self,*args,**kwargs):
self.p.__doc__ += f"\n* {(chr(10)+' ').join(textwrap.wrap((args[0]+': ' if args[0].startswith('--') else '')+re.sub(chr(10)+' *',' ',kwargs['help']).strip(),50))}\n"
populate_argument_parser(DocUpdater(
sys.modules[__name__]))
def error(m) -> None:
"""Anemone error handler. If running as an
application, print message and error-exit. If
running as a module, raise an AnemoneError."""
if __name__=="__main__": sys.stderr.write(f"Error: {m}\n"),sys.exit(1)
else: raise AnemoneError(str(m))
class AnemoneError(Exception):
# "About the beach are a broken spar, A pale anemone's torn sea-star
# And scattered scum of the waves' old war, as the tide comes tumbling in." - Cale Young Rice
"""This exception type is used by Anemone to
signal parameter errors etc to its caller when
it is being called as a module. If the
environment variable ANEMONE_DEBUG is set,
it will also report its construction on the
standard error stream regardless of how the
calling code handles it (might be useful to
diagnose misbehaving calling environments)"""
def __init__(self, message):
Exception.__init__(self,message)
import os # might not have been imported yet
if "ANEMONE_DEBUG" in os.environ:
sys.stderr.write(f"Got AnemoneError: {message}\n")
# These must be defined before Run for type hints:
PageInfo = NT('PageInfo',['duringId','pageNo'])
TagAndText = NT('TagAndText',['tag','text'])
TextsAndTimesWithPages = NT('TextsAndTimesWithPages',['textsAndTimes','pageInfos'])
ChapterTOCInfo = NT('ChapterTOCInfo',['hTag','hLine','itemNo'])
BookTOCInfo = NT('BookTOCInfo',['hTag','hLine','recNo','itemNo'])
del NT
class Run():
"""The parameters we need for an Anemone run.
Constructor can either parse args from the
command line, or from anemone() caller."""
def __init__(self,*inFiles,**kwargs):
R = self
R.audioData,R.filenameTitles = [],[]
R.jsonData = []
R.textData,R.htmlData = [],[]
R.imageFiles,R.outputFile = [],None
R.warnings = []
if inFiles: # being called as a module
R.__dict__.update(
get_argument_parser().parse_args(
["placeholder"] +
[a for k,v in kwargs.items()
for a in
['--'+k.replace('_','-'),str(v)]
if type(v) in [str,int]])
.__dict__)
R.files = inFiles # may mix dict + string, even in same category especially if using "skip", so don't separate types
else: # being called from the command line
R.__dict__.update(get_argument_parser().parse_args().__dict__)
R.__dict__.update((k,v)
for k,v in kwargs.items()
if type(v) not in [str,int,type(None)]) # (None means keep the default from parse_args; boolean and bytes we might as well handle directly; list e.g. merge_books should bypass parser; ditto session object for cache, a type we can't even name here if requests_cache is not installed)
for k in ['merge_books','chapter_titles']:
if not isinstance(R.__dict__[k],list):
if not isinstance(R.__dict__[k],str): error(f"{k} must be Python list or comma-separated string")
R.__dict__[k]=R.__dict__[k].split(',') # comma-separate if coming from the command line, but allow lists to be passed in to the module
if R.__dict__[k]==['']:
R.__dict__[k] = []
try: R.bookTitlesAndNumChaps = [
(n,int(v))
for n,v in [
(b if isinstance(b,tuple)
else b.split('/'))
for b in R.merge_books if b]]
except: error(f"Unable to parse merge-books={R.merge_books}") # noqa: E722
R.progress_loopStart(len(R.files),15)
for i,f in enumerate(R.files):
R.progress(i)
fOrig = f
if isinstance(f,dict):
# support direct JSON pass-in as dict
R.jsonData.append(f)
R.check_for_JSON_transcript()
continue
elif f=="skip": # we're 'full text some audio' and we're skipping audio for a chapter
R.jsonData.append(None) ; continue
elif isinstance(f,str) and f.lower().endswith(f"{os.extsep}zip"):
if R.outputFile: error(f"Only one {os.extsep}zip output file may be specified")
R.outputFile = f ; continue
elif isinstance(f,str) and re.match("https?://",f):
try: f=fetch(f,R.cache,R.refresh,R.refetch,R.delay,R.user_agent,R.retries,R)
except HTTPError as e:
error(f"Unable to fetch {f}: {e}")
elif delimited(f,'{','}'): pass
elif delimited(f,'<','>'): pass
elif not os.path.isfile(f): error(f"File not found: {f}")
else: f = open(f,"rb").read()
if delimited(f,'{','}'):
try: f = json.loads(f)
except: error(f"Could not parse JSON {fOrig}") # noqa: E722
R.jsonData.append(f)
R.check_for_JSON_transcript()
elif delimited(f,'<','>'):
R.htmlData.append(f)
elif fOrig.lower().endswith(f"{os.extsep}mp3") or fOrig.lower().endswith(f"{os.extsep}wav"):
R.audioData.append(f)
if R.has_old_mutagen() and fOrig.lower().endswith(f"{os.extsep}mp3"):
# Mutagen 1.46 and below can't always auto-detect MP3 files, so we trust the extension if we're not on 1.47+
R.old_mutagen.add(f)
R.filenameTitles.append(fOrig[:fOrig.rindex(os.extsep)])
elif fOrig.lower().endswith(f"{os.extsep}txt"):
try: f = f.decode('utf-8').strip()
except UnicodeDecodeError: error(f"Couldn't decode {fOrig} as UTF-8")
R.textData.append(f)
else: error(f"Format of '{fOrig}' has not been recognised")
R.progress(len(R.files))
if R.htmlData: # check for text-only DAISY:
if not R.jsonData:
R.jsonData=[None]*len(R.htmlData)
if not R.audioData:
R.audioData=[None]*len(R.htmlData)
# and check for full-text part-audio DAISY:
if len(R.jsonData) > len(R.audioData):
for c,i in enumerate(R.jsonData):
if i is None:
R.audioData.insert(c,None)
if not R.outputFile:
R.outputFile=f"output_daisy{os.extsep}zip"
if not R.title: R.title=re.sub("(?i)[ _-]daisy[0-9]?$","",R.outputFile.replace(f"{os.extsep}zip",""))
if R.squash or R.aac: R.mp3_recode = True
def check(self) -> None:
"""Checks we've got everything.
You may omit calling this if you're creating
a temporary Run just to call something like
check_for_JSON_transcript and get its HTML."""
R = self
if R.htmlData and any(a and not j for a,j in zip(R.audioData,R.jsonData)): error("Full text and audio without time markers is not yet implemented (but you can give an empty markers list if you want to combine a whole chapter into one navigation point)")
if R.jsonData and not R.htmlData: error("Time markers without full text is not implemented")
if R.htmlData and R.textData: error("Combining full text with title-only text files is not yet implemented. Please specify full text for everything or just titles for everything, not both.")
if R.jsonData and not len(R.audioData)==len(R.jsonData): error(f"If JSON marker files are specified, there must be exactly one JSON file for each recording file. We got f{len(R.jsonData)} JSON files and f{len(R.audioData)} recording files.")
if R.textData and not len(R.audioData)==len(R.textData): error(f"If text files are specified, there must be exactly one text file for each recording file. We got f{len(R.textData)} text files and f{len(R.audioData)} recording files.")
if R.htmlData and not len(R.audioData)==len(R.htmlData): error(f"If HTML documents with audio are specified, there must be exactly one HTML document for each recording. We got f{len(R.htmlData)} HTML documents and f{len(R.audioData)} recordings.")
if R.htmlData and not len(R.htmlData)==len(dict.fromkeys(R.htmlData).keys()): error("Duplicate HTML input documents")
if not R.htmlData and not R.textData and not R.audioData: error("No input given")
if not re.match("[a-z]{2,3}($|-)",R.lang): R.warning(f"lang '{R.lang}' doesn't look like a valid ISO-639 language code") # this should perhaps be an error
if R.date and not re.match("([+-][0-9]*)?[0-9]{4}-[01][0-9]-[0-3][0-9]$",R.date): error("date (if set) should be in ISO 8601's YYYY-MM-DD format")
if R.aac and not R.daisy3: error("aac requires daisy3")
s = set()
for t in ['marker_attribute',
'page_attribute',
'image_attribute']:
v=R.__dict__[t] ; s.add(v)
if not re.match(r"[^"+chr(0)+"- /=>'"+'"'+"]+$",v) or '\n' in v:
if __name__=="__main__":
t="--"+t.replace("_","-")
error(f"{t} must be a valid HTML attribute name")
if not len(s)==3: error("marker_attribute, page_attribute and image_attribute must be different")
# Run constructor guarantees R.outputFile ends with ".zip", so we don't need to check that here
if "daisy" not in R.outputFile[:-4]: R.warning(f"Output filename {repr(R.outputFile)} does not contain 'daisy'")
if R.outputFile==f"output_daisy{os.extsep}zip": R.warning(f"Outputting to default filename {repr(R.outputFile)}. It's better to set an output filename that identifies the publication.")
if not re.sub("[._-]","",R.outputFile[:-4].replace("daisy","")): R.warning(f"Output filename {repr(R.outputFile)} does not seem to contain any meaningful publication identifier")
if re.search('[%&?@*#{}<>!:+`=|$]',R.outputFile): R.warning(f"Output filename {repr(R.outputFile)} contains characters not allowed on Microsoft Windows")
if re.search('[ "'+"']",R.outputFile): R.warning(f"Space or quote in output filename may complicate things for command-line users: {repr(R.outputFile)}")
def import_libs(self,files) -> None:
"""Checks availability of, and imports, the
libraries necessary for our run. Not all
of them are needed on every run: for
example, if we're making a DAISY book with
no audio, we won't need Mutagen."""
R = self
global mutagen, BeautifulSoup
if R.audioData and any(R.audioData):
try: import mutagen
except ImportError: error('Anemone needs the Mutagen library to determine play lengths.\nPlease do: pip install mutagen\nIf you are unable to use pip, it may also work to download mutagen source and move its "mutagen" directory to the current directory.')
if R.htmlData:
try: from bs4 import BeautifulSoup
except ImportError: error('Anemone needs the beautifulsoup4 library to parse HTML.\nPlease do: pip install beautifulsoup4\nIf you are unable to use pip, it may also work to download beautifulsoup4 source and move its "bs4" directory to the current directory.')
if R.mp3_recode or any(f.strip().lower().
endswith(
f"{os.extsep}wav")
for f in files
if isinstance(f,str)):
if R.aac: check_we_got_AAC(any(f.strip().lower().endswith(f"{os.extsep}mp3") for f in files))
else: check_we_got_LAME()
def has_old_mutagen(self) -> bool:
"""Detect Mutagen 1.46 or earlier. This is supplied with
Ubuntu 24.04 LTS (non-pip) and cannot detect MP3 files
unless they have ID3 tags, so we be more careful if that
old version is on the system."""
try: return self.old_mutagen
except AttributeError: pass
global mutagen
try: import mutagen
except ImportError: return # we'll error later (above)
if mutagen.version < (1,47,0):
self.old_mutagen = set([True])
else: self.old_mutagen = set()
return self.old_mutagen
def MFile(self,dat):
if dat in self.old_mutagen:
from mutagen import mp3
return mp3.MP3(BytesIO(dat)) # override autodetect
else: return mutagen.File(BytesIO(dat))
def warning(self,warningText:str) -> None:
"""Logs a warning (or an error if
warnings_are_errors is set)"""
if self.warnings_are_errors:error(warningText)
self.warnings.append(warningText)
if not __name__=="__main__" and "ANEMONE_DEBUG" in os.environ:
sys.stderr.write(f"Anemone-Warning: {warningText}\n") # even if warning_callback overridden, do this as well
self.warning_callback(warningText)
def info(self,text:str,newline:bool=True)->None:
if "info_callback" in self.__dict__:
if text: self.info_callback(text)
if "ANEMONE_DEBUG" not in os.environ: return
sys.stderr.write(f"{text}{chr(10) if newline else ''}")
sys.stderr.flush()
def warning_callback(self,text:str) -> None:
"overridden by passing in a callable"
sys.stderr.write(f"WARNING: {text}\n")
def progress_loopStart(self,i:int,frac:int) -> None:
"""Helps with progress logging when
progress_callback is set. Prepares for a
loop of 'i' iterations to take 'frac' %
of the total progress."""
if "next_base_percent" in self.__dict__:
self.base_percent = self.next_base_percent
else: self.base_percent = 0
self.prog_N,self.prog_D = frac,max(1,i)
if not i: self.progress(1) # empty loop: just say all 'done'
self.next_base_percent = self.base_percent + frac
assert self.next_base_percent <= 100
def progress(self,i:int) -> None:
"logs progress if progress_callback is set"
if "progress_callback" not in self.__dict__:
return
if "old_progress" not in self.__dict__:
self.old_progress = 0
percentage = self.base_percent + int(
i * self.prog_N / self.prog_D)
if percentage <= self.old_progress: return
self.progress_callback(percentage)
self.old_progress = percentage
def check_for_JSON_transcript(self) -> None:
"""Checks to see if the last thing added to
the Run object is a JSON podcast transcript,
and converts it to HTML + time markers"""
R = self
if isinstance(R.jsonData[-1].get(
"segments",None),list) and all(
isinstance(s,dict) and
"startTime" in s and "body" in s
for s in R.jsonData[-1]["segments"]): # looks like JSON transcript format instead of markers format
curSpeaker=None ; bodyList = []
for s in R.jsonData[-1]["segments"]:
bodyList.append(s["body"])
s=s.get("speaker",curSpeaker)
if not s==curSpeaker:
curSpeaker,bodyList[-1] = s,f"[{s}] {bodyList[-1]}"
if len(bodyList)>1:
bodyList[-2] += " "
R.htmlData.append(' '.join(
f'{c}' for i,c in enumerate(bodyList) if c))
R.jsonData[-1]={"markers":[
{"id":f"{i}","time":t}
for i,t in enumerate(
s["startTime"] for s in R.jsonData[-1]["segments"])
if bodyList[i]]}
def get_null_jsonData(self,h) -> dict:
"""Generate no-audio JSON for internal use
when making text-only DAISY files from HTML,
or no-timings JSON for an audio+text chapter.
In this case we assume any element with any
marker-attribute should be extracted."""
if self.marker_attribute == 'id' and not self.marker_attribute_prefix and not hasattr(self,"warned_about_prefix"):
self.warning("You've set marker attribute to 'id' and have chapters without audio-timing data. This results in ALL text with ANY id being extracted for those chapters. To avoid including site 'furniture' you might want to set marker_attribute_prefix.")
self.warned_about_prefix = True
return {'markers':
[{'id':i[self.marker_attribute],'time':0}
for i in BeautifulSoup(h,'html.parser')
.find_all(**{self.marker_attribute:
True})
if i[self.marker_attribute].startswith(
self.marker_attribute_prefix)]}
def get_texts(self) -> list:
"""Gets the text markup required for the run,
extracting it from HTML (guided by JSON IDs)
if we need to do that."""
R = self
if R.textData: return R.textData # section titles only, from text files
elif not R.htmlData: return R.filenameTitles # section titles only, from sound filenames
recordingTexts = []
for h,j in zip(R.htmlData,R.jsonData):
if j is None: # skip audio this chapter
j = R.get_null_jsonData(h)
include_alt_tags_in_text = True
else: include_alt_tags_in_text = False # because we won't be able to match it up to the audio
markers = j['markers']
if not markers: # empty markers list
markers = R.get_null_jsonData(h)['markers']
# - rely on merge0lenSpans to merge whole chapter
want_pids = [jsonAttr(m,"id") for m in markers]
extractor = PidsExtractor(R,want_pids)
extractor.handle_soup(
BeautifulSoup(h, 'html.parser'),
include_alt=include_alt_tags_in_text)
rTxt = []
for i in range(len(markers)):
rTxt.append(parseTime(jsonAttr(markers[i],"time")))
if want_pids[i] in extractor.id_to_content:
tag,content = extractor.id_to_content[want_pids[i]]
content = ''.join(content).strip()
rTxt.append(TagAndText(tag,content_fixes(content)))
else:
R.warning(f"JSON {len(recordingTexts)+1} marker {i+1} marks paragraph ID {want_pids[i]} which is not present in HTML {len(recordingTexts)+1}. Anemone will make this a blank paragraph.")
rTxt.append(TagAndText('p',''))
recordingTexts.append(
TextsAndTimesWithPages(rTxt,extractor.pageNos))
return recordingTexts
def write_all(self,recordingTexts) -> None:
"""Writes the DAISY zip and everything in it.
Each item of recordingTexts is either 1 text
for section title of whole recording, or a
TextsAndTimesWithPages i.e. ([TagAndText,time,
TagAndText,time,TagAndText],[PageInfo,...])"""
R = self
assert len(R.audioData) == len(recordingTexts)
assert recordingTexts
headings = R.getHeadings(recordingTexts)
if R.dry_run:
return R.info(
f"""Dry run: {len(R.warnings) if
R.warnings else 'no'} warning{'' if
len(R.warnings)==1 else 's'
} for {R.outputFile}\n""")
merge0lenSpans(recordingTexts,headings,R.audioData)
if R.mp3_recode and any(R.audioData) or any(
'audio/mp3' not in R.MFile(dat).mime for dat in R.audioData if dat): # parallelise lame if possible
if not __name__=="__main__":
R.info(
f"Making {R.outputFile}...") # especially if repeatedly called as a module, better print which outputFile we're working on BEFORE the mp3s as well as after
executor = ThreadPoolExecutor(
max_workers=cpu_count())
recordingTasks=[(executor.submit(
(recodeAAC if R.aac else recodeMP3 if
R.mp3_recode or
'audio/mp3' not in R.MFile(dat).mime
else lambda x,r:x),
dat, R) if dat else None)
for dat in R.audioData]
else: executor,recordingTasks = None,None
try: R.write_all0(recordingTexts,headings,recordingTasks)
except: # unhandled exception: clean up
try: executor.shutdown(wait=False,cancel_futures=False) # (cancel_futures is Python 3.9+)
except: pass # (no executor / can't do it) # noqa: E722
try: os.remove(R.outputFile) # incomplete
except: pass # noqa: E722
raise
def write_all0(self,recordingTexts,headings,recordingTasks) -> None:
"Service method for write_all"
R = self
if os.sep in R.outputFile:
Path(R.outputFile[:R.outputFile.rindex(
os.sep)]).mkdir(parents=True,
exist_ok=True)
z = ZipFile(R.outputFile,"w",ZIP_DEFLATED,
True)
R.dataSectors = R.catalogueEntries = 0
def writestr(n,s):
if isinstance(s,str):
L = len(s.encode('utf-8'))
else: L = len(s) # bytes or bytearray
R.dataSectors += (L+2047)//2048 # ISO 9660 sectors on a CD-ROM
R.catalogueEntries += 1
# Although 50+ files per catalogue sector is possible on 8.3, RockRidge can reduce this to 8, and Joliet adds a separate directory so we'd better double the expected number of cat sectors
# Also 16 sectors are unused before start
# 333,000 sectors on original 650M CD-ROM,
# we can probably increase that if 650M CDs are not in use, but some non-CD readers can still go wrong when files greatly exceed this size
if 2*((R.catalogueEntries+7)//8) + R.dataSectors + 16 > 333000 \
and not hasattr(R,"warnedFull"):
R.warnedFull = True
R.warning(f"{R.outputFile} is too big for some DAISY readers")
z.writestr(n,s)
def D(s): return s.replace("\n","\r\n") # in case old readers require DOS line endings
hasFullText = any(
isinstance(t,TextsAndTimesWithPages)
for t in recordingTexts)
if hasFullText: writestr("0000.txt",D(f"""
If you're reading this, it likely means your
operating system has unpacked the ZIP file
and is showing you its contents. While it
is possible to extract recordings and text
this way, it is better to send the whole ZIP
to a DAISY reader so that its recordings and
text can be connected with each other. If
you are using EasyReader on a mobile device,
close this file and navigate up a level to
find the original ZIP file so it can be sent
to EasyReader as a whole. Some other DAISY
readers need to be pointed at the {'OPF' if R.daisy3 else 'NCC'} file
instead, or at the whole directory/folder.
- This message was added by the DAISY tool
{generator}
not by the producers of the DAISY publication.
""")) # (it's iOS users that need this apparently, as Apple systems see the zip extension and automatically unpack it. But we do need to manually unpack if writing to a CD-ROM for old devices.)
# Have asked on https://apple.stackexchange.com/questions/474687/can-i-modify-my-zip-so-ios-wont-auto-unpack-it-on-download
secsSoFar = 0
durations = [] ; curP = 1
R.progress_loopStart(len(recordingTexts),70)
for recNo in range(1,len(recordingTexts)+1):
rTxt = recordingTexts[recNo-1]
secsThisRecording = R.MFile(R.audioData[recNo-1]
).info.length if R.audioData[recNo-1] else 0
if secsThisRecording > 3600:
R.warning(f"""Recording {recNo
} is long enough to cause ~{
secsThisRecording*.0001:.1f
}sec synchronisation error on some readers""") # seems lame v3.100 can result in timestamps being effectively multiplied by ~1.0001 on some players but not all, causing slight de-sync on 1h+ recordings (bladeenc may avoid this but be lower quality overall; better to keep the recordings shorter if possible)
durations.append(secsThisRecording)
if R.audioData[recNo-1]:
if recordingTasks is not None:
R.info(f"""Adding {
recNo:04d}.{'mp4' if R.aac else 'mp3'}...""",False)
writestr(f"{recNo:04d}.{'mp4' if R.aac else 'mp3'}",
R.audioData[recNo-1]
if recordingTasks is None
else recordingTasks[recNo-1].result())
if recordingTasks is not None:
R.info(" done")
writestr(f'{recNo:04d}.smil',D(
R.section_smil(recNo,secsSoFar,
secsThisRecording,curP,
rTxt.textsAndTimes if
isinstance(rTxt,
TextsAndTimesWithPages)
else rTxt)))
writestr(f'{recNo:04d}.{"xml" if R.daisy3 else "htm"}',
D(R.text_htm(
(rTxt.textsAndTimes[
(1 if
isinstance(
rTxt.textsAndTimes
[0],float) else 0)
::2]
if isinstance(rTxt,TextsAndTimesWithPages)
else [TagAndText('h1',rTxt)]),
curP)))
secsSoFar += secsThisRecording
curP += (1+len(rTxt.textsAndTimes)//2
if isinstance(rTxt,TextsAndTimesWithPages) else 1)
R.progress(recNo)
R.progress_loopStart(len(R.imageFiles),15)
for n,u in enumerate(R.imageFiles):
writestr(f'{n+1}{u[u.rindex("."):]}',
(squash_image if R.squash else lambda x,y:y)(u,
fetch(u,R.cache,R.refresh,
R.refetch,R.delay,
R.user_agent,R.retries,R)
if re.match("https?://",u)
else open(u,'rb').read()))
R.progress(n+1)
if not R.date:
R.date = "%d-%02d-%02d" % time.localtime(
)[:3]
if R.daisy3:
writestr('dtbook.2005.basic.css',D(d3css))
writestr('package.opf',D(R.package_opf(
hasFullText, len(recordingTexts),
secsSoFar)))
writestr('text.res',D(textres))
else: writestr('master.smil',D(
R.master_smil(headings,secsSoFar)))
writestr(
'navigation.ncx' if R.daisy3
else 'ncc.html',
D(R.ncc_html(
headings,hasFullText,secsSoFar,
[timeAdjust(
t.textsAndTimes if isinstance(t,TextsAndTimesWithPages) else t,
durations[i])
for i,t in enumerate(
recordingTexts)],
[(t.pageInfos if isinstance(
t,TextsAndTimesWithPages) else [])
for t in recordingTexts])))
if not R.daisy3: writestr('er_book_info.xml',D(er_book_info(durations))) # not DAISY standard but EasyReader can use this
z.close()
R.info(f"Wrote {R.outputFile}\n")
def getHeadings(self,recordingTexts) -> list:
"""Gets headings from recordingTexts for the
DAISY's NCC / OPF data"""
R = self
ret = [] ; cvChaps = [] ; chapNo = 0
for t in recordingTexts:
chapNo += 1
if R.bookTitlesAndNumChaps and chapNo==R.bookTitlesAndNumChaps[0][1]+1:
del R.bookTitlesAndNumChaps[0]
if not R.bookTitlesAndNumChaps:
error("merge-books did not account for all files (check the counts)")
chapNo = 1
if not isinstance(t,
TextsAndTimesWithPages):
if R.bookTitlesAndNumChaps and chapNo==1: error("merge-books with non-HTML not yet implemented")
ret.append(t) ; continue # title only
textsAndTimes,pages = t ; first = None
chapHeadings = []
for v,u in enumerate(textsAndTimes):
if isinstance(u,float): continue #time
tag,text = u
if first is None: first = v
if not tag.startswith('h'):
continue
if v//2 - 1 == first//2 and not textsAndTimes[first].tag.startswith('h'): # chapter starts with non-heading followed by heading: check the non-heading for "Chapter N" etc
nums=re.findall("[1-9][0-9]*",
textsAndTimes[first].text)
if len(nums)==1:
text=f"{nums[0]}: {text}" # for TOC
textsAndTimes[v-1] = (
textsAndTimes[first-1] if
first else 0) + 0.001 # for audio jump-navigation to include the "Chapter N". Could have an option to merge the in-chapter text instead, so "Chapter N" appears as part of the heading, not scrolled past quickly: merge0lenSpans will now do this if the chapter paragraph is promoted to heading, but beware we might not want the whole of the 'chapter N' text to be part of the TOC, just the number. Thorium actually stops playing when it hits the 0-length paragraph before the heading, so promoting it might be better; trying the +0.001 for now to make timestamps not exactly equal.
chapHeadings.append(ChapterTOCInfo(
tag, re.sub('<[^>]*>','',text),
v//2))
if chapHeadings:
if R.chapter_titles:
if len(R.chapter_titles)>1: cTitle,R.chapter_titles = R.chapter_titles[0],R.chapter_titles[1:]
else: cTitle,R.chapter_titles = R.chapter_titles[0], []
if cTitle.strip():
R.warning(f"Title override for chapter {chapNo} is {cTitle} but there's already a title in the markup. Ignoring override.")
else: # not chapHeadings
# This'll be a problem, as master_smil and ncc_html need headings to refer to the chapter at all. (Well, ncc_html can also do it by page number if we have them, but we haven't tested all DAISY readers with page number only navigation, and what if we don't even have page numbers?)
# So let's see if we can at least get a chapter number.
if first is not None: nums=re.findall(
"[1-9][0-9]*",
textsAndTimes[first].text)
else:
R.warning(f"Chapter {chapNo} is completely blank! (Is {'--marker-attribute' if __name__=='__main__' else 'marker_attribute'} set correctly?)")
nums = [] ; first = 0 ; textsAndTimes.append(TagAndText('p',''))
chapterNumberTextFull = chapterNumberText = nums[0] if len(nums)==1 and not nums[0]=="1" else str(chapNo)
if int(chapterNumberText) > chapNo:
if not R.ignore_chapter_skips:
R.warning(f"""Skipping chapter{
f' {chapNo}'
if int(chapterNumberText) == chapNo + 1
else f's {chapNo}-{int(chapterNumberText)-1}'}""")
chapNo = int(chapterNumberText) # so it's in sync, in case there's more than one number in the 1st para later and we have to fall back on automatic count
if R.chapter_titles:
if len(R.chapter_titles)>1: chapterNumberTextFull,R.chapter_titles = R.chapter_titles[0],R.chapter_titles[1:]
else: chapterNumberTextFull,R.chapter_titles = R.chapter_titles[0], []
if not chapterNumberTextFull:
R.warning(f"Title override for chapter {chapNo} is blank. Setting to {chapterNumberText}")
chapterNumberTextFull = chapterNumberText
elif chapterNumberText not in chapterNumberTextFull:
R.warning(f"Title for chapter {chapNo} is '{chapterNumberTextFull}' which does not contain the expected '{chapterNumberText}' ({'' if len(nums)==1 and not nums[0]=='1' else 'from automatic numbering as nothing was '}extracted from '{textsAndTimes[first].text.replace(chr(10),' / ')}')")
# In EasyReader 10 on Android, unless there is at least one HEADING (not just div), navigation display is non-functional. And every heading must point to a 'real' heading in the text, otherwise EasyReader 10 will delete all the text in Daisy 2, or promote something to a heading in Daisy 3 (this is not done by Thorium Reader)
# (EasyReader 10 on Android also inserts a newline after every span class=sentence if it's a SMIL item, even if there's no navigation pointing to it)
# So let's add a "real" start-of-chapter heading before the text, with time 0.001 second if we don't know the time from the first time marker (don't set it to 0 or Thorium can have issues)
if first==1 and textsAndTimes[0]:
first = 0 # for the insert below: put it before the non-zero opening time marker
else: textsAndTimes.insert(first,(textsAndTimes[first-1] if first else 0)+0.001)
textsAndTimes.insert(first,TagAndText(
f'h{R.chapter_heading_level}',
chapterNumberTextFull)) # we'll ref this
chapHeadings=[ChapterTOCInfo(
f'h{R.chapter_heading_level}',
chapterNumberTextFull,
first//2)] # points to our extra heading
if textsAndTimes[first+2].text.startswith(chapterNumberText):
textsAndTimes[first+2]=TagAndText(
textsAndTimes[first+2].tag,
textsAndTimes[first+2].text[
len(chapterNumberText):].strip()) # because we just had the number as a heading, so we don't also need it repeated as 1st thing in text
first += 2 # past the heading we added
if first+21 or R.bookTitlesAndNumChaps and not chapNo==R.bookTitlesAndNumChaps[0][1]: R.warning("merge-books specified more files than given")
if len(cvChaps) not in [0,len(ret)]: R.warning(f"Verse-indexed only {len(cvChaps)} of {len(ret)} chapters. Missing: {', '.join(str(i) for i in range(1,len(ret)+1) if i not in cvChaps)}")
if cvChaps and not R.daisy3 and not R.strict_ncc_divs: R.warning("Verse-indexing in Daisy 2 can prevent EasyReader 10 from displaying the text: try Daisy 3 instead") # (and with strict_ncc_divs, verses are not shown in Book navigation in Daisy 2)
return ret
def ncc_html(self, headings = [],
hasFullText:bool = False,
totalSecs = 0,
recTimeTxts = [],
pageNos=[]) -> str:
"""Returns the Navigation Control Centre (NCC)
recTimeTxts includes 0 and total
pageNos is [[PageInfo,...],...]"""
R = self
numPages = sum(len(L) for L in pageNos)
maxPageNo = max((
max(
(int(i.pageNo) for i in PNs),
default=0)
for PNs in pageNos),default=0)
headingsR = R.normaliseDepth(hReduce(headings)) # (hType,hText,recNo,textNo)
return deBlank(f"""
{'' if R.daisy3 else ''}
<{'ncx xmlns="http://www.daisy.org/z3986/2005/ncx/" version="2005-1"'
if R.daisy3
else f'html lang="{R.lang}" xmlns="http://www.w3.org/1999/xhtml"'} xml:lang="{R.lang}">
{'' if R.daisy3 else ''}
{f'' if R.daisy3 else ''}
{f'' if R.daisy3 else ''}
{'' if R.daisy3 else f'{R.title}'}
{f'{R.title}'
if R.daisy3 else ''}
{f'{R.creator}'
if R.daisy3 else ''}
<{'navMap id="navMap"' if R.daisy3 else 'body'}>"""+''.join((f"""
{t.hLine}{'' if recTimeTxts[t.recNo][2*t.itemNo]==recTimeTxts[t.recNo][2*t.itemNo+2] else f''''''}
{''*numDaisy3NavpointsToClose(s,headingsR)}""" if R.daisy3 else ''.join(f"""
{N
}""" for r,PNs in enumerate(
pageNos) for (PO,(after,N)) in enumerate(
PNs) if (r,after)<=t[2:4] and (
not s or (r,after) >
headingsR[s-1][2:4]))+f"""
<{t.hTag} class="{'section' if s or
R.allow_jumps else 'title'
}" id="s{s+1}">
{t.hLine}
{t.hTag}>""") for s,t in enumerate(
headingsR))+(''+''.join(f"""
{N}""" for r,PNs in enumerate(pageNos) for (PO,(after,N)) in enumerate(PNs))+"""
""" if R.daisy3 else """