Skip to content

Commit 18d6a98

Browse files
committed
feat(trigger): add support for concurrent trigger execution (#311)
Fixes: #311
1 parent 386d4a1 commit 18d6a98

File tree

5 files changed

+28
-15
lines changed

5 files changed

+28
-15
lines changed

core/src/main/java/io/kestra/core/models/triggers/AbstractTrigger.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,12 @@ abstract public class AbstractTrigger implements TriggerInterface {
8282
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
8383
private boolean failOnTriggerError = false;
8484

85+
@PluginProperty(group = PluginProperty.CORE_GROUP)
86+
@Schema(
87+
title = "Specifies whether a trigger is allowed to start a new execution even if a previous run is still in progress."
88+
)
89+
private boolean allowConcurrent = false;
90+
8591
/**
8692
* For backward compatibility: we rename minLogLevel to logLevel.
8793
* @deprecated use {@link #logLevel} instead

core/src/main/java/io/kestra/core/repositories/TriggerRepositoryInterface.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
public interface TriggerRepositoryInterface extends QueryBuilderInterface<Triggers.Fields> {
1717
Optional<Trigger> findLast(TriggerContext trigger);
1818

19-
Optional<Trigger> findByExecution(Execution execution);
20-
19+
Optional<Trigger> findByUid(String uid);
20+
2121
List<Trigger> findAll(String tenantId);
2222

2323
List<Trigger> findAllForAllTenants();

jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcTriggerRepository.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,12 @@ public AbstractJdbcTriggerRepository(io.kestra.jdbc.AbstractJdbcRepository<Trigg
7272

7373
@Override
7474
public Optional<Trigger> findLast(TriggerContext trigger) {
75-
return findOne(DSL.trueCondition(), field("key").eq(trigger.uid()));
75+
return findByUid(trigger.uid());
7676
}
7777

7878
@Override
79-
public Optional<Trigger> findByExecution(Execution execution) {
80-
return findOne(execution.getTenantId(), field("execution_id").eq(execution.getId()));
79+
public Optional<Trigger> findByUid(String uid) {
80+
return findOne(DSL.trueCondition(), field("key").eq(uid));
8181
}
8282

8383
public List<Trigger> findByNextExecutionDateReadyForAllTenants(ZonedDateTime now, ScheduleContextInterface scheduleContextInterface) {

jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import io.kestra.core.models.tasks.Task;
1414
import io.kestra.core.models.tasks.WorkerGroup;
1515
import io.kestra.core.models.topologies.FlowTopology;
16+
import io.kestra.core.models.triggers.Trigger;
1617
import io.kestra.core.models.triggers.multipleflows.MultipleCondition;
1718
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
1819
import io.kestra.core.queues.QueueException;
@@ -1143,9 +1144,7 @@ private void toExecution(Executor executor, boolean ignoreFailure) {
11431144
execution.getTrigger().getId()
11441145
);
11451146
} else {
1146-
triggerRepository
1147-
.findByExecution(execution)
1148-
.ifPresent(trigger -> this.triggerState.update(executionService.resetExecution(flow, execution, trigger)));
1147+
triggerRepository.findByUid(Trigger.uid(execution)).ifPresent(trigger -> this.triggerState.update(executionService.resetExecution(flow, execution, trigger)));
11491148
}
11501149
}
11511150

@@ -1239,11 +1238,7 @@ private void toExecution(Executor executor, boolean ignoreFailure) {
12391238
// purge the trigger: reset scheduler trigger at end
12401239
if (execution.getTrigger() != null) {
12411240
FlowWithSource flow = executor.getFlow();
1242-
triggerRepository
1243-
.findByExecution(execution)
1244-
.ifPresent(trigger -> {
1245-
this.triggerState.update(executionService.resetExecution(flow, execution, trigger));
1246-
});
1241+
triggerRepository.findByUid(Trigger.uid(execution)).ifPresent(trigger -> this.triggerState.update(executionService.resetExecution(flow, execution, trigger)));
12471242
}
12481243

12491244
// Purge the workerTaskResultQueue and the workerJobQueue

scheduler/src/main/java/io/kestra/scheduler/AbstractScheduler.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ public void run() {
288288
disableInvalidTrigger(workerTriggerResult.getTriggerContext(), e);
289289
return;
290290
}
291-
this.handleEvaluateWorkerTriggerResult(triggerExecution, nextExecutionDate);
291+
this.handleEvaluateWorkerTriggerResult(triggerExecution, nextExecutionDate, workerTriggerResult.getTrigger());
292292
} else {
293293
ZonedDateTime nextExecutionDate;
294294
try {
@@ -786,7 +786,7 @@ private List<FlowWithSource> getFlowsWithDefaults() {
786786
}
787787

788788
private void handleEvaluateWorkerTriggerResult(SchedulerExecutionWithTrigger result, ZonedDateTime
789-
nextExecutionDate) {
789+
nextExecutionDate, AbstractTrigger abstractTrigger) {
790790
Optional.ofNullable(result)
791791
.ifPresent(executionWithTrigger -> {
792792
log(executionWithTrigger);
@@ -797,6 +797,12 @@ private void handleEvaluateWorkerTriggerResult(SchedulerExecutionWithTrigger res
797797
nextExecutionDate
798798
);
799799

800+
// if the trigger is allowed to run concurrently we do not attached the executio-id to the trigger state
801+
// i.e., the trigger will not be locked
802+
if (abstractTrigger.isAllowConcurrent()) {
803+
trigger = trigger.toBuilder().executionId(null).build();
804+
}
805+
800806
// Worker triggers result is evaluated in another thread with the workerTriggerResultQueue.
801807
// We can then update the trigger directly.
802808
this.saveLastTriggerAndEmitExecution(executionWithTrigger.getExecution(), trigger, triggerToSave -> this.triggerState.update(triggerToSave));
@@ -818,6 +824,12 @@ private void handleEvaluateSchedulingTriggerResult(Schedulable schedule, Schedul
818824
if (result.getExecution().getState().getCurrent() == State.Type.FAILED) {
819825
trigger = trigger.resetExecution(State.Type.FAILED);
820826
}
827+
828+
// if the trigger is allowed to run concurrently we do not attached the executio-id to the trigger state
829+
// i.e., the trigger will not be locked
830+
if (((AbstractTrigger)schedule).isAllowConcurrent()) {
831+
trigger = trigger.toBuilder().executionId(null).build();
832+
}
821833

822834
// Schedule triggers are being executed directly from the handle method within the context where triggers are locked.
823835
// So we must save them by passing the scheduleContext.

0 commit comments

Comments
 (0)