// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "arrow/python/serialize.h" #include "arrow/python/numpy_interop.h" #include <cstdint> #include <limits> #include <memory> #include <sstream> #include <string> #include <vector> #include <numpy/arrayobject.h> #include <numpy/arrayscalars.h> #include "arrow/array.h" #include "arrow/array/builder_binary.h" #include "arrow/array/builder_nested.h" #include "arrow/array/builder_primitive.h" #include "arrow/array/builder_union.h" #include "arrow/io/interfaces.h" #include "arrow/io/memory.h" #include "arrow/ipc/util.h" #include "arrow/ipc/writer.h" #include "arrow/record_batch.h" #include "arrow/result.h" #include "arrow/tensor.h" #include "arrow/util/logging.h" #include "arrow/python/common.h" #include "arrow/python/datetime.h" #include "arrow/python/helpers.h" #include "arrow/python/iterators.h" #include "arrow/python/numpy_convert.h" #include "arrow/python/platform.h" #include "arrow/python/pyarrow.h" constexpr int32_t kMaxRecursionDepth = 100; namespace arrow { using internal::checked_cast; namespace py { class SequenceBuilder; class DictBuilder; Status Append(PyObject* context, PyObject* elem, SequenceBuilder* builder, int32_t recursion_depth, SerializedPyObject* blobs_out); // A Sequence is a heterogeneous collections of elements. It can contain // scalar Python types, lists, tuples, dictionaries, tensors and sparse tensors. class SequenceBuilder { public: explicit SequenceBuilder(MemoryPool* pool = default_memory_pool()) : pool_(pool), types_(::arrow::int8(), pool), offsets_(::arrow::int32(), pool), type_map_(PythonType::NUM_PYTHON_TYPES, -1) { auto null_builder = std::make_shared<NullBuilder>(pool); auto initial_ty = dense_union({field("0", null())}); builder_.reset(new DenseUnionBuilder(pool, {null_builder}, initial_ty)); } // Appending a none to the sequence Status AppendNone() { return builder_->AppendNull(); } template <typename BuilderType, typename MakeBuilderFn> Status CreateAndUpdate(std::shared_ptr<BuilderType>* child_builder, int8_t tag, MakeBuilderFn make_builder) { if (!*child_builder) { child_builder->reset(make_builder()); std::ostringstream convert; convert.imbue(std::locale::classic()); convert << static_cast<int>(tag); type_map_[tag] = builder_->AppendChild(*child_builder, convert.str()); } return builder_->Append(type_map_[tag]); } template <typename BuilderType, typename T> Status AppendPrimitive(std::shared_ptr<BuilderType>* child_builder, const T val, int8_t tag) { RETURN_NOT_OK( CreateAndUpdate(child_builder, tag, [this]() { return new BuilderType(pool_); })); return (*child_builder)->Append(val); } // Appending a boolean to the sequence Status AppendBool(const bool data) { return AppendPrimitive(&bools_, data, PythonType::BOOL); } // Appending an int64_t to the sequence Status AppendInt64(const int64_t data) { return AppendPrimitive(&ints_, data, PythonType::INT); } // Append a list of bytes to the sequence Status AppendBytes(const uint8_t* data, int32_t length) { RETURN_NOT_OK(CreateAndUpdate(&bytes_, PythonType::BYTES, [this]() { return new BinaryBuilder(pool_); })); return bytes_->Append(data, length); } // Appending a string to the sequence Status AppendString(const char* data, int32_t length) { RETURN_NOT_OK(CreateAndUpdate(&strings_, PythonType::STRING, [this]() { return new StringBuilder(pool_); })); return strings_->Append(data, length); } // Appending a half_float to the sequence Status AppendHalfFloat(const npy_half data) { return AppendPrimitive(&half_floats_, data, PythonType::HALF_FLOAT); } // Appending a float to the sequence Status AppendFloat(const float data) { return AppendPrimitive(&floats_, data, PythonType::FLOAT); } // Appending a double to the sequence Status AppendDouble(const double data) { return AppendPrimitive(&doubles_, data, PythonType::DOUBLE); } // Appending a Date64 timestamp to the sequence Status AppendDate64(const int64_t timestamp) { return AppendPrimitive(&date64s_, timestamp, PythonType::DATE64); } // Appending a tensor to the sequence // // \param tensor_index Index of the tensor in the object. Status AppendTensor(const int32_t tensor_index) { RETURN_NOT_OK(CreateAndUpdate(&tensor_indices_, PythonType::TENSOR, [this]() { return new Int32Builder(pool_); })); return tensor_indices_->Append(tensor_index); } // Appending a sparse coo tensor to the sequence // // \param sparse_coo_tensor_index Index of the sparse coo tensor in the object. Status AppendSparseCOOTensor(const int32_t sparse_coo_tensor_index) { RETURN_NOT_OK(CreateAndUpdate(&sparse_coo_tensor_indices_, PythonType::SPARSECOOTENSOR, [this]() { return new Int32Builder(pool_); })); return sparse_coo_tensor_indices_->Append(sparse_coo_tensor_index); } // Appending a sparse csr matrix to the sequence // // \param sparse_csr_matrix_index Index of the sparse csr matrix in the object. Status AppendSparseCSRMatrix(const int32_t sparse_csr_matrix_index) { RETURN_NOT_OK(CreateAndUpdate(&sparse_csr_matrix_indices_, PythonType::SPARSECSRMATRIX, [this]() { return new Int32Builder(pool_); })); return sparse_csr_matrix_indices_->Append(sparse_csr_matrix_index); } // Appending a sparse csc matrix to the sequence // // \param sparse_csc_matrix_index Index of the sparse csc matrix in the object. Status AppendSparseCSCMatrix(const int32_t sparse_csc_matrix_index) { RETURN_NOT_OK(CreateAndUpdate(&sparse_csc_matrix_indices_, PythonType::SPARSECSCMATRIX, [this]() { return new Int32Builder(pool_); })); return sparse_csc_matrix_indices_->Append(sparse_csc_matrix_index); } // Appending a sparse csf tensor to the sequence // // \param sparse_csf_tensor_index Index of the sparse csf tensor in the object. Status AppendSparseCSFTensor(const int32_t sparse_csf_tensor_index) { RETURN_NOT_OK(CreateAndUpdate(&sparse_csf_tensor_indices_, PythonType::SPARSECSFTENSOR, [this]() { return new Int32Builder(pool_); })); return sparse_csf_tensor_indices_->Append(sparse_csf_tensor_index); } // Appending a numpy ndarray to the sequence // // \param tensor_index Index of the tensor in the object. Status AppendNdarray(const int32_t ndarray_index) { RETURN_NOT_OK(CreateAndUpdate(&ndarray_indices_, PythonType::NDARRAY, [this]() { return new Int32Builder(pool_); })); return ndarray_indices_->Append(ndarray_index); } // Appending a buffer to the sequence // // \param buffer_index Index of the buffer in the object. Status AppendBuffer(const int32_t buffer_index) { RETURN_NOT_OK(CreateAndUpdate(&buffer_indices_, PythonType::BUFFER, [this]() { return new Int32Builder(pool_); })); return buffer_indices_->Append(buffer_index); } Status AppendSequence(PyObject* context, PyObject* sequence, int8_t tag, std::shared_ptr<ListBuilder>& target_sequence, std::unique_ptr<SequenceBuilder>& values, int32_t recursion_depth, SerializedPyObject* blobs_out) { if (recursion_depth >= kMaxRecursionDepth) { return Status::NotImplemented( "This object exceeds the maximum recursion depth. It may contain itself " "recursively."); } RETURN_NOT_OK(CreateAndUpdate(&target_sequence, tag, [this, &values]() { values.reset(new SequenceBuilder(pool_)); return new ListBuilder(pool_, values->builder()); })); RETURN_NOT_OK(target_sequence->Append()); return internal::VisitIterable( sequence, [&](PyObject* obj, bool* keep_going /* unused */) { return Append(context, obj, values.get(), recursion_depth, blobs_out); }); } Status AppendList(PyObject* context, PyObject* list, int32_t recursion_depth, SerializedPyObject* blobs_out) { return AppendSequence(context, list, PythonType::LIST, lists_, list_values_, recursion_depth + 1, blobs_out); } Status AppendTuple(PyObject* context, PyObject* tuple, int32_t recursion_depth, SerializedPyObject* blobs_out) { return AppendSequence(context, tuple, PythonType::TUPLE, tuples_, tuple_values_, recursion_depth + 1, blobs_out); } Status AppendSet(PyObject* context, PyObject* set, int32_t recursion_depth, SerializedPyObject* blobs_out) { return AppendSequence(context, set, PythonType::SET, sets_, set_values_, recursion_depth + 1, blobs_out); } Status AppendDict(PyObject* context, PyObject* dict, int32_t recursion_depth, SerializedPyObject* blobs_out); // Finish building the sequence and return the result. // Input arrays may be nullptr Status Finish(std::shared_ptr<Array>* out) { return builder_->Finish(out); } std::shared_ptr<DenseUnionBuilder> builder() { return builder_; } private: MemoryPool* pool_; Int8Builder types_; Int32Builder offsets_; /// Mapping from PythonType to child index std::vector<int8_t> type_map_; std::shared_ptr<BooleanBuilder> bools_; std::shared_ptr<Int64Builder> ints_; std::shared_ptr<BinaryBuilder> bytes_; std::shared_ptr<StringBuilder> strings_; std::shared_ptr<HalfFloatBuilder> half_floats_; std::shared_ptr<FloatBuilder> floats_; std::shared_ptr<DoubleBuilder> doubles_; std::shared_ptr<Date64Builder> date64s_; std::unique_ptr<SequenceBuilder> list_values_; std::shared_ptr<ListBuilder> lists_; std::unique_ptr<DictBuilder> dict_values_; std::shared_ptr<ListBuilder> dicts_; std::unique_ptr<SequenceBuilder> tuple_values_; std::shared_ptr<ListBuilder> tuples_; std::unique_ptr<SequenceBuilder> set_values_; std::shared_ptr<ListBuilder> sets_; std::shared_ptr<Int32Builder> tensor_indices_; std::shared_ptr<Int32Builder> sparse_coo_tensor_indices_; std::shared_ptr<Int32Builder> sparse_csr_matrix_indices_; std::shared_ptr<Int32Builder> sparse_csc_matrix_indices_; std::shared_ptr<Int32Builder> sparse_csf_tensor_indices_; std::shared_ptr<Int32Builder> ndarray_indices_; std::shared_ptr<Int32Builder> buffer_indices_; std::shared_ptr<DenseUnionBuilder> builder_; }; // Constructing dictionaries of key/value pairs. Sequences of // keys and values are built separately using a pair of // SequenceBuilders. The resulting Arrow representation // can be obtained via the Finish method. class DictBuilder { public: explicit DictBuilder(MemoryPool* pool = nullptr) : keys_(pool), vals_(pool) { builder_.reset(new StructBuilder(struct_({field("keys", dense_union(FieldVector{})), field("vals", dense_union(FieldVector{}))}), pool, {keys_.builder(), vals_.builder()})); } // Builder for the keys of the dictionary SequenceBuilder& keys() { return keys_; } // Builder for the values of the dictionary SequenceBuilder& vals() { return vals_; } // Construct an Arrow StructArray representing the dictionary. // Contains a field "keys" for the keys and "vals" for the values. Status Finish(std::shared_ptr<Array>* out) { return builder_->Finish(out); } std::shared_ptr<StructBuilder> builder() { return builder_; } private: SequenceBuilder keys_; SequenceBuilder vals_; std::shared_ptr<StructBuilder> builder_; }; Status SequenceBuilder::AppendDict(PyObject* context, PyObject* dict, int32_t recursion_depth, SerializedPyObject* blobs_out) { if (recursion_depth >= kMaxRecursionDepth) { return Status::NotImplemented( "This object exceeds the maximum recursion depth. It may contain itself " "recursively."); } RETURN_NOT_OK(CreateAndUpdate(&dicts_, PythonType::DICT, [this]() { dict_values_.reset(new DictBuilder(pool_)); return new ListBuilder(pool_, dict_values_->builder()); })); RETURN_NOT_OK(dicts_->Append()); PyObject* key; PyObject* value; Py_ssize_t pos = 0; while (PyDict_Next(dict, &pos, &key, &value)) { RETURN_NOT_OK(dict_values_->builder()->Append()); RETURN_NOT_OK( Append(context, key, &dict_values_->keys(), recursion_depth + 1, blobs_out)); RETURN_NOT_OK( Append(context, value, &dict_values_->vals(), recursion_depth + 1, blobs_out)); } // This block is used to decrement the reference counts of the results // returned by the serialization callback, which is called in AppendArray, // in DeserializeDict and in Append static PyObject* py_type = PyUnicode_FromString("_pytype_"); if (PyDict_Contains(dict, py_type)) { // If the dictionary contains the key "_pytype_", then the user has to // have registered a callback. if (context == Py_None) { return Status::Invalid("No serialization callback set"); } Py_XDECREF(dict); } return Status::OK(); } Status CallCustomCallback(PyObject* context, PyObject* method_name, PyObject* elem, PyObject** result) { if (context == Py_None) { *result = NULL; return Status::SerializationError("error while calling callback on ", internal::PyObject_StdStringRepr(elem), ": handler not registered"); } else { *result = PyObject_CallMethodObjArgs(context, method_name, elem, NULL); return CheckPyError(); } } Status CallSerializeCallback(PyObject* context, PyObject* value, PyObject** serialized_object) { OwnedRef method_name(PyUnicode_FromString("_serialize_callback")); RETURN_NOT_OK(CallCustomCallback(context, method_name.obj(), value, serialized_object)); if (!PyDict_Check(*serialized_object)) { return Status::TypeError("serialization callback must return a valid dictionary"); } return Status::OK(); } Status CallDeserializeCallback(PyObject* context, PyObject* value, PyObject** deserialized_object) { OwnedRef method_name(PyUnicode_FromString("_deserialize_callback")); return CallCustomCallback(context, method_name.obj(), value, deserialized_object); } Status AppendArray(PyObject* context, PyArrayObject* array, SequenceBuilder* builder, int32_t recursion_depth, SerializedPyObject* blobs_out); template <typename NumpyScalarObject> Status AppendIntegerScalar(PyObject* obj, SequenceBuilder* builder) { int64_t value = reinterpret_cast<NumpyScalarObject*>(obj)->obval; return builder->AppendInt64(value); } // Append a potentially 64-bit wide unsigned Numpy scalar. // Must check for overflow as we reinterpret it as signed int64. template <typename NumpyScalarObject> Status AppendLargeUnsignedScalar(PyObject* obj, SequenceBuilder* builder) { constexpr uint64_t max_value = std::numeric_limits<int64_t>::max(); uint64_t value = reinterpret_cast<NumpyScalarObject*>(obj)->obval; if (value > max_value) { return Status::Invalid("cannot serialize Numpy uint64 scalar >= 2**63"); } return builder->AppendInt64(static_cast<int64_t>(value)); } Status AppendScalar(PyObject* obj, SequenceBuilder* builder) { if (PyArray_IsScalar(obj, Bool)) { return builder->AppendBool(reinterpret_cast<PyBoolScalarObject*>(obj)->obval != 0); } else if (PyArray_IsScalar(obj, Half)) { return builder->AppendHalfFloat(reinterpret_cast<PyHalfScalarObject*>(obj)->obval); } else if (PyArray_IsScalar(obj, Float)) { return builder->AppendFloat(reinterpret_cast<PyFloatScalarObject*>(obj)->obval); } else if (PyArray_IsScalar(obj, Double)) { return builder->AppendDouble(reinterpret_cast<PyDoubleScalarObject*>(obj)->obval); } if (PyArray_IsScalar(obj, Byte)) { return AppendIntegerScalar<PyByteScalarObject>(obj, builder); } else if (PyArray_IsScalar(obj, Short)) { return AppendIntegerScalar<PyShortScalarObject>(obj, builder); } else if (PyArray_IsScalar(obj, Int)) { return AppendIntegerScalar<PyIntScalarObject>(obj, builder); } else if (PyArray_IsScalar(obj, Long)) { return AppendIntegerScalar<PyLongScalarObject>(obj, builder); } else if (PyArray_IsScalar(obj, LongLong)) { return AppendIntegerScalar<PyLongLongScalarObject>(obj, builder); } else if (PyArray_IsScalar(obj, Int64)) { return AppendIntegerScalar<PyInt64ScalarObject>(obj, builder); } else if (PyArray_IsScalar(obj, UByte)) { return AppendIntegerScalar<PyUByteScalarObject>(obj, builder); } else if (PyArray_IsScalar(obj, UShort)) { return AppendIntegerScalar<PyUShortScalarObject>(obj, builder); } else if (PyArray_IsScalar(obj, UInt)) { return AppendIntegerScalar<PyUIntScalarObject>(obj, builder); } else if (PyArray_IsScalar(obj, ULong)) { return AppendLargeUnsignedScalar<PyULongScalarObject>(obj, builder); } else if (PyArray_IsScalar(obj, ULongLong)) { return AppendLargeUnsignedScalar<PyULongLongScalarObject>(obj, builder); } else if (PyArray_IsScalar(obj, UInt64)) { return AppendLargeUnsignedScalar<PyUInt64ScalarObject>(obj, builder); } return Status::NotImplemented("Numpy scalar type not recognized"); } Status Append(PyObject* context, PyObject* elem, SequenceBuilder* builder, int32_t recursion_depth, SerializedPyObject* blobs_out) { // The bool case must precede the int case (PyInt_Check passes for bools) if (PyBool_Check(elem)) { RETURN_NOT_OK(builder->AppendBool(elem == Py_True)); } else if (PyArray_DescrFromScalar(elem)->type_num == NPY_HALF) { npy_half halffloat = reinterpret_cast<PyHalfScalarObject*>(elem)->obval; RETURN_NOT_OK(builder->AppendHalfFloat(halffloat)); } else if (PyFloat_Check(elem)) { RETURN_NOT_OK(builder->AppendDouble(PyFloat_AS_DOUBLE(elem))); } else if (PyLong_Check(elem)) { int overflow = 0; int64_t data = PyLong_AsLongLongAndOverflow(elem, &overflow); if (!overflow) { RETURN_NOT_OK(builder->AppendInt64(data)); } else { // Attempt to serialize the object using the custom callback. PyObject* serialized_object; // The reference count of serialized_object will be decremented in SerializeDict RETURN_NOT_OK(CallSerializeCallback(context, elem, &serialized_object)); RETURN_NOT_OK( builder->AppendDict(context, serialized_object, recursion_depth, blobs_out)); } } else if (PyBytes_Check(elem)) { auto data = reinterpret_cast<uint8_t*>(PyBytes_AS_STRING(elem)); int32_t size = -1; RETURN_NOT_OK(internal::CastSize(PyBytes_GET_SIZE(elem), &size)); RETURN_NOT_OK(builder->AppendBytes(data, size)); } else if (PyUnicode_Check(elem)) { ARROW_ASSIGN_OR_RAISE(auto view, PyBytesView::FromUnicode(elem)); int32_t size = -1; RETURN_NOT_OK(internal::CastSize(view.size, &size)); RETURN_NOT_OK(builder->AppendString(view.bytes, size)); } else if (PyList_CheckExact(elem)) { RETURN_NOT_OK(builder->AppendList(context, elem, recursion_depth, blobs_out)); } else if (PyDict_CheckExact(elem)) { RETURN_NOT_OK(builder->AppendDict(context, elem, recursion_depth, blobs_out)); } else if (PyTuple_CheckExact(elem)) { RETURN_NOT_OK(builder->AppendTuple(context, elem, recursion_depth, blobs_out)); } else if (PySet_Check(elem)) { RETURN_NOT_OK(builder->AppendSet(context, elem, recursion_depth, blobs_out)); } else if (PyArray_IsScalar(elem, Generic)) { RETURN_NOT_OK(AppendScalar(elem, builder)); } else if (PyArray_CheckExact(elem)) { RETURN_NOT_OK(AppendArray(context, reinterpret_cast<PyArrayObject*>(elem), builder, recursion_depth, blobs_out)); } else if (elem == Py_None) { RETURN_NOT_OK(builder->AppendNone()); } else if (PyDateTime_Check(elem)) { PyDateTime_DateTime* datetime = reinterpret_cast<PyDateTime_DateTime*>(elem); RETURN_NOT_OK(builder->AppendDate64(internal::PyDateTime_to_us(datetime))); } else if (is_buffer(elem)) { RETURN_NOT_OK(builder->AppendBuffer(static_cast<int32_t>(blobs_out->buffers.size()))); ARROW_ASSIGN_OR_RAISE(auto buffer, unwrap_buffer(elem)); blobs_out->buffers.push_back(buffer); } else if (is_tensor(elem)) { RETURN_NOT_OK(builder->AppendTensor(static_cast<int32_t>(blobs_out->tensors.size()))); ARROW_ASSIGN_OR_RAISE(auto tensor, unwrap_tensor(elem)); blobs_out->tensors.push_back(tensor); } else if (is_sparse_coo_tensor(elem)) { RETURN_NOT_OK(builder->AppendSparseCOOTensor( static_cast<int32_t>(blobs_out->sparse_tensors.size()))); ARROW_ASSIGN_OR_RAISE(auto tensor, unwrap_sparse_coo_tensor(elem)); blobs_out->sparse_tensors.push_back(tensor); } else if (is_sparse_csr_matrix(elem)) { RETURN_NOT_OK(builder->AppendSparseCSRMatrix( static_cast<int32_t>(blobs_out->sparse_tensors.size()))); ARROW_ASSIGN_OR_RAISE(auto matrix, unwrap_sparse_csr_matrix(elem)); blobs_out->sparse_tensors.push_back(matrix); } else if (is_sparse_csc_matrix(elem)) { RETURN_NOT_OK(builder->AppendSparseCSCMatrix( static_cast<int32_t>(blobs_out->sparse_tensors.size()))); ARROW_ASSIGN_OR_RAISE(auto matrix, unwrap_sparse_csc_matrix(elem)); blobs_out->sparse_tensors.push_back(matrix); } else if (is_sparse_csf_tensor(elem)) { RETURN_NOT_OK(builder->AppendSparseCSFTensor( static_cast<int32_t>(blobs_out->sparse_tensors.size()))); ARROW_ASSIGN_OR_RAISE(auto tensor, unwrap_sparse_csf_tensor(elem)); blobs_out->sparse_tensors.push_back(tensor); } else { // Attempt to serialize the object using the custom callback. PyObject* serialized_object; // The reference count of serialized_object will be decremented in SerializeDict RETURN_NOT_OK(CallSerializeCallback(context, elem, &serialized_object)); RETURN_NOT_OK( builder->AppendDict(context, serialized_object, recursion_depth, blobs_out)); } return Status::OK(); } Status AppendArray(PyObject* context, PyArrayObject* array, SequenceBuilder* builder, int32_t recursion_depth, SerializedPyObject* blobs_out) { int dtype = PyArray_TYPE(array); switch (dtype) { case NPY_UINT8: case NPY_INT8: case NPY_UINT16: case NPY_INT16: case NPY_UINT32: case NPY_INT32: case NPY_UINT64: case NPY_INT64: case NPY_HALF: case NPY_FLOAT: case NPY_DOUBLE: { RETURN_NOT_OK( builder->AppendNdarray(static_cast<int32_t>(blobs_out->ndarrays.size()))); std::shared_ptr<Tensor> tensor; RETURN_NOT_OK(NdarrayToTensor(default_memory_pool(), reinterpret_cast<PyObject*>(array), {}, &tensor)); blobs_out->ndarrays.push_back(tensor); } break; default: { PyObject* serialized_object; // The reference count of serialized_object will be decremented in SerializeDict RETURN_NOT_OK(CallSerializeCallback(context, reinterpret_cast<PyObject*>(array), &serialized_object)); RETURN_NOT_OK(builder->AppendDict(context, serialized_object, recursion_depth + 1, blobs_out)); } } return Status::OK(); } std::shared_ptr<RecordBatch> MakeBatch(std::shared_ptr<Array> data) { auto field = std::make_shared<Field>("list", data->type()); auto schema = ::arrow::schema({field}); return RecordBatch::Make(schema, data->length(), {data}); } Status SerializeObject(PyObject* context, PyObject* sequence, SerializedPyObject* out) { PyAcquireGIL lock; SequenceBuilder builder; RETURN_NOT_OK(internal::VisitIterable( sequence, [&](PyObject* obj, bool* keep_going /* unused */) { return Append(context, obj, &builder, 0, out); })); std::shared_ptr<Array> array; RETURN_NOT_OK(builder.Finish(&array)); out->batch = MakeBatch(array); return Status::OK(); } Status SerializeNdarray(std::shared_ptr<Tensor> tensor, SerializedPyObject* out) { std::shared_ptr<Array> array; SequenceBuilder builder; RETURN_NOT_OK(builder.AppendNdarray(static_cast<int32_t>(out->ndarrays.size()))); out->ndarrays.push_back(tensor); RETURN_NOT_OK(builder.Finish(&array)); out->batch = MakeBatch(array); return Status::OK(); } Status WriteNdarrayHeader(std::shared_ptr<DataType> dtype, const std::vector<int64_t>& shape, int64_t tensor_num_bytes, io::OutputStream* dst) { auto empty_tensor = std::make_shared<Tensor>( dtype, std::make_shared<Buffer>(nullptr, tensor_num_bytes), shape); SerializedPyObject serialized_tensor; RETURN_NOT_OK(SerializeNdarray(empty_tensor, &serialized_tensor)); return serialized_tensor.WriteTo(dst); } SerializedPyObject::SerializedPyObject() : ipc_options(ipc::IpcWriteOptions::Defaults()) {} Status SerializedPyObject::WriteTo(io::OutputStream* dst) { int32_t num_tensors = static_cast<int32_t>(this->tensors.size()); int32_t num_sparse_tensors = static_cast<int32_t>(this->sparse_tensors.size()); int32_t num_ndarrays = static_cast<int32_t>(this->ndarrays.size()); int32_t num_buffers = static_cast<int32_t>(this->buffers.size()); RETURN_NOT_OK( dst->Write(reinterpret_cast<const uint8_t*>(&num_tensors), sizeof(int32_t))); RETURN_NOT_OK( dst->Write(reinterpret_cast<const uint8_t*>(&num_sparse_tensors), sizeof(int32_t))); RETURN_NOT_OK( dst->Write(reinterpret_cast<const uint8_t*>(&num_ndarrays), sizeof(int32_t))); RETURN_NOT_OK( dst->Write(reinterpret_cast<const uint8_t*>(&num_buffers), sizeof(int32_t))); // Align stream to 8-byte offset RETURN_NOT_OK(ipc::AlignStream(dst, ipc::kArrowIpcAlignment)); RETURN_NOT_OK(ipc::WriteRecordBatchStream({this->batch}, this->ipc_options, dst)); // Align stream to 64-byte offset so tensor bodies are 64-byte aligned RETURN_NOT_OK(ipc::AlignStream(dst, ipc::kTensorAlignment)); int32_t metadata_length; int64_t body_length; for (const auto& tensor : this->tensors) { RETURN_NOT_OK(ipc::WriteTensor(*tensor, dst, &metadata_length, &body_length)); RETURN_NOT_OK(ipc::AlignStream(dst, ipc::kTensorAlignment)); } for (const auto& sparse_tensor : this->sparse_tensors) { RETURN_NOT_OK( ipc::WriteSparseTensor(*sparse_tensor, dst, &metadata_length, &body_length)); RETURN_NOT_OK(ipc::AlignStream(dst, ipc::kTensorAlignment)); } for (const auto& tensor : this->ndarrays) { RETURN_NOT_OK(ipc::WriteTensor(*tensor, dst, &metadata_length, &body_length)); RETURN_NOT_OK(ipc::AlignStream(dst, ipc::kTensorAlignment)); } for (const auto& buffer : this->buffers) { int64_t size = buffer->size(); RETURN_NOT_OK(dst->Write(reinterpret_cast<const uint8_t*>(&size), sizeof(int64_t))); RETURN_NOT_OK(dst->Write(buffer->data(), size)); } return Status::OK(); } namespace { Status CountSparseTensors( const std::vector<std::shared_ptr<SparseTensor>>& sparse_tensors, PyObject** out) { OwnedRef num_sparse_tensors(PyDict_New()); size_t num_coo = 0; size_t num_csr = 0; size_t num_csc = 0; size_t num_csf = 0; size_t ndim_csf = 0; for (const auto& sparse_tensor : sparse_tensors) { switch (sparse_tensor->format_id()) { case SparseTensorFormat::COO: ++num_coo; break; case SparseTensorFormat::CSR: ++num_csr; break; case SparseTensorFormat::CSC: ++num_csc; break; case SparseTensorFormat::CSF: ++num_csf; ndim_csf += sparse_tensor->ndim(); break; } } PyDict_SetItemString(num_sparse_tensors.obj(), "coo", PyLong_FromSize_t(num_coo)); PyDict_SetItemString(num_sparse_tensors.obj(), "csr", PyLong_FromSize_t(num_csr)); PyDict_SetItemString(num_sparse_tensors.obj(), "csc", PyLong_FromSize_t(num_csc)); PyDict_SetItemString(num_sparse_tensors.obj(), "csf", PyLong_FromSize_t(num_csf)); PyDict_SetItemString(num_sparse_tensors.obj(), "ndim_csf", PyLong_FromSize_t(ndim_csf)); RETURN_IF_PYERROR(); *out = num_sparse_tensors.detach(); return Status::OK(); } } // namespace Status SerializedPyObject::GetComponents(MemoryPool* memory_pool, PyObject** out) { PyAcquireGIL py_gil; OwnedRef result(PyDict_New()); PyObject* buffers = PyList_New(0); PyObject* num_sparse_tensors = nullptr; // TODO(wesm): Not sure how pedantic we need to be about checking the return // values of these functions. There are other places where we do not check // PyDict_SetItem/SetItemString return value, but these failures would be // quite esoteric PyDict_SetItemString(result.obj(), "num_tensors", PyLong_FromSize_t(this->tensors.size())); RETURN_NOT_OK(CountSparseTensors(this->sparse_tensors, &num_sparse_tensors)); PyDict_SetItemString(result.obj(), "num_sparse_tensors", num_sparse_tensors); PyDict_SetItemString(result.obj(), "ndim_csf", num_sparse_tensors); PyDict_SetItemString(result.obj(), "num_ndarrays", PyLong_FromSize_t(this->ndarrays.size())); PyDict_SetItemString(result.obj(), "num_buffers", PyLong_FromSize_t(this->buffers.size())); PyDict_SetItemString(result.obj(), "data", buffers); RETURN_IF_PYERROR(); Py_DECREF(buffers); auto PushBuffer = [&buffers](const std::shared_ptr<Buffer>& buffer) { PyObject* wrapped_buffer = wrap_buffer(buffer); RETURN_IF_PYERROR(); if (PyList_Append(buffers, wrapped_buffer) < 0) { Py_DECREF(wrapped_buffer); RETURN_IF_PYERROR(); } Py_DECREF(wrapped_buffer); return Status::OK(); }; constexpr int64_t kInitialCapacity = 1024; // Write the record batch describing the object structure py_gil.release(); ARROW_ASSIGN_OR_RAISE(auto stream, io::BufferOutputStream::Create(kInitialCapacity, memory_pool)); RETURN_NOT_OK( ipc::WriteRecordBatchStream({this->batch}, this->ipc_options, stream.get())); ARROW_ASSIGN_OR_RAISE(auto buffer, stream->Finish()); py_gil.acquire(); RETURN_NOT_OK(PushBuffer(buffer)); // For each tensor, get a metadata buffer and a buffer for the body for (const auto& tensor : this->tensors) { ARROW_ASSIGN_OR_RAISE(std::unique_ptr<ipc::Message> message, ipc::GetTensorMessage(*tensor, memory_pool)); RETURN_NOT_OK(PushBuffer(message->metadata())); RETURN_NOT_OK(PushBuffer(message->body())); } // For each sparse tensor, get a metadata buffer and buffers containing index and data for (const auto& sparse_tensor : this->sparse_tensors) { ipc::IpcPayload payload; RETURN_NOT_OK(ipc::GetSparseTensorPayload(*sparse_tensor, memory_pool, &payload)); RETURN_NOT_OK(PushBuffer(payload.metadata)); for (const auto& body : payload.body_buffers) { RETURN_NOT_OK(PushBuffer(body)); } } // For each ndarray, get a metadata buffer and a buffer for the body for (const auto& ndarray : this->ndarrays) { ARROW_ASSIGN_OR_RAISE(std::unique_ptr<ipc::Message> message, ipc::GetTensorMessage(*ndarray, memory_pool)); RETURN_NOT_OK(PushBuffer(message->metadata())); RETURN_NOT_OK(PushBuffer(message->body())); } for (const auto& buf : this->buffers) { RETURN_NOT_OK(PushBuffer(buf)); } *out = result.detach(); return Status::OK(); } } // namespace py } // namespace arrow
Memory