Skip to content

Commit 184d62d

Browse files
committed
Update the binary to use a sequence instead of a list and fix the close function
1 parent 0cdb70d commit 184d62d

File tree

3 files changed

+58
-42
lines changed

3 files changed

+58
-42
lines changed

bin/db.ml

Lines changed: 48 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -4,55 +4,59 @@ type command =
44
| Lookup of Rowex.key
55
| Noop
66

7-
let clean commands =
8-
let active, awaits = List.partition Bancos.is_running commands in
9-
let rec go : Bancos.command list -> unit = function
10-
| [] -> ()
11-
| cmd :: rest ->
12-
begin
13-
match Bancos.await cmd with
14-
| `Not_found key ->
15-
Logs.err (fun m -> m "%S not found" (key :> string))
16-
| `Found (key, value) ->
17-
Logs.info (fun m -> m "%S => %x" (key :> string) value)
18-
| `Duplicate key ->
19-
Logs.err (fun m -> m "%S already exists" (key :> string))
20-
| `Exists key -> Logs.info (fun m -> m "%S exists" (key :> string))
21-
| `Ok -> ()
22-
end;
23-
go rest
24-
in
25-
go awaits;
26-
active
27-
28-
let execute ?quiet:_ commands ~readers ~writers filepath =
7+
let iter ?(quiet = false) to_delete node =
8+
let cmd = Miou.Sequence.data node in
9+
if Bancos.is_running cmd then ()
10+
else begin
11+
to_delete := node :: !to_delete;
12+
match Bancos.await cmd with
13+
| `Not_found key -> Logs.err (fun m -> m "%S not found" (key :> string))
14+
| `Found (key, value) ->
15+
if not quiet then Fmt.pr "%S => %d\n%!" (key :> string) value;
16+
Logs.info (fun m -> m "%S => %x" (key :> string) value)
17+
| `Duplicate key ->
18+
Logs.err (fun m -> m "%S already exists" (key :> string))
19+
| `Exists key -> Logs.info (fun m -> m "%S exists" (key :> string))
20+
| `Ok -> ()
21+
end
22+
23+
let clean ?(quiet = false) commands =
24+
let to_delete = ref [] in
25+
Miou.Sequence.iter_node ~f:(iter ~quiet to_delete) commands;
26+
List.iter Miou.Sequence.remove !to_delete
27+
28+
let execute ?(quiet = false) ?(and_remove = false) commands ~readers ~writers
29+
filepath =
2930
Miou.run ~domains:(readers + writers) @@ fun () ->
3031
let t = Bancos.openfile ~readers ~writers filepath in
3132
Logs.debug (fun m -> m "ROWEX file loaded");
32-
let rec go active_commands =
33-
let active_commands = clean active_commands in
33+
let seq = Miou.Sequence.create () in
34+
let rec go () =
35+
clean ~quiet seq;
3436
match commands () with
35-
| None -> active_commands
36-
| Some Noop -> go active_commands
37+
| None -> ()
38+
| Some Noop -> go ()
3739
| Some (Lookup key) ->
3840
let cmd = Bancos.lookup t key in
39-
go (cmd :: active_commands)
41+
ignore Miou.Sequence.(add Left seq cmd);
42+
go ()
4043
| Some (Insert (key, value)) ->
4144
let cmd = Bancos.insert t key value in
42-
go (cmd :: active_commands)
45+
ignore Miou.Sequence.(add Left seq cmd);
46+
go ()
4347
| Some (Remove key) ->
4448
let cmd = Bancos.remove t key in
45-
go (cmd :: active_commands)
49+
ignore Miou.Sequence.(add Left seq cmd);
50+
go ()
4651
in
47-
let active_commands = go [] in
52+
go ();
4853
Logs.debug (fun m -> m "Commands sended, start to clean-up results");
49-
let rec go = function
50-
| [] -> ()
51-
| active_commands -> go (clean active_commands)
52-
in
53-
go active_commands;
54+
while Miou.Sequence.is_empty seq = false do
55+
clean ~quiet seq
56+
done;
5457
Logs.debug (fun m -> m "Results consumed, start to close the db file");
55-
Bancos.close t
58+
Bancos.close t;
59+
if and_remove then Unix.unlink filepath
5660

5761
let parse line =
5862
match String.split_on_char ' ' line with
@@ -91,8 +95,9 @@ let setup_commands input =
9195

9296
let error_msgf fmt = Fmt.kstr (fun msg -> Error (`Msg msg)) fmt
9397

94-
let run quiet commands filepath readers writers =
95-
execute ~quiet commands ~readers ~writers (Fpath.to_string filepath);
98+
let run quiet commands filepath readers writers and_remove =
99+
execute ~quiet ~and_remove commands ~readers ~writers
100+
(Fpath.to_string filepath);
96101
`Ok ()
97102

98103
open Cmdliner
@@ -113,6 +118,10 @@ let index =
113118
let filepath = Arg.conv (parser, pp) in
114119
Arg.(required & opt (some filepath) None & info [ "i"; "index" ] ~doc)
115120

121+
let and_remove =
122+
let doc = "Remove the idx file produced." in
123+
Arg.(value & flag & info [ "and-remove" ] ~doc)
124+
116125
let commands =
117126
let doc =
118127
"A file which contains different commands to execute into the index file."
@@ -134,7 +143,7 @@ let term =
134143
Term.(
135144
ret
136145
(const run $ term_setup_logs $ term_setup_commands $ index $ readers
137-
$ writers))
146+
$ writers $ and_remove))
138147

139148
let cmd =
140149
let doc = "A simple tool to manipulate an KV-store (parallel)." in

lib/bancos.ml

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,14 @@ let close t =
185185
let rec go backoff =
186186
let closed = Atomic.get t.close in
187187
if (not closed) && Atomic.compare_and_set t.close false true then begin
188-
Miou.Condition.broadcast (snd t.rxs_locker);
189-
Miou.Condition.broadcast (snd t.txs_locker);
188+
Miou.Mutex.protect (fst t.rxs_locker)
189+
begin
190+
fun () -> Miou.Condition.broadcast (snd t.rxs_locker)
191+
end;
192+
Miou.Mutex.protect (fst t.txs_locker)
193+
begin
194+
fun () -> Miou.Condition.broadcast (snd t.txs_locker)
195+
end;
190196
try
191197
while true do
192198
Miou.Mutex.lock (fst t.idle);

lib/rowex.mli

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,8 @@ module type S = sig
9999

100100
val pause_intrinsic : unit -> unit t
101101
(** [pause_intrinsic] provides a hint to the processor that the code
102-
sequence is a spin-wait loop. *)
102+
sequence is a spin-wait loop. If ROWEX is used in a scheduler offering the
103+
[Yield] directive, it is advisable to use the latter. *)
103104

104105
val get : memory -> 'a rd Addr.t -> ('t, 'v) value -> 'v t
105106

0 commit comments

Comments
 (0)