Compare commits

..

No commits in common. "master" and "v0.22.115" have entirely different histories.

19 changed files with 236 additions and 612 deletions

View file

@ -18,7 +18,7 @@ builds:
ldflags: ldflags:
- -s -w - -s -w
- -buildid= - -buildid=
- -X github.com/lbryfoundation/lbcd/version.appTag={{ .Tag }} - -X github.com/lbryio/lbcd/version.appTag={{ .Tag }}
targets: targets:
- linux_amd64 - linux_amd64
- linux_arm64 - linux_arm64
@ -35,7 +35,7 @@ builds:
ldflags: ldflags:
- -s -w - -s -w
- -buildid= - -buildid=
- -X github.com/lbryfoundation/lbcd/version.appTag={{ .Tag }} - -X github.com/lbryio/lbcd/version.appTag={{ .Tag }}
env: env:
- CGO_ENABLED=0 - CGO_ENABLED=0
targets: targets:
@ -60,8 +60,8 @@ dockers:
- use: buildx - use: buildx
dockerfile: Dockerfile.goreleaser dockerfile: Dockerfile.goreleaser
image_templates: image_templates:
- "docker.io/lbryfoundation/lbcd:{{ .Tag }}" - "docker.io/lbry/lbcd:{{ .Tag }}"
- "docker.io/lbryfoundation/lbcd:latest" - "docker.io/lbry/lbcd:latest"
release: release:
draft: true draft: true

View file

