MigrationProcessService.java

package io.vanillabp.integration.adapter.migration.processervice;

import java.util.List;
import java.util.Map;

import com.gruelbox.transactionoutbox.TransactionOutbox;

import io.vanillabp.integration.adapter.migration.config.MigrationAdapterProperties;
import io.vanillabp.intergration.adapter.spi.AggregatePersistenceAware;
import io.vanillabp.intergration.adapter.spi.MigratableProcessService;
import io.vanillabp.intergration.adapter.spi.MigratableProcessServicePhaseTwo;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class MigrationProcessService<A> {

  @Getter
  private final String workflowModuleId;

  @Getter
  private final String bpmnProcessId;

  @Getter
  private final Class<A> workflowAggregateClass;

  /**
   * Map of known adapters. The key is the adapter id, the value is the adapter type.
   */
  @Getter
  private final Map<String, String> adapters;

  /**
   * List of adapter ids sorted by priority.
   */
  @Getter
  private final List<String> prioritizedAdapters;

  private final List<MigratableProcessService<A>> adapterProcessServices;

  private final AggregatePersistenceAware<A> aggregatePersistenceSupport;

  private final TransactionOutbox transactionOutbox;

  public MigrationProcessService(
      final String workflowModuleId,
      final String bpmnProcessId,
      final Class<A> workflowAggregateClass,
      final MigrationAdapterProperties properties,
      final AggregatePersistenceAware<A> aggregatePersistenceSupport,
      final List<MigratableProcessService<A>> processServices,
      final TransactionOutbox transactionOutbox) {

    this.workflowModuleId = workflowModuleId;
    this.bpmnProcessId = bpmnProcessId;
    this.workflowAggregateClass = workflowAggregateClass;
    this.adapters = properties.getAdapters();
    this.prioritizedAdapters = properties.getPrioritizedAdaptersFor(workflowModuleId, bpmnProcessId);
    this.aggregatePersistenceSupport = aggregatePersistenceSupport;
    this.adapterProcessServices = prioritizedAdapters
        .stream()
        .flatMap(adapterId -> processServices
            .stream()
            .filter(processService -> processService.getAdapterId().equals(adapterId))
            .findFirst()
            .stream())
        .toList();
    this.transactionOutbox = transactionOutbox;

  }

  public boolean needsTransactionForStartingWorkflows() {

    return adapterProcessServices
        .getFirst()
        .needsTwoPhaseCommitForStartingWorkflows();

  }

  public A startWorkflow(
      final A workflowAggregate) {

    // persist to get ID in case of @Id @GeneratedValue
    // or force optimistic locking exceptions before running
    // the workflow if aggregate was already persisted before
    final var attachedAggregate = aggregatePersistenceSupport
        .save(workflowAggregate);

    final var aggregateId = aggregatePersistenceSupport
        .getAggregateId(attachedAggregate);

    final var adapter = adapterProcessServices
        .getFirst();

    adapter.startWorkflowPhaseOne(aggregatePersistenceSupport, workflowAggregate);

    if (adapter.needsTwoPhaseCommitForStartingWorkflows()) {
      transactionOutbox
          .with()
          .schedule(MigratableProcessServicePhaseTwo.class)
          .startWorkflowPhaseTwo(
              workflowModuleId,
              bpmnProcessId,
              adapter.getAdapterId(),
              aggregateId);
    }

    return attachedAggregate;

  }

  public void startWorkflowPhaseTwo(
      final String adapterId,
      final Object workflowAggregateId) {

    final var adapter = adapterProcessServices
        .stream()
        .filter(processService -> processService.getAdapterId().equals(adapterId))
        .findFirst()
        .orElseThrow(() -> new IllegalStateException(
            "No adapter found for ID '%s'! Maybe it was available in a previous version your software?"
                .formatted(adapterId)));

    adapter.startWorkflowPhaseTwo(workflowAggregateId);

  }

  /**
   * Connect to BPMS after bean creation
   */
  public void initialize() {

  }

}