Skip to content

Commit

Permalink
feat(merge-from): add mergeFrom (#259)
Browse files Browse the repository at this point in the history
Closes #221
  • Loading branch information
nartc authored Jan 29, 2024
1 parent 8dcc78d commit 9ed73cf
Show file tree
Hide file tree
Showing 8 changed files with 283 additions and 0 deletions.
42 changes: 42 additions & 0 deletions docs/src/content/docs/utilities/Signals/merge-from.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
---
title: mergeFrom
description: ngxtension/merge-from
entryPoint: merge-from
badge: stable
contributors: ['chau-tran']
---

`mergeFrom` is a helper function that merges the values of `Observable`s or `Signal`s and emits the latest emitted value.
It also gives us the possibility to change the emitted value before emitting it using RxJS operators.

It is similar to `merge()`, but it also takes `Signals` into consideration.

From `ngxtension` perspective, `mergeFrom` is similar to [`computedFrom`](./computed-from.md), but it doesn't emit the combined value, but the latest emitted value by using the `merge` operator instead of `combineLatest`.

```ts
import { mergeFrom } from 'ngxtension/merge-from';
```

## Usage

`mergeFrom` accepts an array of `Observable`s or `Signal`s and returns a `Signal` that emits the latest value of the `Observable`s or `Signal`s.
By default, it needs to be called in an injection context, but it can also be called outside of it by passing the `Injector` in the third argument `options` object.
If your Observable doesn't emit synchronously, you can use the `startWith` operator to change the starting value, or pass an `initialValue` in the third argument `options` object.

```ts
const a = signal(1);
const b$ = new BehaviorSubject(2);

// array type
const merged = mergeFrom([a, b$]);
// both sources are sync, so emits the last emitted value
console.log(merged()); // 2
```

It can be used in multiple ways:

1. Merge multiple `Signal`s
2. Merge multiple `Observable`s
3. Merge multiple `Signal`s and `Observable`s
4. Using initialValue param
5. Use it outside of an injection context
3 changes: 3 additions & 0 deletions libs/ngxtension/merge-from/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# ngxtension/merge-from

Secondary entry point of `ngxtension`. It can be used by importing from `ngxtension/merge-from`.
5 changes: 5 additions & 0 deletions libs/ngxtension/merge-from/ng-package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"lib": {
"entryFile": "src/index.ts"
}
}
20 changes: 20 additions & 0 deletions libs/ngxtension/merge-from/project.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"name": "ngxtension/merge-from",
"$schema": "../../../node_modules/nx/schemas/project-schema.json",
"projectType": "library",
"sourceRoot": "libs/ngxtension/merge-from/src",
"targets": {
"test": {
"executor": "@nx/jest:jest",
"outputs": ["{workspaceRoot}/coverage/{projectRoot}"],
"options": {
"jestConfig": "libs/ngxtension/jest.config.ts",
"testPathPattern": ["merge-from"]
}
},
"lint": {
"executor": "@nx/eslint:lint",
"outputs": ["{options.outputFile}"]
}
}
}
Empty file.
94 changes: 94 additions & 0 deletions libs/ngxtension/merge-from/src/lib/merge-from.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import { effect, signal } from '@angular/core';
import {
TestBed,
discardPeriodicTasks,
fakeAsync,
tick,
} from '@angular/core/testing';
import { ReplaySubject, map, timer } from 'rxjs';
import { mergeFrom } from './merge-from';