@ -977,8 +977,8 @@ func NewImportMultiCmd(requests []ImportMultiRequest, options *ImportMultiOption
// RescanBlockchainCmd defines the RescanBlockchain JSON-RPC command. // RescanBlockchainCmd defines the RescanBlockchain JSON-RPC command.
type RescanBlockchainCmd struct { type RescanBlockchainCmd struct {
StartHeight *int32 `jsonrpcdefault:"0"` StartHeight *int64 `jsonrpcdefault:"0"`
StopHeight *int32 StopHeight *int64 `jsonrpcdefault:"0"`
} }
// NewRescanBlockchainCmd returns a new instance which can be used to issue // NewRescanBlockchainCmd returns a new instance which can be used to issue
@ -986,7 +986,7 @@ type RescanBlockchainCmd struct {
// //
// The parameters which are pointers indicate they are optional. Passing nil // The parameters which are pointers indicate they are optional. Passing nil
// for optional parameters will use the default value. // for optional parameters will use the default value.
func NewRescanBlockchainCmd(startHeight *int32, stopHeight *int32) *RescanBlockchainCmd { func NewRescanBlockchainCmd(startHeight *int64, stopHeight *int64) *RescanBlockchainCmd {
return &RescanBlockchainCmd{ return &RescanBlockchainCmd{
StartHeight: startHeight, StartHeight: startHeight,
StopHeight: stopHeight, StopHeight: stopHeight,

View file

@ -319,8 +319,8 @@ type ListUnspentResult struct {
// RescanBlockchainResult models the data returned from the rescanblockchain command. // RescanBlockchainResult models the data returned from the rescanblockchain command.
type RescanBlockchainResult struct { type RescanBlockchainResult struct {
StartHeight int32 `json:"start_height"` StartHeight int64 `json:"start_height"`
StoptHeight int32 `json:"stop_height"` StoptHeight int64 `json:"stop_height"`
} }
// SignRawTransactionError models the data that contains script verification // SignRawTransactionError models the data that contains script verification

View file

@ -4,7 +4,9 @@ import (
"bytes" "bytes"
"fmt" "fmt"
"path/filepath" "path/filepath"
"runtime"
"sort" "sort"
"sync"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -247,17 +249,17 @@ func (ct *ClaimTrie) AppendBlock(temporary bool) error {
names = append(names, expirations...) names = append(names, expirations...)
names = removeDuplicates(names) names = removeDuplicates(names)
for _, name := range names { nhns := ct.makeNameHashNext(names, false, nil)
for nhn := range nhns {
hash, next := ct.nodeManager.Hash(name) ct.merkleTrie.Update(nhn.Name, nhn.Hash, true)
ct.merkleTrie.Update(name, hash, true) if nhn.Next <= 0 {
if next <= 0 {
continue continue
} }
newName := normalization.NormalizeIfNecessary(name, next) newName := normalization.NormalizeIfNecessary(nhn.Name, nhn.Next)
updateNames = append(updateNames, newName) updateNames = append(updateNames, newName)
updateHeights = append(updateHeights, next) updateHeights = append(updateHeights, nhn.Next)
} }
if !temporary && len(updateNames) > 0 { if !temporary && len(updateNames) > 0 {
err = ct.temporalRepo.SetNodesAt(updateNames, updateHeights) err = ct.temporalRepo.SetNodesAt(updateNames, updateHeights)
@ -354,29 +356,22 @@ func (ct *ClaimTrie) ResetHeight(height int32) error {
} }
func (ct *ClaimTrie) runFullTrieRebuild(names [][]byte, interrupt <-chan struct{}) { func (ct *ClaimTrie) runFullTrieRebuild(names [][]byte, interrupt <-chan struct{}) {
var nhns chan NameHashNext
if names == nil { if names == nil {
node.Log("Building the entire claim trie in RAM...") node.Log("Building the entire claim trie in RAM...")
ct.claimLogger = newClaimProgressLogger("Processed", node.GetLogger()) ct.claimLogger = newClaimProgressLogger("Processed", node.GetLogger())
nhns = ct.makeNameHashNext(nil, true, interrupt)
ct.nodeManager.IterateNames(func(name []byte) bool {
if interruptRequested(interrupt) {
return false
}
clone := make([]byte, len(name))
copy(clone, name)
hash, _ := ct.nodeManager.Hash(clone)
ct.merkleTrie.Update(clone, hash, false)
ct.claimLogger.LogName(name)
return true
})
} else { } else {
for _, name := range names { ct.claimLogger = nil
hash, _ := ct.nodeManager.Hash(name) nhns = ct.makeNameHashNext(names, false, interrupt)
ct.merkleTrie.Update(name, hash, false)
}
} }
for nhn := range nhns {
ct.merkleTrie.Update(nhn.Name, nhn.Hash, false)
if ct.claimLogger != nil {
ct.claimLogger.LogName(nhn.Name)
}
}
} }
// MerkleHash returns the Merkle Hash of the claimTrie. // MerkleHash returns the Merkle Hash of the claimTrie.
@ -442,6 +437,12 @@ func (ct *ClaimTrie) FlushToDisk() {
} }
} }
type NameHashNext struct {
Name []byte
Hash *chainhash.Hash
Next int32
}
func interruptRequested(interrupted <-chan struct{}) bool { func interruptRequested(interrupted <-chan struct{}) bool {
select { select {
case <-interrupted: // should never block on nil case <-interrupted: // should never block on nil
@ -451,3 +452,53 @@ func interruptRequested(interrupted <-chan struct{}) bool {
return false return false
} }
func (ct *ClaimTrie) makeNameHashNext(names [][]byte, all bool, interrupt <-chan struct{}) chan NameHashNext {
inputs := make(chan []byte, 512)
outputs := make(chan NameHashNext, 512)
var wg sync.WaitGroup
hashComputationWorker := func() {
for name := range inputs {
hash, next := ct.nodeManager.Hash(name)
outputs <- NameHashNext{name, hash, next}
}
wg.Done()
}
threads := int(0.8 * float32(runtime.GOMAXPROCS(0)))
if threads < 1 {
threads = 1
}
for threads > 0 {
threads--
wg.Add(1)
go hashComputationWorker()
}
go func() {
if all {
ct.nodeManager.IterateNames(func(name []byte) bool {
if interruptRequested(interrupt) {
return false
}
clone := make([]byte, len(name))
copy(clone, name) // iteration name buffer is reused on future loops
inputs <- clone
return true
})
} else {
for _, name := range names {
if interruptRequested(interrupt) {
break
}
inputs <- name
}
}
close(inputs)
}()
go func() {
wg.Wait()
close(outputs)
}()
return outputs
}

View file

@ -1,85 +0,0 @@
package node
import (
"container/list"
"github.com/lbryio/lbcd/claimtrie/change"
)
type cacheLeaf struct {
node *Node
element *list.Element
changes []change.Change
height int32
}
type Cache struct {
nodes map[string]*cacheLeaf
order *list.List
limit int
}
func (nc *Cache) insert(name []byte, n *Node, height int32) {
key := string(name)
existing := nc.nodes[key]
if existing != nil {
existing.node = n
existing.height = height
existing.changes = nil
nc.order.MoveToFront(existing.element)
return
}
for nc.order.Len() >= nc.limit {
// TODO: maybe ensure that we don't remove nodes that have a lot of changes?
delete(nc.nodes, nc.order.Back().Value.(string))
nc.order.Remove(nc.order.Back())
}
element := nc.order.PushFront(key)
nc.nodes[key] = &cacheLeaf{node: n, element: element, height: height}
}
func (nc *Cache) fetch(name []byte, height int32) (*Node, []change.Change, int32) {
key := string(name)
existing := nc.nodes[key]
if existing != nil && existing.height <= height {
nc.order.MoveToFront(existing.element)
return existing.node, existing.changes, existing.height
}
return nil, nil, -1
}
func (nc *Cache) addChanges(changes []change.Change, height int32) {
for _, c := range changes {
key := string(c.Name)
existing := nc.nodes[key]
if existing != nil && existing.height <= height {
existing.changes = append(existing.changes, c)
}
}
}
func (nc *Cache) drop(names [][]byte) {
for _, name := range names {
key := string(name)
existing := nc.nodes[key]
if existing != nil {
// we can't roll it backwards because we don't know its previous height value; just toast it
delete(nc.nodes, key)
nc.order.Remove(existing.element)
}
}
}
func (nc *Cache) clear() {
nc.nodes = map[string]*cacheLeaf{}
nc.order = list.New()
// we'll let the GC sort out the remains...
}
func NewCache(limit int) *Cache {
return &Cache{limit: limit, nodes: map[string]*cacheLeaf{}, order: list.New()}
}

View file

@ -21,7 +21,6 @@ type Manager interface {
IterateNames(predicate func(name []byte) bool) IterateNames(predicate func(name []byte) bool)
Hash(name []byte) (*chainhash.Hash, int32) Hash(name []byte) (*chainhash.Hash, int32)
Flush() error Flush() error
ClearCache()
} }
type BaseManager struct { type BaseManager struct {
@ -31,28 +30,19 @@ type BaseManager struct {
changes []change.Change changes []change.Change
tempChanges map[string][]change.Change tempChanges map[string][]change.Change
cache *Cache
} }
func NewBaseManager(repo Repo) (*BaseManager, error) { func NewBaseManager(repo Repo) (*BaseManager, error) {
nm := &BaseManager{ nm := &BaseManager{
repo: repo, repo: repo,
cache: NewCache(10000), // TODO: how many should we cache?
} }
return nm, nil return nm, nil
} }
func (nm *BaseManager) ClearCache() {
nm.cache.clear()
}
func (nm *BaseManager) NodeAt(height int32, name []byte) (*Node, error) { func (nm *BaseManager) NodeAt(height int32, name []byte) (*Node, error) {
n, changes, oldHeight := nm.cache.fetch(name, height)
if n == nil {
changes, err := nm.repo.LoadChanges(name) changes, err := nm.repo.LoadChanges(name)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "in load changes") return nil, errors.Wrap(err, "in load changes")
@ -62,32 +52,10 @@ func (nm *BaseManager) NodeAt(height int32, name []byte) (*Node, error) {
changes = append(changes, nm.tempChanges[string(name)]...) changes = append(changes, nm.tempChanges[string(name)]...)
} }
n, err = nm.newNodeFromChanges(changes, height) n, err := nm.newNodeFromChanges(changes, height)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "in new node") return nil, errors.Wrap(err, "in new node")
} }
// TODO: how can we tell what needs to be cached?
if nm.tempChanges == nil && height == nm.height && n != nil && (len(changes) > 4 || len(name) < 12) {
nm.cache.insert(name, n, height)
}
} else {
if nm.tempChanges != nil { // making an assumption that we only ever have tempChanges for a single block
changes = append(changes, nm.tempChanges[string(name)]...)
n = n.Clone()
} else if height != nm.height {
n = n.Clone()
}
updated, err := nm.updateFromChanges(n, changes, height)
if err != nil {
return nil, errors.Wrap(err, "in update from changes")
}
if !updated {
n.AdjustTo(oldHeight, height, name)
}
if nm.tempChanges == nil && height == nm.height {
nm.cache.insert(name, n, height)
}
}
return n, nil return n, nil
} }
@ -98,13 +66,17 @@ func (nm *BaseManager) node(name []byte) (*Node, error) {
return nm.NodeAt(nm.height, name) return nm.NodeAt(nm.height, name)
} }
func (nm *BaseManager) updateFromChanges(n *Node, changes []change.Change, height int32) (bool, error) { // newNodeFromChanges returns a new Node constructed from the changes.
// The changes must preserve their order received.
func (nm *BaseManager) newNodeFromChanges(changes []change.Change, height int32) (*Node, error) {
count := len(changes) if len(changes) == 0 {
if count == 0 { return nil, nil
return false, nil
} }
n := New()
previous := changes[0].Height previous := changes[0].Height
count := len(changes)
for i, chg := range changes { for i, chg := range changes {
if chg.Height < previous { if chg.Height < previous {
@ -123,37 +95,15 @@ func (nm *BaseManager) updateFromChanges(n *Node, changes []change.Change, heigh
delay := nm.getDelayForName(n, chg) delay := nm.getDelayForName(n, chg)
err := n.ApplyChange(chg, delay) err := n.ApplyChange(chg, delay)
if err != nil { if err != nil {
return false, errors.Wrap(err, "in apply change") return nil, errors.Wrap(err, "in apply change")
} }
} }
if count <= 0 { if count <= 0 {
// we applied no changes, which means we shouldn't exist if we had all the changes return nil, nil
// or might mean nothing significant if we are applying a partial changeset
return false, nil
} }
lastChange := changes[count-1] lastChange := changes[count-1]
n.AdjustTo(lastChange.Height, height, lastChange.Name) return n.AdjustTo(lastChange.Height, height, lastChange.Name), nil
return true, nil
}
// newNodeFromChanges returns a new Node constructed from the changes.
// The changes must preserve their order received.
func (nm *BaseManager) newNodeFromChanges(changes []change.Change, height int32) (*Node, error) {
if len(changes) == 0 {
return nil, nil
}
n := New()
updated, err := nm.updateFromChanges(n, changes, height)
if err != nil {
return nil, errors.Wrap(err, "in update from changes")
}
if updated {
return n, nil
}
return nil, nil
} }
func (nm *BaseManager) AppendChange(chg change.Change) { func (nm *BaseManager) AppendChange(chg change.Change) {
@ -270,7 +220,6 @@ func (nm *BaseManager) IncrementHeightTo(height int32, temporary bool) ([][]byte
} }
if !temporary { if !temporary {
nm.cache.addChanges(nm.changes, height)
if err := nm.repo.AppendChanges(nm.changes); err != nil { // destroys names if err := nm.repo.AppendChanges(nm.changes); err != nil { // destroys names
return nil, errors.Wrap(err, "in append changes") return nil, errors.Wrap(err, "in append changes")
} }
@ -306,8 +255,6 @@ func (nm *BaseManager) DecrementHeightTo(affectedNames [][]byte, height int32) (
return affectedNames, errors.Wrap(err, "in drop changes") return affectedNames, errors.Wrap(err, "in drop changes")
} }
} }
nm.cache.drop(affectedNames)
} }
nm.height = height nm.height = height

View file

@ -110,7 +110,7 @@ func (n *Node) ApplyChange(chg change.Change, delay int32) error {
} }
// AdjustTo activates claims and computes takeovers until it reaches the specified height. // AdjustTo activates claims and computes takeovers until it reaches the specified height.
func (n *Node) AdjustTo(height, maxHeight int32, name []byte) { func (n *Node) AdjustTo(height, maxHeight int32, name []byte) *Node {
changed := n.handleExpiredAndActivated(height) > 0 changed := n.handleExpiredAndActivated(height) > 0
n.updateTakeoverHeight(height, name, changed) n.updateTakeoverHeight(height, name, changed)
if maxHeight > height { if maxHeight > height {
@ -120,6 +120,7 @@ func (n *Node) AdjustTo(height, maxHeight int32, name []byte) {
height = h height = h
} }
} }
return n
} }
func (n *Node) updateTakeoverHeight(height int32, name []byte, refindBest bool) { func (n *Node) updateTakeoverHeight(height int32, name []byte, refindBest bool) {
@ -339,28 +340,3 @@ func (n *Node) SortClaimsByBid() {
return OutPointLess(n.Claims[j].OutPoint, n.Claims[i].OutPoint) return OutPointLess(n.Claims[j].OutPoint, n.Claims[i].OutPoint)
}) })
} }
func (n *Node) Clone() *Node {
clone := New()
if n.SupportSums != nil {
clone.SupportSums = map[string]int64{}
for key, value := range n.SupportSums {
clone.SupportSums[key] = value
}
}
clone.Supports = make(ClaimList, len(n.Supports))
for i, support := range n.Supports {
clone.Supports[i] = &Claim{}
*clone.Supports[i] = *support
}
clone.Claims = make(ClaimList, len(n.Claims))
for i, claim := range n.Claims {
clone.Claims[i] = &Claim{}
*clone.Claims[i] = *claim
}
clone.TakenOverAt = n.TakenOverAt
if n.BestClaim != nil {
clone.BestClaim = clone.Claims.find(byID(n.BestClaim.ClaimID))
}
return clone
}

