Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 17 additions & 29 deletions src/hashtree.erl
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,6 @@

-define(NUM_KEYS_REQUIRED, 1000).

-define(SHA_LENGTH, 6).
-define(EPOCH_LENGTH, 4).

-type tree_id_bin() :: <<_:176>>.
-type segment_bin() :: <<_:256, _:_*8>>.
-type bucket_bin() :: <<_:320>>.
Expand Down Expand Up @@ -210,8 +207,7 @@
write_buffer :: [{put, binary(), binary()} |
{delete, binary()}],
write_buffer_count :: integer(),
dirty_segments :: hashtree_array(),
itr_filter_fun :: function()
dirty_segments :: hashtree_array()
}).

-record(itr_state, {itr :: term(),
Expand Down Expand Up @@ -259,7 +255,6 @@ new({Index,TreeId}, LinkedStore, Options) ->
NumSegments = proplists:get_value(segments, Options, ?NUM_SEGMENTS),
Width = proplists:get_value(width, Options, ?WIDTH),
MemLevels = proplists:get_value(mem_levels, Options, ?MEM_LEVELS),
ItrFilterFun = proplists:get_value(itr_filter_fun, Options, undefined),
NumLevels = erlang:trunc(math:log(NumSegments) / math:log(Width)) + 1,
State = #state{id=encode_id(TreeId),
index=Index,
Expand All @@ -272,8 +267,7 @@ new({Index,TreeId}, LinkedStore, Options) ->
next_rebuild=full,
write_buffer=[],
write_buffer_count=0,
tree=dict:new(),
itr_filter_fun=ItrFilterFun},
tree=dict:new()},
State2 = share_segment_store(State, LinkedStore),
State2.

Expand Down Expand Up @@ -951,7 +945,7 @@ snapshot(State) ->

-spec multi_select_segment(hashtree(), list('*'|integer()), select_fun(T))
-> [{integer(), T}].
multi_select_segment(#state{id=Id, itr=Itr} = State, Segments, F) ->
multi_select_segment(#state{id=Id, itr=Itr}, Segments, F) ->
[First | Rest] = Segments,
IS1 = #itr_state{itr=Itr,
id=Id,
Expand All @@ -967,7 +961,7 @@ multi_select_segment(#state{id=Id, itr=Itr} = State, Segments, F) ->
encode(Id, First, <<>>)
end,
IS2 = try
iterate(iterator_move(Itr, Seek), IS1, State)
iterate(iterator_move(Itr, Seek), IS1)
after
%% Always call prefetch stop to ensure the iterator
%% is safe to use in the compare. Requires
Expand Down Expand Up @@ -1006,20 +1000,19 @@ iterator_move(Itr, Seek) ->
end.

-spec iterate({'error','invalid_iterator'} | {'ok',binary(),binary()},
#itr_state{}, #state{}) -> #itr_state{}.
#itr_state{}) -> #itr_state{}.

%% Ended up at an invalid_iterator likely due to encountering a missing dirty
%% segment - e.g. segment dirty, but removed last entries for it
iterate({error, invalid_iterator}, IS=#itr_state{current_segment='*'}, _State) ->
iterate({error, invalid_iterator}, IS=#itr_state{current_segment='*'}) ->
IS;
iterate({error, invalid_iterator}, IS=#itr_state{itr=Itr,
id=Id,
current_segment=CurSeg,
remaining_segments=Segments,
acc_fun=F,
segment_acc=Acc,
final_acc=FinalAcc},
State = #state{itr_filter_fun = _ItrFilterFun}) ->
final_acc=FinalAcc}) ->
case Segments of
[] ->
IS;
Expand All @@ -1031,53 +1024,48 @@ iterate({error, invalid_iterator}, IS=#itr_state{itr=Itr,
remaining_segments=Remaining,
segment_acc=[],
final_acc=[{CurSeg, F(Acc)} | FinalAcc]},
iterate(iterator_move(Itr, Seek), IS2, State)
iterate(iterator_move(Itr, Seek), IS2)
end;
iterate({ok, K, V}, IS=#itr_state{itr=Itr,
id=Id,
current_segment=CurSeg,
remaining_segments=Segments,
acc_fun=F,
segment_acc=Acc,
final_acc=FinalAcc},
State = #state{itr_filter_fun = ItrFilterFun}) ->
final_acc=FinalAcc}) ->
{SegId, Seg, _} = safe_decode(K),
Segment = case CurSeg of
'*' ->
Seg;
_ ->
CurSeg
end,
KVAcc = case ItrFilterFun of
undefined -> [{K, V}];
_ -> ItrFilterFun(K, V, State)
end,
case {SegId, Seg, Segments, IS#itr_state.prefetch} of
{bad, -1, _, _} ->
%% Non-segment encountered, end traversal
IS;
{Id, Segment, _, _} ->
%% Still reading existing segment
IS2 = IS#itr_state{current_segment=Segment,
segment_acc=KVAcc ++ Acc,
segment_acc=[{K,V} | Acc],
prefetch=true},
iterate(iterator_move(Itr, prefetch), IS2, State);
iterate(iterator_move(Itr, prefetch), IS2);
{Id, _, [Seg|Remaining], _} ->
%% Pointing at next segment we are interested in
IS2 = IS#itr_state{current_segment=Seg,
remaining_segments=Remaining,
segment_acc=KVAcc,
segment_acc=[{K,V}],
final_acc=[{Segment, F(Acc)} | FinalAcc],
prefetch=true},
iterate(iterator_move(Itr, prefetch), IS2, State);
iterate(iterator_move(Itr, prefetch), IS2);
{Id, _, ['*'], _} ->
%% Pointing at next segment we are interested in
IS2 = IS#itr_state{current_segment=Seg,
remaining_segments=['*'],
segment_acc=KVAcc,
segment_acc=[{K,V}],
final_acc=[{Segment, F(Acc)} | FinalAcc],
prefetch=true},
iterate(iterator_move(Itr, prefetch), IS2, State);
iterate(iterator_move(Itr, prefetch), IS2);
{Id, _, [NextSeg | Remaining], true} ->
%% Pointing at uninteresting segment, but need to halt the
%% prefetch to ensure the iterator can be reused
Expand All @@ -1088,15 +1076,15 @@ iterate({ok, K, V}, IS=#itr_state{itr=Itr,
prefetch=true}, % will be after second move
_ = iterator_move(Itr, prefetch_stop), % ignore the pre-fetch,
Seek = encode(Id, NextSeg, <<>>), % and risk wasting a reseek
iterate(iterator_move(Itr, Seek), IS2, State);% to get to the next segment
iterate(iterator_move(Itr, Seek), IS2);% to get to the next segment
{Id, _, [NextSeg | Remaining], false} ->
%% Pointing at uninteresting segment, seek to next interesting one
Seek = encode(Id, NextSeg, <<>>),
IS2 = IS#itr_state{current_segment=NextSeg,
remaining_segments=Remaining,
segment_acc=[],
final_acc=[{Segment, F(Acc)} | FinalAcc]},
iterate(iterator_move(Itr, Seek), IS2, State);
iterate(iterator_move(Itr, Seek), IS2);
{_, _, _, true} ->
%% Done with traversal, but need to stop the prefetch to
%% ensure the iterator can be reused. The next operation
Expand Down