Backend/e-suite.Service.WorkflowProcessor/WorkflowProcessor.cs

157 lines
6.1 KiB
C#

using e_suite.API.Common.repository;
using e_suite.Database.Audit;
using e_suite.Database.Core.Tables.Activity;
using e_suite.Messaging.Common;
using e_suite.Workflow.Core;
using e_suite.Workflow.Core.Extensions;
using eSuite.Core.Clock;
using eSuite.Core.Enums;
using eSuite.Core.Miscellaneous;
using Microsoft.Extensions.Logging;
namespace e_suite.Service.WorkflowProcessor;
public class WorkflowProcessor : IWorkflowProcessor
{
private readonly ILogger _logger;
private readonly IClock _clock;
private readonly IWorkflowTemplateRepository _workflowTemplateRepository;
private readonly IWorkflowConverter _workflowConverter;
private readonly IActivityRepository _activityRepository;
private readonly IActivityMessageSender _activityMessageSender;
public WorkflowProcessor(ILogger logger, IClock clock, IWorkflowTemplateRepository workflowTemplateRepository, IActivityRepository activityRepository, IWorkflowConverter workflowConverter, IActivityMessageSender activityMessageSender)
{
_logger = logger;
_clock = clock;
_workflowTemplateRepository = workflowTemplateRepository;
_activityRepository = activityRepository;
_workflowConverter = workflowConverter;
_activityMessageSender = activityMessageSender;
}
public async Task ProgressActivity(
AuditUserDetails auditUserDetails,
GeneralIdRef activityId,
CancellationToken cancellationToken
)
{
_logger.LogInformation("{DateTime}: Progressing Activity {messageId}", _clock.GetNow, activityId);
var activityInstance = await _activityRepository.GetActivityInstanceAsync(activityId, cancellationToken);
if (activityInstance == null)
{
_logger.LogInformation("{DateTime}: Unable to find activity {messageId}", _clock.GetNow, activityId);
throw new InvalidDataException("activityInstance not found");
}
if (activityInstance.ActivityState == ActivityState.Cancelled)
{
_logger.LogInformation("{DateTime}: Activity {messageId} is cancelled, skipping", _clock.GetNow,
activityId);
return;
}
if (activityInstance.ActivityState == ActivityState.Completed)
{
_logger.LogInformation("{DateTime}: Activity {messageId} is already completed, skipping", _clock.GetNow,
activityId);
return;
}
var workflowVersion = _workflowConverter.DeserialiseFromDatabase(activityInstance.WorkflowVersion);
var hasCompletableTask = false;
await _activityRepository.TransactionAsync(async () =>
{
switch (activityInstance.ActivityState)
{
case ActivityState.Pending:
{
hasCompletableTask = await StartActivity(auditUserDetails, cancellationToken, activityInstance,
workflowVersion);
break;
}
case ActivityState.Active:
throw new NotImplementedException("don't know how to progress a running instance.");
case ActivityState.ReadyToComplete:
throw new NotImplementedException("don't know how to progress a ReadyToComplete instance.");
}
});
if (hasCompletableTask)
{
_activityMessageSender.ProgressActivity(activityId);
}
}
private async Task<bool> StartActivity(
AuditUserDetails auditUserDetails,
CancellationToken cancellationToken,
Activity? activityInstance,
WorkflowVersion workflowVersion
)
{
bool hasCompletableTask = false;
activityInstance.ActivityState = ActivityState.Active;
await _activityRepository.UpdateActivityInstanceAsync(auditUserDetails, activityInstance,
cancellationToken);
var tasks = await PlanTaskExecution(auditUserDetails, activityInstance, workflowVersion, cancellationToken);
await StartInitialTasks(auditUserDetails, tasks, workflowVersion, cancellationToken);
if (tasks.Any(task => task.ActivityState == ActivityState.ReadyToComplete))
{
hasCompletableTask = true;
}
return hasCompletableTask;
}
private async Task StartInitialTasks(AuditUserDetails auditUserDetails, ICollection<ActivityTask> tasks, WorkflowVersion workflowVersion, CancellationToken cancellationToken)
{
var startTasks = workflowVersion.GetStartTasks().ToList();
var startTaskGuids = startTasks.Select(t => t.Guid).ToHashSet();
foreach (var task in tasks)
{
if (startTaskGuids.Contains(task.TaskGuid))
{
var taskDefinition = workflowVersion.FindTask(task.TaskGuid)!;
await taskDefinition.StartTask(task, _clock);
}
}
await _activityRepository.UpdateActivityTasksAsync(auditUserDetails, tasks, cancellationToken);
}
private async Task<ICollection<ActivityTask>> PlanTaskExecution(AuditUserDetails auditUserDetails, Activity activityInstance, WorkflowVersion workflowVersion, CancellationToken cancellationToken)
{
var activityOrdinalCounter = 1;
//Create task instances for the activity
var activityTasks = new List<ActivityTask>();
foreach (var task in workflowVersion.Tasks)
{
var activityTask = new ActivityTask
{
Guid = Guid.NewGuid(),
Activity = activityInstance,
ActivityState = ActivityState.Pending,
TaskGuid = task.Guid,
TaskType = task.GetType().FullName!,
TaskName = task.Name,
ActivityOrdinal = activityOrdinalCounter++,
};
activityTasks.Add(activityTask);
}
await _activityRepository.AddActivityTasksAsync(auditUserDetails, activityTasks, cancellationToken);
return activityTasks;
}
}