Source: lib/helper/sync/store.js

import {
  distinctUntilChanged,
  map,
  scan,
  filter,
  startWith,
  withLatestFrom,
  tap,
  sample,
  pairwise,
  distinctUntilKeyChanged,
  take,
  switchMap,
  share, takeWhile, finalize, first
} from "rxjs/operators";
import { bind, shareLatest } from "@react-rxjs/core";
import { createSignal } from "@react-rxjs/utils";
import { arrayIsTheSame } from "@/shared/helper/general";
import { generateId } from "@/shared/helper/general/id";
import getStore from "@/shared/store/index";
import {combineLatest, merge, of, partition, interval, EMPTY, BehaviorSubject, ReplaySubject, Subject} from "rxjs";
import GeneralSyncHelper from "@/shared/helper/sync/general";
import getGenericFormHelper from "@/shared/helper/form/generic";

const StoreSyncHelperInstances = {
  meeting: new Map(),
  task: new Map(),
  persons: new Map(),
  questionnaire: new Map(),
  investigation: new Map(),
};

const DEFAULT_MAX_AMOUNT_OF_RETRIES = 3;
// By default, non-high priorities are sent every 0,25 second to the server
const DEFAULT_SERVER_COMMIT_PACE = 250; // Future todo: read the internet connection type (if available) or whether the reduce-data is set, to set the default speed.

function enrichCommit(obj) {
  if (typeof obj === "object") {
    if (typeof obj.priority !== "string") {
      Object.assign(obj, {
        priority: obj.commit?.actions?.includes("sign") ? "HIGH" : "NORMAL",
      });
    }
    if (typeof obj.id !== "string") {
      Object.assign(obj, { id: generateId() });
    }
    if (typeof obj.maxRetries !== "number") {
      Object.assign(obj, { maxRetries: DEFAULT_MAX_AMOUNT_OF_RETRIES });
    }
  }
  return obj;
}

function decideUponCommitPace(input) {
  if (typeof input === "boolean" && input) {
    return 1;
  }
  if (typeof input === "string") {
    if (input === "PRIO") {
      return 1;
    } else if (input === "INTERRUPT") {
      return 0;
    }
  }
  if (typeof input === "number") {
    return input;
  }
  return DEFAULT_SERVER_COMMIT_PACE;
}

const defaultInstanceOptions = {
  createInstance: true,
  /**
   * When an instance is created and this boolean is set, the instance subscribes to it's deviatedHeaderAndValue$ observer, retrieves changes and commits these through the commitFormUpdate function.
   * Besides making the developer live easy, it's also handy for changes after related machines and hooks are unmounted.
   */
  selfCommitChanges: true,
  /**
   * Indicate that when a store is cleaned up/deleted and pending commits are blocked based on extra commit checks,
   * the commits are dropped and the store status is set to 'in sync'.
   * @type boolean
   * @default true
   */
  enforceDeletionOfBlockedCommitsOnStoreDelete: true,
  /**
   * If blocked commits are detected, mark the store as concept. This implies that the store is visually shown as in sync, when it is not.
   * @type boolean
   * @default false
   */
  markStoreAsConceptOnBlockedCommits: false,
};
const defaultStoreSyncHelperOptions = {
  /**
   * @see defaultInstanceOptions.selfCommitChanges
   */
  selfCommitChanges: defaultInstanceOptions.selfCommitChanges,
  /**
   * @see defaultInstanceOptions.enforceDeletionOfBlockedCommitsOnStoreDelete
   */
  enforceDeletionOfBlockedCommitsOnStoreDelete: defaultInstanceOptions.enforceDeletionOfBlockedCommitsOnStoreDelete,
  /**
   * @see defaultInstanceOptions.markStoreAsConceptOnBlockedCommits
   */
  markStoreAsConceptOnBlockedCommits: defaultInstanceOptions.enforceDeletionOfBlockedCommitsOnStoreDelete,
};

