Files
kami-parse-server/src/LiveQuery/ParseLiveQueryServer.js

581 lines
21 KiB
JavaScript
Raw Normal View History

2016-03-10 14:27:00 -08:00
import tv4 from 'tv4';
import Parse from 'parse/node';
import { Subscription } from './Subscription';
import { Client } from './Client';
import { ParseWebSocketServer } from './ParseWebSocketServer';
import logger from '../logger';
2016-03-10 14:27:00 -08:00
import RequestSchema from './RequestSchema';
import { matchesQuery, queryHash } from './QueryTools';
import { ParsePubSub } from './ParsePubSub';
import { SessionTokenCache } from './SessionTokenCache';
import _ from 'lodash';
import uuid from 'uuid';
import { runLiveQueryEventHandlers } from '../triggers';
2016-03-10 14:27:00 -08:00
class ParseLiveQueryServer {
clients: Map;
2016-03-10 14:27:00 -08:00
// className -> (queryHash -> subscription)
subscriptions: Object;
parseWebSocketServer: Object;
keyPairs : any;
// The subscriber we use to get object update from publisher
subscriber: Object;
constructor(server: any, config: any) {
this.server = server;
2016-03-10 14:27:00 -08:00
this.clients = new Map();
this.subscriptions = new Map();
config = config || {};
2016-03-10 14:27:00 -08:00
// Store keys, convert obj to map
2016-12-07 15:17:05 -08:00
const keyPairs = config.keyPairs || {};
2016-03-10 14:27:00 -08:00
this.keyPairs = new Map();
2016-12-07 15:17:05 -08:00
for (const key of Object.keys(keyPairs)) {
2016-03-10 14:27:00 -08:00
this.keyPairs.set(key, keyPairs[key]);
}
logger.verbose('Support key pairs', this.keyPairs);
2016-03-10 14:27:00 -08:00
// Initialize Parse
Parse.Object.disableSingleInstance();
2016-12-07 15:17:05 -08:00
const serverURL = config.serverURL || Parse.serverURL;
2016-03-10 14:27:00 -08:00
Parse.serverURL = serverURL;
2016-12-07 15:17:05 -08:00
const appId = config.appId || Parse.applicationId;
const javascriptKey = Parse.javaScriptKey;
const masterKey = config.masterKey || Parse.masterKey;
2016-03-10 14:27:00 -08:00
Parse.initialize(appId, javascriptKey, masterKey);
// Initialize websocket server
this.parseWebSocketServer = new ParseWebSocketServer(
server,
(parseWebsocket) => this._onConnect(parseWebsocket),
config.websocketTimeout
);
// Initialize subscriber
this.subscriber = ParsePubSub.createSubscriber(config);
2016-11-28 12:15:21 -02:00
this.subscriber.subscribe(Parse.applicationId + 'afterSave');
this.subscriber.subscribe(Parse.applicationId + 'afterDelete');
2016-03-10 14:27:00 -08:00
// Register message handler for subscriber. When publisher get messages, it will publish message
// to the subscribers and the handler will be called.
this.subscriber.on('message', (channel, messageStr) => {
logger.verbose('Subscribe messsage %j', messageStr);
let message;
try {
message = JSON.parse(messageStr);
} catch(e) {
logger.error('unable to parse message', messageStr, e);
return;
}
2016-03-10 14:27:00 -08:00
this._inflateParseObject(message);
2016-11-28 12:15:21 -02:00
if (channel === Parse.applicationId + 'afterSave') {
2016-03-10 14:27:00 -08:00
this._onAfterSave(message);
2016-11-28 12:15:21 -02:00
} else if (channel === Parse.applicationId + 'afterDelete') {
2016-03-10 14:27:00 -08:00
this._onAfterDelete(message);
} else {
logger.error('Get message %s from unknown channel %j', message, channel);
2016-03-10 14:27:00 -08:00
}
});
// Initialize sessionToken cache
this.sessionTokenCache = new SessionTokenCache(config.cacheTimeout);
}
// Message is the JSON object from publisher. Message.currentParseObject is the ParseObject JSON after changes.
// Message.originalParseObject is the original ParseObject JSON.
_inflateParseObject(message: any): void {
// Inflate merged object
2016-12-07 15:17:05 -08:00
const currentParseObject = message.currentParseObject;
2016-03-10 14:27:00 -08:00
let className = currentParseObject.className;
let parseObject = new Parse.Object(className);
parseObject._finishFetch(currentParseObject);
message.currentParseObject = parseObject;
// Inflate original object
2016-12-07 15:17:05 -08:00
const originalParseObject = message.originalParseObject;
2016-03-10 14:27:00 -08:00
if (originalParseObject) {
className = originalParseObject.className;
parseObject = new Parse.Object(className);
parseObject._finishFetch(originalParseObject);
message.originalParseObject = parseObject;
}
}
// Message is the JSON object from publisher after inflated. Message.currentParseObject is the ParseObject after changes.
// Message.originalParseObject is the original ParseObject.
_onAfterDelete(message: any): void {
2016-11-28 12:15:21 -02:00
logger.verbose(Parse.applicationId + 'afterDelete is triggered');
2016-03-10 14:27:00 -08:00
2016-12-07 15:17:05 -08:00
const deletedParseObject = message.currentParseObject.toJSON();
const className = deletedParseObject.className;
logger.verbose('ClassName: %j | ObjectId: %s', className, deletedParseObject.id);
logger.verbose('Current client number : %d', this.clients.size);
2016-03-10 14:27:00 -08:00
2016-12-07 15:17:05 -08:00
const classSubscriptions = this.subscriptions.get(className);
2016-03-10 14:27:00 -08:00
if (typeof classSubscriptions === 'undefined') {
logger.debug('Can not find subscriptions under this class ' + className);
2016-03-10 14:27:00 -08:00
return;
}
2016-12-07 15:17:05 -08:00
for (const subscription of classSubscriptions.values()) {
const isSubscriptionMatched = this._matchesSubscription(deletedParseObject, subscription);
2016-03-10 14:27:00 -08:00
if (!isSubscriptionMatched) {
continue;
}
2016-12-07 15:17:05 -08:00
for (const [clientId, requestIds] of _.entries(subscription.clientRequestIds)) {
const client = this.clients.get(clientId);
2016-03-10 14:27:00 -08:00
if (typeof client === 'undefined') {
continue;
}
2016-12-07 15:17:05 -08:00
for (const requestId of requestIds) {
const acl = message.currentParseObject.getACL();
2016-03-10 14:27:00 -08:00
// Check ACL
this._matchesACL(acl, client, requestId).then((isMatched) => {
if (!isMatched) {
return null;
}
client.pushDelete(requestId, deletedParseObject);
}, (error) => {
logger.error('Matching ACL error : ', error);
2016-03-10 14:27:00 -08:00
});
}
}
}
}
// Message is the JSON object from publisher after inflated. Message.currentParseObject is the ParseObject after changes.
// Message.originalParseObject is the original ParseObject.
_onAfterSave(message: any): void {
2016-11-28 12:15:21 -02:00
logger.verbose(Parse.applicationId + 'afterSave is triggered');
2016-03-10 14:27:00 -08:00
let originalParseObject = null;
if (message.originalParseObject) {
originalParseObject = message.originalParseObject.toJSON();
}
2016-12-07 15:17:05 -08:00
const currentParseObject = message.currentParseObject.toJSON();
const className = currentParseObject.className;
logger.verbose('ClassName: %s | ObjectId: %s', className, currentParseObject.id);
logger.verbose('Current client number : %d', this.clients.size);
2016-03-10 14:27:00 -08:00
2016-12-07 15:17:05 -08:00
const classSubscriptions = this.subscriptions.get(className);
2016-03-10 14:27:00 -08:00
if (typeof classSubscriptions === 'undefined') {
logger.debug('Can not find subscriptions under this class ' + className);
2016-03-10 14:27:00 -08:00
return;
}
2016-12-07 15:17:05 -08:00
for (const subscription of classSubscriptions.values()) {
const isOriginalSubscriptionMatched = this._matchesSubscription(originalParseObject, subscription);
const isCurrentSubscriptionMatched = this._matchesSubscription(currentParseObject, subscription);
for (const [clientId, requestIds] of _.entries(subscription.clientRequestIds)) {
const client = this.clients.get(clientId);
2016-03-10 14:27:00 -08:00
if (typeof client === 'undefined') {
continue;
}
2016-12-07 15:17:05 -08:00
for (const requestId of requestIds) {
2016-03-10 14:27:00 -08:00
// Set orignal ParseObject ACL checking promise, if the object does not match
// subscription, we do not need to check ACL
let originalACLCheckingPromise;
if (!isOriginalSubscriptionMatched) {
originalACLCheckingPromise = Promise.resolve(false);
2016-03-10 14:27:00 -08:00
} else {
let originalACL;
if (message.originalParseObject) {
originalACL = message.originalParseObject.getACL();
}
originalACLCheckingPromise = this._matchesACL(originalACL, client, requestId);
}
// Set current ParseObject ACL checking promise, if the object does not match
// subscription, we do not need to check ACL
let currentACLCheckingPromise;
if (!isCurrentSubscriptionMatched) {
currentACLCheckingPromise = Promise.resolve(false);
2016-03-10 14:27:00 -08:00
} else {
2016-12-07 15:17:05 -08:00
const currentACL = message.currentParseObject.getACL();
2016-03-10 14:27:00 -08:00
currentACLCheckingPromise = this._matchesACL(currentACL, client, requestId);
}
Promise.all(
[
originalACLCheckingPromise,
currentACLCheckingPromise
]
).then(([isOriginalMatched, isCurrentMatched]) => {
logger.verbose('Original %j | Current %j | Match: %s, %s, %s, %s | Query: %s',
2016-03-10 14:27:00 -08:00
originalParseObject,
currentParseObject,
isOriginalSubscriptionMatched,
isCurrentSubscriptionMatched,
isOriginalMatched,
isCurrentMatched,
subscription.hash
);
// Decide event type
let type;
if (isOriginalMatched && isCurrentMatched) {
type = 'Update';
} else if (isOriginalMatched && !isCurrentMatched) {
type = 'Leave';
} else if (!isOriginalMatched && isCurrentMatched) {
if (originalParseObject) {
type = 'Enter';
} else {
type = 'Create';
}
} else {
return null;
}
2016-12-07 15:17:05 -08:00
const functionName = 'push' + type;
2016-03-10 14:27:00 -08:00
client[functionName](requestId, currentParseObject);
}, (error) => {
logger.error('Matching ACL error : ', error);
2016-03-10 14:27:00 -08:00
});
}
}
}
}
_onConnect(parseWebsocket: any): void {
parseWebsocket.on('message', (request) => {
if (typeof request === 'string') {
try {
request = JSON.parse(request);
} catch(e) {
logger.error('unable to parse request', request, e);
return;
}
2016-03-10 14:27:00 -08:00
}
logger.verbose('Request: %j', request);
2016-03-10 14:27:00 -08:00
// Check whether this request is a valid request, return error directly if not
if (!tv4.validate(request, RequestSchema['general']) || !tv4.validate(request, RequestSchema[request.op])) {
Client.pushError(parseWebsocket, 1, tv4.error.message);
logger.error('Connect message error %s', tv4.error.message);
2016-03-10 14:27:00 -08:00
return;
}
switch(request.op) {
case 'connect':
this._handleConnect(parseWebsocket, request);
break;
case 'subscribe':
this._handleSubscribe(parseWebsocket, request);
break;
case 'update':
this._handleUpdateSubscription(parseWebsocket, request);
break;
case 'unsubscribe':
this._handleUnsubscribe(parseWebsocket, request);
break;
default:
Client.pushError(parseWebsocket, 3, 'Get unknown operation');
logger.error('Get unknown operation', request.op);
2016-03-10 14:27:00 -08:00
}
});
parseWebsocket.on('disconnect', () => {
logger.info(`Client disconnect: ${parseWebsocket.clientId}`);
2016-12-07 15:17:05 -08:00
const clientId = parseWebsocket.clientId;
2016-03-10 14:27:00 -08:00
if (!this.clients.has(clientId)) {
runLiveQueryEventHandlers({
event: 'ws_disconnect_error',
clients: this.clients.size,
subscriptions: this.subscriptions.size,
error: `Unable to find client ${clientId}`
});
logger.error(`Can not find client ${clientId} on disconnect`);
2016-03-10 14:27:00 -08:00
return;
}
// Delete client
2016-12-07 15:17:05 -08:00
const client = this.clients.get(clientId);
2016-03-10 14:27:00 -08:00
this.clients.delete(clientId);
// Delete client from subscriptions
2016-12-07 15:17:05 -08:00
for (const [requestId, subscriptionInfo] of _.entries(client.subscriptionInfos)) {
const subscription = subscriptionInfo.subscription;
2016-03-10 14:27:00 -08:00
subscription.deleteClientSubscription(clientId, requestId);
// If there is no client which is subscribing this subscription, remove it from subscriptions
2016-12-07 15:17:05 -08:00
const classSubscriptions = this.subscriptions.get(subscription.className);
2016-03-10 14:27:00 -08:00
if (!subscription.hasSubscribingClient()) {
classSubscriptions.delete(subscription.hash);
}
// If there is no subscriptions under this class, remove it from subscriptions
if (classSubscriptions.size === 0) {
this.subscriptions.delete(subscription.className);
}
}
logger.verbose('Current clients %d', this.clients.size);
logger.verbose('Current subscriptions %d', this.subscriptions.size);
runLiveQueryEventHandlers({
event: 'ws_disconnect',
clients: this.clients.size,
subscriptions: this.subscriptions.size
});
});
runLiveQueryEventHandlers({
event: 'ws_connect',
clients: this.clients.size,
subscriptions: this.subscriptions.size
2016-03-10 14:27:00 -08:00
});
}
_matchesSubscription(parseObject: any, subscription: any): boolean {
// Object is undefined or null, not match
if (!parseObject) {
return false;
}
return matchesQuery(parseObject, subscription.query);
}
_matchesACL(acl: any, client: any, requestId: number): any {
// Return true directly if ACL isn't present, ACL is public read, or client has master key
if (!acl || acl.getPublicReadAccess() || client.hasMasterKey) {
return Promise.resolve(true);
2016-03-10 14:27:00 -08:00
}
// Check subscription sessionToken matches ACL first
2016-12-07 15:17:05 -08:00
const subscriptionInfo = client.getSubscriptionInfo(requestId);
2016-03-10 14:27:00 -08:00
if (typeof subscriptionInfo === 'undefined') {
return Promise.resolve(false);
2016-03-10 14:27:00 -08:00
}
2016-12-07 15:17:05 -08:00
const subscriptionSessionToken = subscriptionInfo.sessionToken;
2016-03-10 14:27:00 -08:00
return this.sessionTokenCache.getUserId(subscriptionSessionToken).then((userId) => {
return acl.getReadAccess(userId);
}).then((isSubscriptionSessionTokenMatched) => {
if (isSubscriptionSessionTokenMatched) {
return Promise.resolve(true);
2016-03-10 14:27:00 -08:00
}
// Check if the user has any roles that match the ACL
return new Promise((resolve, reject) => {
// Resolve false right away if the acl doesn't have any roles
const acl_has_roles = Object.keys(acl.permissionsById).some(key => key.startsWith("role:"));
if (!acl_has_roles) {
return resolve(false);
}
this.sessionTokenCache.getUserId(subscriptionSessionToken)
.then((userId) => {
// Pass along a null if there is no user id
if (!userId) {
return Promise.resolve(null);
}
// Prepare a user object to query for roles
// To eliminate a query for the user, create one locally with the id
var user = new Parse.User();
user.id = userId;
return user;
})
.then((user) => {
// Pass along an empty array (of roles) if no user
if (!user) {
return Promise.resolve([]);
}
// Then get the user's roles
var rolesQuery = new Parse.Query(Parse.Role);
rolesQuery.equalTo("users", user);
return rolesQuery.find({useMasterKey:true});
}).
then((roles) => {
// Finally, see if any of the user's roles allow them read access
for (const role of roles) {
if (acl.getRoleReadAccess(role)) {
return resolve(true);
}
}
resolve(false);
})
.catch((error) => {
reject(error);
});
});
}).then((isRoleMatched) => {
if(isRoleMatched) {
return Promise.resolve(true);
}
2016-03-10 14:27:00 -08:00
// Check client sessionToken matches ACL
2016-12-07 15:17:05 -08:00
const clientSessionToken = client.sessionToken;
2016-03-10 14:27:00 -08:00
return this.sessionTokenCache.getUserId(clientSessionToken).then((userId) => {
return acl.getReadAccess(userId);
});
}).then((isMatched) => {
return Promise.resolve(isMatched);
}, () => {
return Promise.resolve(false);
2016-03-10 14:27:00 -08:00
});
}
_handleConnect(parseWebsocket: any, request: any): any {
if (!this._validateKeys(request, this.keyPairs)) {
Client.pushError(parseWebsocket, 4, 'Key in request is not valid');
logger.error('Key in request is not valid');
2016-03-10 14:27:00 -08:00
return;
}
const hasMasterKey = this._hasMasterKey(request, this.keyPairs);
const clientId = uuid();
const client = new Client(clientId, parseWebsocket, hasMasterKey);
parseWebsocket.clientId = clientId;
2016-03-10 14:27:00 -08:00
this.clients.set(parseWebsocket.clientId, client);
logger.info(`Create new client: ${parseWebsocket.clientId}`);
2016-03-10 14:27:00 -08:00
client.pushConnect();
runLiveQueryEventHandlers({
event: 'connect',
clients: this.clients.size,
subscriptions: this.subscriptions.size
});
2016-03-10 14:27:00 -08:00
}
_hasMasterKey(request: any, validKeyPairs: any): boolean {
if(!validKeyPairs || validKeyPairs.size == 0 ||
!validKeyPairs.has("masterKey")) {
return false;
}
if(!request || !request.hasOwnProperty("masterKey")) {
return false;
}
return request.masterKey === validKeyPairs.get("masterKey");
}
2016-03-10 14:27:00 -08:00
_validateKeys(request: any, validKeyPairs: any): boolean {
if (!validKeyPairs || validKeyPairs.size == 0) {
return true;
}
let isValid = false;
2016-12-07 15:17:05 -08:00
for (const [key, secret] of validKeyPairs) {
2016-03-10 14:27:00 -08:00
if (!request[key] || request[key] !== secret) {
continue;
}
isValid = true;
break;
}
return isValid;
}
_handleSubscribe(parseWebsocket: any, request: any): any {
// If we can not find this client, return error to client
if (!parseWebsocket.hasOwnProperty('clientId')) {
Client.pushError(parseWebsocket, 2, 'Can not find this client, make sure you connect to server before subscribing');
logger.error('Can not find this client, make sure you connect to server before subscribing');
2016-03-10 14:27:00 -08:00
return;
}
2016-12-07 15:17:05 -08:00
const client = this.clients.get(parseWebsocket.clientId);
2016-03-10 14:27:00 -08:00
// Get subscription from subscriptions, create one if necessary
2016-12-07 15:17:05 -08:00
const subscriptionHash = queryHash(request.query);
2016-03-10 14:27:00 -08:00
// Add className to subscriptions if necessary
2016-12-07 15:17:05 -08:00
const className = request.query.className;
2016-03-10 14:27:00 -08:00
if (!this.subscriptions.has(className)) {
this.subscriptions.set(className, new Map());
}
2016-12-07 15:17:05 -08:00
const classSubscriptions = this.subscriptions.get(className);
2016-03-10 14:27:00 -08:00
let subscription;
if (classSubscriptions.has(subscriptionHash)) {
subscription = classSubscriptions.get(subscriptionHash);
} else {
subscription = new Subscription(className, request.query.where, subscriptionHash);
classSubscriptions.set(subscriptionHash, subscription);
}
// Add subscriptionInfo to client
2016-12-07 15:17:05 -08:00
const subscriptionInfo = {
2016-03-10 14:27:00 -08:00
subscription: subscription
};
// Add selected fields and sessionToken for this subscription if necessary
if (request.query.fields) {
subscriptionInfo.fields = request.query.fields;
}
if (request.sessionToken) {
subscriptionInfo.sessionToken = request.sessionToken;
}
client.addSubscriptionInfo(request.requestId, subscriptionInfo);
// Add clientId to subscription
subscription.addClientSubscription(parseWebsocket.clientId, request.requestId);
client.pushSubscribe(request.requestId);
logger.verbose(`Create client ${parseWebsocket.clientId} new subscription: ${request.requestId}`);
logger.verbose('Current client number: %d', this.clients.size);
runLiveQueryEventHandlers({
event: 'subscribe',
clients: this.clients.size,
subscriptions: this.subscriptions.size
});
2016-03-10 14:27:00 -08:00
}
_handleUpdateSubscription(parseWebsocket: any, request: any): any {
this._handleUnsubscribe(parseWebsocket, request, false);
this._handleSubscribe(parseWebsocket, request);
}
_handleUnsubscribe(parseWebsocket: any, request: any, notifyClient: bool = true): any {
2016-03-10 14:27:00 -08:00
// If we can not find this client, return error to client
if (!parseWebsocket.hasOwnProperty('clientId')) {
Client.pushError(parseWebsocket, 2, 'Can not find this client, make sure you connect to server before unsubscribing');
logger.error('Can not find this client, make sure you connect to server before unsubscribing');
2016-03-10 14:27:00 -08:00
return;
}
2016-12-07 15:17:05 -08:00
const requestId = request.requestId;
const client = this.clients.get(parseWebsocket.clientId);
2016-03-10 14:27:00 -08:00
if (typeof client === 'undefined') {
Client.pushError(parseWebsocket, 2, 'Cannot find client with clientId ' + parseWebsocket.clientId +
'. Make sure you connect to live query server before unsubscribing.');
logger.error('Can not find this client ' + parseWebsocket.clientId);
2016-03-10 14:27:00 -08:00
return;
}
2016-12-07 15:17:05 -08:00
const subscriptionInfo = client.getSubscriptionInfo(requestId);
2016-03-10 14:27:00 -08:00
if (typeof subscriptionInfo === 'undefined') {
Client.pushError(parseWebsocket, 2, 'Cannot find subscription with clientId ' + parseWebsocket.clientId +
' subscriptionId ' + requestId + '. Make sure you subscribe to live query server before unsubscribing.');
logger.error('Can not find subscription with clientId ' + parseWebsocket.clientId + ' subscriptionId ' + requestId);
2016-03-10 14:27:00 -08:00
return;
}
// Remove subscription from client
client.deleteSubscriptionInfo(requestId);
// Remove client from subscription
2016-12-07 15:17:05 -08:00
const subscription = subscriptionInfo.subscription;
const className = subscription.className;
2016-03-10 14:27:00 -08:00
subscription.deleteClientSubscription(parseWebsocket.clientId, requestId);
// If there is no client which is subscribing this subscription, remove it from subscriptions
2016-12-07 15:17:05 -08:00
const classSubscriptions = this.subscriptions.get(className);
2016-03-10 14:27:00 -08:00
if (!subscription.hasSubscribingClient()) {
classSubscriptions.delete(subscription.hash);
}
// If there is no subscriptions under this class, remove it from subscriptions
if (classSubscriptions.size === 0) {
this.subscriptions.delete(className);
}
runLiveQueryEventHandlers({
event: 'unsubscribe',
clients: this.clients.size,
subscriptions: this.subscriptions.size
});
if (!notifyClient) {
return;
}
2016-03-10 14:27:00 -08:00
client.pushUnsubscribe(request.requestId);
logger.verbose(`Delete client: ${parseWebsocket.clientId} | subscription: ${request.requestId}`);
2016-03-10 14:27:00 -08:00
}
}
export {
ParseLiveQueryServer
}