Skip to content

Commit a1b2de6

Browse files
committed
wip: ts only task updates
1 parent dab784d commit a1b2de6

File tree

2 files changed

+46
-34
lines changed

2 files changed

+46
-34
lines changed

pkg/api/v1/client/client_orchestrator.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,17 @@ func (c *Client) ControllerCheckin(ctx context.Context, serverID, conditionID uu
4444
return c.get(ctx, path)
4545
}
4646

47-
func (c *Client) ConditionTaskPublish(ctx context.Context, conditionKind rctypes.Kind, serverID, conditionID uuid.UUID, task *rctypes.Task[any, any]) (*v1types.ServerResponse, error) {
47+
func (c *Client) ConditionTaskPublish(ctx context.Context, conditionKind rctypes.Kind, serverID, conditionID uuid.UUID, task *rctypes.Task[any, any], onlyUpdateTimestamp bool) (*v1types.ServerResponse, error) {
4848
path := fmt.Sprintf("servers/%s/condition-task/%s/%s", serverID.String(), conditionKind, conditionID.String())
4949

50+
if onlyUpdateTimestamp {
51+
path += "&ts_update=true"
52+
} else {
53+
if task == nil {
54+
task = &rctypes.Task[any, any]{}
55+
}
56+
}
57+
5058
return c.post(ctx, path, task)
5159
}
5260

pkg/api/v1/routes/handlers_orchestrator.go

Lines changed: 37 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"go.opentelemetry.io/otel/trace"
2323

2424
v1types "github.com/metal-toolbox/conditionorc/pkg/api/v1/types"
25+
"github.com/metal-toolbox/rivets/condition"
2526
rctypes "github.com/metal-toolbox/rivets/condition"
2627
rkv "github.com/metal-toolbox/rivets/events/pkg/kv"
2728
"github.com/metal-toolbox/rivets/events/registry"
@@ -165,21 +166,11 @@ func (r *Routes) conditionQueuePop(c *gin.Context) (int, *v1types.ServerResponse
165166
}
166167
}
167168

168-
// create Task entry
169-
task := &rctypes.Task[any, any]{
170-
StructVersion: rctypes.TaskVersion1,
171-
ID: cond.ID,
172-
Kind: conditionKind,
173-
State: rctypes.Pending,
174-
Status: st,
175-
WorkerID: controllerID.String(),
176-
Asset: &rctypes.Asset{ID: serverID},
177-
TraceID: trace.SpanFromContext(ctx).SpanContext().TraceID().String(),
178-
SpanID: trace.SpanFromContext(ctx).SpanContext().SpanID().String(),
179-
Parameters: cond.Parameters,
180-
Data: json.RawMessage(`{"empty": true}`), // placeholder value
181-
Fault: cond.Fault,
182-
}
169+
// setup task to be published
170+
task := condition.NewTaskFromCondition(cond)
171+
task.WorkerID = controllerID.String()
172+
task.TraceID = trace.SpanFromContext(ctx).SpanContext().TraceID().String()
173+
task.SpanID = trace.SpanFromContext(ctx).SpanContext().SpanID().String()
183174

