Skip to content

Commit 2726c6f

Browse files
Minor refactors to processing
Some refactors across druid to clean up the code and add utility functions where required.
1 parent 17215cd commit 2726c6f

25 files changed

+621
-254
lines changed

extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,6 @@ public String getFormatString()
511511
binder -> binder.bind(SegmentManager.class).toInstance(EasyMock.createMock(SegmentManager.class)),
512512
new JoinableFactoryModule(),
513513
new IndexingServiceTuningConfigModule(),
514-
new MSQIndexingModule(),
515514
Modules.override(new MSQSqlModule()).with(
516515
binder -> {
517516
// Our Guice configuration currently requires bindings to exist even if they aren't ever used, the
@@ -540,6 +539,7 @@ public String getFormatString()
540539

541540
objectMapper = setupObjectMapper(injector);
542541
objectMapper.registerModules(new StorageConnectorModule().getJacksonModules());
542+
objectMapper.registerModules(new MSQIndexingModule().getJacksonModules());
543543
objectMapper.registerModules(sqlModule.getJacksonModules());
544544
objectMapper.registerModules(BuiltInTypesModule.getJacksonModulesList());
545545

@@ -697,7 +697,6 @@ protected Supplier<ResourceHolder<CompleteSegment>> getSupplierForSegment(
697697
break;
698698
default:
699699
throw new ISE("Cannot query segment %s in test runner", segmentId);
700-
701700
}
702701
Segment segment = new Segment()
703702
{

processing/src/main/java/org/apache/druid/query/aggregation/SerializedStorage.java

Lines changed: 133 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,17 @@
1919

2020
package org.apache.druid.query.aggregation;
2121

22-
import com.google.common.base.Preconditions;
22+
import it.unimi.dsi.fastutil.longs.LongArrayList;
23+
import org.apache.druid.error.DruidException;
2324
import org.apache.druid.segment.serde.cell.IOIterator;
24-
import org.apache.druid.segment.serde.cell.IntSerializer;
2525
import org.apache.druid.segment.serde.cell.StagedSerde;
2626
import org.apache.druid.segment.writeout.WriteOutBytes;
2727

2828
import javax.annotation.Nullable;
29-
import java.io.BufferedInputStream;
3029
import java.io.IOException;
31-
import java.io.InputStream;
3230
import java.nio.ByteBuffer;
3331
import java.nio.ByteOrder;
32+
import java.nio.IntBuffer;
3433
import java.util.NoSuchElementException;
3534

3635
/**
@@ -45,109 +44,181 @@ public class SerializedStorage<T>
4544
{
4645
private final WriteOutBytes writeOutBytes;
4746
private final StagedSerde<T> serde;
48-
private final IntSerializer intSerializer = new IntSerializer();
47+
private final ByteBuffer itemOffsetsBytes;
48+
private final IntBuffer itemSizes;
49+
50+
private final LongArrayList rowChunkOffsets = new LongArrayList();
51+
private int numStored = 0;
52+
private int maxSize = 0;
4953

5054
public SerializedStorage(WriteOutBytes writeOutBytes, StagedSerde<T> serde)
55+
{
56+
this(writeOutBytes, serde, 4096);
57+
}
58+
59+
public SerializedStorage(WriteOutBytes writeOutBytes, StagedSerde<T> serde, int chunkSize)
5160
{
5261
this.writeOutBytes = writeOutBytes;
5362
this.serde = serde;
63+
64+
this.itemOffsetsBytes = ByteBuffer.allocate(chunkSize * Integer.BYTES).order(ByteOrder.nativeOrder());
65+
this.itemSizes = itemOffsetsBytes.asIntBuffer();
5466
}
5567

5668
public void store(@Nullable T value) throws IOException
5769
{
5870
byte[] bytes = serde.serialize(value);
5971

60-
writeOutBytes.write(intSerializer.serialize(bytes.length));
61-
writeOutBytes.write(bytes);
72+
maxSize = Math.max(maxSize, bytes.length);
73+
itemSizes.put(bytes.length);
74+
if (bytes.length > 0) {
75+
writeOutBytes.write(bytes);
76+
}
77+
78+
++numStored;
79+
if (itemSizes.remaining() == 0) {
80+
rowChunkOffsets.add(writeOutBytes.size());
81+
writeOutBytes.write(itemOffsetsBytes);
82+
itemOffsetsBytes.clear();
83+
itemSizes.clear();
84+
}
6285
}
6386

87+
public int numStored()
88+
{
89+
return numStored;
90+
}
91+
92+
/**
93+
* Generates an iterator over everything that has been stored. Also signifies the end of storing objects.
94+
* iterator() can be called multiple times if needed, but after iterator() is called, store() can no longer be
95+
* called.
96+
*
97+
* @return an iterator
98+
* @throws IOException on failure
99+
*/
64100
public IOIterator<T> iterator() throws IOException
65101
{
66-
return new DeserializingIOIterator<>(writeOutBytes.asInputStream(), serde);
102+
if (itemSizes.position() != itemSizes.limit()) {
103+
rowChunkOffsets.add(writeOutBytes.size());
104+
itemOffsetsBytes.limit(itemSizes.position() * Integer.BYTES);
105+
writeOutBytes.write(itemOffsetsBytes);
106+
107+
// Move the limit to the position so that we fail subsequent writes and indicate that we are done
108+
itemSizes.limit(itemSizes.position());
109+
}
110+
111+
return new DeserializingIOIterator<>(
112+
writeOutBytes,
113+
rowChunkOffsets,
114+
numStored,
115+
itemSizes.capacity(),
116+
maxSize,
117+
serde
118+
);
67119
}
68120

69121
private static class DeserializingIOIterator<T> implements IOIterator<T>
70122
{
71-
private static final int NEEDS_READ = -2;
72-
private static final int EOF = -1;
123+
private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0).asReadOnlyBuffer();
73124

74-
private final byte[] intBytes;
75-
private final BufferedInputStream inputStream;
125+
private final WriteOutBytes medium;
126+
private final LongArrayList rowChunkOffsets;
127+
private final int numEntries;
128+
private ByteBuffer tmpBuf;
76129
private final StagedSerde<T> serde;
77130

78-
private int nextSize;
79-
80-
public DeserializingIOIterator(InputStream inputStream, StagedSerde<T> serde)
131+
private final ByteBuffer itemOffsetsBytes;
132+
private final int[] itemSizes;
133+
134+
private long itemStartOffset;
135+
private int chunkId = 0;
136+
private int currId = 0;
137+
private int itemIndex;
138+
139+
public DeserializingIOIterator(
140+
WriteOutBytes medium,
141+
LongArrayList rowChunkOffsets,
142+
int numEntries,
143+
int chunkSize,
144+
int maxSize,
145+
StagedSerde<T> serde
146+
)
81147
{
82-
this.inputStream = new BufferedInputStream(inputStream);
148+
this.medium = medium;
149+
this.rowChunkOffsets = rowChunkOffsets;
150+
this.numEntries = numEntries;
151+
this.tmpBuf = ByteBuffer.allocate(maxSize).order(ByteOrder.nativeOrder());
83152
this.serde = serde;
84-
intBytes = new byte[Integer.BYTES];
85-
nextSize = NEEDS_READ;
153+
154+
this.itemOffsetsBytes = ByteBuffer.allocate(chunkSize * Integer.BYTES).order(ByteOrder.nativeOrder());
155+
this.itemSizes = new int[chunkSize];
156+
this.itemIndex = chunkSize;
86157
}
87158

88159
@Override
89-
public boolean hasNext() throws IOException
160+
public boolean hasNext()
90161
{
91-
return getNextSize() > EOF;
162+
return currId < numEntries;
92163
}
93164

94165
@Override
95166
public T next() throws IOException
96167
{
97-
int currentNextSize = getNextSize();
98-
99-
if (currentNextSize == -1) {
100-
throw new NoSuchElementException("end of buffer reached");
168+
if (currId >= numEntries) {
169+
throw new NoSuchElementException();
101170
}
102171

103-
byte[] nextBytes = new byte[currentNextSize];
104-
int bytesRead = 0;
105-
106-
while (bytesRead < currentNextSize) {
107-
int result = inputStream.read(nextBytes, bytesRead, currentNextSize - bytesRead);
108-
109-
if (result == -1) {
110-
throw new NoSuchElementException("unexpected end of buffer reached");
172+
if (itemIndex >= itemSizes.length) {
173+
if (chunkId == 0) {
174+
itemStartOffset = 0;
175+
} else {
176+
if (itemStartOffset != rowChunkOffsets.getLong(chunkId - 1)) {
177+
throw DruidException.defensive(
178+
"Should have read up to the start of the offsets [%,d], "
179+
+ "but for some reason the values [%,d] don't align. Possible corruption?",
180+
rowChunkOffsets.getLong(chunkId - 1),
181+
itemStartOffset
182+
);
183+
}
184+
itemStartOffset += (((long) itemSizes.length) * Integer.BYTES);
111185
}
112186

113-
bytesRead += result;
114-
}
115-
116-
Preconditions.checkState(bytesRead == currentNextSize);
117-
T value = serde.deserialize(nextBytes);
118-
119-
nextSize = NEEDS_READ;
120-
121-
return value;
122-
}
123-
124-
private int getNextSize() throws IOException
125-
{
126-
if (nextSize == NEEDS_READ) {
127-
int bytesRead = 0;
128-
129-
while (bytesRead < Integer.BYTES) {
130-
int result = inputStream.read(intBytes, bytesRead, Integer.BYTES - bytesRead);
187+
int numToRead = Math.min(itemSizes.length, numEntries - (chunkId * itemSizes.length));
188+
final long readOffset = rowChunkOffsets.getLong(chunkId++);
189+
itemOffsetsBytes.clear();
190+
itemOffsetsBytes.limit(numToRead * Integer.BYTES);
191+
medium.readFully(readOffset, itemOffsetsBytes);
192+
itemOffsetsBytes.flip();
193+
itemOffsetsBytes.asIntBuffer().get(itemSizes, 0, numToRead);
131194

132-
if (result == -1) {
133-
nextSize = EOF;
134-
return EOF;
135-
} else {
136-
bytesRead += result;
137-
}
138-
}
139-
Preconditions.checkState(bytesRead == Integer.BYTES);
195+
itemIndex = 0;
196+
}
140197

141-
nextSize = ByteBuffer.wrap(intBytes).order(ByteOrder.nativeOrder()).getInt();
198+
int bytesToRead = itemSizes[itemIndex];
199+
final T retVal;
200+
if (bytesToRead == 0) {
201+
retVal = serde.deserialize(EMPTY_BUFFER);
202+
} else {
203+
tmpBuf.clear();
204+
tmpBuf.limit(bytesToRead);
205+
medium.readFully(itemStartOffset, tmpBuf);
206+
tmpBuf.flip();
207+
208+
retVal = serde.deserialize(tmpBuf);
142209
}
143210

144-
return nextSize;
211+
itemStartOffset += bytesToRead;
212+
++itemIndex;
213+
++currId;
214+
215+
return retVal;
145216
}
146217

147218
@Override
148-
public void close() throws IOException
219+
public void close()
149220
{
150-
inputStream.close();
221+
151222
}
152223
}
153224
}

processing/src/main/java/org/apache/druid/segment/IndexIO.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -456,7 +456,8 @@ public QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy, Segmen
456456
new StringUtf8DictionaryEncodedColumnSupplier<>(
457457
index.getDimValueUtf8Lookup(dimension)::singleThreaded,
458458
null,
459-
Suppliers.ofInstance(index.getDimColumn(dimension))
459+
Suppliers.ofInstance(index.getDimColumn(dimension)),
460+
LEGACY_FACTORY.getBitmapFactory()
460461
)
461462
);
462463
GenericIndexed<ImmutableBitmap> bitmaps = index.getBitmapIndexes().get(dimension);

processing/src/main/java/org/apache/druid/segment/IndexSpec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
*/
4444
public class IndexSpec
4545
{
46-
public static IndexSpec DEFAULT = IndexSpec.builder().build();
46+
public static final IndexSpec DEFAULT = IndexSpec.builder().build();
4747

4848
public static Builder builder()
4949
{

processing/src/main/java/org/apache/druid/segment/column/StringUtf8DictionaryEncodedColumn.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.druid.segment.column;
2121

2222
import com.google.common.collect.Lists;
23+
import org.apache.druid.collections.bitmap.BitmapFactory;
2324
import org.apache.druid.common.semantic.SemanticUtils;
2425
import org.apache.druid.java.util.common.StringUtils;
2526
import org.apache.druid.query.extraction.ExtractionFn;
@@ -73,16 +74,19 @@ public class StringUtf8DictionaryEncodedColumn implements DictionaryEncodedColum
7374
@Nullable
7475
private final ColumnarMultiInts multiValueColumn;
7576
private final Indexed<ByteBuffer> utf8Dictionary;
77+
private final BitmapFactory bitmapFactory;
7678

7779
public StringUtf8DictionaryEncodedColumn(
7880
@Nullable ColumnarInts singleValueColumn,
7981
@Nullable ColumnarMultiInts multiValueColumn,
80-
Indexed<ByteBuffer> utf8Dictionary
82+
Indexed<ByteBuffer> utf8Dictionary,
83+
BitmapFactory bitmapFactory
8184
)
8285
{
8386
this.column = singleValueColumn;
8487
this.multiValueColumn = multiValueColumn;
8588
this.utf8Dictionary = utf8Dictionary;
89+
this.bitmapFactory = bitmapFactory;
8690
}
8791

8892
@Override
@@ -135,6 +139,11 @@ public int getCardinality()
135139
return utf8Dictionary.size();
136140
}
137141

142+
public BitmapFactory getBitmapFactory()
143+
{
144+
return bitmapFactory;
145+
}
146+
138147
@Override
139148
public HistoricalDimensionSelector makeDimensionSelector(
140149
final ReadableOffset offset,

processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public class BlockLayoutColumnarDoublesSupplier implements Supplier<ColumnarDoub
3636

3737
// The number of doubles per buffer.
3838
private final int sizePer;
39+
private final CompressionStrategy strategy;
3940

4041
public BlockLayoutColumnarDoublesSupplier(
4142
int totalSize,
@@ -45,7 +46,8 @@ public BlockLayoutColumnarDoublesSupplier(
4546
CompressionStrategy strategy
4647
)
4748
{
48-
baseDoubleBuffers = GenericIndexed.read(fromBuffer, DecompressingByteBufferObjectStrategy.of(byteOrder, strategy));
49+
this.strategy = strategy;
50+
this.baseDoubleBuffers = GenericIndexed.read(fromBuffer, DecompressingByteBufferObjectStrategy.of(byteOrder, strategy));
4951
this.totalSize = totalSize;
5052
this.sizePer = sizePer;
5153
}
@@ -78,7 +80,8 @@ public double get(int index)
7880
}
7981
}
8082

81-
private class BlockLayoutColumnarDoubles implements ColumnarDoubles
83+
// This needs to be a public class so that SemanticCreator is able to call it.
84+
public class BlockLayoutColumnarDoubles implements ColumnarDoubles
8285
{
8386
final Indexed<ResourceHolder<ByteBuffer>> singleThreadedDoubleBuffers = baseDoubleBuffers.singleThreaded();
8487

@@ -91,6 +94,11 @@ private class BlockLayoutColumnarDoubles implements ColumnarDoubles
9194
@Nullable
9295
DoubleBuffer doubleBuffer;
9396

97+
public CompressionStrategy getCompressionStrategy()
98+
{
99+
return strategy;
100+
}
101+
94102
@Override
95103
public int size()
96104
{

0 commit comments

Comments
 (0)