Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
### Go ###
# IDE
.idea

# Binaries for programs and plugins
*.exe
*.exe~
Expand Down
28 changes: 23 additions & 5 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"log"
"os"
"strings"
"sync"

commandline "github.com/aws/amazon-ec2-instance-selector/v2/pkg/cli"
"github.com/aws/amazon-ec2-instance-selector/v2/pkg/selector"
Expand Down Expand Up @@ -185,11 +186,29 @@ Full docs can be found at github.com/aws/amazon-` + binName
flags[region] = sess.Config.Region

instanceSelector := selector.New(sess)
if _, ok := flags[pricePerHour]; ok {
if flags[usageClass] == nil || *flags[usageClass].(*string) == "on-demand" {
instanceSelector.EC2Pricing.HydrateOndemandCache()
outputFlag := cli.StringMe(flags[output])
if outputFlag != nil && *outputFlag == tableWideOutput {
// If output type is `table-wide`, simply print both prices for better comparison,
// even if the actual filter is applied on any one of those based on usage class

// Save time by hydrating in parallel
wg := &sync.WaitGroup{}
wg.Add(2)
go func(waitGroup *sync.WaitGroup) {
defer waitGroup.Done()
_ = instanceSelector.EC2Pricing.HydrateOndemandCache()
}(wg)
go func(waitGroup *sync.WaitGroup) {
defer waitGroup.Done()
_ = instanceSelector.EC2Pricing.HydrateSpotCache(30)
}(wg)
wg.Wait()
} else if flags[pricePerHour] != nil {
// Else, if price filters are applied, only hydrate the respective cache as we don't have to print the prices
if flags[usageClass] == nil || *cli.StringMe(flags[usageClass]) == "on-demand" {
_ = instanceSelector.EC2Pricing.HydrateOndemandCache()
} else {
instanceSelector.EC2Pricing.HydrateSpotCache(30)
_ = instanceSelector.EC2Pricing.HydrateSpotCache(30)
}
}

Expand Down Expand Up @@ -249,7 +268,6 @@ Full docs can be found at github.com/aws/amazon-` + binName
}
}

outputFlag := cli.StringMe(flags[output])
outputFn := getOutputFn(outputFlag, selector.InstanceTypesOutputFn(resultsOutputFn))