const StoreSyncHelper = (type, id, opts = defaultStoreSyncHelperOptions) => {
  const options = {
    ...defaultStoreSyncHelperOptions,
    ...opts,
  };
  let storeId = id;
  const storeType = type;
  const extraCommitChecks = new Set();
  let extraCommitChecksIsInitialized = false;
  const [rawExtraCommitChecks$, updateExtraCommitChecks] = createSignal();
  const [formUpdateSyncToServer$, commitFormUpdate] = createSignal(enrichCommit);
  const [syncedHeaders$, updateSyncedHeaders] = createSignal();
  // TODO: presumably this might be better to replace with BehaviourSubject.
  const [commitStatus$, updateCommitStatus] = createSignal();
  let lastCommitStatus = { status: "COMPLETED", id: -1, retries: 0 };
  // Again: mobx might be better here
  commitStatus$.subscribe((commitSt) => {
    lastCommitStatus = commitSt;
  });
  const [serverCommitStreamPace$, changeServerCommitPace] = createSignal(decideUponCommitPace);
  const Store = getStore(storeType, storeId);
  const { storePipe$, storeHeadsPipe$, storeId$, formStateStorePipe$, isStopped$ } = Store.getStore();

  // Super careful release, so only fix for tasks until we tested a bit more
  // TODO: set this as default for all storeTypes
  const defaultExtraCommitCheck = storeType === "task" ? formStateStorePipe$.pipe(
    map((v) => !!v),
    distinctUntilChanged(),
    startWith(false),
    switchMap((v) => of(v)),
  ) : of(true);

  const extraCommitChecks$ = rawExtraCommitChecks$.pipe(
    startWith(extraCommitChecksIsInitialized ? Array.from(extraCommitChecks) : null),
    map((v) => {
      if (v === null) {
        return extraCommitChecksIsInitialized ? Array.from(extraCommitChecks) : null;
      }
      return v;
    }),
    filter((v) => Array.isArray(v)),
    switchMap((arr) => combineLatest(arr.length === 0 ? [defaultExtraCommitCheck] : [...arr])),
    shareLatest()
  );
  const renewExtraCommitChecks = () => {
    extraCommitChecksIsInitialized = true;
    // Subscription is added for cold starts, where extraCommitChecks$ has no subscribers and therefore the value is 'lost'.
    // Adding a short subscription for a second prevents that.
    // New note: subscription could be removed.
    const tempSubscription = extraCommitChecks$.subscribe();
    updateExtraCommitChecks(Array.from(extraCommitChecks));
    setTimeout(() => {
      tempSubscription.unsubscribe();
    }, 1000);
  };

  storeId$.subscribe((currentStoreId) => {
    if (storeId !== currentStoreId) {
      // Current workaround to add 'this' also as the other identifier.
      const originalStore = StoreSyncHelperInstances[storeType].get(id);
      if (originalStore) {
        StoreSyncHelperInstances[storeType].set(currentStoreId, originalStore);
      }
    }
    storeId = currentStoreId;
  });

  const latestFormUpdate$ = formUpdateSyncToServer$.pipe(
    filter((obj) => typeof obj === "object"),
    scan((prev, curr) => {
      // Not expecting to need more than 10 updates to compare.
      return [...prev.slice(-10), curr];
    }, []),
    filter((arr) => {
      if (arr.length <= 1) {
        return arr.length === 1;
      }
      // Timestamp must be higher (or the same in case of errors) than last 10 commits.
      return (
        arr[arr.length - 1].id >=
        arr
          .slice(0, -1)
          .map(({ id }) => id)
          .sort()
          .slice(-1)[0]
      );
    }),
    map((arr) => arr.slice(-1)[0]),
    shareLatest()
  );

  let lastKnownSyncedHeaders = [];
  const uniqueSyncedHeaders$ = syncedHeaders$.pipe(
    distinctUntilChanged(arrayIsTheSame),
    tap((v) => lastKnownSyncedHeaders),
    shareLatest()
  );
  let lastKnownStoreHeaders = [];
  const storeAndSyncedHeaders$ = storeHeadsPipe$.pipe(
    startWith([]), // The first value is an empty array, to ensure that distinctUntilChanged does not pass through the first synced heads from the server itself.
    tap((v) => lastKnownStoreHeaders = v),
    withLatestFrom(uniqueSyncedHeaders$)
  );
  const deviatedHeaderAndValue$ = storeAndSyncedHeaders$.pipe(
    distinctUntilChanged((_, curr) => arrayIsTheSame(curr[0], curr[1])),
    filter(([localHeads]) => localHeads.length > 0), // Stop here the first value (empty array), unless of course there is a head.
    withLatestFrom(storePipe$), // Sidenote: the store pipe must have been subscribed to
    map(([[localHeads, lastReceivedRemoteHeads], store]) => [store, localHeads, lastReceivedRemoteHeads]), // Combine the store value with the heads, so everything is in one place.
    shareLatest()
  );

  const enrichedCommitStatus$ =
    // To ensure the pipeline starts with two start values because of pairwise fn
    commitStatus$.pipe(
      startWith(null),
      map((v) => v ?? lastCommitStatus),
    ).pipe(
    // In case the mutation is called twice (by react dev mode for example), filter out the same error.
    distinctUntilChanged((prev, curr) => JSON.stringify(prev) === JSON.stringify(curr)),
    // Start with an empty value to ensure pairwise can process the first actual value
    startWith({ status: "COMPLETED", id: 0, retries: 0 }),
    // Get the last value and the current one
    pairwise(),
    // Increase the amount of retries with the commit of it was an error.
    map(([{ status: prevStatus, id: prevId, retries: prevRetries = 0 }, curr]) => {
      const { status: currStatus, id: currId } = curr;
      if (currStatus === "ERROR") {
        Object.assign(curr, { retries: prevId === currId ? prevRetries : 0 });
      }
      if (currStatus === "SUBMITTING" && prevId === currId) {
        Object.assign(curr, {
          retries: prevStatus === "ERROR" ? prevRetries + 1 : prevRetries,
        });
      }
      return curr;
    }),
    // Share the latest completed commit, so when a 'send' hook remounts the last send details are known.
    // This prevents the issue where loading a form page after a change would 'double' upload the previous commit.
    shareLatest()
  );

  /**
   * Indicate whether there are valid commits that could be sent to the server right now, but are blocked duo to commit constraints.
   * @type {BehaviorSubject<boolean>}
   */
  const commitsAreBlocked$ = new BehaviorSubject(false);
  /**
   * Same as commitsAreBlocked$ but triggers only on changes to the status
   * @type {Observable<boolean>}
   */
  const uniqueCommitsAreBlocked$ = commitsAreBlocked$.pipe(distinctUntilChanged());
  /**
   * Observer of the enforceDeletionOfBlockedCommitsOnStoreDelete variable.
   * @type {BehaviorSubject<boolean>}
   */
  const enforceCommitDeletion$ = new BehaviorSubject(options.enforceDeletionOfBlockedCommitsOnStoreDelete);
  const uniqueEnforceCommitDeletion$ = enforceCommitDeletion$.pipe(distinctUntilChanged());

  // Check if there are commits that must be sent to the server.
  const [problematicCommitsRaw$, commitsThatCanBeSentToServer$] = partition(
    combineLatest([
      latestFormUpdate$,
      GeneralSyncHelper.isUserOnline$,
      getGenericFormHelper(storeType, storeId).isFormReadOnly$,
      // TODO: Add pause support for expired token. (also provide warning when that happens)
      enrichedCommitStatus$,
      extraCommitChecks$,
    ]).pipe(
      // Is the user online?
      filter(([, isOnline]) => isOnline),
      // Is the form writeable?
      filter(([, , readOnly]) => !readOnly),
      // Is there no commit being processed by the server currently?
      filter(([, , , { status }]) => ["COMPLETED", "ERROR"].includes(status)),
      // The commit is not processed by the server already?
      filter(([{ id: commitId }, , , { status, id: processedCommitId }]) => !(["COMPLETED"].includes(status) && commitId === processedCommitId)),
      // Report to the commitsAreBlocked$ observable whether we block the commit.
      tap(([, , , , extraCommitChecksResults]) => commitsAreBlocked$.next(extraCommitChecksResults.filter((v) => !v).length !== 0)),
      // All extra commit checks must be true
      filter(([, , , , extraCommitChecksResults]) => extraCommitChecksResults.filter((v) => !v).length === 0),
    ),
    // IMPORTANT TODO: if commitId has been completed, also ignore any follow up ERROR (because that indicates an error somewhere else that must be logged to Sentry)
    // Filter out the commits where the amount of retries is too high
    ([{ id: commitId, maxRetries }, , , { status, id: errorCommitId, retries = 0 }]) =>
      ["ERROR"].includes(status) && commitId === errorCommitId && retries >= maxRetries - 1
  );

  let lastKnownInSyncBoolean;
  let thisHelperStopped = false;
  const thisHelperInternalStop$ = new Subject();
  const thisHelperExternalStop$ = new ReplaySubject(1);

  /* Important future todo for edge cases:
      Casus: Say user was offline, added changes, then triggers an extra commit check, then interrupts the process (deleting store), then goes online
      Then what? First guess: let the last commit before the extra commit check be send. What is the isStoreInSyncWithServer$ in the meanwhile?
      Presumably todo: do something with an offline queue, instead of filtering above
   */
  const isStoreInSyncWithServer$ = combineLatest([storeHeadsPipe$, uniqueSyncedHeaders$, uniqueEnforceCommitDeletion$, uniqueCommitsAreBlocked$, isStopped$]).pipe(
    takeWhile(([storeHeads, syncedHeads, isDeletionEnforced, areCommitsBlocked, isStopped]) => !((isStopped && arrayIsTheSame(storeHeads, syncedHeads)) || (isStopped && !arrayIsTheSame(storeHeads, syncedHeads) && isDeletionEnforced && areCommitsBlocked)), true),
    map(([storeHeads, syncedHeads, isDeletionEnforced, areCommitsBlocked, isStopped]) => arrayIsTheSame(storeHeads, syncedHeads) || (isDeletionEnforced && areCommitsBlocked && isStopped) || (options.markStoreAsConceptOnBlockedCommits && areCommitsBlocked)),
    distinctUntilChanged(),
    tap((v) => lastKnownInSyncBoolean = v),
    startWith(lastKnownInSyncBoolean ?? arrayIsTheSame(lastKnownStoreHeaders, lastKnownSyncedHeaders)),
    finalize(() => {
      thisHelperInternalStop$.next(true)
    }),
    share({
      connector: () => new ReplaySubject(1),
      resetOnRefCountZero: false, // Do not reset if ref count becomes zero
      resetOnComplete: false, // On complete keep the ReplaySubject (with its value)
    })
  );

  // TODO: check whether this keeps working if you add an element (the centralSyncHelper might 'forget' an error happend)
  const problematicCommits$ = problematicCommitsRaw$.pipe(
    map(([commit]) => commit),
    distinctUntilKeyChanged("id"),
    switchMap((commit) => of(commit)),
  );

  // If there are commits that must be sent to the server, the 'speed' is decided here.
  const serverCommits$ = merge(
    commitsThatCanBeSentToServer$.pipe(
      // High priorities are send to the server right away
      filter(([obj]) => obj.priority === "HIGH"),
      withLatestFrom(serverCommitStreamPace$.pipe(startWith(DEFAULT_SERVER_COMMIT_PACE), shareLatest())),
      // INTERRUPT state will just like normal priority result into commits being 'thrown' away. Be aware of this!
      filter(([_, pace]) => pace !== 0),
      map(([[obj]]) => [obj])
    ),
    commitsThatCanBeSentToServer$.pipe(
      filter(([obj]) => obj.priority !== "HIGH"),
      // Important: sample does not trigger the last value if commitsThatCanBeSentToServer$ completes!
      // So an "INTERRUPT" must only be used if data loss is acceptable.
      sample(
        serverCommitStreamPace$.pipe(
          startWith(DEFAULT_SERVER_COMMIT_PACE),
          distinctUntilChanged(),
          switchMap((val) => (val ? interval(val) : EMPTY))
        )
      )
    )
  ).pipe(
    map(([commit]) => ({
      ...commit,
      metaData: {
        // Add additional metaData that might have been inserted in the meantime.
        ...Store.metaData,
        ...commit.metaData,
      },
    }))
  );

  const returnObject = {
    get latestFormUpdate$() {
      return latestFormUpdate$;
    },
    get deviatedHeaderAndValue$() {
      return deviatedHeaderAndValue$;
    },
    get syncedHeaders$() {
      return uniqueSyncedHeaders$;
    },
    set syncedHeaders$(heads) {
      updateSyncedHeaders(heads);
    },
    get isStoreInSyncWithServer$() {
      return isStoreInSyncWithServer$;
    },
    get serverCommits$() {
      return serverCommits$;
    },
    get areServerCommitsBlocked$() {
      return uniqueCommitsAreBlocked$;
    },
    get problematicCommits$() {
      return problematicCommits$;
    },
    get thisHelperExternalStop$() {
      return thisHelperExternalStop$;
    },
    changeServerCommitPace: (paceOrBoolean) => {
      changeServerCommitPace(paceOrBoolean);
    },
    resetServerCommitPace: () => {
      changeServerCommitPace(DEFAULT_SERVER_COMMIT_PACE);
    },
    commitFormUpdate: (update) => {
      commitFormUpdate({
        ...update,
        metaData: {
          storeId,
          storeType,
          ...Store.metaData,
          ...update.metaData,
        },
      });
    },
    addExtraCommitCheck: (observers) => {
      const currentSize = extraCommitChecks.size;
      if (Array.isArray(observers)) {
        observers.forEach(extraCommitChecks.add, extraCommitChecks);
      } else if (observers) {
        extraCommitChecks.add(observers);
      }
      if (currentSize !== extraCommitChecks.size || currentSize === 0) {
        renewExtraCommitChecks();
      }
    },
    resetExtraCommitChecks: () => {
      extraCommitChecks.clear();
      renewExtraCommitChecks();
    },
    set enforceDeletionOfBlockedCommitsOnStoreDelete(value) {
      options.enforceDeletionOfBlockedCommitsOnStoreDelete = value;
      enforceCommitDeletion$.next(value);
    },
    get enforceDeletionOfBlockedCommitsOnStoreDelete() {
      return options.enforceDeletionOfBlockedCommitsOnStoreDelete;
    },
    resetEnforceDeletionOfBlockedCommitsOnStoreDelete: () => {
      options.enforceDeletionOfBlockedCommitsOnStoreDelete = defaultStoreSyncHelperOptions.enforceDeletionOfBlockedCommitsOnStoreDelete;
    },
    set markStoreAsConceptOnBlockedCommits(value) {
      options.markStoreAsConceptOnBlockedCommits = value;
    },
    get markStoreAsConceptOnBlockedCommits() {
      return options.markStoreAsConceptOnBlockedCommits;
    },
    resetMarkStoreAsConceptOnBlockedCommits: () => {
      options.markStoreAsConceptOnBlockedCommits = defaultStoreSyncHelperOptions.markStoreAsConceptOnBlockedCommits;
    },
    updateCommitStatus(commitId, status) {
      // Important note: for now only 3 statuses are accepted: SUBMITTING, ERROR, COMPLETED
      updateCommitStatus({
        status,
        id: commitId,
      });
    },
  };

  const subscriptions = {
    selfCommitChanges: undefined,
    reportInstanceToInternalStop: undefined,
  };
  if (options.selfCommitChanges && !subscriptions.selfCommitChanges) {
    subscriptions.selfCommitChanges = [latestFormUpdate$.subscribe(), returnObject.deviatedHeaderAndValue$.subscribe(([, localHeads, remoteLastKnownHeads, actions]) => {
      const changes = Store.getChanges(remoteLastKnownHeads, "array");
      if (changes.length > 0) {
        returnObject.commitFormUpdate({
          commit: { changes, heads: localHeads, actions },
        });
      }
    })];
  }
  if (!subscriptions.reportInstanceToInternalStop) {
    subscriptions.reportInstanceToInternalStop = thisHelperInternalStop$.pipe(first()).subscribe(() => {
      thisHelperStopped = true;
      thisHelperExternalStop$.next(returnObject);
    });
  }

  return {
    instance: returnObject,
    subscriptions: Object.values(subscriptions),
  };
};