View file

@ -34,7 +34,6 @@ func (nm *NormalizingManager) IncrementHeightTo(height int32, temporary bool) ([
func (nm *NormalizingManager) DecrementHeightTo(affectedNames [][]byte, height int32) ([][]byte, error) { func (nm *NormalizingManager) DecrementHeightTo(affectedNames [][]byte, height int32) ([][]byte, error) {
if nm.normalizedAt > height { if nm.normalizedAt > height {
nm.normalizedAt = -1 nm.normalizedAt = -1
nm.ClearCache()
} }
return nm.Manager.DecrementHeightTo(affectedNames, height) return nm.Manager.DecrementHeightTo(affectedNames, height)
} }
@ -111,7 +110,5 @@ func (nm *NormalizingManager) addNormalizationForkChangesIfNecessary(height int3
return true return true
} }
nm.Manager.ClearCache()
nm.Manager.IterateNames(predicate) nm.Manager.IterateNames(predicate)
} }

View file

@ -111,8 +111,6 @@ type config struct {
SigNet bool `long:"signet" description:"Connect to signet (default RPC server: localhost:49245)"` SigNet bool `long:"signet" description:"Connect to signet (default RPC server: localhost:49245)"`
Wallet bool `long:"wallet" description:"Connect to wallet RPC server instead (default: localhost:9244, testnet: localhost:19244, regtest: localhost:29244)"` Wallet bool `long:"wallet" description:"Connect to wallet RPC server instead (default: localhost:9244, testnet: localhost:19244, regtest: localhost:29244)"`
ShowVersion bool `short:"V" long:"version" description:"Display version information and exit"` ShowVersion bool `short:"V" long:"version" description:"Display version information and exit"`
Timed bool `short:"t" long:"timed" description:"Display RPC response time"`
Quiet bool `short:"q" long:"quiet" description:"Do not output results to stdout"`
} }
// normalizeAddress returns addr with the passed default port appended if // normalizeAddress returns addr with the passed default port appended if

