Skip to content

Commit ce0b92e

Browse files
Domas Monkus0x2b3bfa0
andauthored
K8s support for specifying an existing persistent volume claim (#661)
* K8s support for specifying an existing persistent volume claim. Co-authored-by: Helio Machado <[email protected]>
1 parent dc3374d commit ce0b92e

File tree

4 files changed

+136
-45
lines changed

4 files changed

+136
-45
lines changed
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package resources
2+
3+
import (
4+
"context"
5+
6+
kubernetes_core "k8s.io/api/core/v1"
7+
kubernetes_errors "k8s.io/apimachinery/pkg/api/errors"
8+
kubernetes_meta "k8s.io/apimachinery/pkg/apis/meta/v1"
9+
_ "k8s.io/client-go/plugin/pkg/client/auth"
10+
11+
"terraform-provider-iterative/task/common"
12+
"terraform-provider-iterative/task/k8s/client"
13+
)
14+
15+
// NewExistingPersistentVolumeClaim creates a new ExistingPersistentVolumeClaim object.
16+
func NewExistingPersistentVolumeClaim(client *client.Client, storageParams common.RemoteStorage) *ExistingPersistentVolumeClaim {
17+
return &ExistingPersistentVolumeClaim{
18+
client: client,
19+
params: storageParams,
20+
}
21+
}
22+
23+
// ExistingPersistentVolumeClaim refers to a pre-allocated persistent volume to be used
24+
// as storage for the job.
25+
type ExistingPersistentVolumeClaim struct {
26+
client *client.Client
27+
params common.RemoteStorage
28+
resource *kubernetes_core.PersistentVolumeClaim
29+
}
30+
31+
// Read verifies the persistent volume.
32+
func (p *ExistingPersistentVolumeClaim) Read(ctx context.Context) error {
33+
persistentVolumeClaim, err := p.client.Services.Core.PersistentVolumeClaims(p.client.Namespace).Get(ctx, p.params.Container, kubernetes_meta.GetOptions{})
34+
if err != nil {
35+
if statusErr, ok := err.(*kubernetes_errors.StatusError); ok && statusErr.ErrStatus.Code == 404 {
36+
return common.NotFoundError
37+
}
38+
return err
39+
}
40+
41+
p.resource = persistentVolumeClaim
42+
return nil
43+
}
44+
45+
// VolumeInfo returns information for attaching the persistent volume claim to the job.
46+
func (p *ExistingPersistentVolumeClaim) VolumeInfo(ctx context.Context) (string /*subpath*/, *kubernetes_core.PersistentVolumeClaimVolumeSource) {
47+
pvc := &kubernetes_core.PersistentVolumeClaimVolumeSource{
48+
ClaimName: p.params.Container,
49+
}
50+
return p.params.Path, pvc
51+
52+
}

task/k8s/resources/resource_job.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import (
2727
"terraform-provider-iterative/task/k8s/client"
2828
)
2929

30-
func NewJob(client *client.Client, identifier common.Identifier, persistentVolumeClaim *PersistentVolumeClaim, configMap *ConfigMap, permissionSet *PermissionSet, task common.Task) *Job {
30+
func NewJob(client *client.Client, identifier common.Identifier, persistentVolumeClaim VolumeInfoProvider, configMap *ConfigMap, permissionSet *PermissionSet, task common.Task) *Job {
3131
j := new(Job)
3232
j.Client = client
3333
j.Identifier = identifier.Long()
@@ -50,9 +50,9 @@ type Job struct {
5050
Events []common.Event
5151
}
5252
Dependencies struct {
53-
*PersistentVolumeClaim
54-
*ConfigMap
55-
*PermissionSet
53+
PersistentVolumeClaim VolumeInfoProvider
54+
ConfigMap *ConfigMap
55+
PermissionSet *PermissionSet
5656
}
5757
Resource *kubernetes_batch.Job
5858
}
@@ -168,16 +168,16 @@ func (j *Job) Create(ctx context.Context) error {
168168
}
169169

170170
if j.Attributes.Task.Environment.Directory != "" {
171+
volumeSubPath, volumeClaim := j.Dependencies.PersistentVolumeClaim.VolumeInfo(ctx)
171172
jobVolumeMounts = append(jobVolumeMounts, kubernetes_core.VolumeMount{
172173
Name: j.Identifier + "-pvc",
173174
MountPath: "/directory",
175+
SubPath: volumeSubPath,
174176
})
175177
jobVolumes = append(jobVolumes, kubernetes_core.Volume{
176178
Name: j.Identifier + "-pvc",
177179
VolumeSource: kubernetes_core.VolumeSource{
178-
PersistentVolumeClaim: &kubernetes_core.PersistentVolumeClaimVolumeSource{
179-
ClaimName: j.Dependencies.PersistentVolumeClaim.Identifier,
180-
},
180+
PersistentVolumeClaim: volumeClaim,
181181
},
182182
})
183183
}
@@ -368,3 +368,8 @@ func (j *Job) Logs(ctx context.Context) ([]string, error) {
368368

369369
return result, nil
370370
}
371+
372+
// VolumeInfoProvider is implemented by persistent volume claims.
373+
type VolumeInfoProvider interface {
374+
VolumeInfo(context.Context) (string /*subpath*/, *kubernetes_core.PersistentVolumeClaimVolumeSource)
375+
}

task/k8s/resources/resource_persistent_volume_claim.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,3 +102,13 @@ func (p *PersistentVolumeClaim) Delete(ctx context.Context) error {
102102
}
103103
return nil
104104
}
105+
106+
// VolumeInfo returns information for attaching the persistent volume claim to the job.
107+
func (p *PersistentVolumeClaim) VolumeInfo(ctx context.Context) (string /*subpath*/, *kubernetes_core.PersistentVolumeClaimVolumeSource) {
108+
pvc := &kubernetes_core.PersistentVolumeClaimVolumeSource{
109+
ClaimName: p.Identifier,
110+
}
111+
// PersistentVolumeClaims are mounted at root.
112+
return "", pvc
113+
114+
}

task/k8s/task.go

Lines changed: 62 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -39,29 +39,12 @@ func New(ctx context.Context, cloud common.Cloud, identifier common.Identifier,
3939
return nil, err
4040
}
4141

42-
persistentVolumeClaimStorageClass := ""
43-
persistentVolumeClaimSize := task.Size.Storage
44-
persistentVolumeDirectory := task.Environment.Directory
45-
46-
match := regexp.MustCompile(`^([^:]+):(?:(\d+):)?(.+)$`).FindStringSubmatch(task.Environment.Directory)
47-
if match != nil {
48-
persistentVolumeClaimStorageClass = match[1]
49-
if match[2] != "" {
50-
number, err := strconv.Atoi(match[2])
51-
if err != nil {
52-
return nil, err
53-
}
54-
persistentVolumeClaimSize = int(number)
55-
}
56-
persistentVolumeDirectory = match[3]
57-
}
58-
5942
t := new(Task)
6043
t.Client = client
6144
t.Identifier = identifier
6245
t.Attributes.Task = task
63-
t.Attributes.Directory = persistentVolumeDirectory
64-
t.Attributes.DirectoryOut = persistentVolumeDirectory
46+
t.Attributes.Directory = task.Environment.Directory
47+
t.Attributes.DirectoryOut = task.Environment.Directory
6548
if task.Environment.DirectoryOut != "" {
6649
t.Attributes.DirectoryOut = task.Environment.DirectoryOut
6750
}
@@ -75,17 +58,43 @@ func New(ctx context.Context, cloud common.Cloud, identifier common.Identifier,
7558
t.Identifier,
7659
map[string]string{"script": t.Attributes.Task.Environment.Script},
7760
)
78-
t.Resources.PersistentVolumeClaim = resources.NewPersistentVolumeClaim(
79-
t.Client,
80-
t.Identifier,
81-
persistentVolumeClaimStorageClass,
82-
persistentVolumeClaimSize,
83-
t.Attributes.Task.Parallelism > 1,
84-
)
61+
var pvc resources.VolumeInfoProvider
62+
if task.RemoteStorage != nil {
63+
t.DataSources.ExistingPersistentVolumeClaim = resources.NewExistingPersistentVolumeClaim(
64+
t.Client, *task.RemoteStorage)
65+
pvc = t.DataSources.ExistingPersistentVolumeClaim
66+
} else {
67+
var persistentVolumeDirectory string
68+
var persistentVolumeClaimStorageClass string
69+
persistentVolumeClaimSize := task.Size.Storage
70+
71+
match := regexp.MustCompile(`^([^:]+):(?:(\d+):)?(.+)$`).FindStringSubmatch(task.Environment.Directory)
72+
if match != nil {
73+
persistentVolumeClaimStorageClass = match[1]
74+
if match[2] != "" {
75+
number, err := strconv.Atoi(match[2])
76+
if err != nil {
77+
return nil, err
78+
}
79+
persistentVolumeClaimSize = int(number)
80+
}
81+
persistentVolumeDirectory = match[3]
82+
t.Attributes.Directory = persistentVolumeDirectory
83+
}
84+
85+
t.Resources.PersistentVolumeClaim = resources.NewPersistentVolumeClaim(
86+
t.Client,
87+
t.Identifier,
88+
persistentVolumeClaimStorageClass,
89+
persistentVolumeClaimSize,
90+
t.Attributes.Task.Parallelism > 1,
91+
)
92+
pvc = t.Resources.PersistentVolumeClaim
93+
}
8594
t.Resources.Job = resources.NewJob(
8695
t.Client,
8796
t.Identifier,
88-
t.Resources.PersistentVolumeClaim,
97+
pvc,
8998
t.Resources.ConfigMap,
9099
t.DataSources.PermissionSet,
91100
t.Attributes.Task,
@@ -102,12 +111,13 @@ type Task struct {
102111
DirectoryOut string
103112
}
104113
DataSources struct {
105-
*resources.PermissionSet
114+
PermissionSet *resources.PermissionSet
115+
ExistingPersistentVolumeClaim *resources.ExistingPersistentVolumeClaim
106116
}
107117
Resources struct {
108-
*resources.ConfigMap
109-
*resources.PersistentVolumeClaim
110-
*resources.Job
118+
ConfigMap *resources.ConfigMap
119+
PersistentVolumeClaim *resources.PersistentVolumeClaim
120+
Job *resources.Job
111121
}
112122
}
113123

@@ -119,10 +129,13 @@ func (t *Task) Create(ctx context.Context) error {
119129
}, {
120130
Description: "Creating ConfigMap...",
121131
Action: t.Resources.ConfigMap.Create,
122-
}, {
123-
Description: "Creating PersistentVolumeClaim...",
124-
Action: t.Resources.PersistentVolumeClaim.Create,
125132
}}
133+
if t.Resources.PersistentVolumeClaim != nil {
134+
steps = append(steps, common.Step{
135+
Description: "Creating PersistentVolumeClaim...",
136+
Action: t.Resources.PersistentVolumeClaim.Create,
137+
})
138+
}
126139

