Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions client/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,10 @@ func WaitBPDatabaseCreation(
}
fmt.Printf("\rQuerying SQLChain Profile %vs", count*int(period.Seconds()))
case <-ctx.Done():
err = ctx.Err()
return
if err != nil {
return errors.Wrapf(ctx.Err(), "last error: %s", err.Error())
}
return ctx.Err()
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/cql/internal/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ func runCreate(cmd *Command, args []string) {
SetExitStatus(1)
return
}
fmt.Printf("\nThe database is accecpted by blockproducer, DSN: %#v\n", dsn)

var ctx, cancel = context.WithTimeout(context.Background(), waitTxConfirmationMaxDuration)
defer cancel()
Expand All @@ -197,7 +198,7 @@ func runCreate(cmd *Command, args []string) {
}
}

fmt.Printf("\nThe newly created database is: %#v\n", dsn)
fmt.Printf("\nThe database is created on miners, DSN: %#v\n", dsn)
storeOneDSN(dsn)
fmt.Printf("The connecting string beginning with 'covenantsql://' could be used as a dsn for `cql console`\n or any command, or be used in website like https://web.covenantsql.io\n")
}
54 changes: 38 additions & 16 deletions sqlchain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,15 +575,19 @@ func (c *Chain) syncHead() (err error) {
child, cancel = context.WithTimeout(c.rt.ctx, c.rt.tick)
wg = &sync.WaitGroup{}

totalCount, succCount uint32
totalCount, succCount, initiatingCount uint32
)
defer func() {
wg.Wait()
cancel()

if totalCount > 0 && succCount == 0 {
// Set error if all RPC calls are failed
err = errors.New("all remote peers are unreachable")
if initiatingCount == totalCount {
err = ErrInitiating
} else {
// Set error if all RPC calls are failed
err = errors.New("all remote peers are unreachable")
}
}
}()

Expand All @@ -607,22 +611,26 @@ func (c *Chain) syncHead() (err error) {
resp = &MuxFetchBlockResp{}
)

