Skip to content

Commit 18c0b68

Browse files
authored
Improved NodeTypeIterator loop detection (armadaproject#3435)
* Improved NodeTypeIterator loop detection * Comments * Comments * Include job ids in scheduler errors
1 parent dba5383 commit 18c0b68

File tree

8 files changed

+163
-32
lines changed

8 files changed

+163
-32
lines changed

internal/scheduler/context/context.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,15 @@ func NewGangSchedulingContext(jctxs []*JobSchedulingContext) *GangSchedulingCont
559559
}
560560
}
561561

562+
// JobIds returns a sliced composed of the ids of the jobs that make up the gang.
563+
func (gctx *GangSchedulingContext) JobIds() []string {
564+
rv := make([]string, len(gctx.JobSchedulingContexts))
565+
for i, jctx := range gctx.JobSchedulingContexts {
566+
rv[i] = jctx.JobId
567+
}
568+
return rv
569+
}
570+
562571
// Cardinality returns the number of jobs in the gang.
563572
func (gctx *GangSchedulingContext) Cardinality() int {
564573
return len(gctx.JobSchedulingContexts)

internal/scheduler/gang_scheduler.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"fmt"
55

66
"github.com/hashicorp/go-memdb"
7+
"github.com/pkg/errors"
78

89
"github.com/armadaproject/armada/internal/common/armadacontext"
910
"github.com/armadaproject/armada/internal/common/util"
@@ -106,8 +107,9 @@ func (sch *GangScheduler) Schedule(ctx *armadacontext.Context, gctx *schedulerco
106107
// This deferred function ensures unschedulable jobs are registered as such.
107108
gangAddedToSchedulingContext := false
108109
defer func() {
109-
// Do nothing if an error occurred.
110+
// If an error occurred, augment the error message and return.
110111
if err != nil {
112+
err = errors.WithMessagef(err, "failed scheduling gang %s composed of jobs %v", gctx.Id, gctx.JobIds())
111113
return
112114
}
113115

internal/scheduler/nodedb/encoding.go

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,16 @@ import (
88
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
99
)
1010

11-
// NodeIndexKey returns a []byte to be used as a key with the NodeIndex memdb index with layout
11+
// NodeIndexKey returns a []byte to be used as a key with the NodeIndex memdb index.
12+
// This key should be used for lookup. Use the rounded version below for inserts.
1213
//
13-
// 0 8 16 32
14-
// | nodeTypeId | resources[0] | resources[1] | ... |
14+
// The layout of the key is:
1515
//
16-
// where the numbers indicate number of bytes.
16+
// 0 8 16 32 x x+8
17+
// | nodeTypeId | resources[0] | resources[1] | ... | nodeIndex |
18+
//
19+
// where the numbers indicate byte index.
20+
// NodeIndex ensures each key is unique and so must be unique across all nodes.
1721
//
1822
// The key layout is such that an index ordered first by the nodeTypeId, then resources[0], and so on.
1923
// The byte representation is appended to out, which is returned.
@@ -22,20 +26,32 @@ func NodeIndexKey(out []byte, nodeTypeId uint64, resources []resource.Quantity)
2226
for _, q := range resources {
2327
out = EncodeQuantity(out, q)
2428
}
29+
// Because the key returned by this function should be used with a lower-bound operation on allocatable resources
30+
// we set the nodeIndex to 0.
31+
out = EncodeUint64(out, 0)
2532
return out
2633
}
2734

2835
// RoundedNodeIndexKeyFromResourceList works like NodeIndexKey, except that prior to constructing the key
2936
// the i-th resource is rounded down to the closest multiple of resourceResolutionMillis[i].
37+
// This rounding makes iterating over nodes with at least some amount of available resources more efficient.
3038
// It also takes as arguments a list of resource names and a resourceList, instead of a list of resources.
31-
func RoundedNodeIndexKeyFromResourceList(out []byte, nodeTypeId uint64, resourceNames []string, resourceResolutionMillis []int64, rl schedulerobjects.ResourceList) []byte {
39+
func RoundedNodeIndexKeyFromResourceList(
40+
out []byte,
41+
nodeTypeId uint64,
42+
resourceNames []string,
43+
resourceResolutionMillis []int64,
44+
rl schedulerobjects.ResourceList,
45+
nodeIndex uint64,
46+
) []byte {
3247
out = EncodeUint64(out, nodeTypeId)
3348
for i, name := range resourceNames {
3449
resolution := resourceResolutionMillis[i]
3550
q := rl.Get(name)
3651
q = roundQuantityToResolution(q, resolution)
3752
out = EncodeQuantity(out, q)
3853
}
54+
out = EncodeUint64(out, nodeIndex)
3955
return out
4056
}
4157

@@ -52,7 +68,7 @@ func EncodeQuantity(out []byte, val resource.Quantity) []byte {
5268
return EncodeInt64(out, val.MilliValue())
5369
}
5470

55-
// EncodeInt64 returns the canonical byte representation of a int64 used within the nodeDb.
71+
// EncodeInt64 returns the canonical byte representation of an int64 used within the nodeDb.
5672
// The resulting []byte is such that for two int64 a and b, a.Cmp(b) = bytes.Compare(enc(a), enc(b)).
5773
// The byte representation is appended to out, which is returned.
5874
func EncodeInt64(out []byte, val int64) []byte {
@@ -65,6 +81,10 @@ func EncodeInt64(out []byte, val int64) []byte {
6581
// becomes the maximum positive uint.
6682
scaled := val ^ int64(-1<<(size*8-1))
6783

84+
// TODO(albin): It's possible (though unlikely) that this shifting causes nodeType clashes,
85+
// since they're computed by hashing labels etc. and so may be big integers.
86+
// This would reduce the efficiency of nodeType indexing but shouldn't affect correctness.
87+
6888
binary.BigEndian.PutUint64(out[len(out)-8:], uint64(scaled))
6989
return out
7090
}

internal/scheduler/nodedb/encoding_test.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,51 @@ func TestEncodeQuantity(t *testing.T) {
9999
}
100100
}
101101

102+
func TestRoundQuantityToResolution(t *testing.T) {
103+
tests := map[string]struct {
104+
q resource.Quantity
105+
resolutionMillis int64
106+
expected resource.Quantity
107+
}{
108+
"1Ki": {
109+
q: resource.MustParse("1Ki"),
110+
resolutionMillis: 1,
111+
expected: resource.MustParse("1Ki"),
112+
},
113+
"resolution equal to quantity": {
114+
q: resource.MustParse("1Ki"),
115+
resolutionMillis: 1024 * 1000,
116+
expected: resource.MustParse("1Ki"),
117+
},
118+
"0": {
119+
q: resource.MustParse("0"),
120+
resolutionMillis: 1,
121+
expected: resource.MustParse("0"),
122+
},
123+
"1m": {
124+
q: resource.MustParse("1m"),
125+
resolutionMillis: 1,
126+
expected: resource.MustParse("1m"),
127+
},
128+
"1": {
129+
q: resource.MustParse("1"),
130+
resolutionMillis: 1,
131+
expected: resource.MustParse("1"),
132+
},
133+
"resolution 3": {
134+
q: resource.MustParse("1"),
135+
resolutionMillis: 3,
136+
expected: resource.MustParse("999m"),
137+
},
138+
}
139+
for name, tc := range tests {
140+
t.Run(name, func(t *testing.T) {
141+
actual := roundQuantityToResolution(tc.q, tc.resolutionMillis)
142+
assert.Truef(t, actual.Equal(tc.expected), "expected %s, but got %s", tc.expected.String(), actual.String())
143+
})
144+
}
145+
}
146+
102147
func TestNodeIndexKey(t *testing.T) {
103148
type nodeIndexKeyValues struct {
104149
nodeTypeId uint64
@@ -205,6 +250,7 @@ func TestRoundedNodeIndexKeyFromResourceList(t *testing.T) {
205250
schedulerobjects.ResourceList{
206251
Resources: map[string]resource.Quantity{"foo": resource.MustParse("1"), "bar": resource.MustParse("2")},
207252
},
253+
0,
208254
),
209255
)
210256
assert.NotEqual(
@@ -218,6 +264,7 @@ func TestRoundedNodeIndexKeyFromResourceList(t *testing.T) {
218264
schedulerobjects.ResourceList{
219265
Resources: map[string]resource.Quantity{"foo": resource.MustParse("1"), "bar": resource.MustParse("2")},
220266
},
267+
0,
221268
),
222269
)
223270
}

internal/scheduler/nodedb/nodedb.go

Lines changed: 39 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,15 @@ const (
3939
var empty struct{}
4040

4141
type Node struct {
42-
Id string
43-
Name string
42+
// Unique id and index of this node.
43+
// TODO(albin): Having both id and index is redundant.
44+
// Currently, the id is "cluster name" + "node name" and index an integer assigned on node creation.
45+
Id string
46+
Index uint64
47+
48+
// Executor this node belongs to and node name, which must be unique per executor.
4449
Executor string
50+
Name string
4551

4652
// We need to store taints and labels separately from the node type: the latter only includes
4753
// indexed taints and labels, but we need all of them when checking pod requirements.
@@ -65,9 +71,11 @@ type Node struct {
6571
// shallow copies of fields that are not mutated by methods of NodeDb.
6672
func (node *Node) UnsafeCopy() *Node {
6773
return &Node{
68-
Id: node.Id,
69-
Name: node.Name,
74+
Id: node.Id,
75+
Index: node.Index,
76+
7077
Executor: node.Executor,
78+
Name: node.Name,
7179

7280
Taints: node.Taints,
7381
Labels: node.Labels,
@@ -139,16 +147,19 @@ func (nodeDb *NodeDb) create(node *schedulerobjects.Node) (*Node, error) {
139147
nodeDb.indexedNodeLabelValues[key][value] = empty
140148
}
141149
}
150+
index := uint64(nodeDb.numNodes)
142151
nodeDb.numNodes++
143152
nodeDb.numNodesByNodeType[nodeType.Id]++
144153
nodeDb.totalResources.Add(totalResources)
145154
nodeDb.nodeTypes[nodeType.Id] = nodeType
146155
nodeDb.mu.Unlock()
147156

148157
entry := &Node{
149-
Id: node.Id,
150-
Name: node.Name,
158+
Id: node.Id,
159+
Index: index,
160+
151161
Executor: node.Executor,
162+
Name: node.Name,
152163

153164
Taints: taints,
154165
Labels: labels,
@@ -256,8 +267,10 @@ type NodeDb struct {
256267
//
257268
// Lower resolution makes scheduling faster, but may lead to jobs incorrectly being considered unschedulable.
258269
indexedResourceResolutionMillis []int64
259-
// Map from priority class priority to the index tracking allocatable resources at that priority.
270+
// Map from priority class priority to the database index tracking allocatable resources at that priority.
260271
indexNameByPriority map[int32]string
272+
// Map from priority class priority to the index of node.keys corresponding to that priority.
273+
keyIndexByPriority map[int32]int
261274
// Taint keys that to create indexes for.
262275
// Should include taints frequently used for scheduling.
263276
// Since the NodeDb can efficiently sort out nodes with taints not tolerated
@@ -317,7 +330,7 @@ func NewNodeDb(
317330
nodeDbPriorities = append(nodeDbPriorities, types.AllowedPriorities(priorityClasses)...)
318331

319332
indexedResourceNames := util.Map(indexedResources, func(v configuration.IndexedResource) string { return v.Name })
320-
schema, indexNameByPriority := nodeDbSchema(nodeDbPriorities, indexedResourceNames)
333+
schema, indexNameByPriority, keyIndexByPriority := nodeDbSchema(nodeDbPriorities, indexedResourceNames)
321334
db, err := memdb.NewMemDB(schema)
322335
if err != nil {
323336
return nil, errors.WithStack(err)
@@ -359,6 +372,7 @@ func NewNodeDb(
359372
func(v configuration.IndexedResource) int64 { return v.Resolution.MilliValue() },
360373
),
361374
indexNameByPriority: indexNameByPriority,
375+
keyIndexByPriority: keyIndexByPriority,
362376
indexedTaints: mapFromSlice(indexedTaints),
363377
indexedNodeLabels: mapFromSlice(indexedNodeLabels),
364378
indexedNodeLabelValues: indexedNodeLabelValues,
@@ -432,7 +446,7 @@ func (nodeDb *NodeDb) IndexedNodeLabelValues(label string) (map[string]struct{},
432446
func (nodeDb *NodeDb) NumNodes() int {
433447
nodeDb.mu.Lock()
434448
defer nodeDb.mu.Unlock()
435-
return nodeDb.numNodes
449+
return int(nodeDb.numNodes)
436450
}
437451

438452
func (nodeDb *NodeDb) TotalResources() schedulerobjects.ResourceList {
@@ -791,11 +805,16 @@ func (nodeDb *NodeDb) selectNodeForPodAtPriority(
791805
if !ok {
792806
return nil, errors.Errorf("no index for priority %d; must be in %v", priority, nodeDb.indexNameByPriority)
793807
}
808+
keyIndex, ok := nodeDb.keyIndexByPriority[priority]
809+
if !ok {
810+
return nil, errors.Errorf("no key index for priority %d; must be in %v", priority, nodeDb.keyIndexByPriority)
811+
}
794812
it, err := NewNodeTypesIterator(
795813
txn,
796814
matchingNodeTypeIds,
797815
indexName,
798816
priority,
817+
keyIndex,
799818
nodeDb.indexedResources,
800819
indexResourceRequests,
801820
nodeDb.indexedResourceResolutionMillis,
@@ -1158,7 +1177,7 @@ func (nodeDb *NodeDb) Upsert(node *Node) error {
11581177
func (nodeDb *NodeDb) UpsertWithTxn(txn *memdb.Txn, node *Node) error {
11591178
keys := make([][]byte, len(nodeDb.nodeDbPriorities))
11601179
for i, p := range nodeDb.nodeDbPriorities {
1161-
keys[i] = nodeDb.nodeDbKey(keys[i], node.NodeTypeId, node.AllocatableByPriority[p])
1180+
keys[i] = nodeDb.nodeDbKey(keys[i], node.NodeTypeId, node.AllocatableByPriority[p], node.Index)
11621181
}
11631182
node.Keys = keys
11641183

@@ -1204,38 +1223,40 @@ func (nodeDb *NodeDb) AddEvictedJobSchedulingContextWithTxn(txn *memdb.Txn, inde
12041223
return nil
12051224
}
12061225

1207-
func nodeDbSchema(priorities []int32, resources []string) (*memdb.DBSchema, map[int32]string) {
1208-
nodesTable, indexNameByPriority := nodesTableSchema(priorities, resources)
1226+
func nodeDbSchema(priorities []int32, resources []string) (*memdb.DBSchema, map[int32]string, map[int32]int) {
1227+
nodesTable, indexNameByPriority, keyIndexByPriority := nodesTableSchema(priorities, resources)
12091228
evictionsTable := evictionsTableSchema()
12101229
return &memdb.DBSchema{
12111230
Tables: map[string]*memdb.TableSchema{
12121231
nodesTable.Name: nodesTable,
12131232
evictionsTable.Name: evictionsTable,
12141233
},
1215-
}, indexNameByPriority
1234+
}, indexNameByPriority, keyIndexByPriority
12161235
}
12171236

1218-
func nodesTableSchema(priorities []int32, resources []string) (*memdb.TableSchema, map[int32]string) {
1237+
func nodesTableSchema(priorities []int32, resources []string) (*memdb.TableSchema, map[int32]string, map[int32]int) {
12191238
indexes := make(map[string]*memdb.IndexSchema, len(priorities)+1)
12201239
indexes["id"] = &memdb.IndexSchema{
12211240
Name: "id",
12221241
Unique: true,
12231242
Indexer: &memdb.StringFieldIndex{Field: "Id"},
12241243
}
12251244
indexNameByPriority := make(map[int32]string, len(priorities))
1245+
keyIndexByPriority := make(map[int32]int, len(priorities))
12261246
for i, priority := range priorities {
12271247
name := nodeIndexName(i)
12281248
indexNameByPriority[priority] = name
1249+
keyIndexByPriority[priority] = i
12291250
indexes[name] = &memdb.IndexSchema{
12301251
Name: name,
1231-
Unique: false,
1252+
Unique: true,
12321253
Indexer: &NodeIndex{KeyIndex: i},
12331254
}
12341255
}
12351256
return &memdb.TableSchema{
12361257
Name: "nodes",
12371258
Indexes: indexes,
1238-
}, indexNameByPriority
1259+
}, indexNameByPriority, keyIndexByPriority
12391260
}
12401261

12411262
func evictionsTableSchema() *memdb.TableSchema {
@@ -1278,12 +1299,13 @@ func (nodeDb *NodeDb) stringFromPodRequirementsNotMetReason(reason PodRequiremen
12781299
// nodeDbKey returns the index key for a particular node.
12791300
// Allocatable resources are rounded down to the closest multiple of nodeDb.indexedResourceResolutionMillis.
12801301
// This improves efficiency by reducing the number of distinct values in the index.
1281-
func (nodeDb *NodeDb) nodeDbKey(out []byte, nodeTypeId uint64, allocatable schedulerobjects.ResourceList) []byte {
1302+
func (nodeDb *NodeDb) nodeDbKey(out []byte, nodeTypeId uint64, allocatable schedulerobjects.ResourceList, nodeIndex uint64) []byte {
12821303
return RoundedNodeIndexKeyFromResourceList(
12831304
out,
12841305
nodeTypeId,
12851306
nodeDb.indexedResources,
12861307
nodeDb.indexedResourceResolutionMillis,
12871308
allocatable,
1309+
nodeIndex,
12881310
)
12891311
}

internal/scheduler/nodedb/nodedb_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import (
2323
)
2424

2525
func TestNodeDbSchema(t *testing.T) {
26-
schema, _ := nodeDbSchema(testfixtures.TestPriorities, testfixtures.TestResourceNames)
26+
schema, _, _ := nodeDbSchema(testfixtures.TestPriorities, testfixtures.TestResourceNames)
2727
assert.NoError(t, schema.Validate())
2828
}
2929

0 commit comments

Comments
 (0)