aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGrégoire Duchêne <gduchene@awhk.org>2022-06-05 23:52:55 +0100
committerGrégoire Duchêne <gduchene@awhk.org>2022-06-05 23:52:55 +0100
commit17148b7d2981d98309f7346435f5d3e77c586ab4 (patch)
treebf97393e633a6bb530e96e62af11c93919013fbe
parent0315338ef5c5fadd739088684323b82535fc904b (diff)
Add ListenPipe
Also, PipeListener.Close now returns syscall.EINVAL if the PipeListener was closed already. This imitates the behavior of net.UnixListener.
-rw-r--r--net.go32
-rw-r--r--net_test.go12
2 files changed, 22 insertions, 22 deletions
diff --git a/net.go b/net.go
index b1331e9..bf2081b 100644
--- a/net.go
+++ b/net.go
@@ -4,7 +4,7 @@ import (
"context"
"net"
"strings"
- "sync"
+ "sync/atomic"
"syscall"
)
@@ -24,21 +24,20 @@ func Listen(addr string) (net.Listener, error) {
// PipeListener is a net.Listener that works over a pipe. It provides
// dialer functions that can be used in an HTTP client or gRPC options.
//
-// Its zero value is safe to use. PipeListener must not be copied after
-// its first use.
+// PipeListener must not be copied after its first use.
type PipeListener struct {
- conns chan net.Conn
- done chan struct{}
-
- closeOnce sync.Once
- initOnce sync.Once
+ closed int32
+ conns chan net.Conn
+ done chan struct{}
}
var _ net.Listener = &PipeListener{}
-func (p *PipeListener) Accept() (net.Conn, error) {
- p.initOnce.Do(p.init)
+func ListenPipe() *PipeListener {
+ return &PipeListener{conns: make(chan net.Conn), done: make(chan struct{})}
+}
+func (p *PipeListener) Accept() (net.Conn, error) {
select {
case conn := <-p.conns:
return conn, nil
@@ -50,8 +49,10 @@ func (p *PipeListener) Accept() (net.Conn, error) {
func (p *PipeListener) Addr() net.Addr { return pipeListenerAddr{} }
func (p *PipeListener) Close() error {
- p.initOnce.Do(p.init)
- p.closeOnce.Do(func() { close(p.done) })
+ if !atomic.CompareAndSwapInt32(&p.closed, 0, 1) {
+ return syscall.EINVAL
+ }
+ close(p.done)
return nil
}
@@ -60,8 +61,6 @@ func (p *PipeListener) Dial(_, _ string) (net.Conn, error) {
}
func (p *PipeListener) DialContext(ctx context.Context, _, _ string) (net.Conn, error) {
- p.initOnce.Do(p.init)
-
s, c := net.Pipe()
select {
case p.conns <- s:
@@ -77,11 +76,6 @@ func (p *PipeListener) DialContextGRPC(ctx context.Context, _ string) (net.Conn,
return p.DialContext(ctx, "", "")
}
-func (p *PipeListener) init() {
- p.conns = make(chan net.Conn)
- p.done = make(chan struct{})
-}
-
type pipeListenerAddr struct{}
func (pipeListenerAddr) Network() string { return "pipe" }
diff --git a/net_test.go b/net_test.go
index 505579e..3c01a15 100644
--- a/net_test.go
+++ b/net_test.go
@@ -12,7 +12,7 @@ func TestPipeListener(s *testing.T) {
t := core.T{T: s}
t.Run("Success", func(t *core.T) {
- p := &core.PipeListener{}
+ p := core.ListenPipe()
t.Go(func() {
conn, err := p.Accept()
@@ -26,7 +26,7 @@ func TestPipeListener(s *testing.T) {
})
t.Run("WhenClosed", func(t *core.T) {
- p := &core.PipeListener{}
+ p := core.ListenPipe()
p.Close()
conn, err := p.Accept()
@@ -38,8 +38,14 @@ func TestPipeListener(s *testing.T) {
t.AssertEqual(nil, conn)
})
+ t.Run("WhenClosedTwice", func(t *core.T) {
+ p := core.ListenPipe()
+ t.AssertEqual(nil, p.Close())
+ t.AssertEqual(syscall.EINVAL, p.Close())
+ })
+
t.Run("WhenContextCanceled", func(t *core.T) {
- p := &core.PipeListener{}
+ p := core.ListenPipe()
ctx, cancel := context.WithCancel(context.Background())
cancel()