Skip to content

Commit a2d3b08

Browse files
committed
✨(backend) add preprocess support for indexer
New processors mechanism in IndexerTaskService : after the loading & conversion steps a list a functions can be chained to transform the document content (like django middlewares) Signed-off-by: Fabre Florian <[email protected]>
1 parent 55d9779 commit a2d3b08

File tree

2 files changed

+195
-64
lines changed

2 files changed

+195
-64
lines changed

src/backend/core/services/indexer_services.py

Lines changed: 66 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,9 @@ class IndexerTaskService:
224224

225225
# V2 : Use settings to define the list of converters
226226
converters = {"application/pdf": converters.pdf_to_markdown}
227+
processors = [
228+
# Put prepare_document_for_indexing() here !
229+
]
227230

228231
def __init__(
229232
self, service: Service, batch_size=100, client=None, force_refresh=None
@@ -249,7 +252,7 @@ def get_converter(self, mimetype):
249252
f"No such converter for the unindexable mimetype {mimetype}"
250253
) from e
251254

252-
def process_content(self, document, content):
255+
def convert_content(self, content, document):
253256
"""Transforms the document file data into an indexable format"""
254257
try:
255258
converter = self.get_converter(document.mimetype)
@@ -274,6 +277,25 @@ def process_content(self, document, content):
274277
_id=document.id,
275278
) from e
276279

280+
def process_document(self, document):
281+
"""Transforms the document file data into an indexable format"""
282+
try:
283+
processors = list(self.processors)
284+
processors.reverse()
285+
286+
while len(processors) > 0:
287+
document = processors.pop()(document)
288+
289+
return document
290+
except IndexContentError as e:
291+
e.id = document.id
292+
raise e
293+
except Exception as e:
294+
raise IndexContentError(
295+
f"Unable to process content : {e}",
296+
_id=document.id,
297+
) from e
298+
277299
def stream_content(self, document):
278300
"""Open download stream containing the file data"""
279301
try:
@@ -346,13 +368,13 @@ def process_all(self, batch_size=None):
346368
for doc in docs:
347369
try:
348370
# V2 : Use asyncio loop to parallelize conversion
349-
content = self.process_content(doc, doc.content)
371+
doc = self.process_document(doc) # noqa : PLW2901
350372

351373
actions.update(
352374
doc.id,
353375
data={
354376
"content_status": enums.ContentStatusEnum.READY.value,
355-
"content": content,
377+
"content": doc.content,
356378
},
357379
# if_seq_no and if_primary_term ensure we only update indexes
358380
# if the document hasn't changed
@@ -398,22 +420,32 @@ def load_n_process_all(self, batch_size=None):
398420
for doc in docs:
399421
try:
400422
# V2 : Use asyncio loop to parallelize downloads
401-
content = self.process_content(doc, self.stream_content(doc))
423+
content = self.stream_content(doc)
424+
doc.content = self.convert_content(content, doc)
425+
doc.content_status = enums.ContentStatusEnum.LOADED
426+
except IndexerError as e:
427+
# V2 ; handle retry here and remove the entry if not working
428+
errors.append(e)
429+
continue
402430

403-
actions.update(
404-
doc.id,
405-
data={
406-
"content_status": enums.ContentStatusEnum.READY.value,
407-
"content": content,
408-
},
409-
# if_seq_no and if_primary_term ensure we only update indexes
410-
# if the document hasn't changed
411-
if_seq_no=doc.hit["_seq_no"],
412-
if_primary_term=doc.hit["_primary_term"],
413-
)
431+
try:
432+
doc = self.process_document(doc) # noqa : PLW2901
433+
doc.content_status = enums.ContentStatusEnum.READY
414434
except IndexerError as e:
415435
errors.append(e)
416436

437+
actions.update(
438+
doc.id,
439+
data={
440+
"content_status": doc.content_status.value,
441+
"content": doc.content,
442+
},
443+
# if_seq_no and if_primary_term ensure we only update indexes
444+
# if the document hasn't changed
445+
if_seq_no=doc.hit["_seq_no"],
446+
if_primary_term=doc.hit["_primary_term"],
447+
)
448+
417449
errors.extend(actions.errors())
418450

419451
return errors
@@ -505,23 +537,30 @@ def index(self, documents):
505537
self.service.index_name, client=self.client, refresh=self.force_refresh
506538
) as actions:
507539
for doc in documents:
540+
# Without content and a dowload uri : set WAIT status
508541
if doc.content_uri and not doc.content:
509-
# Without content and a dowload uri : set WAIT status
510542
doc.content_status = enums.ContentStatusEnum.WAIT
511-
elif not is_allowed_mimetype(doc.mimetype, INDEXABLE_MIMETYPES):
512-
# A content but not directly indexable (e.g xml or html content) : process them
543+
actions.index_document(doc)
544+
continue
545+
546+
# A content but not directly indexable (e.g xml or html content) : convert them
547+
if not is_allowed_mimetype(doc.mimetype, INDEXABLE_MIMETYPES):
513548
try:
514-
doc.content = self.process_content(doc, doc.content)
515-
doc.content_status = enums.ContentStatusEnum.READY
549+
doc.content = self.convert_content(doc.content, doc)
516550
except IndexContentError as err:
517-
# If process has failed, set LOADED status for a retry.
518-
# V2 : Add retry mechanism ?
519-
doc.content_status = enums.ContentStatusEnum.LOADED
520551
errors.append(IndexBulkError(str(err), _id=doc.id))
521-
else:
522-
# The content exists and is indexable : set READY
523-
doc.content_status = enums.ContentStatusEnum.READY
524-
552+
continue
553+
554+
# Preprocess the content of the document
555+
try:
556+
doc = self.process_document(doc) # noqa: PLW2901
557+
except IndexContentError as err:
558+
# V2 : Add retry mechanism with LOADED status ?
559+
errors.append(IndexBulkError(str(err), _id=doc.id))
560+
continue
561+
562+
# The content exists and is indexable : set READY
563+
doc.content_status = enums.ContentStatusEnum.READY
525564
actions.index_document(doc)
526565

527566
errors.extend(actions.errors())

0 commit comments

Comments
 (0)