atomic.AddUint32(&totalCount, 1)
if err := c.cl.CallNodeWithContext(
child, node, route.SQLCFetchBlock.String(), req, resp,
); err != nil {
ile.WithError(err).Error("failed to fetch block from peer")
if strings.Contains(err.Error(), ErrUnknownMuxRequest.Error()) {
// TODO(leventeliu): this omits initiating peers. Need redesign.
if !strings.Contains(err.Error(), ErrUnknownMuxRequest.Error()) {
ile.WithError(err).Error("failed to fetch block from peer")
return
}
atomic.AddUint32(&totalCount, 1)
atomic.AddUint32(&initiatingCount, 1)
return
}
atomic.AddUint32(&totalCount, 1)

if resp.Block == nil {
atomic.AddUint32(&succCount, 1)
ile.Debug("fetch block request reply: no such block")
// If block is nil, resp.Height returns the current head height of the remote peer
if resp.Height <= req.Height {
atomic.AddUint32(&initiatingCount, 1)
} else {
atomic.AddUint32(&succCount, 1)
}
return
}

Expand Down Expand Up @@ -655,7 +663,7 @@ func (c *Chain) runCurrentTurn(now time.Time, d time.Duration) {
defer func() {
c.stat()
c.pruneBlockCache()
c.rt.setNextTurn()
c.rt.IncNextTurn()
c.ai.advance(c.rt.getMinValidHeight())
// Info the block processing goroutine that the chain height has grown, so please return
// any stashed blocks for further check.
Expand All @@ -668,7 +676,7 @@ func (c *Chain) runCurrentTurn(now time.Time, d time.Duration) {

le.Debug("run current turn")
if c.rt.getHead().Height < c.rt.getNextTurn()-1 {
le.Error("a block will be skipped")
le.Debug("a block will be skipped")
}
if !c.rt.isMyTurn() {
return
Expand All @@ -691,8 +699,10 @@ func (c *Chain) mainCycle(ctx context.Context) {
return
default:
if err := c.syncHead(); err != nil {
c.logEntry().WithError(err).Error("failed to sync head")
continue
if err != ErrInitiating {
c.logEntry().WithError(err).Error("failed to sync head")
continue
}
}
if t, d := c.rt.nextTick(); d > 0 {
time.Sleep(d)
Expand Down Expand Up @@ -724,10 +734,16 @@ func (c *Chain) sync() (err error) {
}
for c.rt.getNextTurn() <= height {
if err = c.syncHead(); err != nil {
le.WithError(err).Errorf("failed to sync block at height %d", height)
return
if err != ErrInitiating {
le.WithError(err).Errorf("failed to sync block at height %d", height)
return
}
// Skip sync and reset error
c.rt.SetNextTurn(height + 1)
err = nil
} else {
c.rt.IncNextTurn()
}
c.rt.setNextTurn()
}
}
return
Expand Down Expand Up @@ -836,11 +852,13 @@ func (c *Chain) processBlocks(ctx context.Context) {
func (c *Chain) Start() (err error) {
c.rt.goFunc(c.processBlocks)
if err = c.sync(); err != nil {
c.logEntryWithHeadState().WithError(err).Error("failed to start, chain process terminated")
_ = c.Stop()
return
}
c.rt.goFunc(c.mainCycle)
c.rt.startService(c)
c.logEntryWithHeadState().Info("started successfully")
return
}

Expand Down Expand Up @@ -1205,3 +1223,7 @@ func (c *Chain) updateMetrics() {

c.expVars.Get(mwMinerChainBlockTimestamp).(*expvar.String).Set(b.Timestamp().String())
}

func (c *Chain) getCurrentHeight() int32 {
return c.rt.getHead().Height
}
3 changes: 3 additions & 0 deletions sqlchain/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,7 @@ var (
// ErrResponseSeqNotMatch indicates that a response sequence id doesn't match the original one
// in the index.
ErrResponseSeqNotMatch = errors.New("response sequence id doesn't match")
// ErrInitiating indicates that a sqlchain is in initiate state and is not available for sync
// requests.
ErrInitiating = errors.New("sqlchain is in initiate")
)
3 changes: 3 additions & 0 deletions sqlchain/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,8 @@ func (s *ChainRPCService) AdviseNewBlock(req *AdviseNewBlockReq, resp *AdviseNew
func (s *ChainRPCService) FetchBlock(req *FetchBlockReq, resp *FetchBlockResp) (err error) {
resp.Height = req.Height
resp.Block, err = s.chain.FetchBlock(req.Height)
if err == nil && resp.Block == nil {
resp.Height = s.chain.getCurrentHeight()
}
return
}
11 changes: 9 additions & 2 deletions sqlchain/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,20 @@ func (r *runtime) getNextTurn() int32 {
return r.nextTurn
}

// setNextTurn prepares the runtime state for the next turn.
func (r *runtime) setNextTurn() {
// IncNextTurn prepares the runtime state for the next turn.
func (r *runtime) IncNextTurn() {
r.stateMutex.Lock()
defer r.stateMutex.Unlock()
r.nextTurn++
}

// SetNextTurn sets the runtime state to the given turn.
func (r *runtime) SetNextTurn(turn int32) {
r.stateMutex.Lock()
defer r.stateMutex.Unlock()
r.nextTurn = turn
}

// stop sends a signal to the Runtime stop channel by closing it.
func (r *runtime) stop(dbID proto.DatabaseID) {
r.stopService(dbID)
Expand Down
15 changes: 3 additions & 12 deletions worker/dbms.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,9 @@ import (
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"

"github.com/pkg/errors"
mw "github.com/zserge/metric"

"github.com/CovenantSQL/CovenantSQL/blockproducer/interfaces"
"github.com/CovenantSQL/CovenantSQL/conf"
Expand Down Expand Up @@ -57,7 +55,7 @@ const (
)

var (
dbCount = mw.NewGauge("5m1m")
dbCount = new(expvar.Int)
)

func init() {
Expand All @@ -68,7 +66,6 @@ func init() {
type DBMS struct {
cfg *DBMSConfig
dbMap sync.Map
dbCount int64
kayakMux *DBKayakMuxService
chainMux *sqlchain.MuxService
rpc *DBMSRPCService
Expand Down Expand Up @@ -375,7 +372,6 @@ func (dbms *DBMS) initDatabases(
) {
currentInstance := make(map[proto.DatabaseID]bool)
wg := &sync.WaitGroup{}
errCh := make(chan error, len(profiles))

for id, profile := range profiles {
currentInstance[id] = true
Expand All @@ -390,15 +386,10 @@ func (dbms *DBMS) initDatabases(
log.WithFields(log.Fields{
"id": instance.DatabaseID,
}).WithError(err).Error("failed to create database instance")
errCh <- errors.Wrapf(err, "failed to create database %s", instance.DatabaseID)
}
}()
}
wg.Wait()
close(errCh)
for err := range errCh {
return err // omit any other error after this instance
}

// calculate to drop databases
toDropInstance := make(map[proto.DatabaseID]bool)
Expand Down Expand Up @@ -478,7 +469,7 @@ func (dbms *DBMS) Create(instance *types.ServiceInstance, cleanup bool) (err err
err = dbms.addMeta(instance.DatabaseID, db)

// update metrics
dbCount.Add(float64(atomic.AddInt64(&dbms.dbCount, 1)))
dbCount.Add(1)

return
}
Expand All @@ -498,7 +489,7 @@ func (dbms *DBMS) Drop(dbID proto.DatabaseID) (err error) {
}

// update metrics
dbCount.Add(float64(atomic.AddInt64(&dbms.dbCount, -1)))
dbCount.Add(-1)

// remove meta
return dbms.removeMeta(dbID)
Expand Down