This is an example of harnessing async reactive functional programming. This is an example pulled from a project of mine using Typescript and RxJS.
export const s3EntityCrudAdaptorPluginFactory = (platformId: Object, authFacade: AuthFacade, cognitoSettings: CognitoSettings, paramsEvaluatorService: ParamEvaluatorService, http: HttpClient, asyncApiCallHelperSvc: AsyncApiCallHelperService, hostName?: string, protocol?: string) => {
return new CrudAdaptorPlugin<string>({
id: 'aws_s3_entity',
title: 'AWS S3 Entity',
create: ({ object, identity, params }: CrudOperationInput) => of({ success: false }).pipe(
map(() => buildClient(authFacade, cognitoSettings)),
switchMap(s3 => identity({ object }).pipe(
map(({ identity }) => ({ s3, identity }))
)),
switchMap(({ s3, identity }) => params && Object.keys(params).length !== 0 ? forkJoin(Object.keys(params).map(name => paramsEvaluatorService.paramValue(params[name], new Map<string, any>()).pipe(map(v => ({ [name]: v }))))).pipe(
map(groups => groups.reduce((p, c) => ({ ...p, ...c }), {})), // default options go here instead of empty object.
map(options => ({ s3, identity, options }))
): of({ s3, identity, options: {} })),
map(({ s3, identity, options }) => {
const name = options.prefix + identity + '.json';
const command = new PutObjectCommand({
Bucket: options.bucket,
Key: name,
Body: JSON.stringify(object),
ContentType: 'application/json',
CacheControl: `ETag: ${uuid.v4()}` // cache could be part of adaptor options - for now KISS
});
return { s3, command };
}),
switchMap(({ s3, command }) => new Observable<CrudOperationResponse>(obs => {
s3.send(command).then(res => {
console.log('sent');
console.log(res);
obs.next({ success: true });
obs.complete();
}).catch(e => {
console.log('error')
console.log(e);
obs.next({ success: false })
obs.complete();
});
}))
),
read: ({ }: CrudOperationInput) => of<CrudOperationResponse>({ success: false }),
update: ({ object, identity, params }: CrudOperationInput) => of({ success: false }).pipe(
map(() => buildClient(authFacade, cognitoSettings)),
switchMap(s3 => identity({ object }).pipe(
map(({ identity }) => ({ s3, identity }))
)),
switchMap(({ s3, identity }) => params && Object.keys(params).length !== 0 ? forkJoin(Object.keys(params).map(name => paramsEvaluatorService.paramValue(params[name], new Map<string, any>()).pipe(map(v => ({ [name]: v }))))).pipe(
map(groups => groups.reduce((p, c) => ({ ...p, ...c }), {})), // default options go here instead of empty object.
map(options => ({ s3, identity, options }))
): of({ s3, identity, options: {} })),
map(({ s3, identity, options }) => {
const name = options.prefix + identity + '.json';
const command = new PutObjectCommand({
Bucket: options.bucket,
Key: name,
Body: JSON.stringify(object),
ContentType: 'application/json',
CacheControl: `ETag: ${uuid.v4()}` // cache could be part of adaptor options - for now KISS
});
return { s3, command };
}),
switchMap(({ s3, command }) => new Observable<CrudOperationResponse>(obs => {
s3.send(command).then(res => {
console.log('sent');
console.log(res);
obs.next({ success: true });
obs.complete();
}).catch(e => {
console.log('error')
console.log(e);
obs.next({ success: false })
obs.complete();
});
}))
),
delete: ({ }: CrudOperationInput) => of<CrudOperationResponse>({ success: false }),
query: ({ rule, params }: CrudCollectionOperationInput) => of({ entities: [], success: false }).pipe(
map(() => ({ identityCondition: (rule.conditions as AllConditions).all.map(c => (c as AnyConditions).any.find(c2 => (c2 as ConditionProperties).fact === 'identity')).find(c => !!c) })),
switchMap(({ identityCondition }) => iif(
() => identityCondition !== undefined && (identityCondition as ConditionProperties).fact === 'identity',
of({ entities: [], success: false }).pipe(
map(() => buildClient(authFacade, cognitoSettings)),
switchMap( s3 => params && Object.keys(params).length !== 0 ? forkJoin(Object.keys(params).map(name => paramsEvaluatorService.paramValue(params[name], new Map<string, any>()).pipe(map(v => ({ [name]: v }))))).pipe(
map(groups => groups.reduce((p, c) => ({ ...p, ...c }), {})), // default options go here instead of empty object.
map(options => ({ s3, options }))
): of({ s3, options: {} })),
/*map(({ s3, options }) => {
const name = options.prefix + (identityCondition as ConditionProperties).value + '.json';
const command = new GetObjectCommand({
Bucket: options.bucket,
Key: name
});
return { s3, command };
}),*/
/*switchMap(({ s3, command }) => new Observable<CrudCollectionOperationResponse>(obs => {
s3.send(command)
.then(res => new Response(res.Body as ReadableStream, {}))
.then(res => res.json())
.then(entity => {
console.log('sent');
// console.log(res);
obs.next({ success: true, entities: [ entity ] });
obs.complete();
}).catch(e => {
console.log('error')
console.log(e);
obs.next({ success: false, entities: [] })
obs.complete();
});
}))*/
switchMap(({ options }) => createSignedHttpRequest({
method: "GET",
headers: {
"Content-Type": "application/json",
host: `${options.bucket}.s3.amazonaws.com`,
},
hostname: `${options.bucket}.s3.amazonaws.com`,
path: `${options.prefix}${(identityCondition as ConditionProperties).value}.json`,
protocol: 'https:',
service: "s3",
cognitoSettings: cognitoSettings,
authFacade: authFacade
}).pipe(
map(signedHttpRequest => ({ signedHttpRequest, options }))
)
),
switchMap(( { signedHttpRequest, options }) => {
// if (!isPlatformServer(platformId)) {
delete signedHttpRequest.headers.host;
// }
// const url = `${ isPlatformServer(platformId) ? '' : '/opensearch' }${signedHttpRequest.path}`;
const url = `${ isPlatformServer(platformId) ? /*'http://localhost:4000'*/ `${protocol}://${hostName}` : '' }/awproxy/s3/${options.bucket}${signedHttpRequest.path}`;
console.log('url', url);
return asyncApiCallHelperSvc.doTask(http.get(url, { headers: signedHttpRequest.headers, withCredentials: true }).toPromise()).pipe(
catchError(() => of(undefined)),
map(res => ({ res, options }))
);
/*return http.get(url, { headers: signedHttpRequest.headers, withCredentials: true }).pipe(
map(res => ({ res, options }))
);*/
}),
tap(({ res }) => console.log(`panelpage id ${res ? (res as any).id : 'undefined'}`)),
map(({ res }) => ({ entities: res ? [ res ] : [], success: res ? true : false }))
),
// Only implemented for GetObject (single object by identity) at the moment.
of({ entities: [], success: false })
))
)
});
};
All of the functions return observables which is is basically supercharged promise compatible with all the RxJS operations. Therefore, the initial stream of data which is just a simple object literal can be manipulated, replaced, and transformed as it passes down the streams pipeline. No state is maintained and each operation is a small, discrete operation that either be synchronous like map or async like switchMap which replaced the observable with a brand new one. That brand new observable replaces the existing stream with a new values which advance down the pipeline and so on and so forth. Observables can be chained together just like a promise indefinately until the application ceases or stops.
With this mentality functions effectively become collections of sequential reactive, functional operations that further manipulate a stream or even replace or augment it.
This is an example of leveraging reactive programming to handle dom events.
const nav$ = fromEvent(this.el.nativeElement, 'click').pipe(
//filter(evt => (evt as any).target.closest('a') !== null),
tap(() => alert('Hello'))
);
https://rxjs.dev/api/index/function/fromEvent
Learning reactive programming is highly sought skill for many tech employers that could possibly separate someone from the rest of the heap that say they can write JavaScript. The same is true for Typescript many React and Vue applications even use it as part of their stack. I have seen considerable number of posting that use those ecosystems not Angular but desire Typescript and even reactive programming.