diff --git a/util/addblock/addblock.go b/util/addblock/addblock.go index 1ec671ae..ccab7d43 100644 --- a/util/addblock/addblock.go +++ b/util/addblock/addblock.go @@ -1,222 +1,134 @@ -// Copyright (c) 2013 Conformal Systems LLC. +// Copyright (c) 2013-2014 Conformal Systems LLC. // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. package main import ( - "encoding/binary" - "fmt" + "github.com/conformal/btcchain" "github.com/conformal/btcd/limits" "github.com/conformal/btcdb" _ "github.com/conformal/btcdb/ldb" "github.com/conformal/btclog" - "github.com/conformal/btcutil" - "github.com/conformal/btcwire" - "github.com/conformal/go-flags" - "io" "os" "path/filepath" "runtime" ) -type ShaHash btcwire.ShaHash - -type config struct { - DataDir string `short:"b" long:"datadir" description:"Directory to store data"` - DbType string `long:"dbtype" description:"Database backend"` - TestNet3 bool `long:"testnet" description:"Use the test network"` - Progress bool `short:"p" description:"show progress"` - InFile string `short:"i" long:"infile" description:"File containing the block(s)" required:"true"` -} - const ( - ArgSha = iota - ArgHeight + // blockDbNamePrefix is the prefix for the btcd block database. + blockDbNamePrefix = "blocks" ) var ( - btcdHomeDir = btcutil.AppDataDir("btcd", false) - defaultDataDir = filepath.Join(btcdHomeDir, "data") - log btclog.Logger + cfg *config + log btclog.Logger ) -type bufQueue struct { - height int64 - blkbuf []byte -} - -type blkQueue struct { - complete chan bool - height int64 - blk *btcutil.Block -} - -func main() { - cfg := config{ - DbType: "leveldb", - DataDir: defaultDataDir, - } - parser := flags.NewParser(&cfg, flags.Default) - _, err := parser.Parse() - if err != nil { - if e, ok := err.(*flags.Error); !ok || e.Type != flags.ErrHelp { - parser.WriteHelp(os.Stderr) - } - return - } - - // Use all processor cores. - runtime.GOMAXPROCS(runtime.NumCPU()) - - // Up some limits. - if err := limits.SetLimits(); err != nil { - os.Exit(1) - } - - backendLogger := btclog.NewDefaultBackendLogger() - defer backendLogger.Flush() - log = btclog.NewSubsystemLogger(backendLogger, "") - btcdb.UseLogger(log) - - var testnet string - if cfg.TestNet3 { - testnet = "testnet" - } else { - testnet = "mainnet" - } - - cfg.DataDir = filepath.Join(cfg.DataDir, testnet) - - err = os.MkdirAll(cfg.DataDir, 0700) - if err != nil { - fmt.Printf("unable to create db repo area %v, %v", cfg.DataDir, err) - } - - blockDbNamePrefix := "blocks" +// loadBlockDB opens the block database and returns a handle to it. +func loadBlockDB() (btcdb.Db, error) { + // The database name is based on the database type. dbName := blockDbNamePrefix + "_" + cfg.DbType if cfg.DbType == "sqlite" { dbName = dbName + ".db" } dbPath := filepath.Join(cfg.DataDir, dbName) - log.Infof("loading db") - db, err := btcdb.CreateDB(cfg.DbType, dbPath) + log.Infof("Loading block database from '%s'", dbPath) + db, err := btcdb.OpenDB(cfg.DbType, dbPath) if err != nil { - log.Warnf("db open failed: %v", err) - return + // Return the error if it's not because the database doesn't + // exist. + if err != btcdb.DbDoesNotExist { + return nil, err + } + + // Create the db if it does not exist. + err = os.MkdirAll(cfg.DataDir, 0700) + if err != nil { + return nil, err + } + db, err = btcdb.CreateDB(cfg.DbType, dbPath) + if err != nil { + return nil, err + } + } + + // Get the latest block height from the database. + _, height, err := db.NewestSha() + if err != nil { + db.Close() + return nil, err + } + + log.Infof("Block database loaded with block height %d", height) + return db, nil +} + +// realMain is the real main function for the utility. It is necessary to work +// around the fact that deferred functions do not run when os.Exit() is called. +func realMain() error { + // Load configuration and parse command line. + tcfg, _, err := loadConfig() + if err != nil { + return err + } + cfg = tcfg + + // Setup logging. + backendLogger := btclog.NewDefaultBackendLogger() + defer backendLogger.Flush() + log = btclog.NewSubsystemLogger(backendLogger, "") + btcdb.UseLogger(btclog.NewSubsystemLogger(backendLogger, "BCDB: ")) + btcchain.UseLogger(btclog.NewSubsystemLogger(backendLogger, "CHAN: ")) + + // Load the block database. + db, err := loadBlockDB() + if err != nil { + log.Errorf("Failed to load database: %v", err) + return err } defer db.Close() - log.Infof("db created") - var fi io.ReadCloser - - fi, err = os.Open(cfg.InFile) + fi, err := os.Open(cfg.InFile) if err != nil { - log.Warnf("failed to open file %v, err %v", cfg.InFile, err) + log.Errorf("Failed to open file %v: %v", cfg.InFile, err) + return err } - defer func() { - if err := fi.Close(); err != nil { - log.Warn("failed to close file %v %v", cfg.InFile, err) - } - }() + defer fi.Close() - bufqueue := make(chan *bufQueue, 2) - blkqueue := make(chan *blkQueue, 2) + // Create a block importer for the database and input file and start it. + // The done channel returned from start will contain an error if + // anything went wrong. + importer := newBlockImporter(db, fi) - for i := 0; i < runtime.NumCPU(); i++ { - go processBuf(i, bufqueue, blkqueue) + // Perform the import asynchronously. This allows blocks to be + // processed and read in parallel. The results channel returned from + // Import contains the statistics about the import including an error + // if something went wrong. + log.Info("Starting import") + resultsChan := importer.Import() + results := <-resultsChan + if results.err != nil { + log.Errorf("%v", results.err) + return results.err } - go processBuf(0, bufqueue, blkqueue) - go readBlocks(fi, bufqueue) - - var eheight int64 - doneMap := map[int64]*blkQueue{} - for { - - select { - case blkM := <-blkqueue: - doneMap[blkM.height] = blkM - - for { - if blkP, ok := doneMap[eheight]; ok { - delete(doneMap, eheight) - blkP.complete <- true - db.InsertBlock(blkP.blk) - - if cfg.Progress && eheight%int64(10000) == 0 { - log.Infof("Processing block %v", eheight) - } - eheight++ - } else { - break - } - } - } - } - if cfg.Progress { - log.Infof("Processing block %v", eheight) - } + log.Infof("Processed a total of %d blocks (%d imported, %d already "+ + "known)", results.blocksProcessed, results.blocksImported, + results.blocksProcessed-results.blocksImported) + return nil } -func processBuf(idx int, bufqueue chan *bufQueue, blkqueue chan *blkQueue) { - complete := make(chan bool) - for { - select { - case bq := <-bufqueue: - var blkmsg blkQueue +func main() { + // Use all processor cores and up some limits. + runtime.GOMAXPROCS(runtime.NumCPU()) + if err := limits.SetLimits(); err != nil { + os.Exit(1) + } - blkmsg.height = bq.height - - if len(bq.blkbuf) == 0 { - // we are done - blkqueue <- &blkmsg - } - - blk, err := btcutil.NewBlockFromBytes(bq.blkbuf) - if err != nil { - fmt.Printf("failed to parse block %v", bq.height) - return - } - blkmsg.blk = blk - blkmsg.complete = complete - blkqueue <- &blkmsg - select { - case <-complete: - } - } - } -} - -func readBlocks(fi io.Reader, bufqueue chan *bufQueue) { - var height int64 - for { - var net, blen uint32 - - var bufM bufQueue - bufM.height = height - - // generate and write header values - err := binary.Read(fi, binary.LittleEndian, &net) - if err != nil { - break - bufqueue <- &bufM - } - if net != uint32(btcwire.MainNet) { - fmt.Printf("network mismatch %v %v", - net, uint32(btcwire.MainNet)) - - bufqueue <- &bufM - } - err = binary.Read(fi, binary.LittleEndian, &blen) - if err != nil { - bufqueue <- &bufM - } - blkbuf := make([]byte, blen) - err = binary.Read(fi, binary.LittleEndian, blkbuf) - bufM.blkbuf = blkbuf - bufqueue <- &bufM - height++ + // Work around defer not working after os.Exit() + if err := realMain(); err != nil { + os.Exit(1) } } diff --git a/util/addblock/config.go b/util/addblock/config.go new file mode 100644 index 00000000..7c03c8f4 --- /dev/null +++ b/util/addblock/config.go @@ -0,0 +1,125 @@ +// Copyright (c) 2013-2014 Conformal Systems LLC. +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package main + +import ( + "fmt" + "github.com/conformal/btcdb" + _ "github.com/conformal/btcdb/ldb" + "github.com/conformal/btcutil" + "github.com/conformal/btcwire" + "github.com/conformal/go-flags" + "os" + "path/filepath" +) + +const ( + defaultDbType = "leveldb" + defaultDataFile = "bootstrap.dat" + defaultProgress = 10000 +) + +var ( + btcdHomeDir = btcutil.AppDataDir("btcd", false) + defaultDataDir = filepath.Join(btcdHomeDir, "data") + knownDbTypes = btcdb.SupportedDBs() + activeNetwork = btcwire.MainNet +) + +// config defines the configuration options for findcheckpoint. +// +// See loadConfig for details on the configuration load process. +type config struct { + DataDir string `short:"b" long:"datadir" description:"Location of the btcd data directory"` + DbType string `long:"dbtype" description:"Database backend to use for the Block Chain"` + TestNet3 bool `long:"testnet" description:"Use the test network"` + InFile string `short:"i" long:"infile" description:"File containing the block(s)"` + Progress int `short:"p" long:"progress" description:"Show a progress message every time this number of blocks is processed -- Use 0 to disable progress announcements"` +} + +// filesExists reports whether the named file or directory exists. +func fileExists(name string) bool { + if _, err := os.Stat(name); err != nil { + if os.IsNotExist(err) { + return false + } + } + return true +} + +// validDbType returns whether or not dbType is a supported database type. +func validDbType(dbType string) bool { + for _, knownType := range knownDbTypes { + if dbType == knownType { + return true + } + } + + return false +} + +// netName returns a human-readable name for the passed bitcoin network. +func netName(btcnet btcwire.BitcoinNet) string { + net := "mainnet" + if btcnet == btcwire.TestNet3 { + net = "testnet" + } + return net +} + +// loadConfig initializes and parses the config using command line options. +func loadConfig() (*config, []string, error) { + // Default config. + cfg := config{ + DataDir: defaultDataDir, + DbType: defaultDbType, + InFile: defaultDataFile, + Progress: defaultProgress, + } + + // Parse command line options. + parser := flags.NewParser(&cfg, flags.Default) + remainingArgs, err := parser.Parse() + if err != nil { + if e, ok := err.(*flags.Error); !ok || e.Type != flags.ErrHelp { + parser.WriteHelp(os.Stderr) + } + return nil, nil, err + } + + // Choose the active network based on the flags. + if cfg.TestNet3 { + activeNetwork = btcwire.TestNet3 + } + + // Validate database type. + if !validDbType(cfg.DbType) { + str := "%s: The specified database type [%v] is invalid -- " + + "supported types %v" + err := fmt.Errorf(str, "loadConfig", cfg.DbType, knownDbTypes) + fmt.Fprintln(os.Stderr, err) + parser.WriteHelp(os.Stderr) + return nil, nil, err + } + + // Append the network type to the data directory so it is "namespaced" + // per network. In addition to the block database, there are other + // pieces of data that are saved to disk such as address manager state. + // All data is specific to a network, so namespacing the data directory + // means each individual piece of serialized data does not have to + // worry about changing names per network and such. + cfg.DataDir = filepath.Join(cfg.DataDir, netName(activeNetwork)) + + // Ensure the specified block file exists. + if !fileExists(cfg.InFile) { + str := "%s: The specified block file [%v] does not exist" + err := fmt.Errorf(str, "loadConfig", cfg.InFile) + fmt.Fprintln(os.Stderr, err) + parser.WriteHelp(os.Stderr) + return nil, nil, err + } + + return &cfg, remainingArgs, nil +} diff --git a/util/addblock/import.go b/util/addblock/import.go new file mode 100644 index 00000000..931002f5 --- /dev/null +++ b/util/addblock/import.go @@ -0,0 +1,251 @@ +// Copyright (c) 2013-2014 Conformal Systems LLC. +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package main + +import ( + "encoding/binary" + "fmt" + "github.com/conformal/btcchain" + "github.com/conformal/btcdb" + _ "github.com/conformal/btcdb/ldb" + "github.com/conformal/btcutil" + "github.com/conformal/btcwire" + "io" + "sync" +) + +var zeroHash = btcwire.ShaHash{} + +// importResults houses the stats and result as an import operation. +type importResults struct { + blocksProcessed int64 + blocksImported int64 + err error +} + +// blockImporter houses information about an ongoing import from a block data +// file to the block database. +type blockImporter struct { + db btcdb.Db + chain *btcchain.BlockChain + r io.ReadSeeker + processQueue chan []byte + doneChan chan bool + errChan chan error + quit chan bool + wg sync.WaitGroup + blocksProcessed int64 + blocksImported int64 +} + +// readBlock reads the next block from the input file. +func (bi *blockImporter) readBlock() ([]byte, error) { + // The block file format is: + // + var net uint32 + err := binary.Read(bi.r, binary.LittleEndian, &net) + if err != nil { + if err != io.EOF { + return nil, err + } + + // No block and no error means there are no more blocks to read. + return nil, nil + } + if net != uint32(activeNetwork) { + return nil, fmt.Errorf("network mismatch -- got %x, want %x", + net, uint32(activeNetwork)) + } + + // Read the block length and ensure it is sane. + var blockLen uint32 + if err := binary.Read(bi.r, binary.LittleEndian, &blockLen); err != nil { + return nil, err + } + if blockLen > btcwire.MaxBlockPayload { + return nil, fmt.Errorf("block payload of %d bytes is larger "+ + "than the max allowed %d bytes", blockLen, + btcwire.MaxBlockPayload) + } + + serializedBlock := make([]byte, blockLen) + if _, err := io.ReadFull(bi.r, serializedBlock); err != nil { + return nil, err + } + + return serializedBlock, nil +} + +// processBlock potentially imports the block into the database. It first +// deserializes the raw block while checking for errors. Already known blocks +// are skipped and orphan blocks are considered errors. Finally, it runs the +// block through the chain rules to ensure it follows all rules and matches +// up to the known checkpoint. Returns whether the block was imported along +// with any potential errors. +func (bi *blockImporter) processBlock(serializedBlock []byte) (bool, error) { + // Deserialize the block which includes checks for malformed blocks. + block, err := btcutil.NewBlockFromBytes(serializedBlock) + if err != nil { + return false, err + } + + blockSha, err := block.Sha() + if err != nil { + return false, err + } + + // Skip blocks that already exist. + if bi.db.ExistsSha(blockSha) { + return false, nil + } + + // Don't bother trying to process orphans. + prevHash := &block.MsgBlock().Header.PrevBlock + if !prevHash.IsEqual(&zeroHash) && !bi.db.ExistsSha(prevHash) { + return false, fmt.Errorf("import file contains block %v which "+ + "does not link to the available block chain", blockSha) + } + + // Ensure the blocks follows all of the chain rules and match up to the + // known checkpoints. + if err := bi.chain.ProcessBlock(block, true); err != nil { + return false, err + } + + return true, nil +} + +// readHandler is the main handler for reading blocks from the import file. +// This allows block processing to take place in parallel with block reads. +// It must be run as a goroutine. +func (bi *blockImporter) readHandler() { +out: + for { + // Read the next block from the file and if anything goes wrong + // notify the status handler with the error and bail. + serializedBlock, err := bi.readBlock() + if err != nil { + bi.errChan <- fmt.Errorf("Error reading from input "+ + "file: %v", err.Error()) + break out + } + + // A nil block with no error means we're done. + if serializedBlock == nil { + break out + } + + // Send the block or quit if we've been signalled to exit by + // the status handler due to an error elsewhere. + select { + case bi.processQueue <- serializedBlock: + case <-bi.quit: + break out + } + } + + // Close the processing channel to signal no more blocks are coming. + close(bi.processQueue) + bi.wg.Done() +} + +// processHandler is the main handler for processing blocks. This allows block +// processing to take place in parallel with block reads from the import file. +// It must be run as a goroutine. +func (bi *blockImporter) processHandler() { +out: + for { + select { + case serializedBlock, ok := <-bi.processQueue: + // We're done when the channel is closed. + if !ok { + break out + } + + bi.blocksProcessed++ + imported, err := bi.processBlock(serializedBlock) + if err != nil { + bi.errChan <- err + break out + } + + if imported { + bi.blocksImported++ + } + + if cfg.Progress != 0 && bi.blocksProcessed > 0 && + bi.blocksProcessed%int64(cfg.Progress) == 0 { + log.Infof("Processed %d blocks", bi.blocksProcessed) + } + + case <-bi.quit: + break out + } + } + bi.wg.Done() +} + +// statusHandler waits for updates from the import operation and notifies +// the passed doneChan with the results of the import. It also causes all +// goroutines to exit if an error is reported from any of them. +func (bi *blockImporter) statusHandler(resultsChan chan *importResults) { + select { + // An error from either of the goroutines means we're done so signal + // caller with the error and signal all goroutines to quit. + case err := <-bi.errChan: + resultsChan <- &importResults{ + blocksProcessed: bi.blocksProcessed, + blocksImported: bi.blocksImported, + err: err, + } + close(bi.quit) + + // The import finished normally. + case <-bi.doneChan: + resultsChan <- &importResults{ + blocksProcessed: bi.blocksProcessed, + blocksImported: bi.blocksImported, + err: nil, + } + } +} + +// Import is the core function which handles importing the blocks from the file +// associated with the block importer to the database. It returns a channel +// on which the results will be returned when the operation has completed. +func (bi *blockImporter) Import() chan *importResults { + // Start up the read and process handling goroutines. This setup allows + // blocks to be read from disk in parallel while being processed. + bi.wg.Add(2) + go bi.readHandler() + go bi.processHandler() + + // Wait for the import to finish in a separate goroutine and signal + // the status handler when done. + go func() { + bi.wg.Wait() + bi.doneChan <- true + }() + + // Start the status handler and return the result the channel that it + // will send the results on when the import is done. + resultChan := make(chan *importResults) + go bi.statusHandler(resultChan) + return resultChan +} + +// newBlockImporter returns a new importer for the provided file reader seeker +// and database. +func newBlockImporter(db btcdb.Db, r io.ReadSeeker) *blockImporter { + return &blockImporter{ + db: db, + r: r, + processQueue: make(chan []byte, 2), + doneChan: make(chan bool), + errChan: make(chan error), + quit: make(chan bool), + chain: btcchain.New(db, activeNetwork, nil), + } +}