Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
2f441dc
enhance: [skip e2e]increase timeout for image build (#29083)
yellow-shine Jan 5, 2024
23183ff
feat: Add import reader for json (#29252)
bigsheeper Jan 5, 2024
a0cec40
fix: make the entity num metric accurate (#29643)
yah01 Jan 5, 2024
5be9099
enhance: add MockSerializer generation command into Makefile (#29713)
congqixia Jan 5, 2024
b5f039a
fix: Assertion all async invocations in test case (#29737)
congqixia Jan 7, 2024
5dc300c
fix: Fix bug for pk index doesn't have raw data (#29711)
xiaocai2333 Jan 7, 2024
156a0dd
feat: Add import reader for Parquet (#29618)
bigsheeper Jan 7, 2024
635a7f7
feat: add clustering key in create/describe collection (#29506)
wayblink Jan 7, 2024
271edc6
fix: throw exception when upload file failed for DiskIndex (#29627)
foxspy Jan 7, 2024
4b3de64
enhance: add rust to install_dep.sh (#29586)
longjiquan Jan 7, 2024
a3bae80
enhance: print total memory when milvus starts (#29351)
longjiquan Jan 7, 2024
d07197a
enhance: add compare simd function (#29432)
zhagnlu Jan 7, 2024
20fb847
enhance: load delta logs concurrently (#29623)
longjiquan Jan 7, 2024
e9f3df3
fix: inverted index file not found (#29695)
longjiquan Jan 7, 2024
cd34de7
enhance:[skip e2e] use docker pugin to do same thing instead (#29667)
yellow-shine Jan 8, 2024
fe47dee
fix: Set & Return correct SegmentLevel in querynode segment manager (…
congqixia Jan 8, 2024
7e6f73a
feat: Authorize users to query grant info of their roles (#29747)
czs007 Jan 8, 2024
9702cef
feat: Support multiple vector search (#29433)
xige-16 Jan 8, 2024
97e4ec5
enhance: use random root path for minio unit tests (#29753)
yah01 Jan 8, 2024
b9d76f7
Restore the MVCC functionality.
czs007 Dec 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: Add import reader for json (milvus-io#29252)
This PR implements a new json reader for import.

issue: milvus-io#28521

---------

Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper authored Jan 5, 2024
commit 23183ffb0fd4698be0b81c6cd6c3c15d86e20240
11 changes: 5 additions & 6 deletions internal/storage/insert_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,12 @@ func (i *InsertData) GetRowNum() int {
if i.Data == nil || len(i.Data) == 0 {
return 0
}

data, ok := i.Data[common.RowIDField]
if !ok {
return 0
var rowNum int
for _, data := range i.Data {
rowNum = data.RowNum()
break
}

return data.RowNum()
return rowNum
}

func (i *InsertData) GetMemorySize() int {
Expand Down
143 changes: 143 additions & 0 deletions internal/util/importutilv2/json/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package json

import (
"encoding/json"
"fmt"
"io"
"strings"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/util/merr"
)

const (
RowRootNode = "rows"
)

type Row = map[storage.FieldID]any

type reader struct {
dec *json.Decoder
schema *schemapb.CollectionSchema

bufferSize int
isOldFormat bool

parser RowParser
}

func NewReader(r io.Reader, schema *schemapb.CollectionSchema, bufferSize int) (*reader, error) {
reader := &reader{
dec: json.NewDecoder(r),
schema: schema,
bufferSize: bufferSize,
}
var err error
reader.parser, err = NewRowParser(schema)
if err != nil {
return nil, err
}
err = reader.Init()
if err != nil {
return nil, err
}
return reader, nil
}

func (j *reader) Init() error {
// Treat number value as a string instead of a float64.
// By default, json lib treat all number values as float64,
// but if an int64 value has more than 15 digits,
// the value would be incorrect after converting from float64.
j.dec.UseNumber()
t, err := j.dec.Token()
if err != nil {
return merr.WrapErrImportFailed(fmt.Sprintf("failed to decode JSON, error: %v", err))
}
if t != json.Delim('{') && t != json.Delim('[') {
return merr.WrapErrImportFailed("invalid JSON format, the content should be started with '{' or '['")
}
j.isOldFormat = t == json.Delim('{')
return nil
}

func (j *reader) Read() (*storage.InsertData, error) {
insertData, err := storage.NewInsertData(j.schema)
if err != nil {
return nil, err
}
if !j.dec.More() {
return nil, nil
}
if j.isOldFormat {
// read the key
t, err := j.dec.Token()
if err != nil {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to decode the JSON file, error: %v", err))
}
key := t.(string)
keyLower := strings.ToLower(key)
// the root key should be RowRootNode
if keyLower != RowRootNode {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("invalid JSON format, the root key should be '%s', but get '%s'", RowRootNode, key))
}

// started by '['
t, err = j.dec.Token()
if err != nil {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to decode the JSON file, error: %v", err))
}

if t != json.Delim('[') {
return nil, merr.WrapErrImportFailed("invalid JSON format, rows list should begin with '['")
}
}
for j.dec.More() {
var value any
if err = j.dec.Decode(&value); err != nil {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to parse row, error: %v", err))
}
row, err := j.parser.Parse(value)
if err != nil {
return nil, err
}
err = insertData.Append(row)
if err != nil {
return nil, err
}
if insertData.GetMemorySize() >= j.bufferSize {
break
}
}

if !j.dec.More() {
t, err := j.dec.Token()
if err != nil {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to decode JSON, error: %v", err))
}
if t != json.Delim(']') {
return nil, merr.WrapErrImportFailed("invalid JSON format, rows list should end with ']'")
}
}

return insertData, nil
}

func (j *reader) Close() {}
Loading