import {
distinctUntilChanged,
map,
scan,
filter,
startWith,
withLatestFrom,
tap,
sample,
pairwise,
distinctUntilKeyChanged,
take,
switchMap,
share, takeWhile, finalize, first
} from "rxjs/operators";
import { bind, shareLatest } from "@react-rxjs/core";
import { createSignal } from "@react-rxjs/utils";
import { arrayIsTheSame } from "@/shared/helper/general";
import { generateId } from "@/shared/helper/general/id";
import getStore from "@/shared/store/index";
import {combineLatest, merge, of, partition, interval, EMPTY, BehaviorSubject, ReplaySubject, Subject} from "rxjs";
import GeneralSyncHelper from "@/shared/helper/sync/general";
import getGenericFormHelper from "@/shared/helper/form/generic";
const StoreSyncHelperInstances = {
meeting: new Map(),
task: new Map(),
persons: new Map(),
questionnaire: new Map(),
investigation: new Map(),
};
const DEFAULT_MAX_AMOUNT_OF_RETRIES = 3;
// By default, non-high priorities are sent every 0,25 second to the server
const DEFAULT_SERVER_COMMIT_PACE = 250; // Future todo: read the internet connection type (if available) or whether the reduce-data is set, to set the default speed.
function enrichCommit(obj) {
if (typeof obj === "object") {
if (typeof obj.priority !== "string") {
Object.assign(obj, {
priority: obj.commit?.actions?.includes("sign") ? "HIGH" : "NORMAL",
});
}
if (typeof obj.id !== "string") {
Object.assign(obj, { id: generateId() });
}
if (typeof obj.maxRetries !== "number") {
Object.assign(obj, { maxRetries: DEFAULT_MAX_AMOUNT_OF_RETRIES });
}
}
return obj;
}
function decideUponCommitPace(input) {
if (typeof input === "boolean" && input) {
return 1;
}
if (typeof input === "string") {
if (input === "PRIO") {
return 1;
} else if (input === "INTERRUPT") {
return 0;
}
}
if (typeof input === "number") {
return input;
}
return DEFAULT_SERVER_COMMIT_PACE;
}
const defaultInstanceOptions = {
createInstance: true,
/**
* When an instance is created and this boolean is set, the instance subscribes to it's deviatedHeaderAndValue$ observer, retrieves changes and commits these through the commitFormUpdate function.
* Besides making the developer live easy, it's also handy for changes after related machines and hooks are unmounted.
*/
selfCommitChanges: true,
/**
* Indicate that when a store is cleaned up/deleted and pending commits are blocked based on extra commit checks,
* the commits are dropped and the store status is set to 'in sync'.
* @type boolean
* @default true
*/
enforceDeletionOfBlockedCommitsOnStoreDelete: true,
/**
* If blocked commits are detected, mark the store as concept. This implies that the store is visually shown as in sync, when it is not.
* @type boolean
* @default false
*/
markStoreAsConceptOnBlockedCommits: false,
};
const defaultStoreSyncHelperOptions = {
/**
* @see defaultInstanceOptions.selfCommitChanges
*/
selfCommitChanges: defaultInstanceOptions.selfCommitChanges,
/**
* @see defaultInstanceOptions.enforceDeletionOfBlockedCommitsOnStoreDelete
*/
enforceDeletionOfBlockedCommitsOnStoreDelete: defaultInstanceOptions.enforceDeletionOfBlockedCommitsOnStoreDelete,
/**
* @see defaultInstanceOptions.markStoreAsConceptOnBlockedCommits
*/
markStoreAsConceptOnBlockedCommits: defaultInstanceOptions.enforceDeletionOfBlockedCommitsOnStoreDelete,
};
const StoreSyncHelper = (type, id, opts = defaultStoreSyncHelperOptions) => {
const options = {
...defaultStoreSyncHelperOptions,
...opts,
};
let storeId = id;
const storeType = type;
const extraCommitChecks = new Set();
let extraCommitChecksIsInitialized = false;
const [rawExtraCommitChecks$, updateExtraCommitChecks] = createSignal();
const [formUpdateSyncToServer$, commitFormUpdate] = createSignal(enrichCommit);
const [syncedHeaders$, updateSyncedHeaders] = createSignal();
// TODO: presumably this might be better to replace with BehaviourSubject.
const [commitStatus$, updateCommitStatus] = createSignal();
let lastCommitStatus = { status: "COMPLETED", id: -1, retries: 0 };
// Again: mobx might be better here
commitStatus$.subscribe((commitSt) => {
lastCommitStatus = commitSt;
});
const [serverCommitStreamPace$, changeServerCommitPace] = createSignal(decideUponCommitPace);
const Store = getStore(storeType, storeId);
const { storePipe$, storeHeadsPipe$, storeId$, formStateStorePipe$, isStopped$ } = Store.getStore();
// Super careful release, so only fix for tasks until we tested a bit more
// TODO: set this as default for all storeTypes
const defaultExtraCommitCheck = storeType === "task" ? formStateStorePipe$.pipe(
map((v) => !!v),
distinctUntilChanged(),
startWith(false),
switchMap((v) => of(v)),
) : of(true);
const extraCommitChecks$ = rawExtraCommitChecks$.pipe(
startWith(extraCommitChecksIsInitialized ? Array.from(extraCommitChecks) : null),
map((v) => {
if (v === null) {
return extraCommitChecksIsInitialized ? Array.from(extraCommitChecks) : null;
}
return v;
}),
filter((v) => Array.isArray(v)),
switchMap((arr) => combineLatest(arr.length === 0 ? [defaultExtraCommitCheck] : [...arr])),
shareLatest()
);
const renewExtraCommitChecks = () => {
extraCommitChecksIsInitialized = true;
// Subscription is added for cold starts, where extraCommitChecks$ has no subscribers and therefore the value is 'lost'.
// Adding a short subscription for a second prevents that.
// New note: subscription could be removed.
const tempSubscription = extraCommitChecks$.subscribe();
updateExtraCommitChecks(Array.from(extraCommitChecks));
setTimeout(() => {
tempSubscription.unsubscribe();
}, 1000);
};
storeId$.subscribe((currentStoreId) => {
if (storeId !== currentStoreId) {
// Current workaround to add 'this' also as the other identifier.
const originalStore = StoreSyncHelperInstances[storeType].get(id);
if (originalStore) {
StoreSyncHelperInstances[storeType].set(currentStoreId, originalStore);
}
}
storeId = currentStoreId;
});
const latestFormUpdate$ = formUpdateSyncToServer$.pipe(
filter((obj) => typeof obj === "object"),
scan((prev, curr) => {
// Not expecting to need more than 10 updates to compare.
return [...prev.slice(-10), curr];
}, []),
filter((arr) => {
if (arr.length <= 1) {
return arr.length === 1;
}
// Timestamp must be higher (or the same in case of errors) than last 10 commits.
return (
arr[arr.length - 1].id >=
arr
.slice(0, -1)
.map(({ id }) => id)
.sort()
.slice(-1)[0]
);
}),
map((arr) => arr.slice(-1)[0]),
shareLatest()
);
let lastKnownSyncedHeaders = [];
const uniqueSyncedHeaders$ = syncedHeaders$.pipe(
distinctUntilChanged(arrayIsTheSame),
tap((v) => lastKnownSyncedHeaders),
shareLatest()
);
let lastKnownStoreHeaders = [];
const storeAndSyncedHeaders$ = storeHeadsPipe$.pipe(
startWith([]), // The first value is an empty array, to ensure that distinctUntilChanged does not pass through the first synced heads from the server itself.
tap((v) => lastKnownStoreHeaders = v),
withLatestFrom(uniqueSyncedHeaders$)
);
const deviatedHeaderAndValue$ = storeAndSyncedHeaders$.pipe(
distinctUntilChanged((_, curr) => arrayIsTheSame(curr[0], curr[1])),
filter(([localHeads]) => localHeads.length > 0), // Stop here the first value (empty array), unless of course there is a head.
withLatestFrom(storePipe$), // Sidenote: the store pipe must have been subscribed to
map(([[localHeads, lastReceivedRemoteHeads], store]) => [store, localHeads, lastReceivedRemoteHeads]), // Combine the store value with the heads, so everything is in one place.
shareLatest()
);
const enrichedCommitStatus$ =
// To ensure the pipeline starts with two start values because of pairwise fn
commitStatus$.pipe(
startWith(null),
map((v) => v ?? lastCommitStatus),
).pipe(
// In case the mutation is called twice (by react dev mode for example), filter out the same error.
distinctUntilChanged((prev, curr) => JSON.stringify(prev) === JSON.stringify(curr)),
// Start with an empty value to ensure pairwise can process the first actual value
startWith({ status: "COMPLETED", id: 0, retries: 0 }),
// Get the last value and the current one
pairwise(),
// Increase the amount of retries with the commit of it was an error.
map(([{ status: prevStatus, id: prevId, retries: prevRetries = 0 }, curr]) => {
const { status: currStatus, id: currId } = curr;
if (currStatus === "ERROR") {
Object.assign(curr, { retries: prevId === currId ? prevRetries : 0 });
}
if (currStatus === "SUBMITTING" && prevId === currId) {
Object.assign(curr, {
retries: prevStatus === "ERROR" ? prevRetries + 1 : prevRetries,
});
}
return curr;
}),
// Share the latest completed commit, so when a 'send' hook remounts the last send details are known.
// This prevents the issue where loading a form page after a change would 'double' upload the previous commit.
shareLatest()
);
/**
* Indicate whether there are valid commits that could be sent to the server right now, but are blocked duo to commit constraints.
* @type {BehaviorSubject<boolean>}
*/
const commitsAreBlocked$ = new BehaviorSubject(false);
/**
* Same as commitsAreBlocked$ but triggers only on changes to the status
* @type {Observable<boolean>}
*/
const uniqueCommitsAreBlocked$ = commitsAreBlocked$.pipe(distinctUntilChanged());
/**
* Observer of the enforceDeletionOfBlockedCommitsOnStoreDelete variable.
* @type {BehaviorSubject<boolean>}
*/
const enforceCommitDeletion$ = new BehaviorSubject(options.enforceDeletionOfBlockedCommitsOnStoreDelete);
const uniqueEnforceCommitDeletion$ = enforceCommitDeletion$.pipe(distinctUntilChanged());
// Check if there are commits that must be sent to the server.
const [problematicCommitsRaw$, commitsThatCanBeSentToServer$] = partition(
combineLatest([
latestFormUpdate$,
GeneralSyncHelper.isUserOnline$,
getGenericFormHelper(storeType, storeId).isFormReadOnly$,
// TODO: Add pause support for expired token. (also provide warning when that happens)
enrichedCommitStatus$,
extraCommitChecks$,
]).pipe(
// Is the user online?
filter(([, isOnline]) => isOnline),
// Is the form writeable?
filter(([, , readOnly]) => !readOnly),
// Is there no commit being processed by the server currently?
filter(([, , , { status }]) => ["COMPLETED", "ERROR"].includes(status)),
// The commit is not processed by the server already?
filter(([{ id: commitId }, , , { status, id: processedCommitId }]) => !(["COMPLETED"].includes(status) && commitId === processedCommitId)),
// Report to the commitsAreBlocked$ observable whether we block the commit.
tap(([, , , , extraCommitChecksResults]) => commitsAreBlocked$.next(extraCommitChecksResults.filter((v) => !v).length !== 0)),
// All extra commit checks must be true
filter(([, , , , extraCommitChecksResults]) => extraCommitChecksResults.filter((v) => !v).length === 0),
),
// IMPORTANT TODO: if commitId has been completed, also ignore any follow up ERROR (because that indicates an error somewhere else that must be logged to Sentry)
// Filter out the commits where the amount of retries is too high
([{ id: commitId, maxRetries }, , , { status, id: errorCommitId, retries = 0 }]) =>
["ERROR"].includes(status) && commitId === errorCommitId && retries >= maxRetries - 1
);
let lastKnownInSyncBoolean;
let thisHelperStopped = false;
const thisHelperInternalStop$ = new Subject();
const thisHelperExternalStop$ = new ReplaySubject(1);
/* Important future todo for edge cases:
Casus: Say user was offline, added changes, then triggers an extra commit check, then interrupts the process (deleting store), then goes online
Then what? First guess: let the last commit before the extra commit check be send. What is the isStoreInSyncWithServer$ in the meanwhile?
Presumably todo: do something with an offline queue, instead of filtering above
*/
const isStoreInSyncWithServer$ = combineLatest([storeHeadsPipe$, uniqueSyncedHeaders$, uniqueEnforceCommitDeletion$, uniqueCommitsAreBlocked$, isStopped$]).pipe(
takeWhile(([storeHeads, syncedHeads, isDeletionEnforced, areCommitsBlocked, isStopped]) => !((isStopped && arrayIsTheSame(storeHeads, syncedHeads)) || (isStopped && !arrayIsTheSame(storeHeads, syncedHeads) && isDeletionEnforced && areCommitsBlocked)), true),
map(([storeHeads, syncedHeads, isDeletionEnforced, areCommitsBlocked, isStopped]) => arrayIsTheSame(storeHeads, syncedHeads) || (isDeletionEnforced && areCommitsBlocked && isStopped) || (options.markStoreAsConceptOnBlockedCommits && areCommitsBlocked)),
distinctUntilChanged(),
tap((v) => lastKnownInSyncBoolean = v),
startWith(lastKnownInSyncBoolean ?? arrayIsTheSame(lastKnownStoreHeaders, lastKnownSyncedHeaders)),
finalize(() => {
thisHelperInternalStop$.next(true)
}),
share({
connector: () => new ReplaySubject(1),
resetOnRefCountZero: false, // Do not reset if ref count becomes zero
resetOnComplete: false, // On complete keep the ReplaySubject (with its value)
})
);
// TODO: check whether this keeps working if you add an element (the centralSyncHelper might 'forget' an error happend)
const problematicCommits$ = problematicCommitsRaw$.pipe(
map(([commit]) => commit),
distinctUntilKeyChanged("id"),
switchMap((commit) => of(commit)),
);
// If there are commits that must be sent to the server, the 'speed' is decided here.
const serverCommits$ = merge(
commitsThatCanBeSentToServer$.pipe(
// High priorities are send to the server right away
filter(([obj]) => obj.priority === "HIGH"),
withLatestFrom(serverCommitStreamPace$.pipe(startWith(DEFAULT_SERVER_COMMIT_PACE), shareLatest())),
// INTERRUPT state will just like normal priority result into commits being 'thrown' away. Be aware of this!
filter(([_, pace]) => pace !== 0),
map(([[obj]]) => [obj])
),
commitsThatCanBeSentToServer$.pipe(
filter(([obj]) => obj.priority !== "HIGH"),
// Important: sample does not trigger the last value if commitsThatCanBeSentToServer$ completes!
// So an "INTERRUPT" must only be used if data loss is acceptable.
sample(
serverCommitStreamPace$.pipe(
startWith(DEFAULT_SERVER_COMMIT_PACE),
distinctUntilChanged(),
switchMap((val) => (val ? interval(val) : EMPTY))
)
)
)
).pipe(
map(([commit]) => ({
...commit,
metaData: {
// Add additional metaData that might have been inserted in the meantime.
...Store.metaData,
...commit.metaData,
},
}))
);
const returnObject = {
get latestFormUpdate$() {
return latestFormUpdate$;
},
get deviatedHeaderAndValue$() {
return deviatedHeaderAndValue$;
},
get syncedHeaders$() {
return uniqueSyncedHeaders$;
},
set syncedHeaders$(heads) {
updateSyncedHeaders(heads);
},
get isStoreInSyncWithServer$() {
return isStoreInSyncWithServer$;
},
get serverCommits$() {
return serverCommits$;
},
get areServerCommitsBlocked$() {
return uniqueCommitsAreBlocked$;
},
get problematicCommits$() {
return problematicCommits$;
},
get thisHelperExternalStop$() {
return thisHelperExternalStop$;
},
changeServerCommitPace: (paceOrBoolean) => {
changeServerCommitPace(paceOrBoolean);
},
resetServerCommitPace: () => {
changeServerCommitPace(DEFAULT_SERVER_COMMIT_PACE);
},
commitFormUpdate: (update) => {
commitFormUpdate({
...update,
metaData: {
storeId,
storeType,
...Store.metaData,
...update.metaData,
},
});
},
addExtraCommitCheck: (observers) => {
const currentSize = extraCommitChecks.size;
if (Array.isArray(observers)) {
observers.forEach(extraCommitChecks.add, extraCommitChecks);
} else if (observers) {
extraCommitChecks.add(observers);
}
if (currentSize !== extraCommitChecks.size || currentSize === 0) {
renewExtraCommitChecks();
}
},
resetExtraCommitChecks: () => {
extraCommitChecks.clear();
renewExtraCommitChecks();
},
set enforceDeletionOfBlockedCommitsOnStoreDelete(value) {
options.enforceDeletionOfBlockedCommitsOnStoreDelete = value;
enforceCommitDeletion$.next(value);
},
get enforceDeletionOfBlockedCommitsOnStoreDelete() {
return options.enforceDeletionOfBlockedCommitsOnStoreDelete;
},
resetEnforceDeletionOfBlockedCommitsOnStoreDelete: () => {
options.enforceDeletionOfBlockedCommitsOnStoreDelete = defaultStoreSyncHelperOptions.enforceDeletionOfBlockedCommitsOnStoreDelete;
},
set markStoreAsConceptOnBlockedCommits(value) {
options.markStoreAsConceptOnBlockedCommits = value;
},
get markStoreAsConceptOnBlockedCommits() {
return options.markStoreAsConceptOnBlockedCommits;
},
resetMarkStoreAsConceptOnBlockedCommits: () => {
options.markStoreAsConceptOnBlockedCommits = defaultStoreSyncHelperOptions.markStoreAsConceptOnBlockedCommits;
},
updateCommitStatus(commitId, status) {
// Important note: for now only 3 statuses are accepted: SUBMITTING, ERROR, COMPLETED
updateCommitStatus({
status,
id: commitId,
});
},
};
const subscriptions = {
selfCommitChanges: undefined,
reportInstanceToInternalStop: undefined,
};
if (options.selfCommitChanges && !subscriptions.selfCommitChanges) {
subscriptions.selfCommitChanges = [latestFormUpdate$.subscribe(), returnObject.deviatedHeaderAndValue$.subscribe(([, localHeads, remoteLastKnownHeads, actions]) => {
const changes = Store.getChanges(remoteLastKnownHeads, "array");
if (changes.length > 0) {
returnObject.commitFormUpdate({
commit: { changes, heads: localHeads, actions },
});
}
})];
}
if (!subscriptions.reportInstanceToInternalStop) {
subscriptions.reportInstanceToInternalStop = thisHelperInternalStop$.pipe(first()).subscribe(() => {
thisHelperStopped = true;
thisHelperExternalStop$.next(returnObject);
});
}
return {
instance: returnObject,
subscriptions: Object.values(subscriptions),
};
};
/**
*
* @param type
* @param id
* @param {Object} [opts={}]
* @param {Array} [opts.createInstance=true] Create instance if type and id combination does not return an existing instance.
* @param {Array} [opts.extraCommitChecks] Observables
* @param {boolean} [opts.enforceDeletionOfBlockedCommitsOnStoreDelete] If createInstance=true, default value is true.
* @param {boolean} [opts.markStoreAsConceptOnBlockedCommits] If createInstance=true, default value is false
* @returns Object
*/
function getStoreSyncHelper(type, id, opts = {}) {
const options = {
...defaultInstanceOptions,
...opts,
};
// Presumably change Store to Factory design pattern
let retrievedStoreSyncHelper = StoreSyncHelperInstances[type].get(id);
if (!retrievedStoreSyncHelper && options.createInstance) {
retrievedStoreSyncHelper = StoreSyncHelper(type, id, {
selfCommitChanges: options.selfCommitChanges,
enforceDeletionOfBlockedCommitsOnStoreDelete: options.enforceDeletionOfBlockedCommitsOnStoreDelete,
markStoreAsConceptOnBlockedCommits: options.markStoreAsConceptOnBlockedCommits,
});
StoreSyncHelperInstances[type].set(id, retrievedStoreSyncHelper);
// TODO: decide default behaviour
if (!options.extraCommitChecks) {
retrievedStoreSyncHelper.instance.addExtraCommitCheck();
}
}
if (retrievedStoreSyncHelper) {
if (opts.extraCommitChecks) {
retrievedStoreSyncHelper.instance.addExtraCommitCheck(opts.extraCommitChecks);
}
if (opts.enforceDeletionOfBlockedCommitsOnStoreDelete) {
retrievedStoreSyncHelper.instance.enforceDeletionOfBlockedCommitsOnStoreDelete = opts.enforceDeletionOfBlockedCommitsOnStoreDelete;
}
if (opts.markStoreAsConceptOnBlockedCommits) {
retrievedStoreSyncHelper.instance.markStoreAsConceptOnBlockedCommits = opts.markStoreAsConceptOnBlockedCommits;
}
}
return retrievedStoreSyncHelper.instance;
}
export const useIsStoreInSyncWithServer = bind((type, id) => getStoreSyncHelper(type, id).isStoreInSyncWithServer$, false)[0];
export const useAreServerCommitsBlocked = bind((type, id) => getStoreSyncHelper(type, id).areServerCommitsBlocked$, false)[0];
export const useAreThereProblematicCommits = bind(
(type, id) =>
getStoreSyncHelper(type, id).problematicCommits$.pipe(
take(1), // unsubscribe after getting one problematic commit
map(() => true)
),
false
)[0];
export default getStoreSyncHelper;