import Automerge from "@/import/automerge";
import * as Sentry from "@sentry/nextjs";
import { createSignal } from "@react-rxjs/utils";
import { bind, shareLatest } from "@react-rxjs/core";
import { applyPatchOnObject, createJsonPathScope, generateCombinedPatchSet, getValueAtExactPath } from "@ogd-software/json-utils";
import { buffer, debounceTime, distinctUntilChanged, filter, map, startWith, tap, scan, switchMap, first, find, takeUntil } from "rxjs/operators";
import { arrayIsTheSame } from "@/shared/helper/general";
import {dateToSeconds, inputToDateTime} from "@/shared/helper/general/dateConverter";
import {BehaviorSubject, of} from "rxjs";
const storeInstances = {
meeting: new Map(),
task: new Map(),
persons: new Map(),
questionnaire: new Map(),
investigation: new Map(),
};
const processNewMetaData = ({ value = {}, options = {} } = {}) => {
const returnObj = {};
if (typeof value?.ttl !== "undefined" || options?.setTtl) {
// For 'safety' sake, we add a +5 minutes time on the ttl, before it's send to the server. This way an undo operation could take place.
returnObj.ttl = typeof value?.ttl !== "undefined" ? value.ttl : dateToSeconds(inputToDateTime("now").plus({ minute: 5 }));
}
const allowedSimpleKeys = ["category", "municipalityCode", "coc", "storeId", "storeType", "originatesFrom"];
const allowedValues = Object.keys(value || {}).filter((key) => allowedSimpleKeys.includes(key));
allowedValues.forEach((key) => {
returnObj[key] = value[key];
});
return returnObj;
};
/**
* Store returning a CRDT document by using automerge. Singleton pattern is used, as automerge does not support multiple pointers to an object.
*/
const Store = (type, id, { isClone = false } = {}) => {
let storeId = id;
const storeType = type;
const metaData = {};
const internalSubscriptions = [];
let storeSnapshot = null;
// If this subject submits a value, all store listeners will complete.
// See https://stackoverflow.com/a/67802732
const isStopped$ = new BehaviorSubject(false);
const stop$ = isStopped$.pipe(filter((v) => v), first());
// Synchronized queue to ensure store updates are in order, while the command insertion is async.
const [storeUpdateQueue, queueStoreUpdate] = createSignal();
// Variable being used to queue store updated, so they happen in order
let storeUpdate = Promise.resolve();
storeUpdateQueue.subscribe(([resolver, callback]) => storeUpdate = storeUpdate.then(async (storeWasSignaled) => {
const result = await callback(storeWasSignaled);
return resolver(result);
}))
// A JS copy of the store, including its heads, send to any listener.
// Note: it's a copy because the current automerge version does not allow to have multiple object pointers to a store.
// This also prevents unmanaged store updates.
const [unguardedStore$, updateStore] = createSignal();
// Set a signal for the store id
const [storeId$, updateStoreId] = createSignal();
/**
* Update the JSON copy of the store if changes occurred.
*/
async function signalAndUpdateStorePipeline(forceStoreUpdateSignal = false) {
const newStoreHeads = Automerge.getHeads(store);
if (forceStoreUpdateSignal || !arrayIsTheSame(storeHeads, newStoreHeads)) {
storeHeads = newStoreHeads;
const justTheStore = Automerge.toJS(store);
storeSnapshot = justTheStore;
let waitForIt = Promise.resolve();
// TODO: set an option to manually override this behaviour (do not do anything if isClone), to prevent issues with storeId in the future.
// Setting storeId within the metaDataStore is simply a bad idea. (because you lose control if you want to steer the data to the server)
// Example: persons store is searched and updated through coc and municipalityCode, it has its own id, but that one is not used for updates. Putting that into metaData would not be wise (regression bug)
if (justTheStore?.id && storeId !== justTheStore.id && !isClone) {
waitForIt = new Promise((resolve) => {
storeId$.pipe(find((v) => v === justTheStore.id)).subscribe((value) => void resolve(value));
updateStoreId(justTheStore.id);
});
}
await waitForIt;
await new Promise((resolve) => {
unguardedStore$.pipe(first()).subscribe(() => void resolve());
updateStore([justTheStore, newStoreHeads]);
});
// Now we are sure that the store value is at least being propagated. We do not know if everyone received the new value yet, but the async pipe has started.
initialStoreIsLoaded = true;
}
return true;
}
// A function to send updates to the store.
const [receivedUpdate, sendUpdateToStore] = createSignal();
const [newStoreMetaData$, setStoreMetaData] = createSignal(processNewMetaData);
// Change the 'unguarded' store into a pipe that completes when a delete signal is given.
const store$ = unguardedStore$.pipe(
takeUntil(stop$),
);
const storeMetaData$ = newStoreMetaData$.pipe(
scan((prev, curr) => {
return {
...prev,
...curr,
};
}, {}),
distinctUntilChanged((obj1, obj2) => JSON.stringify(obj1) === JSON.stringify(obj2)),
tap((meta) => {
Object.assign(metaData, meta);
}),
takeUntil(stop$),
shareLatest()
);
const currentStoreId$ = storeId$.pipe(startWith(storeId), distinctUntilChanged(), shareLatest());
internalSubscriptions.push(
currentStoreId$.subscribe((currentStoreId) => {
if (storeId !== currentStoreId) {
// Current workaround to add 'this' also ass the other identifier.
const originalStore = storeInstances[storeType].get(id);
if (originalStore) {
storeInstances[storeType].set(currentStoreId, originalStore);
}
}
storeId = currentStoreId;
})
);
// The actual store
let store;
// Simple boolean to indicate that the store has been loaded, so instead the merge command must be used.
// Also being used to enable a store pipeline trigger when the predicted client side state is exactly the same as server side (case: questionnaire defaults) while no initial pipeline was fired
let initialStoreIsLoaded = false;
// Keeping track of which store heads there have been, to prevent unneeded observable triggers.
let storeHeads = [];
// The pipe using the shareLatest function to ensure only one value (the latest) receives all observers.
const storePipe$ = store$.pipe(
map(([storeJS]) => storeJS),
shareLatest()
);
// Create a pipe that only contains items that must be pasted into the element machines.
const formStateStorePipe$ = storePipe$.pipe(map((storeValue) => storeValue?.formState));
// Separate pipe to store the current heads of the store.
const storeHeadsPipe$ = store$.pipe(
map(([, heads]) => heads),
shareLatest()
);
// Workaround against re-running the same default patch over and over.
const defaultsCache = [];
// An active observer to ensure the latest store value is always available
let activeStoreObserver;
internalSubscriptions.push(
receivedUpdate
.pipe(
map((arr) => (Array.isArray(arr) ? arr.flat(Infinity) : [arr])),
map((arr) => arr.filter((obj) => ["DESTROY_STORE", "SET_TTL"].includes(obj.type))),
filter((arr) => arr.length > 0)
)
.subscribe(async (arr) => {
await Promise.all(
arr.map(
async (obj) =>
void setStoreMetaData({
value: { ttl: obj?.value },
options: { setTtl: true },
})
)
);
// TODO: code below should be wrapped in a subscribe on newStoreMetaData$ with a filter current metadata, when changed execute once and remove.
// To much work for one person in one municipality, so putting it on the bill for the next sprint
store = Automerge.emptyChange(store, {
message: JSON.stringify({
event: "TTL_CHANGED",
// TODO: insert new ttl here
}),
time: Date.now(),
});
void signalAndUpdateStorePipeline();
})
);
internalSubscriptions.push(
receivedUpdate
.pipe(
map((arr) => (Array.isArray(arr) ? arr.flat(Infinity) : [arr])),
map((arr) => arr.filter((obj) => obj.type === "SET_METADATA")),
filter((arr) => arr.length > 0)
)
.subscribe(async (arr) => {
arr.forEach((obj) => void setStoreMetaData(obj));
})
);
// Processing any update
internalSubscriptions.push(
receivedUpdate
.pipe(
buffer(receivedUpdate.pipe(debounceTime(150))),
filter((arr) => arr.length > 0)
)
.subscribe((valueArr) => {
const addedOptions = {}; // DO NOT USE if you don't know what you're doing, this is a very dirty hacky workaround to make setValue work for signing.
const adjustedArr = valueArr
.flat()
.filter((obj) => obj.type.startsWith("UPDATE_"))
.map((obj) => {
let prefix = "";
if (obj.type === "UPDATE_FORM_STATE_VALUE") {
prefix = "formState.";
} else if (obj.type === "UPDATE_DECLARATION_STATE_VALUE") {
prefix = "declarationState.";
}
if (obj.options) {
Object.assign(addedOptions, obj.options);
}
return [`${prefix}${obj.ref}`, obj.value, obj.weight, obj.options];
});
if (adjustedArr.length === 0) {
return;
}
const nonProxyDoc = Automerge.toJS(store);
// Generate combined patches that search for any available path within the given path, so any patch does not override root objects (preventing 'weird' conflicts merges between simultaneous users)
let patches = generateCombinedPatchSet(nonProxyDoc, adjustedArr, {
applyNullAtUndecidedArrayItems: true, // Already adding null instead of undefined in processed arrays.
keepDataOnForcedTypeCasting: false, // Let's not make things complicated.
forceDigitsToBeTypeArray: true, // Force devs to use digit keys for arrays only
mergeWithPreviousData: "deep", // With current setup a safe default :)
...addedOptions,
});
store = Automerge.change(
store,
{
message: JSON.stringify({
patch: patches.map(([path]) => path),
// TODO: ADD USER!
}),
time: Date.now(),
},
(doc) => {
patches.forEach(([path, value]) => {
if (["undefined", "object"].includes(typeof value)) {
if (typeof value === "undefined") {
const fullPathArray = createJsonPathScope(path, "array");
const objectPath = createJsonPathScope(fullPathArray.slice(0, -1), "string");
const deleteProp = fullPathArray.slice(-1)[0];
// Only support objects for now, deletion of a property is not yet supported by applyPatchOnObject
delete getValueAtExactPath(objectPath, doc)[deleteProp];
return;
} else {
// Automerge only supports values that are JSON parsable. Therefore, objects must be parsed to JSON.
value = JSON.parse(JSON.stringify(value));
}
}
// If the value is the root we can simply override the value.
if (path !== "") {
// Do magic replace to ensure the commit history is as clean as possible.
applyPatchOnObject(doc, path, value, {
treatInputAsImmutable: false, // We must apply the patch within the object of this function
});
} else if (typeof value === typeof doc) {
// Very important note: the root type cannot be changed currently, much deeper changes are therefore needed.
Object.assign(doc, value);
} else {
throw Error("Provided patch is currently unsupported by our automerge implementation");
}
});
}
);
void signalAndUpdateStorePipeline();
})
);
return {
/**
* Initiate the document store (automerge) for a form. If already initiated, only the return value is sent back.
* @param {[Uint8Array]} [defaults] Any default value for the store. Do not use this if the document might have been edited and the defaults are not those edits.
* @param {boolean} [activateObserver=true] If set to true, the returned store pipe will be active (and therefore when submitInitialStoreToPipe = true a value).
* @param {boolean} [submitInitialStoreToPipe=false] If set to true, the returned store pipe will have the initial store. Not advised to set to true if document is expected to have edits!
* @returns {{storeHeadsPipe$: Observable<*>, formStateStorePipe$: Observable<*>, sendUpdateToStore: () => void, storePipe$: Observable<*>}}
*/
initiate: function ({ defaults, defaultMetaData, activateObserver = true, submitInitialStoreToPipe = false } = {}) {
if (activateObserver && !activeStoreObserver) {
// By subscribing to the observer, the initial store is not being 'thrown away' if others subscribe 'late'.
activeStoreObserver = [storePipe$.subscribe(), formStateStorePipe$.subscribe(), storeHeadsPipe$.subscribe(), storeMetaData$.subscribe()];
}
if (!store) {
store = Automerge.init();
}
if (Array.isArray(defaults) && defaults.length > 0) {
if (Array.isArray(defaults[0])) {
defaults.forEach(([msg, patch]) => {
if (!defaultsCache.includes(msg)) {
const message =
typeof msg === "object"
? {
time: Date.now(),
...msg,
}
: {
time: Date.now(),
message: msg,
};
store = Automerge.change(store, message, patch);
defaultsCache.push(msg);
}
});
} else {
store = Automerge.applyChanges(store, defaults)[0];
}
}
if (defaultMetaData && Object.keys(metaData).length === 0) {
setStoreMetaData({ value: defaultMetaData });
}
if (submitInitialStoreToPipe) {
void signalAndUpdateStorePipeline();
}
return this.getStore();
},
/**
* Update the store value without any checks.
* Only call this function if you really know what you're doing
* @param {function} callback Function that executes the inner change Automerge function.
* @param {string} message Message or JSON object
* @returns {Promise<void>}
*/
rawUpdateStoreValue: async function (callback, message) {
if (store === null) {
throw Error("Store has not been initialized yet");
}
if (typeof callback !== "function" || typeof message !== "string") {
throw Error("Developer: rawUpdateStoreValue requires both a callback function and message");
}
return new Promise((resolve) => {
const fn = async () => {
store = Automerge.change(store, {
message,
time: Date.now(),
}, callback);
return signalAndUpdateStorePipeline();
}
queueStoreUpdate([resolve, fn]);
});
},
/**
* Update a path within the store.
* @param {string} ref Path notated like 'someObj.foo[bar]'
* @param {*} value Any value that must be inserted within the path.
* @param {number} [weight=0] Optional patch weight
* @param {object} [options]
*/
updateStoreValue: async function (ref, value, weight = 0, options) {
if (store === null) {
throw Error("Store has not been initialized yet");
}
return sendUpdateToStore({
type: "UPDATE_STORE_VALUE",
ref,
value,
weight,
options,
});
},
/**
* Get the store observers and update function.
* @returns {{storeHeadsPipe$: Observable<*>, formStateStorePipe$: Observable<*>, sendUpdateToStore: () => void, storePipe$: Observable<*>, rawUpdateStoreValue: () => Promise<void>, updateStoreValue: () => Promise<void>}}
*/
getStore: function () {
return {
storePipe$,
sendUpdateToStore,
updateStoreValue: this.updateStoreValue,
rawUpdateStoreValue: this.rawUpdateStoreValue,
formStateStorePipe$,
storeHeadsPipe$,
storeId$: currentStoreId$,
isStopped$,
stop$,
};
},
/**
* Merge external changes (possibly from others) with the current store.
* @param {object} doc Automerge document
* @param {function} [storeHeadsCallback] Provide a callback function to retrieve the heads of the merged store (getHeads()) before signaling the observer.
* @returns {{storeHeadsPipe$: Observable<*>, formStateStorePipe$: Observable<*>, sendUpdateToStore: () => void, storePipe$: Observable<*>}}
*/
mergeStore: async function (doc, storeHeadsCallback) {
if (!initialStoreIsLoaded) {
if (!activeStoreObserver) {
// By subscribing to the observer, the external loaded store is not being 'thrown away' if others subscribe 'late'.
activeStoreObserver = [storePipe$.subscribe(), formStateStorePipe$.subscribe(), storeHeadsPipe$.subscribe(), storeMetaData$.subscribe()];
}
try {
store = Automerge.load(doc);
} catch (e) {
Sentry.withScope(function (scope) {
scope.setContext("StoreInfo", { storeType, storeId, activity: "loading" });
Sentry.captureException(e);
});
}
} else {
try {
store = Automerge.merge(store, Automerge.load(doc));
} catch (e) {
Sentry.withScope(function (scope) {
scope.setContext("StoreInfo", { storeType, storeId, activity: "merging" });
Sentry.captureException(e);
});
}
}
if (storeHeadsCallback) {
storeHeadsCallback(this.getHeads(store));
}
void signalAndUpdateStorePipeline();
return this.getStore();
},
/**
* Export store to be synced remotely.
* @returns {Uint8Array} Store in serializable format.
*/
exportStore: function (heads) {
if (!store) {
Sentry.withScope(function (scope) {
scope.setContext("StoreInfo", { storeType, storeId, activity: "export" });
Sentry.captureMessage("Store export was called without loaded store", "fatal");
});
throw Error("Store has not been initialized yet");
}
if (heads) {
const viewAtSpecificPointInTime = Automerge.clone(Automerge.view(store, heads));
return Automerge.save(viewAtSpecificPointInTime);
}
return Automerge.save(store);
},
/**
* Get current heads of the store, including an observable.
* @returns {[Hash[],Observable<*>]}
*/
getHeads: function () {
return [Automerge.getHeads(store), storeHeadsPipe$];
},
/**
* Update metadata
* @param {object} metaData New meta data
* @param {object} [metaDataOptions] Options
* @returns {Promise<void>}
*/
updateMetaData: async function (metaData, metaDataOptions = {}) {
void setStoreMetaData({ value: metaData, options: metaDataOptions });
},
/**
* Get all changes since {heads}
* @param {[string]} heads Reference heads from which point the changes must be generated from
* @param {"Uint8Array"|"array"} [changeEncoding="Uint8Array"] By default Automerge spits out Uint8Array formatted changes, but those are converted to objects by JSON.stringify. When "array" is used, the type is removed.
* @returns {number[][]|Change[]}
*/
getChanges: function (heads, changeEncoding = "Uint8Array") {
if (!store) {
Sentry.withScope(function (scope) {
scope.setContext("StoreInfo", { storeType, storeId, activity: "getChanges" });
Sentry.captureMessage("Store changes was called without loaded store", "error");
});
throw Error("Store has not been initialized yet");
}
let changes;
if (!heads || (Array.isArray(heads) && heads.length === 0)) {
// Only in weird edge cases where the server does not know which head it is on (e.g. deleted heads but remained store). Should only happen at initial migrations.
changes = Automerge.getAllChanges(store);
} else {
const viewAtSpecificPointInTime = Automerge.view(store, heads);
changes = Automerge.getChanges(viewAtSpecificPointInTime, store);
}
return changeEncoding === "Uint8Array" ? changes : changes.map((change) => Array.from(change));
},
applyChanges: async function (changes) {
// If called through invocations the changes array contains only 'normal' arrays with integers.
// In those cases convert them into Uint8Array
const TypedArray = Object.getPrototypeOf(Uint8Array);
const convertedChanges = changes.map((arr) => {
if (arr instanceof TypedArray) {
return arr;
}
if (Array.isArray(arr)) {
return new Uint8Array(arr);
}
// Otherwise the assumption is made the Uint8Array got converted into an Object with keyed indexes.
return new Uint8Array(Object.values(arr));
});
[store] = Automerge.applyChanges(store, convertedChanges);
return new Promise((resolve) => {
setTimeout(() => {
// To increase performance of the main thread
void signalAndUpdateStorePipeline();
}, 0);
resolve(this.getStore());
});
},
get isLoaded() {
return initialStoreIsLoaded;
},
get id() {
return storeId;
},
get metaData() {
return metaData;
},
// Warning: it's up to the developer to handle a ttl trigger. But keep it mind that it's an action meant to delete all copies of it!
get ttlOnStore$() {
return storeMetaData$.pipe(map((obj) => obj.ttl));
},
get isLoaded$() {
return initialStoreIsLoaded ? of(initialStoreIsLoaded) : formStateStorePipe$.pipe(first(), switchMap(() => of(true)));
},
get metaData$() {
return storeMetaData$;
},
deActivateStoreObservers: function () {
if (activeStoreObserver) {
activeStoreObserver.forEach((subscription) => void subscription.unsubscribe());
activeStoreObserver = undefined;
}
},
/**
* Remove internal subscriptions of store to free-up as much memory as possible for garbage collection.
*/
dangerousDismountStore: async function () {
// Trigger a complete on the store observer.
isStopped$.next(true);
// Remove all internal subscriptions, should be obsolete with the trigger above, but just in case.
internalSubscriptions.forEach((subscription) => void subscription.unsubscribe());
// Free up all storage within Automerge.
Automerge.free(store);
delete this;
},
/**
* Force all store value and header observers to signal the store its value.
* Do not use this by default, as it might trigger a lot of subscribers.
* Useful for edge cases, e.g. a modal containing a store but only want to sync it after specific conditions have met (and therefore having 'late' sync subscribers).
*/
forceStoreUpdateSignal: async function () {
return signalAndUpdateStorePipeline(true);
},
getSnapshot: function (path = "", { defaultValueOnMiss, defaultValueCallback } = {}) {
return getValueAtExactPath(path, storeSnapshot, {
defaultValueOnMiss,
defaultValueCallback,
});
},
};
};
/**
*
* @param {"meeting"|"task"|"persons"|"questionnaire"|"investigation"} type
* @param {string|symbol} id
* @returns {*}
*/
export default function getStore(type, id = Symbol("Temp id when not synced with server"), { isClone = false, createInstance = true } = {}) {
// Presumably change Store to Factory design pattern
let store = storeInstances[type].get(id);
if (!store && createInstance) {
store = Store(type, id, {
isClone,
});
storeInstances[type].set(id, store);
}
return store;
}
export async function removeStore(type, id) {
let store = storeInstances[type]?.has(id);
if (store) {
store = storeInstances[type]?.get(id);
store.deActivateStoreObservers();
await store.dangerousDismountStore();
// TODO: also remove possible aliases? maybe decide through config options
storeInstances[type].delete(id);
store = undefined; // Hoping that this will speed up garbage collection.
}
}
export const useTtlOnStore = bind((type, id) => getStore(type, id).ttlOnStore$, null)[0];
export const useStoreId = bind((type, id) => getStore(type, id).getStore().storeId$, null)[0];