diff --git a/code/config/config.py b/code/config/config.py index c1c0f5685..91d5ab819 100644 --- a/code/config/config.py +++ b/code/config/config.py @@ -27,6 +27,11 @@ class LLMProviderConfig: endpoint: Optional[str] = None api_version: Optional[str] = None +@dataclass +class EmbeddingModeConfig: + default: str = "single" + document_types: Dict[str, str] = field(default_factory=dict) + @dataclass class EmbeddingProviderConfig: api_key: Optional[str] = None @@ -201,6 +206,13 @@ def load_embedding_config(self, path: str = "config_embedding.yaml"): self.preferred_embedding_provider: str = data["preferred_provider"] self.embedding_providers: Dict[str, EmbeddingProviderConfig] = {} + + # Load embedding mode configuration + embedding_mode_data = data.get("embedding_mode", {}) + self.embedding_mode = EmbeddingModeConfig( + default=embedding_mode_data.get("default", "single"), + document_types=embedding_mode_data.get("document_types", {}) + ) for name, cfg in data.get("providers", {}).items(): # Extract configuration values from the YAML @@ -517,6 +529,18 @@ def get_llm_provider(self, provider_name: Optional[str] = None) -> Optional[LLMP return self.llm_endpoints[self.preferred_llm_endpoint] return None + + def get_embedding_mode(self, document_type: str) -> str: + """Get the embedding mode for a specific document type.""" + if not hasattr(self, 'embedding_mode'): + return "single" # Default fallback + + # Check if there's a specific mode for this document type + if document_type in self.embedding_mode.document_types: + return self.embedding_mode.document_types[document_type] + + # Fall back to default mode + return self.embedding_mode.default # Global singleton CONFIG = AppConfig() \ No newline at end of file diff --git a/code/config/config_embedding.yaml b/code/config/config_embedding.yaml index f35daa23d..aef0e4aa5 100644 --- a/code/config/config_embedding.yaml +++ b/code/config/config_embedding.yaml @@ -1,5 +1,16 @@ preferred_provider: openai +# Embedding generation mode configuration +embedding_mode: + # Default mode for documents (single or multi) + default: single + + # Per-document-type configuration + # Set to 'multi' to generate multiple specialized embeddings per document + # Set to 'single' to generate one embedding per document (legacy behavior) + document_types: + company: multi + providers: openai: api_key_env: OPENAI_API_KEY diff --git a/code/scraping/incrementalCrawlAndLoad.py b/code/scraping/incrementalCrawlAndLoad.py index 6947ed006..29f551692 100644 --- a/code/scraping/incrementalCrawlAndLoad.py +++ b/code/scraping/incrementalCrawlAndLoad.py @@ -323,9 +323,9 @@ async def _process_single_url(self, url: str) -> bool: else: self.stats["schema_types"][schema_type] = 1 - # Step 3: Prepare documents for database + # Step 3: Prepare documents for database with specialized embedding texts documents_to_upload = [] - docs, _ = prepare_documents_from_json(final_url, schemas_str, self.db_name) + docs, embedding_texts = prepare_documents_from_json(final_url, schemas_str, self.db_name) documents_to_upload.extend(docs) # Step 4: Generate embeddings and upload @@ -335,8 +335,13 @@ async def _process_single_url(self, url: str) -> bool: provider_config = CONFIG.get_embedding_provider(provider) model = provider_config.model if provider_config else None - # Extract texts for embedding - texts = [doc["schema_json"] for doc in documents_to_upload] + # Use specialized embedding texts if available, otherwise fall back to schema_json + if embedding_texts and len(embedding_texts) == len(documents_to_upload): + texts = embedding_texts + logger.debug(f"Using {len(texts)} specialized embedding texts from multi-embedding generator") + else: + texts = [doc["schema_json"] for doc in documents_to_upload] + logger.debug(f"Falling back to schema_json for {len(texts)} documents (multi-embedding texts not available)") # Generate embeddings embeddings = await batch_get_embeddings(texts, provider, model) diff --git a/code/tools/db_load.py b/code/tools/db_load.py index ee546623a..1b9388fae 100644 --- a/code/tools/db_load.py +++ b/code/tools/db_load.py @@ -821,6 +821,9 @@ async def loadJsonToDB(file_path: str, site: str, batch_size: int = 100, delete_ if json_only_format: print("Detected JSON-only format. URLs will be extracted from within the JSON data.") + # Track both documents and their specialized embedding texts + all_embedding_texts = [] + # Process each line to extract documents for line in lines: try: @@ -830,9 +833,10 @@ async def loadJsonToDB(file_path: str, site: str, batch_size: int = 100, delete_ if url is None or json_data is None: continue - # Prepare documents - documents, _ = prepare_documents_from_json(url, json_data, site) + # Prepare documents with specialized embedding texts + documents, embedding_texts = prepare_documents_from_json(url, json_data, site) all_documents.extend(documents) + all_embedding_texts.extend(embedding_texts) except Exception as e: print(f"Error processing line: {str(e)}") continue @@ -844,8 +848,13 @@ async def loadJsonToDB(file_path: str, site: str, batch_size: int = 100, delete_ # Open file to write documents with embeddings with open(embeddings_path, 'w', encoding='utf-8') as embed_file: - # Extract texts for embedding - texts = [doc["schema_json"] for doc in all_documents] + # Use specialized embedding texts if available, otherwise fall back to schema_json + if all_embedding_texts and len(all_embedding_texts) == len(all_documents): + texts = all_embedding_texts + print(f"Using {len(texts)} specialized embedding texts from multi-embedding generator") + else: + texts = [doc["schema_json"] for doc in all_documents] + print(f"Falling back to schema_json for {len(texts)} documents (multi-embedding texts not available)") # Process in batches total_documents = 0 @@ -877,8 +886,11 @@ async def loadJsonToDB(file_path: str, site: str, batch_size: int = 100, delete_ # Ensure JSON has no newlines doc_json = doc['schema_json'].replace('\n', ' ') - # Write to embeddings file - embed_file.write(f"{doc['url']}\t{doc_json}\t{embedding_str}\n") + # Write to embeddings file in new multi-embedding format + # Format: URL \t JSON \t embedding \t base_doc_id \t embedding_type + base_doc_id = doc.get('base_doc_id', '') + embedding_type = doc.get('embedding_type', '') + embed_file.write(f"{doc['url']}\t{doc_json}\t{embedding_str}\t{base_doc_id}\t{embedding_type}\n") docs_with_embeddings.append(doc) diff --git a/code/tools/db_load_utils.py b/code/tools/db_load_utils.py index 4b13362ad..5d7986717 100644 --- a/code/tools/db_load_utils.py +++ b/code/tools/db_load_utils.py @@ -156,7 +156,11 @@ def get_item_name(item: Dict[str, Any]) -> str: def prepare_documents_from_json(url: str, json_data: str, site: str) -> Tuple[List[Dict[str, Any]], List[str]]: """ - Prepare documents from URL and JSON data. + Prepare documents from URL and JSON data using configurable embedding approach. + + Uses configuration to determine embedding mode per document type: + - Single mode: Creates one embedding per document (legacy behavior) + - Multi mode: Creates multiple specialized embeddings per document for better search Args: url: URL for the item @@ -167,6 +171,10 @@ def prepare_documents_from_json(url: str, json_data: str, site: str) -> Tuple[Li Tuple of (documents, texts_for_embedding) """ try: + # Import dependencies + from tools.multi_embedding_generator import generate_document_embeddings, detect_item_type + from config.config import CONFIG + # Parse and trim the JSON json_obj = json.loads(json_data) trimmed_json = trim_schema_json(json_obj, site) @@ -189,17 +197,40 @@ def prepare_documents_from_json(url: str, json_data: str, site: str) -> Tuple[Li item_url = url if i == 0 else f"{url}#{i}" item_json = json.dumps(item) - # Add document to batch - doc = { - "id": str(int64_hash(item_url)), - "schema_json": item_json, - "url": item_url, - "name": get_item_name(item), - "site": site - } + # Determine embedding mode based on configuration + item_type = detect_item_type(item) + embedding_mode = CONFIG.get_embedding_mode(item_type) - documents.append(doc) - texts.append(item_json) + if embedding_mode == "multi": + # Generate multiple specialized embeddings for this item + multi_embeddings = generate_document_embeddings(item, item_url, site) + + # Create a document for each specialized embedding + for emb_data in multi_embeddings: + doc = { + "id": emb_data["id"], + "base_doc_id": emb_data["base_doc_id"], + "embedding_type": emb_data["embedding_type"], + "schema_json": item_json, + "url": item_url, + "name": get_item_name(item), + "site": site + } + + documents.append(doc) + texts.append(emb_data["embedding_text"]) + else: + # Single embedding mode (legacy behavior) + doc = { + "id": str(int64_hash(item_url)), + "schema_json": item_json, + "url": item_url, + "name": get_item_name(item), + "site": site + } + + documents.append(doc) + texts.append(item_json) return documents, texts except Exception as e: @@ -210,17 +241,38 @@ def documents_from_csv_line(line, site): """ Parse a line with URL, JSON, and embedding into document objects. + Supports both legacy format (single embedding) and new multi-embedding format: + - Legacy: URL \t JSON \t embedding + - Multi: URL \t JSON \t embedding \t base_doc_id \t embedding_type + Args: - line: Tab-separated line with URL, JSON, and embedding + line: Tab-separated line with URL, JSON, and embedding data site: Site identifier Returns: List of document objects """ try: - url, json_data, embedding_str = line.strip().split('\t') + parts = line.strip().split('\t') + + if len(parts) < 3: + print(f"Error: Line has insufficient columns ({len(parts)} < 3)") + return [] + + url = parts[0] + json_data = parts[1] + embedding_str = parts[2] + + # Check if this is the new multi-embedding format + is_multi_format = len(parts) >= 5 + base_doc_id = parts[3] if is_multi_format else None + embedding_type = parts[4] if is_multi_format else None + + # Parse embedding embedding_str = embedding_str.replace("[", "").replace("]", "") embedding = [float(x) for x in embedding_str.split(',')] + + # Parse and trim JSON js = json.loads(json_data) js = trim_schema_json(js, site) except Exception as e: @@ -240,19 +292,33 @@ def documents_from_csv_line(line, site): if item is None: continue - # No longer filtering by should_include_item - trimming already handles this + # URL handling item_url = url if i == 0 else f"{url}#{i}" name = get_item_name(item) - # Ensure no None values in the document - doc = { - "id": str(int64_hash(item_url)), - "embedding": embedding, - "schema_json": json.dumps(item), - "url": item_url or "", - "name": name or "Unnamed Item", - "site": site or "unknown" - } + # Create document based on format + if is_multi_format: + # New multi-embedding format + doc = { + "id": f"{base_doc_id}_{embedding_type}", # Use the stored multi-embedding ID + "base_doc_id": base_doc_id, # Link back to original document + "embedding_type": embedding_type, # Type of this embedding + "embedding": embedding, + "schema_json": json.dumps(item), + "url": item_url or "", + "name": name or "Unnamed Item", + "site": site or "unknown" + } + else: + # Legacy single embedding format + doc = { + "id": str(int64_hash(item_url)), + "embedding": embedding, + "schema_json": json.dumps(item), + "url": item_url or "", + "name": name or "Unnamed Item", + "site": site or "unknown" + } # Additional validation to ensure no None values for key, value in doc.items(): diff --git a/code/tools/multi_embedding_generator.py b/code/tools/multi_embedding_generator.py new file mode 100644 index 000000000..e571b1f7f --- /dev/null +++ b/code/tools/multi_embedding_generator.py @@ -0,0 +1,249 @@ +#!/usr/bin/env python3 +""" +Multi-embedding generator for creating multiple focused embeddings per document. + +Instead of one diluted embedding, create multiple specialized embeddings that +all point to the same document, allowing for precise search matching. +""" + +import json +from typing import Dict, Any, List +import hashlib + +def generate_document_embeddings(item: Dict[str, Any], base_url: str, site: str) -> List[Dict[str, Any]]: + """ + Generate multiple specialized embeddings for a single document. + + For companies, creates specialized embeddings for: + - Identity embedding: Company names, stage, exact identifiers + - Investor embedding: Funding, investors, financing relationships + - Business embedding: Industries, descriptions, business context + - Metadata embedding: Locations, employee size, other details + + Args: + item: The document data + base_url: Base URL for the document + site: Site identifier + + Returns: + List of embedding documents, each with focused text and shared metadata + """ + + item_type = detect_item_type(item) + base_doc_id = generate_base_doc_id(base_url) + + if item_type == "company": + embeddings = generate_company_embeddings(item, base_url, base_doc_id, site) + else: + embeddings = generate_generic_embeddings(item, base_url, base_doc_id, site) + + # Validate embeddings before returning + validated_embeddings = [] + for emb in embeddings: + if validate_embedding_document(emb): + validated_embeddings.append(emb) + + return validated_embeddings if validated_embeddings else [create_fallback_embedding(item, base_url, base_doc_id, site)] + +def detect_item_type(item: Dict[str, Any]) -> str: + """Detect whether item is a company based on fields.""" + company_fields = {"investors", "founded_year", "stage", "industries", "short_description", "employee_size"} + + company_score = sum(1 for field in company_fields if field in item) + + # For this experiment, only generate company embeddings for items that look like companies + return "company" if company_score > 0 else "generic" + +def generate_base_doc_id(url: str) -> str: + """Generate a consistent base document ID.""" + return hashlib.md5(url.encode()).hexdigest() + +def create_embedding_document(base_doc_id: str, embedding_type: str, text: str, + original_item: Dict[str, Any], url: str, site: str) -> Dict[str, Any]: + """Create a single embedding document with shared metadata.""" + + return { + "id": f"{base_doc_id}_{embedding_type}", + "base_doc_id": base_doc_id, # Link back to original document + "embedding_type": embedding_type, + "embedding_text": text, + "schema_json": json.dumps(original_item), # Full original data + "url": url, + "name": original_item.get("name", ""), + "site": site + } + +def generate_company_embeddings(item: Dict[str, Any], base_url: str, base_doc_id: str, site: str) -> List[Dict[str, Any]]: + """Generate specialized embeddings for company documents.""" + + embeddings = [] + name = (item.get("name") or "").strip() + + # 1. IDENTITY EMBEDDING - for exact name/title matching + identity_parts = [] + if name: + identity_parts.extend([name, name, name]) # Triple repetition + + stage = (item.get("stage") or "").strip() + if stage: + identity_parts.extend([stage, f"{stage} company"]) + + if identity_parts: + identity_text = " ".join(identity_parts) + embeddings.append(create_embedding_document( + base_doc_id, "identity", identity_text, item, base_url, site + )) + + # 2. INVESTOR EMBEDDING - for investor/funding searches + investors = item.get("investors", []) + if investors: + investor_parts = [] + investor_names = " ".join(str(inv) for inv in investors) + + investor_parts.extend([ + f"{name} funded by {investor_names}", + f"{name} backed by {investor_names}", + f"{name} invested in by {investor_names}", + f"companies funded by {investor_names}", + f"companies backed by {investor_names}", + investor_names, # Raw investor names + investor_names, # Repetition for strength + ]) + + investor_text = " ".join(investor_parts) + embeddings.append(create_embedding_document( + base_doc_id, "investor", investor_text, item, base_url, site + )) + + # 3. INDUSTRY/BUSINESS EMBEDDING - for domain/industry searches + industry_parts = [] + + industries = item.get("industries", []) + if industries: + industry_text = " ".join(str(ind) for ind in industries) + industry_parts.extend([ + f"{name} {industry_text}", + f"{industry_text} company", + industry_text + ]) + + description = (item.get("short_description") or "").strip() + if description: + industry_parts.extend([ + f"{name} {description}", + description + ]) + + if industry_parts: + business_text = " ".join(industry_parts) + embeddings.append(create_embedding_document( + base_doc_id, "business", business_text, item, base_url, site + )) + + # 4. FOUNDER EMBEDDING - for people-company connections + founders = item.get("founders", []) + if founders: + founder_parts = [] + founder_names = " ".join(str(founder) for founder in founders) + + founder_parts.extend([ + f"{name} founded by {founder_names}", + f"{founder_names} founded {name}", + f"{founder_names} founder of {name}", + f"companies founded by {founder_names}", + founder_names + ]) + + founder_text = " ".join(founder_parts) + embeddings.append(create_embedding_document( + base_doc_id, "founder", founder_text, item, base_url, site + )) + + # 5. METADATA EMBEDDING - for location, size, year searches + metadata_parts = [] + + location = (item.get("location") or "").strip() + if location: + metadata_parts.extend([ + f"{name} located in {location}", + f"companies in {location}", + location + ]) + + founded_year = item.get("founded_year") + if founded_year: + metadata_parts.extend([ + f"{name} founded in {founded_year}", + f"companies founded in {founded_year}" + ]) + + employee_size = (item.get("employee_size") or "").strip() + if employee_size: + metadata_parts.extend([ + f"{name} {employee_size} employees", + f"{employee_size} company" + ]) + + if metadata_parts: + metadata_text = " ".join(metadata_parts) + embeddings.append(create_embedding_document( + base_doc_id, "metadata", metadata_text, item, base_url, site + )) + + return embeddings + +def generate_generic_embeddings(item: Dict[str, Any], base_url: str, base_doc_id: str, site: str) -> List[Dict[str, Any]]: + """Fallback for unknown item types.""" + + # For unknown types, create a single embedding with all content + all_text_parts = [] + + # Extract all string values + for key, value in item.items(): + if isinstance(value, str) and value.strip(): + all_text_parts.append(value.strip()) + elif isinstance(value, list): + list_text = " ".join(str(v) for v in value if v) + if list_text: + all_text_parts.append(list_text) + + if all_text_parts: + full_text = " ".join(all_text_parts) + return [create_embedding_document( + base_doc_id, "full", full_text, item, base_url, site + )] + + return [] + +def validate_embedding_document(emb_doc: Dict[str, Any]) -> bool: + """Validate that an embedding document has required fields and valid content.""" + required_fields = {"id", "base_doc_id", "embedding_type", "embedding_text"} + + # Check required fields exist + if not all(field in emb_doc for field in required_fields): + return False + + # Check non-empty values + if not all(emb_doc[field] for field in required_fields): + return False + + # Check embedding text is meaningful (more than just whitespace) + if not emb_doc["embedding_text"].strip(): + return False + + return True + +def create_fallback_embedding(item: Dict[str, Any], base_url: str, base_doc_id: str, site: str) -> Dict[str, Any]: + """Create a basic single embedding as fallback when multi-embedding fails.""" + item_json = json.dumps(item) + + return { + "id": f"{base_doc_id}_fallback", + "base_doc_id": base_doc_id, + "embedding_type": "fallback", + "embedding_text": item_json, + "schema_json": item_json, + "url": base_url, + "name": item.get("name", ""), + "site": site + } \ No newline at end of file