Skip to content

Commit 3fa1d11

Browse files
author
Aschen
committed
Add Observer class
1 parent 5221424 commit 3fa1d11

File tree

5 files changed

+293
-3
lines changed

5 files changed

+293
-3
lines changed

index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ export * from './src/core/searchResult/Profile';
1919
export * from './src/core/searchResult/Role';
2020
export * from './src/core/searchResult/Specifications';
2121
export * from './src/core/searchResult/User';
22+
export * from './src/core/Observer';
23+
export * from './src/core/RealtimeDocument';
2224

2325
export * from './src/types';
2426

src/controllers/Document.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -860,7 +860,7 @@ export class DocumentController extends BaseController {
860860
*
861861
* @param index Index name
862862
* @param collection Collection name
863-
* @param query Search query
863+
* @param searchBody Search query
864864
* @param options Additional options
865865
* - `queuable` If true, queues the request during downtime, until connected to Kuzzle again
866866
* - `from` Offset of the first document to fetch
@@ -874,7 +874,7 @@ export class DocumentController extends BaseController {
874874
search (
875875
index: string,
876876
collection: string,
877-
query: JSONObject = {},
877+
searchBody: JSONObject = {},
878878
options: {
879879
queuable?: boolean;
880880
from?: number;
@@ -885,7 +885,7 @@ export class DocumentController extends BaseController {
885885
timeout?: number;
886886
} = {}
887887
): Promise<SearchResult<DocumentHit>> {
888-
return this._search(index, collection, query, options)
888+
return this._search(index, collection, searchBody, options)
889889
.then(({ response, request, opts }) => (
890890
new DocumentSearchResult(this.kuzzle, request, opts, response.result)
891891
));

src/core/Observer.ts

Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
import { Kuzzle } from '../Kuzzle';
2+
import { RealtimeDocument } from './RealtimeDocument';
3+
import { Document, DocumentNotification, JSONObject } from '../types';
4+
import { SearchResult } from './searchResult/SearchResultBase';
5+
import { RealtimeDocumentSearchResult } from './searchResult/RealtimeDocument';
6+
7+
class CollectionSubscription extends Set<string> {
8+
public roomId: string = null;
9+
10+
get filters () {
11+
return {
12+
ids: { values: Array.from(this.values()) }
13+
};
14+
}
15+
}
16+
17+
type DocumentUrn = string;
18+
type CollectionUrn = string;
19+
20+
function documentUrn (index: string, collection: string, id: string): DocumentUrn {
21+
return `${index}:${collection}:${id}`;
22+
}
23+
24+
function collectionUrn (index: string, collection: string): CollectionUrn {
25+
return `${index}:${collection}`;
26+
}
27+
28+
export class Observer {
29+
private collections = new Map<CollectionUrn, CollectionSubscription>();
30+
private documents = new Map<DocumentUrn, RealtimeDocument>();
31+
private sdk: Kuzzle;
32+
33+
constructor (sdk: Kuzzle) {
34+
this.sdk = sdk;
35+
}
36+
37+
stop (index: string, collection: string, document: { _id: string }) {
38+
const urn = documentUrn(index, collection, document._id);
39+
const rtDocument = this.documents.get(urn);
40+
41+
if (! rtDocument) {
42+
return;
43+
}
44+
45+
const subscription = this.collections.get(collectionUrn(index, collection));
46+
47+
subscription.delete(document._id);
48+
49+
return this.resubscribe(index, collection);
50+
}
51+
52+
dispose () {
53+
const promises = [];
54+
55+
for (const subscription of this.collections.values()) {
56+
if (subscription.roomId) {
57+
promises.push(this.sdk.realtime.unsubscribe(subscription.roomId));
58+
}
59+
}
60+
61+
this.collections.clear();
62+
this.documents.clear();
63+
64+
return Promise.all(promises);
65+
}
66+
67+
/**
68+
* Gets a realtime document
69+
*
70+
* @param index Index name
71+
* @param collection Collection name
72+
* @param _id Document ID
73+
*
74+
* @returns The realtime document
75+
*/
76+
get (index: string, collection: string, id: string): Promise<RealtimeDocument> {
77+
return this.sdk.document.get(index, collection, id)
78+
.then(document => this.observe(index, collection, document));
79+
}
80+
81+
/**
82+
*
83+
* Gets multiple realtime documents.
84+
*
85+
* @param index Index name
86+
* @param collection Collection name
87+
* @param ids Document IDs
88+
*
89+
* @returns An object containing 2 arrays: "successes" and "errors"
90+
*/
91+
mGet (
92+
index: string,
93+
collection: string,
94+
ids: string[]
95+
): Promise<{
96+
/**
97+
* Array of successfully retrieved documents
98+
*/
99+
successes: RealtimeDocument[];
100+
/**
101+
* Array of the IDs of not found documents.
102+
*/
103+
errors: string[];
104+
}> {
105+
const rtDocuments = [];
106+
let _errors;
107+
108+
return this.sdk.document.mGet(index, collection, ids)
109+
.then(({ successes, errors }) => {
110+
_errors = errors;
111+
112+
for (const document of successes) {
113+
rtDocuments.push(this.addDocument(index, collection, document));
114+
}
115+
116+
return this.resubscribe(index, collection);
117+
})
118+
.then(() => ({ successes: rtDocuments, errors: _errors }));
119+
}
120+
121+
search (
122+
index: string,
123+
collection: string,
124+
searchBody: JSONObject = {},
125+
options: {
126+
from?: number;
127+
size?: number;
128+
scroll?: string;
129+
lang?: string;
130+
verb?: string;
131+
timeout?: number;
132+
} = {}
133+
): Promise<SearchResult<RealtimeDocument>> {
134+
let result;
135+
136+
return this.sdk.document['_search'](index, collection, searchBody, options)
137+
.then(({ response, request, opts }) => {
138+
result = new RealtimeDocumentSearchResult(
139+
this.sdk,
140+
request,
141+
opts,
142+
response.result,
143+
this);
144+
145+
const rtDocuments = [];
146+
for (const hit of result.hits) {
147+
rtDocuments.push(this.addDocument(index, collection, hit));
148+
}
149+
result.hits = rtDocuments;
150+
151+
return this.resubscribe(index, collection)
152+
})
153+
.then(() => result);
154+
}
155+
156+
observe (
157+
index: string,
158+
collection: string,
159+
document: Document,
160+
): Promise<RealtimeDocument> {
161+
const rtDocument = this.addDocument(index, collection, document);
162+
163+
return this.resubscribe(index, collection)
164+
.then(() => rtDocument);
165+
}
166+
167+
addDocument (index: string, collection: string, document: Document): RealtimeDocument {
168+
const rtDocument = new RealtimeDocument(document);
169+
170+
const urn = collectionUrn(index, collection);
171+
172+
if (! this.collections.has(urn)) {
173+
this.collections.set(urn, new CollectionSubscription());
174+
}
175+
176+
const subscription = this.collections.get(urn);
177+
178+
subscription.add(document._id);
179+
180+
this.documents.set(documentUrn(index, collection, document._id), rtDocument);
181+
182+
return rtDocument;
183+
}
184+
185+
resubscribe (index: string, collection: string) {
186+
let _roomId;
187+
188+
const subscription = this.collections.get(collectionUrn(index, collection));
189+
190+
return this.sdk.realtime.subscribe(
191+
index,
192+
collection,
193+
subscription.filters,
194+
this.notificationHandler.bind(this)
195+
)
196+
.then(roomId => {
197+
_roomId = roomId;
198+
199+
if (subscription.roomId) {
200+
return this.sdk.realtime.unsubscribe(subscription.roomId);
201+
}
202+
203+
return null;
204+
})
205+
.then(() => {
206+
subscription.roomId = _roomId;
207+
});
208+
}
209+
210+
private notificationHandler (notification: DocumentNotification) {
211+
const { index, collection, result } = notification;
212+
213+
const urn = documentUrn(index, collection, result._id);
214+
const rtDocument = this.documents.get(urn);
215+
216+
// On "write", mutate document with changes
217+
// On "publish", nothing
218+
if (notification.event !== 'delete') {
219+
if (notification.event === 'write') {
220+
Object.assign(rtDocument._source, result._source);
221+
}
222+
223+
return Promise.resolve();
224+
}
225+
226+
rtDocument.deleted = true;
227+
this.documents.delete(rtDocument._id);
228+
229+
return this.resubscribe(index, collection);
230+
}
231+
}

src/core/RealtimeDocument.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import { JSONObject } from '../types';
2+
3+
export class RealtimeDocument {
4+
public _id: string;
5+
public _source: JSONObject;
6+
public deleted: boolean;
7+
8+
constructor ({ _id, _source }) {
9+
this._id = _id;
10+
this._source = _source;
11+
this.deleted = false;
12+
}
13+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import { SearchResultBase } from './SearchResultBase';
2+
import { DocumentHit } from '../../types';
3+
import { Observer } from '../Observer';
4+
5+
export class RealtimeDocumentSearchResult extends SearchResultBase<DocumentHit> {
6+
private observer: Observer;
7+
8+
/**
9+
* @param {Kuzzle} kuzzle
10+
* @param {object} request
11+
* @param {object} options
12+
* @param {object} result
13+
*/
14+
constructor (kuzzle, request, options, result, observer: Observer) {
15+
super(kuzzle, request, options, result);
16+
17+
this._searchAction = 'search';
18+
this._scrollAction = 'scroll';
19+
20+
this.observer = observer;
21+
}
22+
23+
protected _buildNextSearchResult (result: RealtimeDocumentSearchResult) {
24+
const { index, collection } = this._request;
25+
26+
const nextSearchResult = new RealtimeDocumentSearchResult(
27+
this._kuzzle,
28+
this._request,
29+
this._options,
30+
result,
31+
this.observer);
32+
33+
nextSearchResult.fetched += this.fetched;
34+
35+
const rtDocuments = [];
36+
for (const hit of nextSearchResult.hits) {
37+
rtDocuments.push(this.observer.addDocument(index, collection, hit));
38+
}
39+
nextSearchResult.hits = rtDocuments;
40+
41+
return this.observer.resubscribe(index, collection)
42+
.then(() => nextSearchResult);
43+
}
44+
}

0 commit comments

Comments
 (0)