Skip to content

Commit 3a3c46a

Browse files
committed
use SubscribeTransactionReceipts
1 parent a463b32 commit 3a3c46a

File tree

4 files changed

+130
-77
lines changed

4 files changed

+130
-77
lines changed

internal/ethapi/api.go

Lines changed: 28 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1652,6 +1652,11 @@ func (api *TransactionAPI) SendRawTransaction(ctx context.Context, input hexutil
16521652
return SubmitTransaction(ctx, api.b, tx)
16531653
}
16541654

1655+
type ReceiptWithTx struct {
1656+
Receipt *types.Receipt
1657+
Tx *types.Transaction
1658+
}
1659+
16551660
// SendRawTransactionSync will add the signed transaction to the transaction pool
16561661
// and wait until the transaction has been included in a block and return the receipt, or the timeout.
16571662
func (api *TransactionAPI) SendRawTransactionSync(ctx context.Context, input hexutil.Bytes, timeoutMs *hexutil.Uint64) (map[string]interface{}, error) {
@@ -1685,61 +1690,17 @@ func (api *TransactionAPI) SendRawTransactionSync(ctx context.Context, input hex
16851690
return r, nil
16861691
}
16871692

1688-
// Subscribe to new block events and check the receipt on each new block.
1689-
heads := make(chan core.ChainHeadEvent, 16)
1690-
sub := api.b.SubscribeChainHeadEvent(heads)
1693+
// Subscribe to receipt stream (filtered to this tx)
1694+
rcpts := make(chan []*ReceiptWithTx, 1)
1695+
sub := api.b.SubscribeTransactionReceipts([]common.Hash{hash}, rcpts)
16911696
defer sub.Unsubscribe()
1692-
subErrCh := sub.Err()
1693-
1694-
// calculate poll/settle intervals
1695-
const (
1696-
pollFraction = 20
1697-
pollMin = 25 * time.Millisecond
1698-
pollMax = 500 * time.Millisecond
1699-
)
1700-
settleInterval := timeout / pollFraction
1701-
if settleInterval < pollMin {
1702-
settleInterval = pollMin
1703-
} else if settleInterval > pollMax {
1704-
settleInterval = pollMax
1705-
}
17061697

1707-
// Settle: short delay to bridge receipt-indexing lag after a new head.
1708-
// resetSettle re-arms a single timer safely (stop+drain+reset).
1709-
// On head: check once immediately, then reset; on timer: re-check; repeat until deadline.
1710-
var (
1711-
settle *time.Timer
1712-
settleCh <-chan time.Time
1713-
)
1714-
resetSettle := func(d time.Duration) {
1715-
if settle == nil {
1716-
settle = time.NewTimer(d)
1717-
settleCh = settle.C
1718-
return
1719-
}
1720-
if !settle.Stop() {
1721-
select {
1722-
case <-settle.C:
1723-
default:
1724-
}
1725-
}
1726-
settle.Reset(d)
1727-
}
1728-
defer func() {
1729-
if settle != nil && !settle.Stop() {
1730-
select {
1731-
case <-settle.C:
1732-
default:
1733-
}
1734-
}
1735-
}()
1736-
1737-
// Start a settle cycle immediately to cover a missed-head race.
1738-
resetSettle(settleInterval)
1698+
subErrCh := sub.Err()
17391699

17401700
for {
17411701
select {
17421702
case <-receiptCtx.Done():
1703+
// Upstream cancellation -> bubble it; otherwise emit our timeout error
17431704
if err := ctx.Err(); err != nil {
17441705
return nil, err
17451706
}
@@ -1750,24 +1711,30 @@ func (api *TransactionAPI) SendRawTransactionSync(ctx context.Context, input hex
17501711

17511712
case err, ok := <-subErrCh:
17521713
if !ok || err == nil {
1753-
// subscription closed; disable this case and rely on settle timer
1714+
// subscription closed; disable this case
17541715
subErrCh = nil
17551716
continue
17561717
}
17571718
return nil, err
17581719

1759-
case <-heads:
1760-
// Immediate re-check on new head, then grace to bridge indexing lag.
1761-
if r, err := api.GetTransactionReceipt(receiptCtx, hash); err == nil && r != nil {
1762-
return r, nil
1763-
}
1764-
resetSettle(settleInterval)
1765-
1766-
case <-settleCh:
1767-
if r, err := api.GetTransactionReceipt(receiptCtx, hash); err == nil && r != nil {
1768-
return r, nil
1720+
case batch := <-rcpts:
1721+
for _, rwt := range batch {
1722+
if rwt == nil || rwt.Receipt == nil || rwt.Receipt.TxHash != hash {
1723+
continue
1724+
}
1725+
1726+
if rwt.Receipt.BlockNumber != nil && rwt.Receipt.BlockHash != (common.Hash{}) {
1727+
return MarshalReceipt(
1728+
rwt.Receipt,
1729+
rwt.Receipt.BlockHash,
1730+
rwt.Receipt.BlockNumber.Uint64(),
1731+
api.signer,
1732+
rwt.Tx,
1733+
int(rwt.Receipt.TransactionIndex),
1734+
), nil
1735+
}
1736+
return api.GetTransactionReceipt(receiptCtx, hash)
17691737
}
1770-
resetSettle(settleInterval)
17711738
}
17721739
}
17731740
}

internal/ethapi/api_test.go

Lines changed: 98 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -442,13 +442,19 @@ type testBackend struct {
442442
pendingReceipts types.Receipts
443443

444444
// test-only fields for SendRawTransactionSync
445-
autoMine bool
446-
mined bool
445+
receiptsFeed *event.Feed
446+
headFeed *event.Feed // keep if other tests use it; otherwise remove
447+
autoMine bool
448+
449+
sentTx *types.Transaction
450+
sentTxHash common.Hash
451+
447452
syncDefaultTO time.Duration
448453
syncMaxTO time.Duration
449-
sentTx *types.Transaction
450-
sentTxHash common.Hash
451-
headFeed *event.Feed
454+
}
455+
456+
func fakeBlockHash(txh common.Hash) common.Hash {
457+
return crypto.Keccak256Hash([]byte("testblock"), txh.Bytes())
452458
}
453459

454460
func newTestBackend(t *testing.T, n int, gspec *core.Genesis, engine consensus.Engine, generator func(i int, b *core.BlockGen)) *testBackend {
@@ -476,6 +482,7 @@ func newTestBackend(t *testing.T, n int, gspec *core.Genesis, engine consensus.E
476482
pending: blocks[n],
477483
pendingReceipts: receipts[n],
478484
headFeed: new(event.Feed),
485+
receiptsFeed: new(event.Feed),
479486
}
480487
return backend
481488
}
@@ -605,23 +612,38 @@ func (b testBackend) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) even
605612
func (b *testBackend) SendTx(ctx context.Context, tx *types.Transaction) error {
606613
b.sentTx = tx
607614
b.sentTxHash = tx.Hash()
608-
if b.autoMine {
609-
b.mined = true
615+
616+
if b.autoMine && b.receiptsFeed != nil {
617+
num := b.chain.CurrentHeader().Number.Uint64() + 1
618+
bh := fakeBlockHash(tx.Hash())
619+
620+
r := &types.Receipt{
621+
TxHash: tx.Hash(),
622+
Status: types.ReceiptStatusSuccessful,
623+
BlockHash: bh,
624+
BlockNumber: new(big.Int).SetUint64(num),
625+
TransactionIndex: 0,
626+
CumulativeGasUsed: 21000,
627+
GasUsed: 21000,
628+
}
629+
b.receiptsFeed.Send([]*ReceiptWithTx{{Receipt: r, Tx: tx}})
610630
}
611631
return nil
612632
}
613-
func (b testBackend) GetCanonicalTransaction(txHash common.Hash) (bool, *types.Transaction, common.Hash, uint64, uint64) {
614-
// in-memory fast path for tests
615-
if b.mined && txHash == b.sentTxHash {
616-
return true, b.sentTx, common.HexToHash("0x01"), 1, 0
633+
func (b *testBackend) GetCanonicalTransaction(txHash common.Hash) (bool, *types.Transaction, common.Hash, uint64, uint64) {
634+
// Treat the auto-mined tx as canonically placed at head+1.
635+
if b.autoMine && txHash == b.sentTxHash {
636+
num := b.chain.CurrentHeader().Number.Uint64() + 1
637+
return true, b.sentTx, fakeBlockHash(txHash), num, 0
617638
}
618-
// fallback to existing DB-backed path
619639
tx, blockHash, blockNumber, index := rawdb.ReadCanonicalTransaction(b.db, txHash)
620640
return tx != nil, tx, blockHash, blockNumber, index
621641
}
622-
func (b testBackend) GetCanonicalReceipt(tx *types.Transaction, blockHash common.Hash, blockNumber, blockIndex uint64) (*types.Receipt, error) {
623-
// In-memory fast path used by tests
624-
if b.mined && tx != nil && tx.Hash() == b.sentTxHash && blockHash == common.HexToHash("0x01") && blockNumber == 1 && blockIndex == 0 {
642+
func (b *testBackend) GetCanonicalReceipt(tx *types.Transaction, blockHash common.Hash, blockNumber, blockIndex uint64) (*types.Receipt, error) {
643+
if b.autoMine && tx != nil && tx.Hash() == b.sentTxHash &&
644+
blockHash == fakeBlockHash(tx.Hash()) &&
645+
blockIndex == 0 &&
646+
blockNumber == b.chain.CurrentHeader().Number.Uint64()+1 {
625647
return &types.Receipt{
626648
Type: tx.Type(),
627649
Status: types.ReceiptStatusSuccessful,
@@ -631,9 +653,9 @@ func (b testBackend) GetCanonicalReceipt(tx *types.Transaction, blockHash common
631653
BlockHash: blockHash,
632654
BlockNumber: new(big.Int).SetUint64(blockNumber),
633655
TransactionIndex: 0,
656+
TxHash: tx.Hash(),
634657
}, nil
635658
}
636-
// Fallback: use the chain's canonical receipt (DB-backed)
637659
return b.chain.GetCanonicalReceipt(tx, blockHash, blockNumber, blockIndex)
638660
}
639661
func (b testBackend) TxIndexDone() bool {
@@ -3961,6 +3983,66 @@ func makeSignedRaw(t *testing.T, api *TransactionAPI, from, to common.Address, v
39613983
func makeSelfSignedRaw(t *testing.T, api *TransactionAPI, addr common.Address) (hexutil.Bytes, *types.Transaction) {
39623984
return makeSignedRaw(t, api, addr, addr, big.NewInt(0))
39633985
}
3986+
3987+
func (b *testBackend) SubscribeTransactionReceipts(txHashes []common.Hash, ch chan<- []*ReceiptWithTx) event.Subscription {
3988+
// If no feed is wired for this test, return a no-op subscription
3989+
if b.receiptsFeed == nil {
3990+
return event.NewSubscription(func(quit <-chan struct{}) error {
3991+
<-quit
3992+
return nil
3993+
})
3994+
}
3995+
3996+
// No filter => forward batches directly
3997+
if len(txHashes) == 0 {
3998+
return b.receiptsFeed.Subscribe(ch)
3999+
}
4000+
4001+
// Filtered: wrap the underlying feed and only forward matching receipts
4002+
in := make(chan []*ReceiptWithTx, 1)
4003+
sub := b.receiptsFeed.Subscribe(in)
4004+
4005+
// Build a hash set for quick filtering
4006+
wanted := make(map[common.Hash]struct{}, len(txHashes))
4007+
for _, h := range txHashes {
4008+
wanted[h] = struct{}{}
4009+
}
4010+
4011+
return event.NewSubscription(func(quit <-chan struct{}) error {
4012+
defer sub.Unsubscribe()
4013+
for {
4014+
select {
4015+
case batch, ok := <-in:
4016+
if !ok {
4017+
return nil
4018+
}
4019+
var out []*ReceiptWithTx
4020+
for _, r := range batch {
4021+
if r != nil && r.Receipt != nil {
4022+
if _, ok := wanted[r.Receipt.TxHash]; ok {
4023+
out = append(out, r)
4024+
}
4025+
}
4026+
}
4027+
if len(out) == 0 {
4028+
continue
4029+
}
4030+
select {
4031+
case ch <- out:
4032+
case <-quit:
4033+
return nil
4034+
}
4035+
case err, ok := <-sub.Err():
4036+
if !ok || err == nil {
4037+
return nil
4038+
}
4039+
return err
4040+
case <-quit:
4041+
return nil
4042+
}
4043+
}
4044+
})
4045+
}
39644046
func TestSendRawTransactionSync_Success(t *testing.T) {
39654047
t.Parallel()
39664048
genesis := &core.Genesis{

internal/ethapi/backend.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ type Backend interface {
9898
GetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error)
9999
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
100100
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
101+
SubscribeTransactionReceipts(txHashes []common.Hash, ch chan<- []*ReceiptWithTx) event.Subscription
101102

102103
CurrentView() *filtermaps.ChainView
103104
NewMatcherBackend() filtermaps.MatcherBackend

internal/ethapi/transaction_args_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,3 +411,6 @@ func (b *backendMock) CurrentView() *filtermaps.ChainView { return nil
411411
func (b *backendMock) NewMatcherBackend() filtermaps.MatcherBackend { return nil }
412412

413413
func (b *backendMock) HistoryPruningCutoff() uint64 { return 0 }
414+
func (b *backendMock) SubscribeTransactionReceipts(txHashes []common.Hash, ch chan<- []*ReceiptWithTx) event.Subscription {
415+
return nil
416+
}

0 commit comments

Comments
 (0)