View file

@ -9,7 +9,6 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
"time"
"github.com/lbryio/lbcd/btcjson" "github.com/lbryio/lbcd/btcjson"
) )
@ -134,8 +133,6 @@ func main() {
os.Exit(1) os.Exit(1)
} }
started := time.Now()
// Send the JSON-RPC request to the server using the user-specified // Send the JSON-RPC request to the server using the user-specified
// connection configuration. // connection configuration.
result, err := sendPostRequest(marshalledJSON, cfg) result, err := sendPostRequest(marshalledJSON, cfg)
@ -144,16 +141,6 @@ func main() {
os.Exit(1) os.Exit(1)
} }
if cfg.Timed {
elapsed := time.Since(started)
defer fmt.Fprintf(os.Stderr, "%s\n", elapsed)
}
var output io.Writer = os.Stdout
if cfg.Quiet {
output = io.Discard
}
// Choose how to display the result based on its type. // Choose how to display the result based on its type.
strResult := string(result) strResult := string(result)
if strings.HasPrefix(strResult, "{") || strings.HasPrefix(strResult, "[") { if strings.HasPrefix(strResult, "{") || strings.HasPrefix(strResult, "[") {
@ -163,7 +150,7 @@ func main() {
err) err)
os.Exit(1) os.Exit(1)
} }
fmt.Fprintln(output, dst.String()) fmt.Println(dst.String())
} else if strings.HasPrefix(strResult, `"`) { } else if strings.HasPrefix(strResult, `"`) {
var str string var str string
@ -172,9 +159,9 @@ func main() {
err) err)
os.Exit(1) os.Exit(1)
} }
fmt.Fprintln(output, str) fmt.Println(str)
} else if strResult != "null" { } else if strResult != "null" {
fmt.Fprintln(output, strResult) fmt.Println(strResult)
} }
} }

