# Copyright (C) 2019-2023 Zilliz. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except # in compliance with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software distributed under the License # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express # or implied. See the License for the specific language governing permissions and limitations under # the License. import json import logging from pathlib import Path from typing import Optional import numpy as np import pandas as pd from pymilvus.client.types import ( DataType, ) from pymilvus.exceptions import MilvusException from pymilvus.orm.schema import ( CollectionSchema, FieldSchema, ) from .constants import ( DYNAMIC_FIELD_NAME, MB, NUMPY_TYPE_CREATOR, BulkFileType, ) logger = logging.getLogger("bulk_buffer") logger.setLevel(logging.DEBUG) class Buffer: def __init__( self, schema: CollectionSchema, file_type: BulkFileType = BulkFileType.NUMPY, config: Optional[dict] = None, ): self._buffer = {} self._fields = {} self._file_type = file_type self._config = config or {} for field in schema.fields: if field.is_primary and field.auto_id: continue if field.is_function_output: continue self._buffer[field.name] = [] self._fields[field.name] = field if len(self._buffer) == 0: self._throw("Illegal collection schema: fields list is empty") # dynamic field, internal name is '$meta' if schema.enable_dynamic_field: self._buffer[DYNAMIC_FIELD_NAME] = [] self._fields[DYNAMIC_FIELD_NAME] = FieldSchema( name=DYNAMIC_FIELD_NAME, dtype=DataType.JSON ) @property def row_count(self) -> int: if len(self._buffer) == 0: return 0 for k in self._buffer: return len(self._buffer[k]) return None def _throw(self, msg: str): logger.error(msg) raise MilvusException(message=msg) def _raw_obj(self, x: object): if isinstance(x, np.ndarray): return x.tolist() if isinstance(x, np.generic): return x.item() return x def append_row(self, row: dict): dynamic_values = {} if DYNAMIC_FIELD_NAME in row and not isinstance(row[DYNAMIC_FIELD_NAME], dict): self._throw(f"Dynamic field '{DYNAMIC_FIELD_NAME}' value should be JSON format") for k, v in row.items(): if k == DYNAMIC_FIELD_NAME: dynamic_values.update(v) continue if k not in self._buffer: dynamic_values[k] = self._raw_obj(v) else: self._buffer[k].append(v) if DYNAMIC_FIELD_NAME in self._buffer: self._buffer[DYNAMIC_FIELD_NAME].append(dynamic_values) def persist(self, local_path: str, **kwargs) -> list: # verify row count of fields are equal row_count = -1 for k in self._buffer: if row_count < 0: row_count = len(self._buffer[k]) elif row_count != len(self._buffer[k]): buffer_k_len = len(self._buffer[k]) self._throw( f"Column {k} row count {buffer_k_len} doesn't equal to the first column row count {row_count}" ) # output files if self._file_type == BulkFileType.NUMPY: return self._persist_npy(local_path, **kwargs) if self._file_type == BulkFileType.JSON: return self._persist_json_rows(local_path, **kwargs) if self._file_type == BulkFileType.PARQUET: return self._persist_parquet(local_path, **kwargs) if self._file_type == BulkFileType.CSV: return self._persist_csv(local_path, **kwargs) self._throw(f"Unsupported file tpye: {self._file_type}") return [] def _persist_npy(self, local_path: str, **kwargs): file_list = [] row_count = len(next(iter(self._buffer.values()))) for k in self._buffer: full_file_name = Path(local_path).joinpath(k + ".npy") file_list.append(str(full_file_name)) try: Path(local_path).mkdir(exist_ok=True) # numpy data type specify dt = None field_schema = self._fields[k] if field_schema.dtype == DataType.ARRAY: element_type = field_schema.element_type dt = NUMPY_TYPE_CREATOR[element_type.name] elif field_schema.dtype.name in NUMPY_TYPE_CREATOR: dt = NUMPY_TYPE_CREATOR[field_schema.dtype.name] # for JSON field, convert to string array if field_schema.dtype == DataType.JSON: str_arr = [] for val in self._buffer[k]: str_arr.append(json.dumps(val)) self._buffer[k] = str_arr # currently, milvus server doesn't support numpy for sparse vector if field_schema.dtype == DataType.SPARSE_FLOAT_VECTOR: self._throw( f"Failed to persist file {full_file_name}," f" error: milvus doesn't support parsing sparse vectors from numpy file" ) # special process for float16 vector, the self._buffer stores bytes for # float16 vector, convert the bytes to uint8 array if field_schema.dtype in {DataType.FLOAT16_VECTOR, DataType.BFLOAT16_VECTOR}: a = [] for b in self._buffer[k]: a.append(np.frombuffer(b, dtype=dt).tolist()) arr = np.array(a, dtype=dt) else: arr = np.array(self._buffer[k], dtype=dt) np.save(str(full_file_name), arr) except Exception as e: self._throw(f"Failed to persist file {full_file_name}, error: {e}") logger.info(f"Successfully persist file {full_file_name}, row count: {row_count}") if len(file_list) != len(self._buffer): logger.error("Some of fields were not persisted successfully, abort the files") for f in file_list: Path(f).unlink() Path(local_path).rmdir() file_list.clear() self._throw("Some of fields were not persisted successfully, abort the files") return file_list def _persist_json_rows(self, local_path: str, **kwargs): rows = [] row_count = len(next(iter(self._buffer.values()))) row_index = 0 while row_index < row_count: row = {} for k, v in self._buffer.items(): # special process for float16 vector, the self._buffer stores bytes for # float16 vector, convert the bytes to float list field_schema = self._fields[k] if field_schema.dtype in {DataType.FLOAT16_VECTOR, DataType.BFLOAT16_VECTOR}: dt = ( np.dtype("bfloat16") if (field_schema.dtype == DataType.BFLOAT16_VECTOR) else np.float16 ) row[k] = np.frombuffer(v[row_index], dtype=dt).tolist() else: row[k] = v[row_index] rows.append(row) row_index = row_index + 1 data = { "rows": rows, } file_path = Path(local_path + ".json") try: with file_path.open("w") as json_file: json.dump(data, json_file, indent=2) except Exception as e: self._throw(f"Failed to persist file {file_path}, error: {e}") logger.info(f"Successfully persist file {file_path}, row count: {len(rows)}") return [str(file_path)] def _persist_parquet(self, local_path: str, **kwargs): file_path = Path(local_path + ".parquet") data = {} for k in self._buffer: field_schema = self._fields[k] if field_schema.dtype in {DataType.JSON, DataType.SPARSE_FLOAT_VECTOR}: # for JSON and SPARSE_VECTOR field, store as string array str_arr = [] for val in self._buffer[k]: str_arr.append(json.dumps(val)) data[k] = pd.Series(str_arr, dtype=None) elif field_schema.dtype in {DataType.BINARY_VECTOR, DataType.FLOAT_VECTOR}: arr = [] for val in self._buffer[k]: arr.append(np.array(val, dtype=NUMPY_TYPE_CREATOR[field_schema.dtype.name])) data[k] = pd.Series(arr) elif field_schema.dtype in {DataType.FLOAT16_VECTOR, DataType.BFLOAT16_VECTOR}: # special process for float16 vector, the self._buffer stores bytes for # float16 vector, convert the bytes to uint8 array arr = [] for val in self._buffer[k]: arr.append( np.frombuffer(val, dtype=NUMPY_TYPE_CREATOR[field_schema.dtype.name]) ) data[k] = pd.Series(arr) elif field_schema.dtype == DataType.ARRAY: dt = NUMPY_TYPE_CREATOR[field_schema.element_type.name] arr = [] for val in self._buffer[k]: arr.append(np.array(val, dtype=dt)) data[k] = pd.Series(arr) elif field_schema.dtype.name in NUMPY_TYPE_CREATOR: dt = NUMPY_TYPE_CREATOR[field_schema.dtype.name] data[k] = pd.Series(self._buffer[k], dtype=dt) else: # dtype is null, let pandas deduce the type, might not work data[k] = pd.Series(self._buffer[k]) # calculate a proper row group size row_group_size_min = 1000 row_group_size = 10000 row_group_size_max = 1000000 if "buffer_size" in kwargs and "buffer_row_count" in kwargs: row_group_bytes = kwargs.get( "row_group_bytes", 32 * MB ) # 32MB is an experience value that avoid high memory usage of parquet reader on server-side buffer_size = kwargs.get("buffer_size", 1) buffer_row_count = kwargs.get("buffer_row_count", 1) size_per_row = int(buffer_size / buffer_row_count) + 1 row_group_size = int(row_group_bytes / size_per_row) row_group_size = max(row_group_size, row_group_size_min) row_group_size = min(row_group_size, row_group_size_max) # write to Parquet file data_frame = pd.DataFrame(data=data) data_frame.to_parquet( file_path, row_group_size=row_group_size, engine="pyarrow" ) # don't use fastparquet logger.info( f"Successfully persist file {file_path}, total size: {buffer_size}," f" row count: {buffer_row_count}, row group size: {row_group_size}" ) return [str(file_path)] def _persist_csv(self, local_path: str, **kwargs): sep = self._config.get("sep", ",") nullkey = self._config.get("nullkey", "") header = list(self._buffer.keys()) data = pd.DataFrame(columns=header) for k, v in self._buffer.items(): field_schema = self._fields[k] # When using df.to_csv(arr) to write non-scalar data, # the repr function is used to convert the data to a string. # if the value of arr is [1.0, 2.0], repr(arr) will change with the type of arr: # when arr is a list, the output is '[1.0, 2.0]' # when arr is a tuple, the output is '(1.0, 2.0)' # when arr is a np.array, the output is '[1.0 2.0]' # we needs the output to be '[1.0, 2.0]', consistent with the array format in json # so 1. whether make sure that arr of type # (BINARY_VECTOR, FLOAT_VECTOR, FLOAT16_VECTOR, BFLOAT16_VECTOR) is a LIST, # 2. or convert arr into a string using json.dumps(arr) first and then add it to df # I choose method 2 here if field_schema.dtype in { DataType.SPARSE_FLOAT_VECTOR, DataType.BINARY_VECTOR, DataType.FLOAT_VECTOR, }: arr = [] for val in v: arr.append(json.dumps(val)) data[k] = pd.Series(arr, dtype=np.dtype("str")) elif field_schema.dtype in {DataType.FLOAT16_VECTOR, DataType.BFLOAT16_VECTOR}: # special process for float16 vector, the self._buffer stores bytes for # float16 vector, convert the bytes to float list dt = ( np.dtype("bfloat16") if (field_schema.dtype == DataType.BFLOAT16_VECTOR) else np.dtype("float16") ) arr = [] for val in v: arr.append(json.dumps(np.frombuffer(val, dtype=dt).tolist())) data[k] = pd.Series(arr, dtype=np.dtype("str")) elif field_schema.dtype in { DataType.JSON, DataType.ARRAY, }: arr = [] for val in v: if val is None: arr.append(nullkey) else: arr.append(json.dumps(val)) data[k] = pd.Series(arr, dtype=np.dtype("str")) elif field_schema.dtype in {DataType.BOOL}: arr = [] for val in v: if val is not None: arr.append("true" if val else "false") data[k] = pd.Series(arr, dtype=np.dtype("str")) else: data[k] = pd.Series(v, dtype=NUMPY_TYPE_CREATOR[field_schema.dtype.name]) file_path = Path(local_path + ".csv") try: # pd.Series will convert None to np.nan, # so we can use 'na_rep=nullkey' to replace NaN with nullkey data.to_csv(file_path, sep=sep, na_rep=nullkey, index=False) except Exception as e: self._throw(f"Failed to persist file {file_path}, error: {e}") logger.info("Successfully persist file %s, row count: %s", file_path, len(data)) return [str(file_path)]
Memory