# Copyright (C) 2019-2024 Zilliz. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under
# the License.
from typing import List, Optional
import tempfile
import os
import subprocess
import pathlib
import logging
import fcntl
import re
BIN_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'lib')
logger = logging.getLogger()
class Server:
"""
The milvus-lite server
"""
MILVUS_BIN = 'milvus'
def __init__(self, db_file: str, address: Optional[str] = None):
"""
Args:
db_file (str):
The local file to store data.
address (address, optional):
grpc server address, example: localhost:19530,
if not set, the MilvusLite service will use UDS.
"""
if os.environ.get('BIN_PATH') is not None:
self._bin_path = pathlib.Path(os.environ['BIN_PATH']).absolute()
else:
self._bin_path = pathlib.Path(BIN_PATH).absolute()
self._db_file = pathlib.Path(db_file).absolute()
if not re.match(r'^[a-zA-Z0-9.\-_]+$', self._db_file.name):
raise RuntimeError(f"Unsupport db name {self._db_file.name}, the name must match ^[a-zA-Z0-9.\\-_]+$")
if len(self._db_file.name) > 36:
raise RuntimeError(f"Db name {self._db_file.name} is too long, should be less than 36")
self._work_dir = self._db_file.parent
self._address= address
self._p = None
self._uds_path = f"{tempfile.mktemp()}_{self._db_file.name}.sock"
self._lock_path = str(self._db_file.parent / f'.{self._db_file.name}.lock')
self._lock_fd = None
def init(self) -> bool:
if not self._bin_path.exists():
logger.error("Bin path not exists")
return False
if not self._work_dir.exists():
logger.error("Dir %s not exist", self._work_dir)
return True
@property
def milvus_bin(self):
return str(self._bin_path / 'milvus')
@property
def log_level(self):
return os.environ.get("LOG_LEVEL", "ERROR")
@property
def uds_path(self):
return f'unix:{self._uds_path}'
@property
def args(self):
if self._address is not None:
return [self.milvus_bin, self._db_file, self._address, self.log_level]
return [self.milvus_bin, self._db_file, self.uds_path, self.log_level, self._lock_path]
def start(self) -> bool:
assert self._p is None, "Server already started"
self._lock_fd = open(self._lock_path, 'a')
try:
fcntl.lockf(self._lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
self._p = subprocess.Popen(
args=self.args,
env={
"LD_LIBRARY_PATH": str(self._bin_path),
"DYLD_LIBRARY_PATH": str(self._bin_path)
},
cwd=str(self._work_dir),
)
try:
# Wait for 0.5 second to ensure successful startup
self._p.wait(0.5)
logger.error("Start milvus-lite failed")
return False
except subprocess.TimeoutExpired:
return True
except BlockingIOError:
logger.error("Open %s failed, the file has been opened by another program", self._db_file)
return False
def stop(self):
if self._lock_fd:
# When the file lock is released, the milvus-lite service will automatically stop.
fcntl.flock(self._lock_fd, fcntl.LOCK_UN)
self._lock_fd.close()
self._lock_fd = None
if self._p is not None:
self._p = None
try:
os.unlink(self._uds_path)
except FileNotFoundError:
pass
try:
os.unlink(self._lock_path)
except FileNotFoundError:
pass
def __del__(self):
self.stop()