mirror of
https://github.com/FlipsideCrypto/dcrd.git
synced 2026-02-06 10:56:47 +00:00
Add pipes for parent process IPC. (#332)
Rewrite startup/shutdown logic to simplify shutdown signaling. All cleanup is now run from deferred functions in the main function. Add two new config options to set the read and write ends of a pair of pipes. This is used as a simple mechanism for a parent process to communicate with, observe, and manage the lifetime of a child dcrd process. When the RX (read end) pipe is closed, clean shutdown automatically begins. Add a new flag --lifetimeevents to create and send lifetime event notifications over the TX (write end) pipe during bringup and shutdown. This allows the parent process to observe which subsystems are currently starting, running, and stopping. Fixes #297. Fixes #298.
This commit is contained in:
parent
0a9a0f1969
commit
0d8104b2cf
@ -149,6 +149,9 @@ type config struct {
|
||||
NoPeerBloomFilters bool `long:"nopeerbloomfilters" description:"Disable bloom filtering support."`
|
||||
SigCacheMaxSize uint `long:"sigcachemaxsize" description:"The maximum number of entries in the signature verification cache."`
|
||||
BlocksOnly bool `long:"blocksonly" description:"Do not accept transactions from remote peers."`
|
||||
PipeRx uint `long:"piperx" description:"File descriptor of read end pipe to enable parent -> child process communication"`
|
||||
PipeTx uint `long:"pipetx" description:"File descriptor of write end pipe to enable parent <- child process communication"`
|
||||
LifetimeEvents bool `long:"lifetimeevents" description:"Send lifetime notifications over the TX pipe"`
|
||||
onionlookup func(string) ([]net.IP, error)
|
||||
lookup func(string) ([]net.IP, error)
|
||||
oniondial func(string, string) (net.Conn, error)
|
||||
|
||||
111
dcrd.go
111
dcrd.go
@ -1,5 +1,5 @@
|
||||
// Copyright (c) 2013-2014 The btcsuite developers
|
||||
// Copyright (c) 2015 The Decred developers
|
||||
// Copyright (c) 2015-2016 The Decred developers
|
||||
// Use of this source code is governed by an ISC
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
@ -16,13 +16,11 @@ import (
|
||||
"runtime/pprof"
|
||||
"time"
|
||||
|
||||
"github.com/decred/dcrd/blockchain/stake"
|
||||
"github.com/decred/dcrd/limits"
|
||||
)
|
||||
|
||||
var (
|
||||
cfg *config
|
||||
shutdownChannel = make(chan struct{})
|
||||
)
|
||||
var cfg *config
|
||||
|
||||
// winServiceMain is only invoked on Windows. It detects when dcrd is running
|
||||
// as a service and reacts accordingly.
|
||||
@ -43,6 +41,9 @@ func dcrdMain(serverChan chan<- *server) error {
|
||||
cfg = tcfg
|
||||
defer backendLog.Flush()
|
||||
|
||||
interrupted := interruptListener()
|
||||
defer dcrdLog.Info("Shutdown complete")
|
||||
|
||||
// Show version at startup.
|
||||
dcrdLog.Infof("Version %s", version())
|
||||
// Show dcrd home dir location
|
||||
@ -91,19 +92,50 @@ func dcrdMain(serverChan chan<- *server) error {
|
||||
}()
|
||||
}
|
||||
|
||||
var lifetimeNotifier lifetimeEventServer
|
||||
if cfg.LifetimeEvents {
|
||||
lifetimeNotifier = newLifetimeEventServer(outgoingPipeMessages)
|
||||
}
|
||||
|
||||
if cfg.PipeRx != 0 {
|
||||
go serviceControlPipeRx(uintptr(cfg.PipeRx))
|
||||
}
|
||||
if cfg.PipeTx != 0 {
|
||||
go serviceControlPipeTx(uintptr(cfg.PipeTx))
|
||||
} else {
|
||||
go drainOutgoingPipeMessages()
|
||||
}
|
||||
|
||||
if interruptRequested(interrupted) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Perform upgrades to dcrd as new versions require it.
|
||||
if err := doUpgrades(); err != nil {
|
||||
dcrdLog.Errorf("%v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
if interruptRequested(interrupted) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Load the block database.
|
||||
lifetimeNotifier.notifyStartupEvent(lifetimeEventDBOpen)
|
||||
db, err := loadBlockDB()
|
||||
if err != nil {
|
||||
dcrdLog.Errorf("%v", err)
|
||||
return err
|
||||
}
|
||||
defer db.Close()
|
||||
defer func() {
|
||||
lifetimeNotifier.notifyShutdownEvent(lifetimeEventDBOpen)
|
||||
dcrdLog.Infof("Gracefully shutting down the database...")
|
||||
db.Close()
|
||||
}()
|
||||
|
||||
if interruptRequested(interrupted) {
|
||||
return nil
|
||||
}
|
||||
|
||||
if cfg.DropAddrIndex {
|
||||
dcrdLog.Info("Deleting entire addrindex.")
|
||||
@ -116,26 +148,41 @@ func dcrdMain(serverChan chan<- *server) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
tmdb, err := loadTicketDB(db, activeNetParams.Params)
|
||||
if err != nil {
|
||||
dcrdLog.Errorf("%v", err)
|
||||
return err
|
||||
// The ticket "DB" takes ages to load and serialize back out to a file.
|
||||
// Load it asynchronously and if the process is interrupted during the
|
||||
// load, discard the result since no cleanup is necessary.
|
||||
lifetimeNotifier.notifyStartupEvent(lifetimeEventTicketDB)
|
||||
type ticketDBResult struct {
|
||||
ticketDB *stake.TicketDB
|
||||
err error
|
||||
}
|
||||
ticketDBResultChan := make(chan ticketDBResult)
|
||||
go func() {
|
||||
tmdb, err := loadTicketDB(db, activeNetParams.Params)
|
||||
ticketDBResultChan <- ticketDBResult{tmdb, err}
|
||||
}()
|
||||
var tmdb *stake.TicketDB
|
||||
select {
|
||||
case <-interrupted:
|
||||
return nil
|
||||
case r := <-ticketDBResultChan:
|
||||
if r.err != nil {
|
||||
dcrdLog.Errorf("%v", err)
|
||||
return err
|
||||
}
|
||||
tmdb = r.ticketDB
|
||||
}
|
||||
defer func() {
|
||||
lifetimeNotifier.notifyShutdownEvent(lifetimeEventTicketDB)
|
||||
tmdb.Close()
|
||||
err := tmdb.Store(cfg.DataDir, "ticketdb.gob")
|
||||
if err != nil {
|
||||
dcrdLog.Errorf("Failed to store ticket database: %v", err.Error())
|
||||
}
|
||||
}()
|
||||
defer tmdb.Close()
|
||||
|
||||
// Ensure the databases are sync'd and closed on Ctrl+C.
|
||||
addInterruptHandler(func() {
|
||||
dcrdLog.Infof("Gracefully shutting down the database...")
|
||||
db.RollbackClose()
|
||||
})
|
||||
|
||||
// Create server and start it.
|
||||
lifetimeNotifier.notifyStartupEvent(lifetimeEventP2PServer)
|
||||
server, err := newServer(cfg.Listeners, db, tmdb, activeNetParams.Params)
|
||||
if err != nil {
|
||||
// TODO(oga) this logging could do with some beautifying.
|
||||
@ -143,32 +190,28 @@ func dcrdMain(serverChan chan<- *server) error {
|
||||
cfg.Listeners, err)
|
||||
return err
|
||||
}
|
||||
addInterruptHandler(func() {
|
||||
defer func() {
|
||||
lifetimeNotifier.notifyShutdownEvent(lifetimeEventP2PServer)
|
||||
dcrdLog.Infof("Gracefully shutting down the server...")
|
||||
server.Stop()
|
||||
server.WaitForShutdown()
|
||||
})
|
||||
srvrLog.Infof("Server shutdown complete")
|
||||
}()
|
||||
|
||||
server.Start()
|
||||
if serverChan != nil {
|
||||
serverChan <- server
|
||||
}
|
||||
|
||||
// Monitor for graceful server shutdown and signal the main goroutine
|
||||
// when done. This is done in a separate goroutine rather than waiting
|
||||
// directly so the main goroutine can be signaled for shutdown by either
|
||||
// a graceful shutdown or from the main interrupt handler. This is
|
||||
// necessary since the main goroutine must be kept running long enough
|
||||
// for the interrupt handler goroutine to finish.
|
||||
go func() {
|
||||
server.WaitForShutdown()
|
||||
srvrLog.Infof("Server shutdown complete")
|
||||
shutdownChannel <- struct{}{}
|
||||
}()
|
||||
if interruptRequested(interrupted) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Wait for shutdown signal from either a graceful server stop or from
|
||||
// the interrupt handler.
|
||||
<-shutdownChannel
|
||||
dcrdLog.Info("Shutdown complete")
|
||||
lifetimeNotifier.notifyStartupComplete()
|
||||
|
||||
// Wait until the interrupt signal is received from an OS signal or
|
||||
// shutdown is requested through the RPC server.
|
||||
<-interrupted
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
211
ipc.go
Normal file
211
ipc.go
Normal file
@ -0,0 +1,211 @@
|
||||
// Copyright (c) 2016 The Decred developers
|
||||
// Use of this source code is governed by an ISC
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
)
|
||||
|
||||
// Messages sent over a pipe are encoded using a simple binary message format:
|
||||
//
|
||||
// - Protocol version (1 byte, currently 1)
|
||||
// - Message type length (1 byte)
|
||||
// - Message type string (encoded as UTF8, no longer than 255 bytes)
|
||||
// - Message payload length (4 bytes, little endian)
|
||||
// - Message payload bytes (no longer than 2^32 - 1 bytes)
|
||||
type pipeMessage interface {
|
||||
Type() string
|
||||
PayloadSize() uint32
|
||||
WritePayload(w io.Writer) error
|
||||
}
|
||||
|
||||
var outgoingPipeMessages = make(chan pipeMessage)
|
||||
|
||||
// serviceControlPipeRx reads from the file descriptor fd of a read end pipe.
|
||||
// This is intended to be used as a simple control mechanism for parent
|
||||
// processes to communicate with and and manage the lifetime of a dcrd child
|
||||
// process using a unidirectional pipe (on Windows, this is an anonymous pipe,
|
||||
// not a named pipe).
|
||||
//
|
||||
// When the pipe is closed or any other errors occur reading the control
|
||||
// message, shutdown begins. This prevents dcrd from continuing to run
|
||||
// unsupervised after the parent process closes unexpectedly.
|
||||
//
|
||||
// No control messages are currently defined and the only use for the pipe is to
|
||||
// start clean shutdown when the pipe is closed. Control messages that follow
|
||||
// the pipe message format can be added later as needed.
|
||||
func serviceControlPipeRx(fd uintptr) {
|
||||
pipe := os.NewFile(fd, fmt.Sprintf("|%v", fd))
|
||||
r := bufio.NewReader(pipe)
|
||||
for {
|
||||
_, err := r.Discard(1024)
|
||||
if err == io.EOF {
|
||||
err = nil
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
dcrdLog.Errorf("Failed to read from pipe: %v", err)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case shutdownRequestChannel <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// serviceControlPipeTx sends pipe messages to the file descriptor fd of a write
|
||||
// end pipe. This is intended to be a simple response and notification system
|
||||
// for a child dcrd process to communicate with a parent process without the
|
||||
// need to go through the RPC server.
|
||||
//
|
||||
// See the comment on the pipeMessage interface for the binary encoding of a
|
||||
// pipe message.
|
||||
func serviceControlPipeTx(fd uintptr) {
|
||||
defer drainOutgoingPipeMessages()
|
||||
|
||||
pipe := os.NewFile(fd, fmt.Sprintf("|%v", fd))
|
||||
w := bufio.NewWriter(pipe)
|
||||
headerBuffer := make([]byte, 0, 1+1+255+4) // capped to max header size
|
||||
var err error
|
||||
for m := range outgoingPipeMessages {
|
||||
const protocolVersion byte = 1
|
||||
|
||||
mtype := m.Type()
|
||||
psize := m.PayloadSize()
|
||||
|
||||
headerBuffer = append(headerBuffer, protocolVersion)
|
||||
headerBuffer = append(headerBuffer, byte(len(mtype)))
|
||||
headerBuffer = append(headerBuffer, mtype...)
|
||||
buf := make([]byte, 4)
|
||||
binary.LittleEndian.PutUint32(buf, psize)
|
||||
headerBuffer = append(headerBuffer, buf...)
|
||||
|
||||
_, err = w.Write(headerBuffer)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
err = m.WritePayload(w)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
err = w.Flush()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
headerBuffer = headerBuffer[:0]
|
||||
}
|
||||
|
||||
dcrdLog.Errorf("Failed to write to pipe: %v", err)
|
||||
}
|
||||
|
||||
func drainOutgoingPipeMessages() {
|
||||
for range outgoingPipeMessages {
|
||||
}
|
||||
}
|
||||
|
||||
// The lifetimeEvent describes a startup or shutdown event. The message type
|
||||
// string is "lifetimeevent".
|
||||
//
|
||||
// The payload size is always 2 bytes long. The first byte describes whether a
|
||||
// service or event is about to run or whether startup has completed. The
|
||||
// second byte, when applicable, describes which event or service is about to
|
||||
// start or stop.
|
||||
//
|
||||
// 0 <event id>: The startup event is about to run
|
||||
// 1 <ignored>: All startup tasks have completed
|
||||
// 2 <event id>: The shutdown event is about to run
|
||||
//
|
||||
// Event IDs can take on the following values:
|
||||
//
|
||||
// 0: Database opening/closing
|
||||
// 1: Ticket database opening/closing
|
||||
// 2: Peer-to-peer server starting/stopping
|
||||
//
|
||||
// Note that not all subsystems are started/stopped or events run during the
|
||||
// program's lifetime depending on what features are enabled through the config.
|
||||
//
|
||||
// As an example, the following messages may be sent during a typical execution:
|
||||
//
|
||||
// 0 0: The database is being opened
|
||||
// 0 1: The ticket DB is being opened
|
||||
// 0 2: The P2P server is starting
|
||||
// 1 0: All startup tasks have completed
|
||||
// 2 2: The P2P server is stopping
|
||||
// 2 1: The ticket DB is being closed and written to disk
|
||||
// 2 0: The database is being closed
|
||||
type lifetimeEvent struct {
|
||||
event lifetimeEventID
|
||||
action lifetimeAction
|
||||
}
|
||||
|
||||
var _ pipeMessage = (*lifetimeEvent)(nil)
|
||||
|
||||
type lifetimeEventID byte
|
||||
|
||||
const (
|
||||
startupEvent lifetimeEventID = iota
|
||||
startupComplete
|
||||
shutdownEvent
|
||||
)
|
||||
|
||||
type lifetimeAction byte
|
||||
|
||||
const (
|
||||
lifetimeEventDBOpen lifetimeAction = iota
|
||||
lifetimeEventTicketDB
|
||||
lifetimeEventP2PServer
|
||||
)
|
||||
|
||||
func (*lifetimeEvent) Type() string { return "lifetimeevent" }
|
||||
func (e *lifetimeEvent) PayloadSize() uint32 { return 2 }
|
||||
func (e *lifetimeEvent) WritePayload(w io.Writer) error {
|
||||
_, err := w.Write([]byte{byte(e.event), byte(e.action)})
|
||||
return err
|
||||
}
|
||||
|
||||
type lifetimeEventServer chan<- pipeMessage
|
||||
|
||||
func newLifetimeEventServer(outChan chan<- pipeMessage) lifetimeEventServer {
|
||||
return lifetimeEventServer(outChan)
|
||||
}
|
||||
|
||||
func (s lifetimeEventServer) notifyStartupEvent(action lifetimeAction) {
|
||||
if s == nil {
|
||||
return
|
||||
}
|
||||
s <- &lifetimeEvent{
|
||||
event: startupEvent,
|
||||
action: action,
|
||||
}
|
||||
}
|
||||
|
||||
func (s lifetimeEventServer) notifyStartupComplete() {
|
||||
if s == nil {
|
||||
return
|
||||
}
|
||||
s <- &lifetimeEvent{
|
||||
event: startupComplete,
|
||||
action: 0,
|
||||
}
|
||||
}
|
||||
|
||||
func (s lifetimeEventServer) notifyShutdownEvent(action lifetimeAction) {
|
||||
if s == nil {
|
||||
return
|
||||
}
|
||||
s <- &lifetimeEvent{
|
||||
event: shutdownEvent,
|
||||
action: action,
|
||||
}
|
||||
}
|
||||
64
rpcserver.go
64
rpcserver.go
@ -5051,7 +5051,10 @@ func handleSetGenerate(s *rpcServer, cmd interface{}, closeChan <-chan struct{})
|
||||
|
||||
// handleStop implements the stop command.
|
||||
func handleStop(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) {
|
||||
s.server.Stop()
|
||||
select {
|
||||
case s.requestProcessShutdown <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
return "dcrd stopping.", nil
|
||||
}
|
||||
|
||||
@ -5731,23 +5734,24 @@ func handleVerifyMessage(s *rpcServer, cmd interface{}, closeChan <-chan struct{
|
||||
// rpcServer holds the items the rpc server may need to access (config,
|
||||
// shutdown, main server, etc.)
|
||||
type rpcServer struct {
|
||||
started int32
|
||||
shutdown int32
|
||||
policy *mining.Policy
|
||||
server *server
|
||||
authsha [fastsha256.Size]byte
|
||||
limitauthsha [fastsha256.Size]byte
|
||||
ntfnMgr *wsNotificationManager
|
||||
numClients int32
|
||||
statusLines map[int]string
|
||||
statusLock sync.RWMutex
|
||||
wg sync.WaitGroup
|
||||
listeners []net.Listener
|
||||
workState *workState
|
||||
gbtWorkState *gbtWorkState
|
||||
templatePool map[[merkleRootPairSize]byte]*workStateBlockInfo
|
||||
helpCacher *helpCacher
|
||||
quit chan int
|
||||
started int32
|
||||
shutdown int32
|
||||
policy *mining.Policy
|
||||
server *server
|
||||
authsha [fastsha256.Size]byte
|
||||
limitauthsha [fastsha256.Size]byte
|
||||
ntfnMgr *wsNotificationManager
|
||||
numClients int32
|
||||
statusLines map[int]string
|
||||
statusLock sync.RWMutex
|
||||
wg sync.WaitGroup
|
||||
listeners []net.Listener
|
||||
workState *workState
|
||||
gbtWorkState *gbtWorkState
|
||||
templatePool map[[merkleRootPairSize]byte]*workStateBlockInfo
|
||||
helpCacher *helpCacher
|
||||
requestProcessShutdown chan struct{}
|
||||
quit chan int
|
||||
|
||||
// coin supply caching values
|
||||
coinSupplyMtx sync.Mutex
|
||||
@ -5836,6 +5840,13 @@ func (s *rpcServer) Stop() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// RequestedProcessShutdown returns a channel that is sent to when an authorized
|
||||
// RPC client requests the process to shutdown. If the request can not be read
|
||||
// immediately, it is dropped.
|
||||
func (s *rpcServer) RequestedProcessShutdown() <-chan struct{} {
|
||||
return s.requestProcessShutdown
|
||||
}
|
||||
|
||||
// limitConnections responds with a 503 service unavailable and returns true if
|
||||
// adding another client would exceed the maximum allow RPC clients.
|
||||
//
|
||||
@ -6219,14 +6230,15 @@ func genCertPair(certFile, keyFile string) error {
|
||||
// newRPCServer returns a new instance of the rpcServer struct.
|
||||
func newRPCServer(listenAddrs []string, policy *mining.Policy, s *server) (*rpcServer, error) {
|
||||
rpc := rpcServer{
|
||||
policy: policy,
|
||||
server: s,
|
||||
statusLines: make(map[int]string),
|
||||
workState: newWorkState(),
|
||||
templatePool: make(map[[merkleRootPairSize]byte]*workStateBlockInfo),
|
||||
gbtWorkState: newGbtWorkState(s.timeSource),
|
||||
helpCacher: newHelpCacher(),
|
||||
quit: make(chan int),
|
||||
policy: policy,
|
||||
server: s,
|
||||
statusLines: make(map[int]string),
|
||||
workState: newWorkState(),
|
||||
templatePool: make(map[[merkleRootPairSize]byte]*workStateBlockInfo),
|
||||
gbtWorkState: newGbtWorkState(s.timeSource),
|
||||
helpCacher: newHelpCacher(),
|
||||
requestProcessShutdown: make(chan struct{}),
|
||||
quit: make(chan int),
|
||||
}
|
||||
if cfg.RPCUser != "" && cfg.RPCPass != "" {
|
||||
login := cfg.RPCUser + ":" + cfg.RPCPass
|
||||
|
||||
@ -2629,6 +2629,12 @@ func newServer(listenAddrs []string, database database.Db, tmdb *stake.TicketDB,
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Signal process shutdown when the RPC server requests it.
|
||||
go func() {
|
||||
<-s.rpcServer.RequestedProcessShutdown()
|
||||
shutdownRequestChannel <- struct{}{}
|
||||
}()
|
||||
}
|
||||
|
||||
return &s, nil
|
||||
|
||||
@ -86,17 +86,11 @@ loop:
|
||||
// more commands while pending.
|
||||
changes <- svc.Status{State: svc.StopPending}
|
||||
|
||||
// Stop the main server gracefully when it is
|
||||
// already setup or just break out and allow
|
||||
// the service to exit immediately if it's not
|
||||
// setup yet. Note that calling Stop will cause
|
||||
// dcrdMain to exit in the goroutine above which
|
||||
// will in turn send a signal (and a potential
|
||||
// error) to doneChan.
|
||||
if mainServer != nil {
|
||||
mainServer.Stop()
|
||||
} else {
|
||||
break loop
|
||||
// Signal the main function to exit if shutdown
|
||||
// was not already requested.
|
||||
select {
|
||||
case shutdownRequestChannel <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
|
||||
default:
|
||||
|
||||
98
signal.go
98
signal.go
@ -1,5 +1,5 @@
|
||||
// Copyright (c) 2013-2014 The btcsuite developers
|
||||
// Copyright (c) 2015 The Decred developers
|
||||
// Copyright (c) 2015-2016 The Decred developers
|
||||
// Use of this source code is governed by an ISC
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
@ -10,79 +10,45 @@ import (
|
||||
"os/signal"
|
||||
)
|
||||
|
||||
// interruptChannel is used to receive SIGINT (Ctrl+C) signals.
|
||||
var interruptChannel chan os.Signal
|
||||
// shutdownRequestChannel is used to initiate shutdown from one of the
|
||||
// subsystems using the same code paths as when an interrupt signal is received.
|
||||
var shutdownRequestChannel = make(chan struct{})
|
||||
|
||||
// addHandlerChannel is used to add an interrupt handler to the list of handlers
|
||||
// to be invoked on SIGINT (Ctrl+C) signals.
|
||||
var addHandlerChannel = make(chan func())
|
||||
// interruptSignals defines the default signals to catch in order to do a proper
|
||||
// shutdown. This may be modified during init depending on the platform.
|
||||
var interruptSignals = []os.Signal{os.Interrupt}
|
||||
|
||||
// signals defines the default signals to catch in order to do a proper
|
||||
// shutdown.
|
||||
var signals = []os.Signal{os.Interrupt}
|
||||
// interruptListener listens for SIGINT (Ctrl+C) signals and shutdown requests
|
||||
// from shutdownRequestChannel. It returns a channel that is closed when either
|
||||
// signal is received.
|
||||
func interruptListener() <-chan struct{} {
|
||||
c := make(chan struct{})
|
||||
|
||||
// mainInterruptHandler listens for SIGINT (Ctrl+C) signals on the
|
||||
// interruptChannel and invokes the registered interruptCallbacks accordingly.
|
||||
// It also listens for callback registration. It must be run as a goroutine.
|
||||
func mainInterruptHandler() {
|
||||
// interruptCallbacks is a list of callbacks to invoke when a
|
||||
// SIGINT (Ctrl+C) is received.
|
||||
var interruptCallbacks []func()
|
||||
go func() {
|
||||
interruptChannel := make(chan os.Signal, 1)
|
||||
signal.Notify(interruptChannel, interruptSignals...)
|
||||
|
||||
// isShutdown is a flag which is used to indicate whether or not
|
||||
// the shutdown signal has already been received and hence any future
|
||||
// attempts to add a new interrupt handler should invoke them
|
||||
// immediately.
|
||||
var isShutdown bool
|
||||
|
||||
for {
|
||||
select {
|
||||
case sig := <-interruptChannel:
|
||||
// Ignore more than one shutdown signal.
|
||||
if isShutdown {
|
||||
dcrdLog.Infof("Received signal (%s). "+
|
||||
"Already shutting down...", sig)
|
||||
continue
|
||||
}
|
||||
|
||||
isShutdown = true
|
||||
dcrdLog.Infof("Received signal (%s). Shutting down...",
|
||||
sig)
|
||||
|
||||
// Run handlers in LIFO order.
|
||||
for i := range interruptCallbacks {
|
||||
idx := len(interruptCallbacks) - 1 - i
|
||||
callback := interruptCallbacks[idx]
|
||||
callback()
|
||||
}
|
||||
|
||||
// Signal the main goroutine to shutdown.
|
||||
go func() {
|
||||
shutdownChannel <- struct{}{}
|
||||
}()
|
||||
|
||||
case handler := <-addHandlerChannel:
|
||||
// The shutdown signal has already been received, so
|
||||
// just invoke and new handlers immediately.
|
||||
if isShutdown {
|
||||
handler()
|
||||
}
|
||||
|
||||
interruptCallbacks = append(interruptCallbacks, handler)
|
||||
dcrdLog.Infof("Received signal (%s). Shutting down...", sig)
|
||||
case <-shutdownRequestChannel:
|
||||
dcrdLog.Infof("Shutdown requested. Shutting down...")
|
||||
}
|
||||
}
|
||||
|
||||
close(c)
|
||||
}()
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
// addInterruptHandler adds a handler to call when a SIGINT (Ctrl+C) is
|
||||
// received.
|
||||
func addInterruptHandler(handler func()) {
|
||||
// Create the channel and start the main interrupt handler which invokes
|
||||
// all other callbacks and exits if not already done.
|
||||
if interruptChannel == nil {
|
||||
interruptChannel = make(chan os.Signal, 1)
|
||||
signal.Notify(interruptChannel, signals...)
|
||||
go mainInterruptHandler()
|
||||
// interruptRequested returns true when the channel returned by
|
||||
// interruptListener was closed. This simplifies early shutdown slightly since
|
||||
// the caller can just use an if statement instead of a select.
|
||||
func interruptRequested(interrupted <-chan struct{}) bool {
|
||||
select {
|
||||
case <-interrupted:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
|
||||
addHandlerChannel <- handler
|
||||
}
|
||||
|
||||
@ -12,5 +12,5 @@ import (
|
||||
)
|
||||
|
||||
func init() {
|
||||
signals = []os.Signal{os.Interrupt, syscall.SIGTERM}
|
||||
interruptSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user