/**
 *
 * @param type
 * @param id
 * @param {Object} [opts={}]
 * @param {Array} [opts.createInstance=true] Create instance if type and id combination does not return an existing instance.
 * @param {Array} [opts.extraCommitChecks] Observables
 * @param {boolean} [opts.enforceDeletionOfBlockedCommitsOnStoreDelete] If createInstance=true, default value is true.
 * @param {boolean} [opts.markStoreAsConceptOnBlockedCommits] If createInstance=true, default value is false
 * @returns Object
 */
function getStoreSyncHelper(type, id, opts = {}) {
  const options = {
    ...defaultInstanceOptions,
    ...opts,
  };
  // Presumably change Store to Factory design pattern
  let retrievedStoreSyncHelper = StoreSyncHelperInstances[type].get(id);
  if (!retrievedStoreSyncHelper && options.createInstance) {
    retrievedStoreSyncHelper = StoreSyncHelper(type, id, {
      selfCommitChanges: options.selfCommitChanges,
      enforceDeletionOfBlockedCommitsOnStoreDelete: options.enforceDeletionOfBlockedCommitsOnStoreDelete,
      markStoreAsConceptOnBlockedCommits: options.markStoreAsConceptOnBlockedCommits,
    });
    StoreSyncHelperInstances[type].set(id, retrievedStoreSyncHelper);
    // TODO: decide default behaviour
    if (!options.extraCommitChecks) {
      retrievedStoreSyncHelper.instance.addExtraCommitCheck();
    }
  }
  if (retrievedStoreSyncHelper) {
    if (opts.extraCommitChecks) {
      retrievedStoreSyncHelper.instance.addExtraCommitCheck(opts.extraCommitChecks);
    }
    if (opts.enforceDeletionOfBlockedCommitsOnStoreDelete) {
      retrievedStoreSyncHelper.instance.enforceDeletionOfBlockedCommitsOnStoreDelete = opts.enforceDeletionOfBlockedCommitsOnStoreDelete;
    }
    if (opts.markStoreAsConceptOnBlockedCommits) {
      retrievedStoreSyncHelper.instance.markStoreAsConceptOnBlockedCommits = opts.markStoreAsConceptOnBlockedCommits;
    }
  }
  return retrievedStoreSyncHelper.instance;
}

export const useIsStoreInSyncWithServer = bind((type, id) => getStoreSyncHelper(type, id).isStoreInSyncWithServer$, false)[0];
export const useAreServerCommitsBlocked = bind((type, id) => getStoreSyncHelper(type, id).areServerCommitsBlocked$, false)[0];
export const useAreThereProblematicCommits = bind(
  (type, id) =>
    getStoreSyncHelper(type, id).problematicCommits$.pipe(
      take(1), // unsubscribe after getting one problematic commit
      map(() => true)
    ),
  false
)[0];

export default getStoreSyncHelper;