View file

@ -1,21 +1,15 @@
# lbcdbloknotify # lbcd Websockets Example
This bridge program subscribes to lbcd's notifications over websockets using the rpcclient package. This example shows how to use the rpcclient package to connect to a btcd RPC
Users can specify supported actions upon receiving this notifications. server using TLS-secured websockets, register for block connected and block
disconnected notifications, and get the current block count.
## Building(or Running) the Program ## Running the Example
Clone the lbcd package: The first step is to clone the lbcd package:
```bash ```bash
$ git clone github.com/lbryio/lbcd $ git clone github.com/lbryio/lbcd
$ cd lbcd/rpcclient/examples
# build the program
$ go build .
# or directly run it (build implicitly behind the scene)
$ go run .
``` ```
Display available options: Display available options:
@ -36,30 +30,19 @@ $ go run . -h
-stratumpass string -stratumpass string
Stratum server password (default "password") Stratum server password (default "password")
-quiet -quiet
Do not print periodic logs Do not print logs
``` ```
Running the program: Start the program:
```bash ```bash
# Send stratum mining.update_block mesage upon receving block connected notifiations. $ go run . -stratumpass <STRATUM PASSWD> -rpcuser <RPC USERNAME> -rpcpass <RPC PASSWD>
$ go run . -rpcuser <RPC USERNAME> -rpcpass <RPC PASSWD> --notls -stratum <STRATUM SERVER> -stratumpass <STRATUM PASSWD>
2022/01/10 23:16:21 Current block count: 1093112 2022/01/10 23:16:21 NotifyBlocks: Registration Complete
2022/01/10 23:16:21 Block count: 1093112
... ...
# Execute a custome command (with blockhash) upon receving block connected notifiations.
$ go run . -rpcuser <RPC USERNAME> -rpcpass <RPC PASSWD> --notls -run "echo %s"
``` ```
## Notes
* Stratum TCP connection is persisted with auto-reconnect. (retry backoff increases from 1s to 60s maximum)
* Stratum update_block jobs on previous notifications are canceled when a new notification arrives.
Usually, the jobs are so short and completed immediately. However, if the Stratum connection is broken, this
prevents the bridge from accumulating stale jobs.
## License ## License
This example is licensed under the [copyfree](http://copyfree.org) ISC License. This example is licensed under the [copyfree](http://copyfree.org) ISC License.

View file

@ -1,20 +0,0 @@
package main
import (
"github.com/lbryio/lbcd/wire"
"github.com/lbryio/lbcutil"
)
type eventBlockConected struct {
height int32
header *wire.BlockHeader
txns []*lbcutil.Tx
}
type adapter struct {
*bridge
}
func (a *adapter) onFilteredBlockConnected(height int32, header *wire.BlockHeader, txns []*lbcutil.Tx) {
a.eventCh <- &eventBlockConected{height, header, txns}
}

View file

@ -1,172 +0,0 @@
package main
import (
"context"
"errors"
"fmt"
"log"
"net"
"os"
"os/exec"
"strings"
"sync"
"syscall"
"time"
)
type bridge struct {
ctx context.Context
prevJobContext context.Context
prevJobCancel context.CancelFunc
eventCh chan interface{}
errorc chan error
wg sync.WaitGroup
stratum *stratumClient
customCmd string
}
func newBridge(stratumServer, stratumPass, coinid string) *bridge {
s := &bridge{
ctx: context.Background(),
eventCh: make(chan interface{}),
errorc: make(chan error),
}
if len(stratumServer) > 0 {
s.stratum = newStratumClient(stratumServer, stratumPass, coinid)
}
return s
}
func (b *bridge) start() {
if b.stratum != nil {
backoff := time.Second
for {
err := b.stratum.dial()
if err == nil {
break
}
log.Printf("WARN: stratum.dial() error: %s, retry in %s", err, backoff)
time.Sleep(backoff)
if backoff < 60*time.Second {
backoff += time.Second
}
}
}
for e := range b.eventCh {
switch e := e.(type) {
case *eventBlockConected:
b.handleFilteredBlockConnected(e)
default:
b.errorc <- fmt.Errorf("unknown event type: %T", e)
return
}
}
}
func (b *bridge) handleFilteredBlockConnected(e *eventBlockConected) {
if !*quiet {
log.Printf("Block connected: %s (%d) %v", e.header.BlockHash(), e.height, e.header.Timestamp)
}
hash := e.header.BlockHash().String()
height := e.height
// Cancel jobs on previous block. It's safe if they are already done.
if b.prevJobContext != nil {
select {
case <-b.prevJobContext.Done():
log.Printf("prev one canceled")
default:
b.prevJobCancel()
}
}
// Wait until all previous jobs are done or canceled.
b.wg.Wait()
// Create and save cancelable subcontext for new jobs.
ctx, cancel := context.WithCancel(b.ctx)
b.prevJobContext, b.prevJobCancel = ctx, cancel
if len(b.customCmd) > 0 {
go b.execCustomCommand(ctx, hash, height)
}
// Send stratum update block message
if b.stratum != nil {
go b.stratumUpdateBlock(ctx, hash, height)
}
}
func (s *bridge) stratumUpdateBlock(ctx context.Context, hash string, height int32) {
s.wg.Add(1)
defer s.wg.Done()
backoff := time.Second
retry := func(err error) {
if backoff < 60*time.Second {
backoff += time.Second
}
log.Printf("WARN: stratum.send() on block %d error: %s", height, err)
time.Sleep(backoff)
s.stratum.dial()
}
msg := stratumUpdateBlockMsg(*stratumPass, *coinid, hash)
for {
switch err := s.stratum.send(ctx, msg); {
case err == nil:
return
case errors.Is(err, context.Canceled):
log.Printf("INFO: stratum.send() on block %d: %s.", height, err)
return
case errors.Is(err, syscall.EPIPE):
errClose := s.stratum.conn.Close()
if errClose != nil {
log.Printf("WARN: stratum.conn.Close() on block %d: %s.", height, errClose)
}
retry(err)
case errors.Is(err, net.ErrClosed):
retry(err)
default:
retry(err)
}
}
}
func (s *bridge) execCustomCommand(ctx context.Context, hash string, height int32) {
s.wg.Add(1)
defer s.wg.Done()
cmd := strings.ReplaceAll(s.customCmd, "%s", hash)
err := doExecCustomCommand(ctx, cmd)
if err != nil {
log.Printf("ERROR: execCustomCommand on block %s(%d): %s", hash, height, err)
}
}
func doExecCustomCommand(ctx context.Context, cmd string) error {
strs := strings.Split(cmd, " ")
path, err := exec.LookPath(strs[0])
if errors.Is(err, exec.ErrDot) {
err = nil
}
if err != nil {
return err
}
c := exec.CommandContext(ctx, path, strs[1:]...)
c.Stdout = os.Stdout
return c.Run()
}

View file

@ -1,53 +0,0 @@
package main
import (
"io/ioutil"
"log"
"path/filepath"
"github.com/lbryio/lbcd/rpcclient"
)
func newLbcdClient(server, user, pass string, notls bool, adpt adapter) *rpcclient.Client {
ntfnHandlers := rpcclient.NotificationHandlers{
OnFilteredBlockConnected: adpt.onFilteredBlockConnected,
}
// Config lbcd RPC client with websockets.
connCfg := &rpcclient.ConnConfig{
Host: server,
Endpoint: "ws",
User: user,
Pass: pass,
DisableTLS: true,
}
if !notls {
cert, err := ioutil.ReadFile(filepath.Join(lbcdHomeDir, "rpc.cert"))
if err != nil {
log.Fatalf("can't read lbcd certificate: %s", err)
}
connCfg.Certificates = cert
connCfg.DisableTLS = false
}
client, err := rpcclient.New(connCfg, &ntfnHandlers)
if err != nil {
log.Fatalf("can't create rpc client: %s", err)
}
// Register for block connect and disconnect notifications.
if err = client.NotifyBlocks(); err != nil {
log.Fatalf("can't register block notification: %s", err)
}
// Get the current block count.
blockCount, err := client.GetBlockCount()
if err != nil {
log.Fatalf("can't get block count: %s", err)
}
log.Printf("Current block count: %d", blockCount)
return client
}

View file

@ -1,63 +1,136 @@
// Copyright (c) 2014-2017 The btcsuite developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package main package main
import ( import (
"errors"
"flag" "flag"
"fmt"
"io/ioutil"
"log" "log"
"net"
"os"
"os/exec" "os/exec"
"path/filepath" "path/filepath"
"strings" "strings"
"github.com/lbryio/lbcd/rpcclient"
"github.com/lbryio/lbcd/wire"
"github.com/lbryio/lbcutil" "github.com/lbryio/lbcutil"
) )
var (
lbcdHomeDir = lbcutil.AppDataDir("lbcd", false)
defaultCert = filepath.Join(lbcdHomeDir, "rpc.cert")
)
var ( var (
coinid = flag.String("coinid", "1425", "Coin ID") coinid = flag.String("coinid", "1425", "Coin ID")
stratumServer = flag.String("stratum", "", "Stratum server") stratum = flag.String("stratum", "", "Stratum server")
stratumPass = flag.String("stratumpass", "", "Stratum server password") stratumPass = flag.String("stratumpass", "", "Stratum server password")
rpcserver = flag.String("rpcserver", "localhost:9245", "LBCD RPC server") rpcserver = flag.String("rpcserver", "localhost:9245", "LBCD RPC server")
rpcuser = flag.String("rpcuser", "rpcuser", "LBCD RPC username") rpcuser = flag.String("rpcuser", "rpcuser", "LBCD RPC username")
rpcpass = flag.String("rpcpass", "rpcpass", "LBCD RPC password") rpcpass = flag.String("rpcpass", "rpcpass", "LBCD RPC password")
rpccert = flag.String("rpccert", defaultCert, "LBCD RPC certificate")
notls = flag.Bool("notls", false, "Connect to LBCD with TLS disabled") notls = flag.Bool("notls", false, "Connect to LBCD with TLS disabled")
run = flag.String("run", "", "Run custom shell command") run = flag.String("run", "", "Run custom shell command")
quiet = flag.Bool("quiet", false, "Do not print logs") quiet = flag.Bool("quiet", false, "Do not print logs")
) )
func onFilteredBlockConnected(height int32, header *wire.BlockHeader, txns []*lbcutil.Tx) {
blockHash := header.BlockHash().String()
if !*quiet {
log.Printf("Block connected: %v (%d) %v", blockHash, height, header.Timestamp)
}
if cmd := *run; len(cmd) != 0 {
cmd = strings.ReplaceAll(cmd, "%s", blockHash)
err := execCustomCommand(cmd)
if err != nil {
log.Printf("ERROR: execCustomCommand: %s", err)
}
}
if len(*stratum) > 0 && len(*stratumPass) > 0 {
err := stratumUpdateBlock(*stratum, *stratumPass, *coinid, blockHash)
if err != nil {
log.Printf("ERROR: stratumUpdateBlock: %s", err)
}
}
}
func execCustomCommand(cmd string) error {
strs := strings.Split(cmd, " ")
path, err := exec.LookPath(strs[0])
if errors.Is(err, exec.ErrDot) {
err = nil
}
if err != nil {
return err
}
c := exec.Command(path, strs[1:]...)
c.Stdout = os.Stdout
return c.Run()
}
func stratumUpdateBlock(stratum, stratumPass, coinid, blockHash string) error {
addr, err := net.ResolveTCPAddr("tcp", stratum)
if err != nil {
return fmt.Errorf("can't resolve addr: %w", err)
}
conn, err := net.DialTCP("tcp", nil, addr)
if err != nil {
return fmt.Errorf("can't dial tcp: %w", err)
}
defer conn.Close()
msg := fmt.Sprintf(`{"id":1,"method":"mining.update_block","params":[%q,%s,%q]}`,
stratumPass, coinid, blockHash)
_, err = conn.Write([]byte(msg))
if err != nil {
return fmt.Errorf("can't write message: %w", err)
}
return nil
}
func main() { func main() {
flag.Parse() flag.Parse()
// Setup notification handler ntfnHandlers := rpcclient.NotificationHandlers{
b := newBridge(*stratumServer, *stratumPass, *coinid) OnFilteredBlockConnected: onFilteredBlockConnected,
}
if len(*run) > 0 { // Connect to local lbcd RPC server using websockets.
// Check if ccommand exists. lbcdHomeDir := lbcutil.AppDataDir("lbcd", false)
strs := strings.Split(*run, " ") certs, err := ioutil.ReadFile(filepath.Join(lbcdHomeDir, "rpc.cert"))
cmd := strs[0]
_, err := exec.LookPath(cmd)
if err != nil { if err != nil {
log.Fatalf("ERROR: %s not found: %s", cmd, err) log.Fatalf("can't read lbcd certificate: %s", err)
} }
b.customCmd = *run connCfg := &rpcclient.ConnConfig{
Host: *rpcserver,
Endpoint: "ws",
User: *rpcuser,
Pass: *rpcpass,
Certificates: certs,
DisableTLS: *notls,
}
client, err := rpcclient.New(connCfg, &ntfnHandlers)
if err != nil {
log.Fatalf("can't create rpc client: %s", err)
} }
// Start the eventt handler. // Register for block connect and disconnect notifications.
go b.start() if err = client.NotifyBlocks(); err != nil {
log.Fatalf("can't register block notification: %s", err)
}
log.Printf("NotifyBlocks: Registration Complete")
// Adaptater receives lbcd notifications, and emit events. // Get the current block count.
adpt := adapter{b} blockCount, err := client.GetBlockCount()
if err != nil {
client := newLbcdClient(*rpcserver, *rpcuser, *rpcpass, *notls, adpt) log.Fatalf("can't get block count: %s", err)
}
go func() { log.Printf("Block count: %d", blockCount)
err := <-b.errorc
log.Fatalf("ERROR: %s", err)
client.Shutdown()
}()
// Wait until the client either shuts down gracefully (or the user // Wait until the client either shuts down gracefully (or the user
// terminates the process with Ctrl+C). // terminates the process with Ctrl+C).

View file

@ -1,56 +0,0 @@
package main
import (
"context"
"fmt"
"net"
)
type stratumClient struct {
server string
passwd string
coinid string
conn *net.TCPConn
}
func newStratumClient(server, passwd, coinid string) *stratumClient {
return &stratumClient{
server: server,
}
}
func (c *stratumClient) dial() error {
addr, err := net.ResolveTCPAddr("tcp", c.server)
if err != nil {
return fmt.Errorf("resolve tcp addr: %w", err)
}
conn, err := net.DialTCP("tcp", nil, addr)
if err != nil {
return fmt.Errorf("dial tcp: %w", err)
}
c.conn = conn
return nil
}
func (c *stratumClient) send(ctx context.Context, msg string) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
_, err := c.conn.Write([]byte(msg))
return err
}
func stratumUpdateBlockMsg(stratumPass, coinid, blockHash string) string {
return fmt.Sprintf(`{"id":1,"method":"mining.update_block","params":[%q,%s,%q]}`,
stratumPass, coinid, blockHash)
}

View file

@ -774,8 +774,7 @@ func (c *Client) handleSendPostMessage(jReq *jsonRequest) {
tries := 10 tries := 10
for i := 0; tries == 0 || i < tries; i++ { for i := 0; tries == 0 || i < tries; i++ {
bodyReader := bytes.NewReader(jReq.marshalledJSON) bodyReader := bytes.NewReader(jReq.marshalledJSON)
var httpReq *http.Request httpReq, err := http.NewRequest("POST", url, bodyReader)
httpReq, err = http.NewRequest("POST", url, bodyReader)
if err != nil { if err != nil {
jReq.responseChan <- &Response{result: nil, err: err} jReq.responseChan <- &Response{result: nil, err: err}
return return
@ -787,8 +786,7 @@ func (c *Client) handleSendPostMessage(jReq *jsonRequest) {
} }
// Configure basic access authorization. // Configure basic access authorization.
var user, pass string user, pass, err := c.config.getAuth()
user, pass, err = c.config.getAuth()
if err != nil { if err != nil {
jReq.responseChan <- &Response{result: nil, err: err} jReq.responseChan <- &Response{result: nil, err: err}
return return

View file

@ -2062,14 +2062,14 @@ func (r FutureRescanBlockchainResult) Receive() (*btcjson.RescanBlockchainResult
// returned instance. // returned instance.
// //
// See RescanBlockchain for the blocking version and more details. // See RescanBlockchain for the blocking version and more details.
func (c *Client) RescanBlockchainAsync(startHeight *int32, stopHeight *int32) FutureRescanBlockchainResult { func (c *Client) RescanBlockchainAsync(startHeight *int64, stopHeight *int64) FutureRescanBlockchainResult {
cmd := btcjson.NewRescanBlockchainCmd(startHeight, stopHeight) cmd := btcjson.NewRescanBlockchainCmd(startHeight, stopHeight)
return c.SendCmd(cmd) return c.SendCmd(cmd)
} }
// RescanBlockchain rescans the local blockchain for wallet related // RescanBlockchain rescans the local blockchain for wallet related
// transactions from the startHeight to the the inclusive stopHeight. // transactions from the startHeight to the the inclusive stopHeight.
func (c *Client) RescanBlockchain(startHeight *int32, stopHeight *int32) (*btcjson.RescanBlockchainResult, error) { func (c *Client) RescanBlockchain(startHeight *int64, stopHeight *int64) (*btcjson.RescanBlockchainResult, error) {
return c.RescanBlockchainAsync(startHeight, stopHeight).Receive() return c.RescanBlockchainAsync(startHeight, stopHeight).Receive()
} }

View file

@ -206,7 +206,7 @@ func StripClaimScriptPrefix(script []byte) []byte {
return script[cs.Size:] return script[cs.Size:]
} }
const illegalChars = "=&#:$%?/;\\\b\n\t\r\x00" const illegalChars = "=&#:*$%?/;\\\b\n\t\r\x00"
func AllClaimsAreSane(script []byte, enforceSoftFork bool) error { func AllClaimsAreSane(script []byte, enforceSoftFork bool) error {
cs, err := ExtractClaimScript(script) cs, err := ExtractClaimScript(script)