184175
if err := r.kvPublishTask(
185176
ctx,
@@ -188,6 +179,7 @@ func (r *Routes) conditionQueuePop(c *gin.Context) (int, *v1types.ServerResponse
188179
conditionKind,
189180
task,
190181
true,
182+
false,
191183
); err != nil {
192184
r.logger.WithField("condition.id", cond.ID).WithError(err).Info("task KV publish error")
193185

@@ -409,7 +401,7 @@ func (r *Routes) kvPublishStatusValue(
409401
controllerID registry.ControllerID,
410402
conditionKind rctypes.Kind,
411403
newSV *rctypes.StatusValue,
412-
create bool,
404+
create,
413405
onlyTimestamp bool,
414406
) error {
415407
statusKV, err := status.GetConditionKV(conditionKind)
@@ -763,15 +755,22 @@ func (r *Routes) taskPublish(c *gin.Context) (int, *v1types.ServerResponse) {
763755
attribute.KeyValue{Key: "serverId", Value: attribute.StringValue(c.Param("uuid"))},
764756
attribute.KeyValue{Key: "conditionKind", Value: attribute.StringValue(c.Param("conditionKind"))},
765757
attribute.KeyValue{Key: "conditionID", Value: attribute.StringValue(c.Param("conditionID"))},
758+
attribute.KeyValue{Key: "timestampUpdate", Value: attribute.StringValue(c.Request.URL.Query().Get("ts_update"))},
766759
)
767760
defer span.End()
768761

769762
var task rctypes.Task[any, any]
770-
if err := c.ShouldBindJSON(&task); err != nil {
771-
r.logger.WithError(err).Warn("unmarshal Task payload")
763+
var onlyTimestampUpdate bool
772764

773-
return http.StatusBadRequest, &v1types.ServerResponse{
774-
Message: "invalid Task payload: " + err.Error(),
765+
if c.Request.URL.Query().Get("ts_update") == "true" {
766+
onlyTimestampUpdate = true
767+
} else {
768+
if err := c.ShouldBindJSON(&task); err != nil {
769+
r.logger.WithError(err).Warn("unmarshal Task payload")
770+
771+
return http.StatusBadRequest, &v1types.ServerResponse{
772+
Message: "invalid Task payload: " + err.Error(),
773+
}
775774
}
776775
}
777776

@@ -829,6 +828,7 @@ func (r *Routes) taskPublish(c *gin.Context) (int, *v1types.ServerResponse) {
829828
conditionKind,
830829
&task,
831830
false,
831+
onlyTimestampUpdate,
832832
); err != nil {
833833
return http.StatusInternalServerError, &v1types.ServerResponse{
834834
Message: "Task publish error: " + err.Error(),
@@ -873,7 +873,15 @@ func (r *Routes) registerRegistryKVHandle() error {
873873
}
874874

875875
// Publish implements the ConditionTaskRepository interface, it transparently publishes the given json.RawMessage to the KV
876-
func (r *Routes) kvPublishTask(ctx context.Context, serverID, conditionID string, conditionKind rctypes.Kind, task *rctypes.Task[any, any], create bool) error {
876+
func (r *Routes) kvPublishTask(
877+
ctx context.Context,
878+
serverID,
879+
conditionID string,
880+
conditionKind rctypes.Kind,
881+
task *rctypes.Task[any, any],
882+
create,
883+
onlyTimestamp bool,
884+
) error {
877885
_, span := otel.Tracer(pkgName).Start(
878886
ctx,
879887
"controller.Publish.KV.Task",
@@ -947,24 +955,20 @@ func (r *Routes) kvPublishTask(ctx context.Context, serverID, conditionID string
947955
return failed(errors.Wrap(err, "Task object deserialize error"))
948956
}
949957

950-
// verify conditionID matches before update
951-
if currTask.ID.String() != conditionID && !rctypes.StateIsComplete(currTask.State) {
952-
msg := fmt.Errorf(
953-
"existing Task object %s in in-complete state, does not match conditionID: %s, must be purged before proceeding",
954-
currTask.ID.String(),
955-
conditionID,
956-
)
957-
958-
return failed(msg)
958+
if onlyTimestamp {
959+
currTask.UpdatedAt = time.Now()
960+
} else {
961+
if err := currTask.Update(task); err != nil {
962+
return failed(err)
963+
}
959964
}
960965

961-
task.UpdatedAt = time.Now()
962-
taskJSON, err := task.Marshal()
966+
currTaskJSON, err := currTask.Marshal()
963967
if err != nil {
964968
return failed(err)
965969
}
966970

967-
if _, err := taskKV.Update(key, taskJSON, curr.Revision()); err != nil {
971+
if _, err := taskKV.Update(key, currTaskJSON, curr.Revision()); err != nil {
968972
return failed(err)
969973
}
970974

0 commit comments

Comments
 (0)