describe(mergeFrom.name, () => {
it('should work with signals', () => {
const values: any[] = [];
const value = signal(1);
const valueTwo = signal(2);

TestBed.runInInjectionContext(() => {
const merged = mergeFrom([value, valueTwo]);

effect(() => {
values.push(merged());
});
TestBed.flushEffects();

value.set(-1);
TestBed.flushEffects();

valueTwo.set(3);
TestBed.flushEffects();

expect(values).toEqual([2, -1, 3]);
});
});

it('should work with observables', () => {
const values: any[] = [];
const value = new ReplaySubject<number>(1);
const valueTwo = new ReplaySubject<number>(1);

TestBed.runInInjectionContext(() => {
const merged = mergeFrom([value, valueTwo], { initialValue: 1 });

effect(() => {
values.push(merged());
});
TestBed.flushEffects();

value.next(-1);
TestBed.flushEffects();

valueTwo.next(3);
TestBed.flushEffects();

expect(values).toEqual([1, -1, 3]);
});
});

it('should work with both and timing', fakeAsync(() => {
const values: any[] = [];
const value = signal(1);
const value$ = timer(0, 1000).pipe(map((val) => val * 2));

TestBed.runInInjectionContext(() => {
const merged = mergeFrom(
[value, value$],
map((val) => val + 10),
);

effect(() => {
values.push(merged());
});
TestBed.flushEffects();

value.set(-1);
TestBed.flushEffects();

tick(1000);
TestBed.flushEffects();

value.set(3);
TestBed.flushEffects();

tick(1000);
TestBed.flushEffects();

tick(1000);
TestBed.flushEffects();

discardPeriodicTasks();

expect(values).toEqual([11, 9, 10, 12, 13, 14, 16]);
});
}));
});
118 changes: 118 additions & 0 deletions libs/ngxtension/merge-from/src/lib/merge-from.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import { Injector, isSignal, untracked, type Signal } from '@angular/core';
import { toObservable, toSignal } from '@angular/core/rxjs-interop';
import { assertInjector } from 'ngxtension/assert-injector';
import {
distinctUntilChanged,
from,
identity,
isObservable,
merge,
startWith,
type ObservableInput,
type OperatorFunction,
} from 'rxjs';

export type ObservableSignalInput<T> = ObservableInput<T> | Signal<T>;

/**
* So that we can have `fn([Observable<A>, Signal<B>]): Observable<[A, B]>`
*/
type ObservableSignalInputTuple<T> = {
[K in keyof T]: ObservableSignalInput<T[K]>;
};

export type MergeFromOptions<T> = {
readonly injector?: Injector;
readonly initialValue?: T | null;
};

export function mergeFrom<
Inputs extends readonly unknown[],
Output = Inputs[number],
>(
inputs: readonly [...ObservableSignalInputTuple<Inputs>],
operator?: OperatorFunction<Inputs[number], Output>,
options?: MergeFromOptions<Output>,
): Signal<Output>;
export function mergeFrom<
Inputs extends readonly unknown[],
Output = Inputs[number],
>(
inputs: readonly [...ObservableSignalInputTuple<Inputs>],
options?: MergeFromOptions<Output>,
): Signal<Output>;

export function mergeFrom<
Inputs extends readonly unknown[],
Output = Inputs[number],
>(...args: unknown[]) {
const [sources, operator = identity, options = {}] = parseArgs<
Inputs,
Output
>(args);
const normalizedSources = sources.map((source) => {
if (isSignal(source)) {
return toObservable(source, { injector: options.injector }).pipe(
startWith(untracked(source)),
);
}

if (!isObservable(source)) {
source = from(source);
}

return source.pipe(distinctUntilChanged());
});

const merged = merge(...normalizedSources).pipe(
operator as OperatorFunction<Inputs[number], Output>,
);

return assertInjector(mergeFrom, options.injector, () => {
if (options.initialValue !== undefined) {
return toSignal(merged, { initialValue: options.initialValue as Output });
}
return toSignal(merged, { requireSync: true });
});
}

function parseArgs<Inputs extends readonly unknown[], Output = Inputs[number]>(
args: unknown[],
) {
if (args.length === 0) {
throw new Error(
`[ngxtension] mergeFrom: Expected at least one argument, got none.`,
);
}

if (args.length === 1) {
return [
args[0] as readonly [...ObservableSignalInputTuple<Inputs>],
undefined,
undefined,
] as const;
}

if (args.length === 2) {
const isOperator = typeof args[1] === 'function';
if (isOperator) {
return [
args[0] as readonly [...ObservableSignalInputTuple<Inputs>],
args[1] as OperatorFunction<Inputs[number], Output>,
undefined,
] as const;
}

return [
args[0] as readonly [...ObservableSignalInputTuple<Inputs>],
undefined,
args[1] as MergeFromOptions<Output>,
] as const;
}

return args as unknown as [
readonly [...ObservableSignalInputTuple<Inputs>],
OperatorFunction<Inputs[number], Output>,
MergeFromOptions<Output>,
];
}
1 change: 1 addition & 0 deletions tsconfig.base.json
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
"ngxtension/map-skip-undefined": [
"libs/ngxtension/map-skip-undefined/src/index.ts"
],
"ngxtension/merge-from": ["libs/ngxtension/merge-from/src/index.ts"],
"ngxtension/navigation-end": [
"libs/ngxtension/navigation-end/src/index.ts"
],
Expand Down

0 comments on commit 9ed73cf

Please sign in to comment.