@@ -30,6 +30,9 @@ import (
3030// that don't call a server.
3131type Transactable interface {
3232 Transact (cmd string , expectedResults int , args ... []byte ) ([][]byte , error )
33+ TransactAsync (cmd string , expectedResults int , args [][]byte , callback func ([][]byte ) error )
34+ Barrier (callback func ()) error
35+ Flush () error
3336}
3437
3538// Subprocess is a "middle" layer that interacts with a FIPS module via running
@@ -39,6 +42,24 @@ type Subprocess struct {
3942 stdin io.WriteCloser
4043 stdout io.ReadCloser
4144 primitives map [string ]primitive
45+ // supportsFlush is true if the modulewrapper indicated that it wants to receive flush commands.
46+ supportsFlush bool
47+ // pendingReads is a queue of expected responses. `readerRoutine` reads each response and calls the callback in the matching pendingRead.
48+ pendingReads chan pendingRead
49+ // readerFinished is a channel that is closed if `readerRoutine` has finished (e.g. because of a read error).
50+ readerFinished chan struct {}
51+ }
52+
53+ // pendingRead represents an expected response from the modulewrapper.
54+ type pendingRead struct {
55+ // barrierCallback is called as soon as this pendingRead is the next in the queue, before any read from the modulewrapper.
56+ barrierCallback func ()
57+
58+ // callback is called with the result from the modulewrapper. If this is nil then no read is performed.
59+ callback func (result [][]byte ) error
60+ // cmd is the command that requested this read for logging purposes.
61+ cmd string
62+ expectedNumResults int
4263}
4364
4465// New returns a new Subprocess middle layer that runs the given binary.
@@ -61,13 +82,18 @@ func New(path string) (*Subprocess, error) {
6182 return NewWithIO (cmd , stdin , stdout ), nil
6283}
6384
85+ // maxPending is the maximum number of requests that can be in the pipeline.
86+ const maxPending = 4096
87+
6488// NewWithIO returns a new Subprocess middle layer with the given ReadCloser and
6589// WriteCloser. The returned Subprocess will call Wait on the Cmd when closed.
6690func NewWithIO (cmd * exec.Cmd , in io.WriteCloser , out io.ReadCloser ) * Subprocess {
6791 m := & Subprocess {
68- cmd : cmd ,
69- stdin : in ,
70- stdout : out ,
92+ cmd : cmd ,
93+ stdin : in ,
94+ stdout : out ,
95+ pendingReads : make (chan pendingRead , maxPending ),
96+ readerFinished : make (chan struct {}),
7197 }
7298
7399 m .primitives = map [string ]primitive {
@@ -107,6 +133,7 @@ func NewWithIO(cmd *exec.Cmd, in io.WriteCloser, out io.ReadCloser) *Subprocess
107133 "hmacDRBG" : & drbg {"hmacDRBG" , map [string ]bool {"SHA-1" : true , "SHA2-224" : true , "SHA2-256" : true , "SHA2-384" : true , "SHA2-512" : true }},
108134 "KDF" : & kdfPrimitive {},
109135 "KDA" : & hkdf {},
136+ "TLS-v1.3" : & tls13 {},
110137 "CMAC-AES" : & keyedMACPrimitive {"CMAC-AES" },
111138 "RSA" : & rsa {},
112139 "kdf-components" : & kdfComp {"kdf-components" },
@@ -117,6 +144,7 @@ func NewWithIO(cmd *exec.Cmd, in io.WriteCloser, out io.ReadCloser) *Subprocess
117144 }
118145 m .primitives ["ECDSA" ] = & ecdsa {"ECDSA" , map [string ]bool {"P-224" : true , "P-256" : true , "P-384" : true , "P-521" : true }, m .primitives }
119146
147+ go m .readerRoutine ()
120148 return m
121149}
122150
@@ -125,10 +153,58 @@ func (m *Subprocess) Close() {
125153 m .stdout .Close ()
126154 m .stdin .Close ()
127155 m .cmd .Wait ()
156+ close (m .pendingReads )
157+ <- m .readerFinished
158+ }
159+
160+ func (m * Subprocess ) flush () error {
161+ if ! m .supportsFlush {
162+ return nil
163+ }
164+
165+ const cmd = "flush"
166+ buf := make ([]byte , 8 , 8 + len (cmd ))
167+ binary .LittleEndian .PutUint32 (buf , 1 )
168+ binary .LittleEndian .PutUint32 (buf [4 :], uint32 (len (cmd )))
169+ buf = append (buf , []byte (cmd )... )
170+
171+ if _ , err := m .stdin .Write (buf ); err != nil {
172+ return err
173+ }
174+ return nil
128175}
129176
130- // Transact performs a single request--response pair with the subprocess.
131- func (m * Subprocess ) Transact (cmd string , expectedResults int , args ... []byte ) ([][]byte , error ) {
177+ func (m * Subprocess ) enqueueRead (pending pendingRead ) error {
178+ select {
179+ case <- m .readerFinished :
180+ panic ("attempted to enqueue request after the reader failed" )
181+ default :
182+ }
183+
184+ select {
185+ case m .pendingReads <- pending :
186+ break
187+ default :
188+ // `pendingReads` is full. Ensure that the modulewrapper will process
189+ // some outstanding requests to free up space in the queue.
190+ if err := m .flush (); err != nil {
191+ return err
192+ }
193+ m .pendingReads <- pending
194+ }
195+
196+ return nil
197+ }
198+
199+ // TransactAsync performs a single request--response pair with the subprocess.
200+ // The callback will run at some future point, in a separate goroutine. All
201+ // callbacks will, however, be run in the order that TransactAsync was called.
202+ // Use Flush to wait for all outstanding callbacks.
203+ func (m * Subprocess ) TransactAsync (cmd string , expectedNumResults int , args [][]byte , callback func (result [][]byte ) error ) {
204+ if err := m .enqueueRead (pendingRead {nil , callback , cmd , expectedNumResults }); err != nil {
205+ panic (err )
206+ }
207+
132208 argLength := len (cmd )
133209 for _ , arg := range args {
134210 argLength += len (arg )
@@ -146,22 +222,93 @@ func (m *Subprocess) Transact(cmd string, expectedResults int, args ...[]byte) (
146222 }
147223
148224 if _ , err := m .stdin .Write (buf ); err != nil {
149- return nil , fmt .Errorf ("Failed to write buff: %s" , err )
225+ panic (err )
226+ }
227+ }
228+
229+ // Flush tells the subprocess to complete all outstanding requests and waits
230+ // for all outstanding TransactAsync callbacks to complete.
231+ func (m * Subprocess ) Flush () error {
232+ if m .supportsFlush {
233+ m .flush ()
234+ }
235+
236+ done := make (chan struct {})
237+ if err := m .enqueueRead (pendingRead {barrierCallback : func () {
238+ close (done )
239+ }}); err != nil {
240+ return err
150241 }
151242
152- buf = buf [:4 ]
243+ <- done
244+ return nil
245+ }
246+
247+ // Barrier runs callback after all outstanding TransactAsync callbacks have
248+ // been run.
249+ func (m * Subprocess ) Barrier (callback func ()) error {
250+ return m .enqueueRead (pendingRead {barrierCallback : callback })
251+ }
252+
253+ func (m * Subprocess ) Transact (cmd string , expectedNumResults int , args ... []byte ) ([][]byte , error ) {
254+ done := make (chan struct {})
255+ var result [][]byte
256+ m .TransactAsync (cmd , expectedNumResults , args , func (r [][]byte ) error {
257+ result = r
258+ close (done )
259+ return nil
260+ })
261+
262+ if err := m .flush (); err != nil {
263+ return nil , err
264+ }
265+
266+ select {
267+ case <- done :
268+ return result , nil
269+ case <- m .readerFinished :
270+ panic ("was still waiting for a result when the reader finished" )
271+ }
272+ }
273+
274+ func (m * Subprocess ) readerRoutine () {
275+ defer close (m .readerFinished )
276+
277+ for pendingRead := range m .pendingReads {
278+ if pendingRead .barrierCallback != nil {
279+ pendingRead .barrierCallback ()
280+ }
281+
282+ if pendingRead .callback == nil {
283+ continue
284+ }
285+
286+ result , err := m .readResult (pendingRead .cmd , pendingRead .expectedNumResults )
287+ if err != nil {
288+ panic (fmt .Errorf ("failed to read from subprocess: %w" , err ))
289+ }
290+
291+ if err := pendingRead .callback (result ); err != nil {
292+ panic (fmt .Errorf ("result from subprocess was rejected: %w" , err ))
293+ }
294+ }
295+ }
296+
297+ func (m * Subprocess ) readResult (cmd string , expectedNumResults int ) ([][]byte , error ) {
298+ buf := make ([]byte , 4 )
299+
153300 if _ , err := io .ReadFull (m .stdout , buf ); err != nil {
154- return nil , fmt . Errorf ( "Failed to read the length of sections section: %s" , err )
301+ return nil , err
155302 }
156303
157304 numResults := binary .LittleEndian .Uint32 (buf )
158- if int (numResults ) != expectedResults {
159- return nil , fmt .Errorf ("expected %d results from %q but got %d" , expectedResults , cmd , numResults )
305+ if int (numResults ) != expectedNumResults {
306+ return nil , fmt .Errorf ("expected %d results from %q but got %d" , expectedNumResults , cmd , numResults )
160307 }
161308
162309 buf = make ([]byte , 4 * numResults )
163310 if _ , err := io .ReadFull (m .stdout , buf ); err != nil {
164- return nil , fmt . Errorf ( "Failed to read the length of each section: %s" , err )
311+ return nil , err
165312 }
166313
167314 var resultsLength uint64
@@ -175,7 +322,7 @@ func (m *Subprocess) Transact(cmd string, expectedResults int, args ...[]byte) (
175322
176323 results := make ([]byte , resultsLength )
177324 if _ , err := io .ReadFull (m .stdout , results ); err != nil {
178- return nil , fmt . Errorf ( "Failed to read total results: %s" , err )
325+ return nil , err
179326 }
180327
181328 ret := make ([][]byte , 0 , numResults )
@@ -198,16 +345,25 @@ func (m *Subprocess) Config() ([]byte, error) {
198345 return nil , err
199346 }
200347 var config []struct {
201- Algorithm string `json:"algorithm"`
348+ Algorithm string `json:"algorithm"`
349+ Features []string `json:"features"`
202350 }
203351 if err := json .Unmarshal (results [0 ], & config ); err != nil {
204352 return nil , errors .New ("failed to parse config response from wrapper: " + err .Error ())
205353 }
206354 for _ , algo := range config {
207- if _ , ok := m .primitives [algo .Algorithm ]; ! ok {
355+ if algo .Algorithm == "acvptool" {
356+ for _ , feature := range algo .Features {
357+ switch feature {
358+ case "batch" :
359+ m .supportsFlush = true
360+ }
361+ }
362+ } else if _ , ok := m .primitives [algo .Algorithm ]; ! ok {
208363 return nil , fmt .Errorf ("wrapper config advertises support for unknown algorithm %q" , algo .Algorithm )
209364 }
210365 }
366+
211367 return results [0 ], nil
212368}
213369
0 commit comments