from qcloud_cos import CosConfig from qcloud_cos import CosS3Client import tcvectordb from tcvectordb.model.ai_database import AIDatabase from tcvectordb.model.collection_view import CollectionView, Embedding, SplitterProcess, ParsingProcess from tcvectordb.model.index import Index from tcvectordb.exceptions import ServerInternalError from rich.console import Console from settings import settings console = Console() cos = CosS3Client( CosConfig( Region=settings.tencent_cloud.region, SecretId=settings.tencent_cloud.secret_id, SecretKey=settings.tencent_cloud.secret_key, ), ) vdb = tcvectordb.RPCVectorDBClient( url=str(settings.VDB.url), username=settings.VDB.username, key=settings.VDB.key, ) def create_ai_database_if_not_exists(database_name: str) -> AIDatabase: if vdb.exists_db(database_name): console.log(f'[green]Database "{database_name}" already exists.[/green]') return vdb.database(database_name) else: console.log(f'[green]Creating database "{database_name}"...[/green]') return vdb.create_ai_database(database_name) def create_collection_view_if_not_exists(database: AIDatabase, collection_name: str) -> CollectionView: try: console.log(f'[green]Checking if collection view "{collection_name}" exists...[/green]') return database.describe_collection_view(collection_name) except ServerInternalError as e: if not e.message.startswith(f'CollectionView not exist: {collection_name}'): raise e console.log(f'[green]Creating collection view "{collection_name}"...[/green]') return database.create_collection_view( collection_name, embedding=Embedding( language='multi', enable_words_embedding=True, ), splitter_process=SplitterProcess( append_title_to_chunk=False, append_keywords_to_chunk=True, chunk_splitter=None, ), parsing_process=ParsingProcess('VisionModelParsing'), index=Index(), ) def create_db_collection_view_if_not_exists(database: str, collection: str) -> CollectionView: db = create_ai_database_if_not_exists(database) return db, create_collection_view_if_not_exists(db, collection) database, collection_view = create_db_collection_view_if_not_exists( settings.VDB.database, settings.VDB.collection, ) console.log(f'[green]Collection view "{settings.VDB.collection}" is ready.[/green]')