instanceTypes, itemsTruncated, err := instanceSelector.FilterWithOutput(filters, outputFn)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ require (
github.com/smartystreets/goconvey v1.6.4 // indirect
github.com/spf13/cobra v0.0.7
github.com/spf13/pflag v1.0.3
go.uber.org/multierr v1.1.0
gopkg.in/ini.v1 v1.57.0
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGr
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
Expand Down
137 changes: 93 additions & 44 deletions pkg/ec2pricing/ec2pricing.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ package ec2pricing

import (
"fmt"
"github.com/aws/aws-sdk-go/aws/endpoints"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you move these non-standard lib imports to the next stanza?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Will add that to my fmt rules for this project.

"github.com/aws/aws-sdk-go/service/lightsail"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is lightsail brought in here? I see it added to the main struct as well, but no use for it as far as I can tell.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad.

"github.com/aws/aws-sdk-go/service/lightsail/lightsailiface"
"go.uber.org/multierr"
"math"
"sort"
"strconv"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/endpoints"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
Expand All @@ -25,19 +28,26 @@ const (

// EC2Pricing is the public struct to interface with AWS pricing APIs
type EC2Pricing struct {
PricingClient pricingiface.PricingAPI
EC2Client ec2iface.EC2API
AWSSession *session.Session
cache map[string]float64
spotCache map[string]map[string][]spotPricingEntry
PricingClient pricingiface.PricingAPI
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To fix https://docs.aws.amazon.com/awsaccountbilling/latest/aboutv2/using-pelong.html#pe-endpoint , I believe we can just copy the session and change the region to always use us-east-1 just for the PricingClient. The filters already include the location. I didn't realize pricing was a global endpoint :) I can do this fix if you'd like.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I realized this. Will make the change and other changes.

EC2Client ec2iface.EC2API
LightsailClient lightsailiface.LightsailAPI
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove lightsail client

AWSSession *session.Session
onDemandCache map[string]float64
spotCache map[string]map[string][]spotPricingEntry
lastOnDemandCachedUTC *time.Time // Updated on successful cache write
lastSpotCachedUTC *time.Time // Updated on successful cache write
}

// EC2PricingIface is the EC2Pricing interface mainly used to mock out ec2pricing during testing
type EC2PricingIface interface {
GetOndemandInstanceTypeCost(instanceType string) (float64, error)
GetSpotInstanceTypeNDayAvgCost(instanceType string, availabilityZones []string, days int) (float64, error)
// Keep hydrate functions thread safe by keeping different write data points
// In simple words, make sure they don't write the same variable/file/row etc. which they don't (they have different cache maps)
HydrateOndemandCache() error
HydrateSpotCache(days int) error
LastOnDemandCachedUTC() *time.Time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you change to LastOnDemandCacheUTC removing the d. Same for the spot timestamp

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okie.

LastSpotCachedUTC() *time.Time
}

type spotPricingEntry struct {
Expand All @@ -48,12 +58,27 @@ type spotPricingEntry struct {
// New creates an instance of instance-selector EC2Pricing
func New(sess *session.Session) *EC2Pricing {
return &EC2Pricing{
PricingClient: pricing.New(sess),
EC2Client: ec2.New(sess),
AWSSession: sess,
PricingClient: pricing.New(sess),
EC2Client: ec2.New(sess),
LightsailClient: lightsail.New(sess),
AWSSession: sess,
lastOnDemandCachedUTC: nil,
lastSpotCachedUTC: nil,
}
}

// LastOnDemandCachedUTC returns the UTC timestamp when the onDemandCache was last refreshed
// Returns nil if the onDemandCache has not been initialized
func (p *EC2Pricing) LastOnDemandCachedUTC() *time.Time {
return p.lastOnDemandCachedUTC
}

// LastSpotCachedUTC returns the UTC timestamp when the spotCache was last refreshed
// Returns nil if the spotCache has not been initialized
func (p *EC2Pricing) LastSpotCachedUTC() *time.Time {
return p.lastSpotCachedUTC
}

// GetSpotInstanceTypeNDayAvgCost retrieves the spot price history for a given AZ from the past N days and averages the price
// Passing an empty list for availabilityZones will retrieve avg cost for all AZs in the current AWSSession's region
func (p *EC2Pricing) GetSpotInstanceTypeNDayAvgCost(instanceType string, availabilityZones []string, days int) (float64, error) {
Expand All @@ -66,28 +91,31 @@ func (p *EC2Pricing) GetSpotInstanceTypeNDayAvgCost(instanceType string, availab
EndTime: &endTime,
InstanceTypes: []*string{&instanceType},
}
zoneToPriceEntries := map[string][]spotPricingEntry{}
zoneToPriceEntries := make(map[string][]spotPricingEntry)

if _, ok := p.spotCache[instanceType]; !ok {
var processingErr error
err := p.EC2Client.DescribeSpotPriceHistoryPages(&spotPriceHistInput, func(dspho *ec2.DescribeSpotPriceHistoryOutput, b bool) bool {
errAPI := p.EC2Client.DescribeSpotPriceHistoryPages(&spotPriceHistInput, func(dspho *ec2.DescribeSpotPriceHistoryOutput, b bool) bool {
for _, history := range dspho.SpotPriceHistory {
var spotPrice float64
spotPrice, processingErr = strconv.ParseFloat(*history.SpotPrice, 64)
spotPrice, errParse := strconv.ParseFloat(*history.SpotPrice, 64)
if errParse != nil {
processingErr = multierr.Append(processingErr, errParse)
continue
}
zone := *history.AvailabilityZone

zoneToPriceEntries[zone] = append(zoneToPriceEntries[zone], spotPricingEntry{
Timestamp: *history.Timestamp,
SpotPrice: spotPrice,
})
}
return true
})
if err != nil {
return float64(0), err
if errAPI != nil {
return float64(-1), errAPI
}
if processingErr != nil {
return float64(0), processingErr
return float64(-1), processingErr
}
} else {
for zone, priceEntries := range p.spotCache[instanceType] {
Expand All @@ -112,7 +140,7 @@ func (p *EC2Pricing) GetSpotInstanceTypeNDayAvgCost(instanceType string, availab
aggregateZonePriceSum += p.calculateSpotAggregate(priceEntries)
}

return (aggregateZonePriceSum / float64(numOfZones)), nil
return aggregateZonePriceSum / float64(numOfZones), nil
}

func (p *EC2Pricing) calculateSpotAggregate(spotPriceEntries []spotPricingEntry) float64 {
Expand All @@ -133,11 +161,16 @@ func (p *EC2Pricing) calculateSpotAggregate(spotPriceEntries []spotPricingEntry)
duration := spotPriceEntries[int(math.Max(float64(i-1), 0))].Timestamp.Sub(entry.Timestamp).Minutes()
priceSum += duration * entry.SpotPrice
}
return (priceSum / totalDuration)
return priceSum / totalDuration
}

// GetOndemandInstanceTypeCost retrieves the on-demand hourly cost for the specified instance type
func (p *EC2Pricing) GetOndemandInstanceTypeCost(instanceType string) (float64, error) {
// Check cache first and return it if available
if price, ok := p.onDemandCache[instanceType]; ok {
return price, nil
}

regionDescription := p.getRegionForPricingAPI()
// TODO: mac.metal instances cannot be found with the below filters
productInput := pricing.GetProductsInput{
Expand All @@ -153,25 +186,27 @@ func (p *EC2Pricing) GetOndemandInstanceTypeCost(instanceType string) (float64,
},
}

// Check cache first and return it if available
if price, ok := p.cache[instanceType]; ok {
return price, nil
}

pricePerUnitInUSD := float64(-1)
err := p.PricingClient.GetProductsPages(&productInput, func(pricingOutput *pricing.GetProductsOutput, nextPage bool) bool {
var err error
var processingErr error
// FIXME Only works for us-east-1 and ap-south-1
// https://docs.aws.amazon.com/awsaccountbilling/latest/aboutv2/using-pelong.html#pe-endpoint
errAPI := p.PricingClient.GetProductsPages(&productInput, func(pricingOutput *pricing.GetProductsOutput, nextPage bool) bool {
var errParse error
for _, priceDoc := range pricingOutput.PriceList {
_, pricePerUnitInUSD, err = parseOndemandUnitPrice(priceDoc)
_, pricePerUnitInUSD, errParse = parseOndemandUnitPrice(priceDoc)
}
if err != nil {
if errParse != nil {
processingErr = multierr.Append(processingErr, errParse)
// keep going through pages if we can't parse the pricing doc
return true
}
return false
})
if err != nil {
return -1, err
if errAPI != nil {
return -1, errAPI
}
if processingErr != nil {
return -1, processingErr
}
return pricePerUnitInUSD, nil
}
Expand All @@ -181,7 +216,7 @@ func (p *EC2Pricing) GetOndemandInstanceTypeCost(instanceType string) (float64,
// There is no TTL on cache entries
// You'll only want to use this if you don't mind a long startup time (around 30 seconds) and will query the cache often after that.
func (p *EC2Pricing) HydrateSpotCache(days int) error {
newCache := map[string]map[string][]spotPricingEntry{}
newCache := make(map[string]map[string][]spotPricingEntry)

endTime := time.Now().UTC()
startTime := endTime.Add(time.Hour * time.Duration(24*-1*days))
Expand All @@ -191,14 +226,17 @@ func (p *EC2Pricing) HydrateSpotCache(days int) error {
EndTime: &endTime,
}
var processingErr error
err := p.EC2Client.DescribeSpotPriceHistoryPages(&spotPriceHistInput, func(dspho *ec2.DescribeSpotPriceHistoryOutput, b bool) bool {
errAPI := p.EC2Client.DescribeSpotPriceHistoryPages(&spotPriceHistInput, func(dspho *ec2.DescribeSpotPriceHistoryOutput, b bool) bool {
for _, history := range dspho.SpotPriceHistory {
var spotPrice float64
spotPrice, processingErr = strconv.ParseFloat(*history.SpotPrice, 64)
spotPrice, errFloat := strconv.ParseFloat(*history.SpotPrice, 64)
if errFloat != nil {
processingErr = multierr.Append(processingErr, errFloat)
continue
}
instanceType := *history.InstanceType
zone := *history.AvailabilityZone
if _, ok := newCache[instanceType]; !ok {
newCache[instanceType] = map[string][]spotPricingEntry{}
newCache[instanceType] = make(map[string][]spotPricingEntry)
}
newCache[instanceType][zone] = append(newCache[instanceType][zone], spotPricingEntry{
Timestamp: *history.Timestamp,
Expand All @@ -207,20 +245,21 @@ func (p *EC2Pricing) HydrateSpotCache(days int) error {
}
return true
})
if err != nil {
return err
if errAPI != nil {
return errAPI
}
cTime := time.Now().UTC()
p.spotCache = newCache
p.lastSpotCachedUTC = &cTime
return processingErr
}

// HydrateOndemandCache makes a bulk request to the pricing api to retrieve all instance type pricing and stores them in a local cache
// If HydrateOndemandCache is called more than once, the cache will be fully refreshed
// There is no TTL on cache entries
func (p *EC2Pricing) HydrateOndemandCache() error {
if p.cache == nil {
p.cache = make(map[string]float64)
}
newOnDemandCache := make(map[string]float64)

regionDescription := p.getRegionForPricingAPI()
productInput := pricing.GetProductsInput{
ServiceCode: aws.String(serviceCode),
Expand All @@ -233,17 +272,27 @@ func (p *EC2Pricing) HydrateOndemandCache() error {
{Type: aws.String(pricing.FilterTypeTermMatch), Field: aws.String("tenancy"), Value: aws.String("shared")},
},
}
err := p.PricingClient.GetProductsPages(&productInput, func(pricingOutput *pricing.GetProductsOutput, nextPage bool) bool {
var processingErr error
// FIXME Only works for us-east-1 and ap-south-1
// https://docs.aws.amazon.com/awsaccountbilling/latest/aboutv2/using-pelong.html#pe-endpoint
errAPI := p.PricingClient.GetProductsPages(&productInput, func(pricingOutput *pricing.GetProductsOutput, nextPage bool) bool {
for _, priceDoc := range pricingOutput.PriceList {
instanceTypeName, price, err := parseOndemandUnitPrice(priceDoc)
if err != nil {
instanceTypeName, price, errParse := parseOndemandUnitPrice(priceDoc)
if errParse != nil {
processingErr = multierr.Append(processingErr, errParse)
continue
}
p.cache[instanceTypeName] = price
newOnDemandCache[instanceTypeName] = price
}
return true
})
return err
if errAPI != nil {
return errAPI
}
cTime := time.Now().UTC()
p.onDemandCache = newOnDemandCache
p.lastOnDemandCachedUTC = &cTime
return processingErr
}

// getRegionForPricingAPI attempts to retrieve the region description based on the AWS session used to create
Expand Down
Loading