Конвертер принимает уже спарсенные товары из receiver, нормализует поля и готовит запись для catalog.
- Общий
BaseParserHandler(мастер-класс) с единым контрактом нормализации. - Реестр обработчиков
HandlerRegistryдля выбора модуля поparser_name. - Отдельные модули
parsers/fixprice,parsers/chizhik,parsers/perekrestokс parser-specific title обработчиками. - Пайплайн
ConverterPipeline:- обработчик парсера,
- резолв canonical product id (
plu/sku/source_id + parser, fallback по normalized title только если нетplu/sku), - persistent image dedup,
- backfill
NULLполей по ближайшей версии товара во времени.
converter/
core/
base.py # мастер-класс обработчика
models.py # raw/normalized dataclass-модели
ports.py # интерфейсы receiver/catalog/storage
registry.py # реестр обработчиков
services.py # identity, image dedup, null-backfill
parsers/
fixprice/
handler.py # обработчик Fix Price
title_parser.py
normalizers.py
patterns.py
chizhik/
handler.py # обработчик Чижик
title_parser.py
patterns.py
perekrestok/
handler.py # обработчик Перекрёсток
title_parser.py
sync.py # сервис batch-sync receiver -> catalog
daemon.py # polling daemon (receiver -> catalog)
pipeline.py # title/category/geo/composition normalization
catalog теперь хранит данные не только в projection-таблице, а в нормализованной структуре с историей:
catalog_product_snapshots- append-only снапшоты по волатильным полям:price/discount_price/loyal_price/price_unit,available_count, плюс event-контракт (source_event_uid,content_fingerprint,valid_from_at/valid_to_at,observed_at,created_at).catalog_product_sources- состояние источника(parser_name, source_id)и ссылка на последний snapshot.catalog_settlements- справочник населенных пунктов/регионов/стран.catalog_categories- справочник категорий (uid/title/depth/parent + adult/icon/banner).catalog_products- текущая проекция (read-model) для быстрых чтений.catalog_product_assets- массивные поля текущей проекции товара (image urls, duplicates, fingerprints) в нормализованном виде.
Для title в БД хранится единое поле title_normalized_no_stopwords; поля
title_normalized и title_original_no_stopwords в catalog_products и
catalog_product_snapshots не сохраняются.
Converter сохраняет расширенный product-контракт в catalog_products (current projection),
а snapshot-историю ведет через единую таблицу catalog_product_snapshots.
Legacy snapshot-схема не поддерживается: миграция one-way удаляет устаревшие snapshot-таблицы и лишние snapshot-поля.
Автоматической миграции в CatalogRepository больше нет: запуск migration выполняется вручную отдельной командой.
Целевая production-СУБД: PostgreSQL (postgresql+psycopg://...).
SQLite оставлен только для локальных тестов/фикстур.
Политика обновления:
- история не удаляется и не перезаписывается (
append-only snapshots); - справочники (
settlements/categories) пополняются и дополняются; catalog_productsобновляется неразрушительно:NULL/пустыеновые значения не затирают заполненные старые.
Поддержан паттерн вида:
Название, Бренд(опц), floatXfloat[ Xfloat ] см ИЛИ float (г/кг/мл/л), int(кол-во, опц), в ассортименте
Из title формируются:
title_originaltitle_normalizedtitle_original_no_stopwordstitle_normalized_no_stopwordsunit,available_count,package_quantity,package_unitdimension_height_m,dimension_width_m,dimension_depth_m(в метрах)
Unit guide:
Chocolate 200 g->unit=PCE,available_count=15,package_quantity=0.2,package_unit=KGMMilk 1 L->unit=PCE,available_count=10,package_quantity=1,package_unit=LTRPotatoes by weight->unit=KGM,available_count=None,package_quantity=NoneWater vending->unit=LTR,available_count=None,package_quantity=None
python3 example_fixprice_title_parser.pypython3 -m unittest discover -s tests -p 'test_*.py' -vЕсть адаптер под SQLite-базу receiver:
converter.adapters.ReceiverSQLiteRepository- поддерживает только актуальную схему
receiver(run_artifacts.parser_nameобязателен). - если обязательной колонки нет, адаптер падает с ошибкой несовместимой схемы.
Есть sink под SQLite-базу catalog:
converter.adapters.CatalogSQLiteRepository- выполняет
upsertнормализованных товаров; - хранит persistent
canonical_product_idmap и image fingerprints.
Полный sync receiver -> catalog (SQLite):
python3 sync_receiver_to_catalog.py \
--receiver-db ../receiver/data/receiver.db \
--catalog-db ./data/catalog.db \
--parser-name fixprice \
--receiver-fetch-size 2000 \
--write-chunk-size 1000 \
--sync-version v2Полный sync receiver -> catalog (PostgreSQL):
pip install -r requirements.txt
python3 sync_receiver_to_catalog.py \
--receiver-db 'postgresql+psycopg://user:pass@127.0.0.1:5432/receiver' \
--catalog-db 'postgresql+psycopg://user:pass@127.0.0.1:5432/catalog' \
--parser-name fixprice \
--receiver-fetch-size 2000 \
--write-chunk-size 1000 \
--sync-version v2Конвертер удаляет duplicate image URLs через async outbox:
CONVERTER_STORAGE_BASE_URL(илиSTORAGE_BASE_URL) — базовый URL storage.CONVERTER_STORAGE_API_TOKEN(илиSTORAGE_API_TOKEN) — токенBearer.CONVERTER_STORAGE_DELETE_TIMEOUT_SEC— timeoutDELETEзапроса (по умолчанию10).- ошибка удаления больше не прерывает
apply_chunk, обработка идет через retry в outbox worker. - перед dedup конвертер вызывает
POST /api/images/{image_name}/persist(best effort) и сохраняет новыйLocationURL.
Удаление выполняется только для URL текущего storage origin и путей /images/<name> / /images_permanent/<name>.
Запуск daemon-процесса в режиме циклического опроса receiver:
python3 converter_daemon.py \
--receiver-db ../receiver/data/receiver.db \
--catalog-db ./data/catalog.db \
--parser-name fixprice \
--receiver-fetch-size 2000 \
--write-chunk-size 1000 \
--sync-version v2 \
--writer-mode pg_v2 \
--poll-interval-sec 5write_chunk_size задаёт атомарный размер apply_chunk.
Демон не использует HTTP trigger и работает в режиме consume-delete: после успешной записи chunk в catalog удаляет обработанные run_artifact_products из receiver.
- Создать папку
converter/parsers/<parser_name>/. - Реализовать
<ParserName>Handler(BaseParserHandler). - Зарегистрировать в
converter/parsers/__init__.py. - При необходимости добавить parser-specific normalizers/patterns.
../dataclass../storage../receiver