Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
oci: be more precise about channels and routines
Be careful about leaking anything

Signed-off-by: Peter Hunt <[email protected]>
  • Loading branch information
haircommander committed Jul 16, 2021
commit fd6dcf33dc69bf8164ccf26fd1f2a9229940c851
36 changes: 28 additions & 8 deletions internal/oci/runtime_oci.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,8 @@ func (r *runtimeOCI) ExecSyncContainer(ctx context.Context, c *Container, comman
cmd.Stdout = &stdoutBuf
cmd.Stderr = &stderrBuf

pidFileCreatedCh, err := WatchForFile(pidFile, notify.InModify, notify.InMovedTo)
pidFileCreatedDone := make(chan struct{}, 1)
pidFileCreatedCh, err := WatchForFile(pidFile, pidFileCreatedDone, notify.InModify, notify.InMovedTo)
if err != nil {
return nil, errors.Wrapf(err, "failed to watch %s", pidFile)
}
Expand All @@ -357,6 +358,7 @@ func (r *runtimeOCI) ExecSyncContainer(ctx context.Context, c *Container, comman
done := make(chan error, 1)
go func() {
done <- cmd.Wait()
close(done)
}()

// First, wait for the pid file to be created.
Expand All @@ -366,8 +368,8 @@ func (r *runtimeOCI) ExecSyncContainer(ctx context.Context, c *Container, comman
select {
case <-pidFileCreatedCh:
case doneErr = <-done:
close(done)
}
close(pidFileCreatedDone)

switch {
case doneErr != nil:
Expand Down Expand Up @@ -1039,15 +1041,22 @@ func (r *runtimeOCI) ReopenContainerLog(ctx context.Context, c *Container) error
}
defer controlFile.Close()

ch, err := WatchForFile(c.LogPath(), notify.InCreate, notify.InModify)
done := make(chan struct{}, 1)
ch, err := WatchForFile(c.LogPath(), done, notify.InCreate, notify.InModify)
if err != nil {
return errors.Wrapf(err, "failed to create watch for %s", c.LogPath())
}

if _, err = fmt.Fprintf(controlFile, "%d %d %d\n", 2, 0, 0); err != nil {
log.Debugf(ctx, "Failed to write to control file to reopen log file: %v", err)
}
<-ch
select {
case <-ch:
case <-time.After(time.Minute * 3):
// Give up after 3 minutes, as something wrong probably happened
log.Errorf(ctx, "Failed to reopen log file for container %s: timed out", c.ID())
}
close(done)

return nil
}
Expand Down Expand Up @@ -1092,7 +1101,11 @@ func (c *Container) conmonPidFilePath() string {
return filepath.Join(c.bundlePath, "conmon-pidfile")
}

func WatchForFile(path string, opsToWatch ...notify.Event) (chan struct{}, error) {
// WatchForFile creates a watch on the parent directory of path, looking for events opsToWatch.
// It returns immediately with a channel to find when path had one of those events.
// done can be used to stop the watch.
// WatchForFile is responsible for closing all internal channels and the returned channel, but not for closing done.
func WatchForFile(path string, done chan struct{}, opsToWatch ...notify.Event) (chan struct{}, error) {
eiCh := make(chan notify.EventInfo, 1)
ch := make(chan struct{})

Expand All @@ -1101,10 +1114,17 @@ func WatchForFile(path string, opsToWatch ...notify.Event) (chan struct{}, error
return nil, err
}
go func() {
defer close(ch)
defer close(eiCh)
defer notify.Stop(eiCh)
for ei := range eiCh {
if ei.Path() == path {
ch <- struct{}{}
for {
select {
case ei := <-eiCh:
if ei.Path() == path {
ch <- struct{}{}
return
}
case <-done:
return
}
}
Expand Down
18 changes: 15 additions & 3 deletions internal/oci/runtime_oci_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,14 @@ var _ = t.Describe("Oci", func() {
})
t.Describe("WatchForFile", func() {
var notifyFile string
var done chan struct{}
BeforeEach(func() {
notifyFile = filepath.Join(t.MustTempDir("watch"), "file")
done = make(chan struct{}, 1)
})
It("should catch file creation", func() {
// Given
ch, err := oci.WatchForFile(notifyFile, notify.InCreate, notify.InModify)
ch, err := oci.WatchForFile(notifyFile, done, notify.InCreate, notify.InModify)
Expect(err).To(BeNil())

// When
Expand All @@ -164,7 +166,7 @@ var _ = t.Describe("Oci", func() {
})
It("should not catch file create if doesn't exist", func() {
// Given
ch, err := oci.WatchForFile(notifyFile, notify.InCreate, notify.InModify)
ch, err := oci.WatchForFile(notifyFile, done, notify.InCreate, notify.InModify)
Expect(err).To(BeNil())

// When
Expand All @@ -182,7 +184,7 @@ var _ = t.Describe("Oci", func() {
})
It("should only catch file write", func() {
// Given
ch, err := oci.WatchForFile(notifyFile, notify.InModify)
ch, err := oci.WatchForFile(notifyFile, done, notify.InModify)
Expect(err).To(BeNil())

// When
Expand All @@ -197,6 +199,16 @@ var _ = t.Describe("Oci", func() {

<-ch
})
It("should give up after sending on done", func() {
// Given
ch, err := oci.WatchForFile(notifyFile, done, notify.InModify)
Expect(err).To(BeNil())

// When
checkChannelEmpty(ch)
done <- struct{}{}
<-ch
})
})
})

Expand Down