Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions api/types/types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package types

import "time"
import (
"database/sql"
"time"
)

type NodeInfo struct {
ID int
Expand All @@ -11,9 +14,9 @@ type NodeInfo struct {
LastContact time.Time
Unschedulable bool

Name string
StartupTime time.Time
Tasks string
Layers string
Miners string
Name sql.NullString // Can be NULL from harmony_machine_details
StartupTime sql.NullTime // Can be NULL from harmony_machine_details
Tasks sql.NullString // Can be NULL from harmony_machine_details
Layers sql.NullString // Can be NULL from harmony_machine_details
Miners sql.NullString // Can be NULL from harmony_machine_details
}
82 changes: 71 additions & 11 deletions build/openrpc/curio.json
Original file line number Diff line number Diff line change
Expand Up @@ -423,11 +423,26 @@
"HostPort": "string value",
"LastContact": "0001-01-01T00:00:00Z",
"Unschedulable": true,
"Name": "string value",
"StartupTime": "0001-01-01T00:00:00Z",
"Tasks": "string value",
"Layers": "string value",
"Miners": "string value"
"Name": {
"String": "string value",
"Valid": true
},
"StartupTime": {
"Time": "0001-01-01T00:00:00Z",
"Valid": true
},
"Tasks": {
"String": "string value",
"Valid": true
},
"Layers": {
"String": "string value",
"Valid": true
},
"Miners": {
"String": "string value",
"Valid": true
}
}
],
"additionalProperties": false,
Expand All @@ -451,24 +466,69 @@
"type": "string"
},
"Layers": {
"type": "string"
"additionalProperties": false,
"properties": {
"String": {
"type": "string"
},
"Valid": {
"type": "boolean"
}
},
"type": "object"
},
"Miners": {
"type": "string"
"additionalProperties": false,
"properties": {
"String": {
"type": "string"
},
"Valid": {
"type": "boolean"
}
},
"type": "object"
},
"Name": {
"type": "string"
"additionalProperties": false,
"properties": {
"String": {
"type": "string"
},
"Valid": {
"type": "boolean"
}
},
"type": "object"
},
"RAM": {
"title": "number",
"type": "number"
},
"StartupTime": {
"format": "date-time",
"type": "string"
"additionalProperties": false,
"properties": {
"Time": {
"format": "date-time",
"type": "string"
},
"Valid": {
"type": "boolean"
}
},
"type": "object"
},
"Tasks": {
"type": "string"
"additionalProperties": false,
"properties": {
"String": {
"type": "string"
},
"Valid": {
"type": "boolean"
}
},
"type": "object"
},
"Unschedulable": {
"type": "boolean"
Expand Down
28 changes: 23 additions & 5 deletions cmd/curio/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,35 @@ var infoCmd = &cli.Command{
}
fmt.Printf("Node Info:\n")
fmt.Printf("ID: %d\n", info.ID)
fmt.Printf("Name: %s\n", info.Name)
if info.Name.Valid {
fmt.Printf("Name: %s\n", info.Name.String)
}
fmt.Printf("CPU: %d\n", info.CPU)
fmt.Printf("RAM: %s\n", humanize.Bytes(uint64(info.RAM)))
fmt.Printf("GPU: %.2f\n", info.GPU)
fmt.Printf("Schedulable: %t\n", !info.Unschedulable)
fmt.Printf("HostPort: %s\n", info.HostPort)
fmt.Printf("Tasks: %s\n", info.Tasks)
fmt.Printf("Layers: %s\n", info.Layers)
fmt.Printf("Miners: %s\n", info.Miners)
if info.Tasks.Valid {
fmt.Printf("Tasks: %s\n", info.Tasks.String)
} else {
fmt.Printf("Tasks: None\n")
}
if info.Layers.Valid {
fmt.Printf("Layers: %s\n", info.Layers.String)
} else {
fmt.Printf("Layers: None\n")
}
if info.Miners.Valid {
fmt.Printf("Miners: %s\n", info.Miners.String)
} else {
fmt.Printf("Miners: None\n")
}
fmt.Printf("LastContact: %s\n", info.LastContact)
fmt.Printf("StartupTime: %s\n", info.StartupTime)
if info.StartupTime.Valid {
fmt.Printf("StartupTime: %s\n", info.StartupTime.Time)
} else {
fmt.Printf("StartupTime: N/A\n")
}
return nil
},
}
6 changes: 5 additions & 1 deletion deps/stats/wallet_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,14 @@ func attoToNano(atto types.BigInt) int64 {

func StartWalletExporter(ctx context.Context, db *harmonydb.DB, api api.FullNode, spIDs []address.Address) {
go func() {
ticker := time.NewTicker(WalletExporterInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
case <-time.After(WalletExporterInterval):
return
case <-ticker.C:
walletExporterCycle(ctx, db, api, spIDs)
}
}
Expand Down
25 changes: 20 additions & 5 deletions documentation/en/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,26 @@ Response:
"HostPort": "string value",
"LastContact": "0001-01-01T00:00:00Z",
"Unschedulable": true,
"Name": "string value",
"StartupTime": "0001-01-01T00:00:00Z",
"Tasks": "string value",
"Layers": "string value",
"Miners": "string value"
"Name": {
"String": "string value",
"Valid": true
},
"StartupTime": {
"Time": "0001-01-01T00:00:00Z",
"Valid": true
},
"Tasks": {
"String": "string value",
"Valid": true
},
"Layers": {
"String": "string value",
"Valid": true
},
"Miners": {
"String": "string value",
"Valid": true
}
}
```

Expand Down
9 changes: 7 additions & 2 deletions harmony/harmonytask/harmonytask.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,15 +288,19 @@ func (e *TaskEngine) GracefullyTerminate() {

func (e *TaskEngine) poller() {
nextWait := POLL_NEXT_DURATION
timer := time.NewTimer(nextWait)
defer timer.Stop()

for {
stats.Record(context.Background(), TaskMeasures.PollerIterations.M(1))

select {
case <-time.After(nextWait): // Find work periodically
case <-timer.C: // Find work periodically
nextWait = POLL_DURATION
timer.Reset(nextWait)
case <-e.ctx.Done(): ///////////////////// Graceful exit
return
}
nextWait = POLL_DURATION

// Check if the machine is schedulable
schedulable, err := e.checkNodeFlags()
Expand All @@ -310,6 +314,7 @@ func (e *TaskEngine) poller() {
accepted := e.pollerTryAllWork(schedulable)
if accepted {
nextWait = POLL_NEXT_DURATION
timer.Reset(nextWait)
}

if !schedulable {
Expand Down
15 changes: 8 additions & 7 deletions lib/dealdata/dealdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dealdata

import (
"context"
"database/sql"
"encoding/json"
"io"
"net/http"
Expand Down Expand Up @@ -33,9 +34,9 @@ type dealMetadata struct {
PieceCID string `db:"piece_cid"`
PieceSize int64 `db:"piece_size"`

DataUrl *string `db:"data_url"`
DataHeaders []byte `db:"data_headers"`
DataRawSize *int64 `db:"data_raw_size"`
DataUrl sql.NullString `db:"data_url"`
DataHeaders []byte `db:"data_headers"`
DataRawSize sql.NullInt64 `db:"data_raw_size"`

DataDelOnFinalize bool `db:"data_delete_on_finalize"`
}
Expand Down Expand Up @@ -173,8 +174,8 @@ func getDealMetadata(ctx context.Context, db *harmonydb.DB, sc *ffi.SealCalls, s

// make pieceReader
if !commDOnly {
if p.DataUrl != nil {
dataUrl := *p.DataUrl
if p.DataUrl.Valid {
dataUrl := p.DataUrl.String

goUrl, err := url.Parse(dataUrl)
if err != nil {
Expand Down Expand Up @@ -215,10 +216,10 @@ func getDealMetadata(ctx context.Context, db *harmonydb.DB, sc *ffi.SealCalls, s

closers = append(closers, pr)

reader, _ := padreader.New(pr, uint64(*p.DataRawSize))
reader, _ := padreader.New(pr, uint64(p.DataRawSize.Int64))
pieceReaders = append(pieceReaders, reader)
} else {
reader, _ := padreader.New(NewUrlReader(nil, dataUrl, hdrs, *p.DataRawSize), uint64(*p.DataRawSize))
reader, _ := padreader.New(NewUrlReader(nil, dataUrl, hdrs, p.DataRawSize.Int64), uint64(p.DataRawSize.Int64))
pieceReaders = append(pieceReaders, reader)
}

Expand Down
1 change: 1 addition & 0 deletions lib/ffi/cunative/decode_snap.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ func workerSnap(wg *sync.WaitGroup, jobs <-chan jobSnap, results chan<- resultSn

pool.Put(j.rbuf)
pool.Put(j.kbuf)
pool.Put(rhoInvsBytes) // Return rhoInvsBytes to pool

results <- resultSnap{obuf, j.size, j.chunkID}
}
Expand Down
27 changes: 18 additions & 9 deletions lib/paths/db_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ func (dbi *DBIndex) StorageList(ctx context.Context) (map[storiface.ID][]storifa
}

// skip sector info for storage paths with no sectors
if !entry.MinerId.Valid {
// All sector_location fields must be valid (they come from LEFT JOIN)
if !entry.MinerId.Valid || !entry.SectorNum.Valid || !entry.SectorFiletype.Valid {
continue
}

Expand Down Expand Up @@ -591,8 +592,6 @@ func (dbi *DBIndex) StorageFindSector(ctx context.Context, sector abi.SectorID,

func (dbi *DBIndex) findSectorUncached(ctx context.Context, s abi.SectorID, ft storiface.SectorFileType, ssize abi.SectorSize, allowFetch bool) ([]storiface.SectorStorageInfo, error) {

var result []storiface.SectorStorageInfo

allowList := make(map[string]struct{})
storageWithSector := map[string]bool{}

Expand Down Expand Up @@ -638,11 +637,15 @@ func (dbi *DBIndex) findSectorUncached(ctx context.Context, s abi.SectorID, ft s
return nil, xerrors.Errorf("Finding sector storage from DB fails with err: %w", err)
}

result := make([]storiface.SectorStorageInfo, 0, len(rows))

for _, row := range rows {

// Parse all urls
var urls, burls []string
for _, u := range splitString(row.Urls) {
splitUrls := splitString(row.Urls)
urls := make([]string, 0, len(splitUrls))
burls := make([]string, 0, len(splitUrls))
for _, u := range splitUrls {
rl, err := url.Parse(u)
if err != nil {
return nil, xerrors.Errorf("failed to parse url: %w", err)
Expand Down Expand Up @@ -762,8 +765,10 @@ func (dbi *DBIndex) findSectorUncached(ctx context.Context, s abi.SectorID, ft s
}
}

var urls, burls []string
for _, u := range splitString(row.Urls) {
splitUrls := splitString(row.Urls)
urls := make([]string, 0, len(splitUrls))
burls := make([]string, 0, len(splitUrls))
for _, u := range splitUrls {
rl, err := url.Parse(u)
if err != nil {
return nil, xerrors.Errorf("failed to parse url: %w", err)
Expand Down Expand Up @@ -893,7 +898,7 @@ func (dbi *DBIndex) StorageBestAlloc(ctx context.Context, allocate storiface.Sec
return nil, xerrors.Errorf("Querying for best storage sectors fails with err %w: ", err)
}

var result []storiface.StorageInfo
result := make([]storiface.StorageInfo, 0, len(rows))

for _, row := range rows {
// Matching with 0 as a workaround to avoid having minerID
Expand Down Expand Up @@ -1090,6 +1095,9 @@ func (dbi *DBIndex) StorageLock(ctx context.Context, sector abi.SectorID, read s
lockUuid := uuid.New()

// retry with exponential backoff and block until lock is acquired
timer := time.NewTimer(time.Duration(waitTime) * time.Second)
defer timer.Stop()

for {
locked, err := dbi.lock(ctx, sector, read, write, lockUuid)
// if err is not nil and is not because we cannot acquire lock, retry
Expand All @@ -1105,10 +1113,11 @@ func (dbi *DBIndex) StorageLock(ctx context.Context, sector abi.SectorID, read s
}

select {
case <-time.After(time.Duration(waitTime) * time.Second):
case <-timer.C:
if waitTime < maxWaitTime {
waitTime *= 2
}
timer.Reset(time.Duration(waitTime) * time.Second)
case <-ctx.Done():
return ctx.Err()
}
Expand Down
Loading