127140
if t.Attributes.Directory != "" {
128141
env := map[string]string{
@@ -166,7 +179,14 @@ func (t *Task) Read(ctx context.Context) error {
166179
Action: t.Resources.ConfigMap.Read,
167180
}, {
168181
Description: "Reading PersistentVolumeClaim...",
169-
Action: t.Resources.PersistentVolumeClaim.Read,
182+
Action: func(ctx context.Context) error {
183+
if t.Resources.PersistentVolumeClaim != nil {
184+
return t.Resources.PersistentVolumeClaim.Read(ctx)
185+
} else if t.DataSources.ExistingPersistentVolumeClaim != nil {
186+
return t.DataSources.ExistingPersistentVolumeClaim.Read(ctx)
187+
}
188+
return fmt.Errorf("misconfigured storage")
189+
},
170190
}, {
171191
Description: "Reading Job...",
172192
Action: t.Resources.Job.Read,
@@ -211,13 +231,17 @@ func (t *Task) Delete(ctx context.Context) error {
211231
steps = append(steps, []common.Step{{
212232
Description: "Deleting Job...",
213233
Action: t.Resources.Job.Delete,
214-
}, {
215-
Description: "Deleting PersistentVolumeClaim...",
216-
Action: t.Resources.PersistentVolumeClaim.Delete,
217234
}, {
218235
Description: "Deleting ConfigMap...",
219236
Action: t.Resources.ConfigMap.Delete,
220237
}}...)
238+
if t.Resources.PersistentVolumeClaim != nil {
239+
steps = append(steps, common.Step{
240+
Description: "Deleting PersistentVolumeClaim...",
241+
Action: t.Resources.PersistentVolumeClaim.Delete,
242+
})
243+
}
244+
221245
if err := common.RunSteps(ctx, steps); err != nil {
222246
return err
223247
}

0 commit comments

Comments
 (0)