1919
2020package org .apache .druid .query .aggregation ;
2121
22- import com .google .common .base .Preconditions ;
22+ import it .unimi .dsi .fastutil .longs .LongArrayList ;
23+ import org .apache .druid .error .DruidException ;
2324import org .apache .druid .segment .serde .cell .IOIterator ;
24- import org .apache .druid .segment .serde .cell .IntSerializer ;
2525import org .apache .druid .segment .serde .cell .StagedSerde ;
2626import org .apache .druid .segment .writeout .WriteOutBytes ;
2727
2828import javax .annotation .Nullable ;
29- import java .io .BufferedInputStream ;
3029import java .io .IOException ;
31- import java .io .InputStream ;
3230import java .nio .ByteBuffer ;
3331import java .nio .ByteOrder ;
32+ import java .nio .IntBuffer ;
3433import java .util .NoSuchElementException ;
3534
3635/**
@@ -45,109 +44,181 @@ public class SerializedStorage<T>
4544{
4645 private final WriteOutBytes writeOutBytes ;
4746 private final StagedSerde <T > serde ;
48- private final IntSerializer intSerializer = new IntSerializer ();
47+ private final ByteBuffer itemOffsetsBytes ;
48+ private final IntBuffer itemSizes ;
49+
50+ private final LongArrayList rowChunkOffsets = new LongArrayList ();
51+ private int numStored = 0 ;
52+ private int maxSize = 0 ;
4953
5054 public SerializedStorage (WriteOutBytes writeOutBytes , StagedSerde <T > serde )
55+ {
56+ this (writeOutBytes , serde , 4096 );
57+ }
58+
59+ public SerializedStorage (WriteOutBytes writeOutBytes , StagedSerde <T > serde , int chunkSize )
5160 {
5261 this .writeOutBytes = writeOutBytes ;
5362 this .serde = serde ;
63+
64+ this .itemOffsetsBytes = ByteBuffer .allocate (chunkSize * Integer .BYTES ).order (ByteOrder .nativeOrder ());
65+ this .itemSizes = itemOffsetsBytes .asIntBuffer ();
5466 }
5567
5668 public void store (@ Nullable T value ) throws IOException
5769 {
5870 byte [] bytes = serde .serialize (value );
5971
60- writeOutBytes .write (intSerializer .serialize (bytes .length ));
61- writeOutBytes .write (bytes );
72+ maxSize = Math .max (maxSize , bytes .length );
73+ itemSizes .put (bytes .length );
74+ if (bytes .length > 0 ) {
75+ writeOutBytes .write (bytes );
76+ }
77+
78+ ++numStored ;
79+ if (itemSizes .remaining () == 0 ) {
80+ rowChunkOffsets .add (writeOutBytes .size ());
81+ writeOutBytes .write (itemOffsetsBytes );
82+ itemOffsetsBytes .clear ();
83+ itemSizes .clear ();
84+ }
6285 }
6386
87+ public int numStored ()
88+ {
89+ return numStored ;
90+ }
91+
92+ /**
93+ * Generates an iterator over everything that has been stored. Also signifies the end of storing objects.
94+ * iterator() can be called multiple times if needed, but after iterator() is called, store() can no longer be
95+ * called.
96+ *
97+ * @return an iterator
98+ * @throws IOException on failure
99+ */
64100 public IOIterator <T > iterator () throws IOException
65101 {
66- return new DeserializingIOIterator <>(writeOutBytes .asInputStream (), serde );
102+ if (itemSizes .position () != itemSizes .limit ()) {
103+ rowChunkOffsets .add (writeOutBytes .size ());
104+ itemOffsetsBytes .limit (itemSizes .position () * Integer .BYTES );
105+ writeOutBytes .write (itemOffsetsBytes );
106+
107+ // Move the limit to the position so that we fail subsequent writes and indicate that we are done
108+ itemSizes .limit (itemSizes .position ());
109+ }
110+
111+ return new DeserializingIOIterator <>(
112+ writeOutBytes ,
113+ rowChunkOffsets ,
114+ numStored ,
115+ itemSizes .capacity (),
116+ maxSize ,
117+ serde
118+ );
67119 }
68120
69121 private static class DeserializingIOIterator <T > implements IOIterator <T >
70122 {
71- private static final int NEEDS_READ = -2 ;
72- private static final int EOF = -1 ;
123+ private static final ByteBuffer EMPTY_BUFFER = ByteBuffer .allocate (0 ).asReadOnlyBuffer ();
73124
74- private final byte [] intBytes ;
75- private final BufferedInputStream inputStream ;
125+ private final WriteOutBytes medium ;
126+ private final LongArrayList rowChunkOffsets ;
127+ private final int numEntries ;
128+ private ByteBuffer tmpBuf ;
76129 private final StagedSerde <T > serde ;
77130
78- private int nextSize ;
79-
80- public DeserializingIOIterator (InputStream inputStream , StagedSerde <T > serde )
131+ private final ByteBuffer itemOffsetsBytes ;
132+ private final int [] itemSizes ;
133+
134+ private long itemStartOffset ;
135+ private int chunkId = 0 ;
136+ private int currId = 0 ;
137+ private int itemIndex ;
138+
139+ public DeserializingIOIterator (
140+ WriteOutBytes medium ,
141+ LongArrayList rowChunkOffsets ,
142+ int numEntries ,
143+ int chunkSize ,
144+ int maxSize ,
145+ StagedSerde <T > serde
146+ )
81147 {
82- this .inputStream = new BufferedInputStream (inputStream );
148+ this .medium = medium ;
149+ this .rowChunkOffsets = rowChunkOffsets ;
150+ this .numEntries = numEntries ;
151+ this .tmpBuf = ByteBuffer .allocate (maxSize ).order (ByteOrder .nativeOrder ());
83152 this .serde = serde ;
84- intBytes = new byte [Integer .BYTES ];
85- nextSize = NEEDS_READ ;
153+
154+ this .itemOffsetsBytes = ByteBuffer .allocate (chunkSize * Integer .BYTES ).order (ByteOrder .nativeOrder ());
155+ this .itemSizes = new int [chunkSize ];
156+ this .itemIndex = chunkSize ;
86157 }
87158
88159 @ Override
89- public boolean hasNext () throws IOException
160+ public boolean hasNext ()
90161 {
91- return getNextSize () > EOF ;
162+ return currId < numEntries ;
92163 }
93164
94165 @ Override
95166 public T next () throws IOException
96167 {
97- int currentNextSize = getNextSize ();
98-
99- if (currentNextSize == -1 ) {
100- throw new NoSuchElementException ("end of buffer reached" );
168+ if (currId >= numEntries ) {
169+ throw new NoSuchElementException ();
101170 }
102171
103- byte [] nextBytes = new byte [currentNextSize ];
104- int bytesRead = 0 ;
105-
106- while (bytesRead < currentNextSize ) {
107- int result = inputStream .read (nextBytes , bytesRead , currentNextSize - bytesRead );
108-
109- if (result == -1 ) {
110- throw new NoSuchElementException ("unexpected end of buffer reached" );
172+ if (itemIndex >= itemSizes .length ) {
173+ if (chunkId == 0 ) {
174+ itemStartOffset = 0 ;
175+ } else {
176+ if (itemStartOffset != rowChunkOffsets .getLong (chunkId - 1 )) {
177+ throw DruidException .defensive (
178+ "Should have read up to the start of the offsets [%,d], "
179+ + "but for some reason the values [%,d] don't align. Possible corruption?" ,
180+ rowChunkOffsets .getLong (chunkId - 1 ),
181+ itemStartOffset
182+ );
183+ }
184+ itemStartOffset += (((long ) itemSizes .length ) * Integer .BYTES );
111185 }
112186
113- bytesRead += result ;
114- }
115-
116- Preconditions .checkState (bytesRead == currentNextSize );
117- T value = serde .deserialize (nextBytes );
118-
119- nextSize = NEEDS_READ ;
120-
121- return value ;
122- }
123-
124- private int getNextSize () throws IOException
125- {
126- if (nextSize == NEEDS_READ ) {
127- int bytesRead = 0 ;
128-
129- while (bytesRead < Integer .BYTES ) {
130- int result = inputStream .read (intBytes , bytesRead , Integer .BYTES - bytesRead );
187+ int numToRead = Math .min (itemSizes .length , numEntries - (chunkId * itemSizes .length ));
188+ final long readOffset = rowChunkOffsets .getLong (chunkId ++);
189+ itemOffsetsBytes .clear ();
190+ itemOffsetsBytes .limit (numToRead * Integer .BYTES );
191+ medium .readFully (readOffset , itemOffsetsBytes );
192+ itemOffsetsBytes .flip ();
193+ itemOffsetsBytes .asIntBuffer ().get (itemSizes , 0 , numToRead );
131194
132- if (result == -1 ) {
133- nextSize = EOF ;
134- return EOF ;
135- } else {
136- bytesRead += result ;
137- }
138- }
139- Preconditions .checkState (bytesRead == Integer .BYTES );
195+ itemIndex = 0 ;
196+ }
140197
141- nextSize = ByteBuffer .wrap (intBytes ).order (ByteOrder .nativeOrder ()).getInt ();
198+ int bytesToRead = itemSizes [itemIndex ];
199+ final T retVal ;
200+ if (bytesToRead == 0 ) {
201+ retVal = serde .deserialize (EMPTY_BUFFER );
202+ } else {
203+ tmpBuf .clear ();
204+ tmpBuf .limit (bytesToRead );
205+ medium .readFully (itemStartOffset , tmpBuf );
206+ tmpBuf .flip ();
207+
208+ retVal = serde .deserialize (tmpBuf );
142209 }
143210
144- return nextSize ;
211+ itemStartOffset += bytesToRead ;
212+ ++itemIndex ;
213+ ++currId ;
214+
215+ return retVal ;
145216 }
146217
147218 @ Override
148- public void close () throws IOException
219+ public void close ()
149220 {
150- inputStream . close ();
221+
151222 }
152223 }
153224}
0 commit comments