2016-06-12 16:35:13 -07:00
const pgp = require ( 'pg-promise' ) ( ) ;
const PostgresRelationDoesNotExistError = '42P01' ;
const PostgresDuplicateRelationError = '42P07' ;
2016-06-16 15:39:05 -07:00
const PostgresDuplicateColumnError = '42701' ;
const PostgresUniqueIndexViolationError = '23505' ;
2016-06-12 16:35:13 -07:00
2016-06-06 13:47:11 -07:00
const parseTypeToPostgresType = type => {
switch ( type . type ) {
case 'String' : return 'text' ;
case 'Date' : return 'timestamp' ;
case 'Object' : return 'jsonb' ;
case 'Boolean' : return 'boolean' ;
2016-06-10 14:09:48 -07:00
case 'Pointer' : return 'char(10)' ;
2016-06-11 00:43:02 -07:00
case 'Number' : return 'double precision' ;
2016-06-10 14:09:48 -07:00
case 'Array' :
if ( type . contents && type . contents . type === 'String' ) {
return 'text[]' ;
} else {
2016-06-17 09:59:16 -07:00
return 'jsonb' ;
2016-06-10 14:09:48 -07:00
}
2016-06-06 13:47:11 -07:00
default : throw ` no type for ${ JSON . stringify ( type ) } yet ` ;
}
} ;
2016-06-12 16:35:13 -07:00
2016-06-16 15:39:05 -07:00
const buildWhereClause = ( { schema , query , index } ) => {
let patterns = [ ] ;
let values = [ ] ;
for ( let fieldName in query ) {
let fieldValue = query [ fieldName ] ;
if ( typeof fieldValue === 'string' ) {
patterns . push ( ` $ ${ index } :name = $ ${ index + 1 } ` ) ;
values . push ( fieldName , fieldValue ) ;
index += 2 ;
} else if ( fieldValue . $ne ) {
patterns . push ( ` $ ${ index } :name <> $ ${ index + 1 } ` ) ;
values . push ( fieldName , fieldValue . $ne ) ;
index += 2 ;
} else if ( fieldName === '$or' ) {
fieldValue . map ( subQuery => buildWhereClause ( { schema , query : subQuery , index } ) ) . forEach ( result => {
patterns . push ( result . pattern ) ;
values . push ( ... result . values ) ;
} ) ;
} else if ( Array . isArray ( fieldValue . $in ) && schema . fields [ fieldName ] . type === 'Array' ) {
let inPatterns = [ ] ;
let allowNull = false ;
values . push ( fieldName ) ;
fieldValue . $in . forEach ( ( listElem , listIndex ) => {
if ( listElem === null ) {
allowNull = true ;
} else {
values . push ( listElem ) ;
inPatterns . push ( ` $ ${ index + 1 + listIndex - ( allowNull ? 1 : 0 ) } ` ) ;
}
} ) ;
if ( allowNull ) {
patterns . push ( ` ( $ ${ index } :name IS NULL OR $ ${ index } :name && ARRAY[ ${ inPatterns . join ( ',' ) } ]) ` ) ;
} else {
patterns . push ( ` $ ${ index } :name && ARRAY[ ${ inPatterns . join ( ',' ) } ] ` ) ;
}
index = index + 1 + inPatterns . length ;
} else if ( Array . isArray ( fieldValue . $in ) && schema . fields [ fieldName ] . type === 'String' ) {
let inPatterns = [ ] ;
values . push ( fieldName ) ;
fieldValue . $in . forEach ( ( listElem , listIndex ) => {
values . push ( listElem ) ;
inPatterns . push ( ` $ ${ index + 1 + listIndex } ` ) ;
} ) ;
patterns . push ( ` $ ${ index } :name IN ( ${ inPatterns . join ( ',' ) } ) ` ) ;
index = index + 1 + inPatterns . length ;
} else if ( fieldValue . _ _type === 'Pointer' ) {
patterns . push ( ` $ ${ index } :name = $ ${ index + 1 } ` ) ;
values . push ( fieldName , fieldValue . objectId ) ;
index += 2 ;
} else {
throw new Parse . Error ( Parse . Error . OPERATION _FORBIDDEN , ` Postgres doesn't support this query type yet ` ) ;
}
}
return { pattern : patterns . join ( ' AND ' ) , values } ;
}
2016-06-12 16:35:13 -07:00
export class PostgresStorageAdapter {
// Private
_collectionPrefix : string ;
_client ;
constructor ( {
uri ,
collectionPrefix = '' ,
} ) {
this . _collectionPrefix = collectionPrefix ;
this . _client = pgp ( uri ) ;
}
_ensureSchemaCollectionExists ( ) {
return this . _client . query ( 'CREATE TABLE "_SCHEMA" ( "className" varChar(120), "schema" jsonb, "isParseClass" bool, PRIMARY KEY ("className") )' )
. catch ( error => {
if ( error . code === PostgresDuplicateRelationError ) {
// Table already exists, must have been created by a different request. Ignore error.
} else {
throw error ;
}
} ) ;
} ;
classExists ( name ) {
2016-06-16 19:34:00 -07:00
return Promise . reject ( 'Not implemented yet.' )
2016-06-12 16:35:13 -07:00
}
setClassLevelPermissions ( className , CLPs ) {
2016-06-16 19:34:00 -07:00
return Promise . reject ( 'Not implemented yet.' )
2016-06-12 16:35:13 -07:00
}
createClass ( className , schema ) {
2016-06-06 13:47:11 -07:00
let valuesArray = [ ] ;
let patternsArray = [ ] ;
Object . keys ( schema . fields ) . forEach ( ( fieldName , index ) => {
valuesArray . push ( fieldName ) ;
2016-06-10 14:09:48 -07:00
let parseType = schema . fields [ fieldName ] ;
if ( [ '_rperm' , '_wperm' ] . includes ( fieldName ) ) {
parseType . contents = { type : 'String' } ;
}
valuesArray . push ( parseTypeToPostgresType ( parseType ) ) ;
2016-06-06 13:47:11 -07:00
patternsArray . push ( ` $ ${ index * 2 + 2 } :name $ ${ index * 2 + 3 } :raw ` ) ;
} ) ;
2016-06-16 15:39:05 -07:00
return this . _ensureSchemaCollectionExists ( )
. then ( ( ) => this . _client . query ( ` CREATE TABLE $ 1:name ( ${ patternsArray . join ( ',' ) } ) ` , [ className , ... valuesArray ] ) )
. catch ( error => {
if ( error . code === PostgresDuplicateRelationError ) {
// Table already exists, must have been created by a different request. Ignore error.
} else {
throw error ;
}
} )
2016-06-12 16:35:13 -07:00
. then ( ( ) => this . _client . query ( 'INSERT INTO "_SCHEMA" ("className", "schema", "isParseClass") VALUES ($<className>, $<schema>, true)' , { className , schema } ) )
2016-06-17 11:09:42 -07:00
. then ( ( ) => schema ) ;
2016-06-12 16:35:13 -07:00
}
addFieldIfNotExists ( className , fieldName , type ) {
2016-06-17 08:04:58 +01:00
// TODO: Must be revised for invalid logic...
return this . _client . tx ( "addFieldIfNotExists" , t => {
return t . query ( 'ALTER TABLE $<className:name> ADD COLUMN $<fieldName:name> $<postgresType:raw>' , {
className ,
fieldName ,
postgresType : parseTypeToPostgresType ( type )
} )
. catch ( error => {
if ( error . code === PostgresRelationDoesNotExistError ) {
return this . createClass ( className , { fields : { [ fieldName ] : type } } )
} else if ( error . code === PostgresDuplicateColumnError ) {
// Column already exists, created by other request. Carry on to
// See if it's the right type.
} else {
throw error ;
}
} )
. then ( ( ) => t . query ( 'SELECT "schema" FROM "_SCHEMA" WHERE "className" = $<className>' , { className } ) )
. then ( result => {
if ( fieldName in result [ 0 ] . schema ) {
throw "Attempted to add a field that already exists" ;
} else {
result [ 0 ] . schema . fields [ fieldName ] = type ;
return t . query (
'UPDATE "_SCHEMA" SET "schema"=$<schema> WHERE "className"=$<className>' ,
{ schema : result [ 0 ] . schema , className }
) ;
}
} )
} ) ;
2016-06-12 16:35:13 -07:00
}
// Drops a collection. Resolves with true if it was a Parse Schema (eg. _User, Custom, etc.)
// and resolves with false if it wasn't (eg. a join table). Rejects if deletion was impossible.
deleteClass ( className ) {
2016-06-16 19:34:00 -07:00
return Promise . reject ( 'Not implemented yet.' )
2016-06-12 16:35:13 -07:00
}
2016-06-16 19:34:00 -07:00
// Delete all data known to this adapter. Used for testing.
2016-06-12 16:35:13 -07:00
deleteAllClasses ( ) {
return this . _client . query ( 'SELECT "className" FROM "_SCHEMA"' )
. then ( results => {
const classes = [ '_SCHEMA' , ... results . map ( result => result . className ) ] ;
2016-06-17 08:04:58 +01:00
return this . _client . tx ( t => t . batch ( classes . map ( className => t . none ( 'DROP TABLE $<className:name>' , { className } ) ) ) ) ;
2016-06-12 16:35:13 -07:00
} , error => {
if ( error . code === PostgresRelationDoesNotExistError ) {
// No _SCHEMA collection. Don't delete anything.
return ;
} else {
throw error ;
}
} )
}
// Remove the column and all the data. For Relations, the _Join collection is handled
// specially, this function does not delete _Join columns. It should, however, indicate
// that the relation fields does not exist anymore. In mongo, this means removing it from
// the _SCHEMA collection. There should be no actual data in the collection under the same name
// as the relation column, so it's fine to attempt to delete it. If the fields listed to be
// deleted do not exist, this function should return successfully anyways. Checking for
// attempts to delete non-existent fields is the responsibility of Parse Server.
// This function is not obligated to delete fields atomically. It is given the field
// names in a list so that databases that are capable of deleting fields atomically
// may do so.
// Returns a Promise.
deleteFields ( className , schema , fieldNames ) {
2016-06-16 19:34:00 -07:00
return Promise . reject ( 'Not implemented yet.' )
2016-06-12 16:35:13 -07:00
}
// Return a promise for all schemas known to this adapter, in Parse format. In case the
2016-06-16 19:34:00 -07:00
// schemas cannot be retrieved, returns a promise that rejects. Requirements for the
2016-06-12 16:35:13 -07:00
// rejection reason are TBD.
getAllClasses ( ) {
return this . _ensureSchemaCollectionExists ( )
2016-06-17 11:09:42 -07:00
. then ( ( ) => this . _client . map ( 'SELECT * FROM "_SCHEMA"' , null , row => ( { className : row . className , ... row . schema } ) ) ) ;
2016-06-12 16:35:13 -07:00
}
// Return a promise for the schema with the given name, in Parse format. If
// this adapter doesn't know about the schema, return a promise that rejects with
// undefined as the reason.
getClass ( className ) {
2016-07-08 22:52:14 -07:00
return this . _client . query ( 'SELECT * FROM "_SCHEMA" WHERE "className"=$<className>' , { className } )
. then ( result => {
if ( result . length === 1 ) {
return result [ 0 ] . schema ;
} else {
2016-06-12 16:35:13 -07:00
throw undefined ;
2016-07-08 22:52:14 -07:00
}
2016-06-12 16:35:13 -07:00
} ) ;
}
2016-06-11 00:43:02 -07:00
// TODO: remove the mongo format dependency in the return value
2016-06-12 16:35:13 -07:00
createObject ( className , schema , object ) {
2016-06-06 13:47:11 -07:00
let columnsArray = [ ] ;
let valuesArray = [ ] ;
Object . keys ( object ) . forEach ( fieldName => {
columnsArray . push ( fieldName ) ;
2016-06-10 14:09:48 -07:00
switch ( schema . fields [ fieldName ] . type ) {
case 'Date' :
valuesArray . push ( object [ fieldName ] . iso ) ;
break ;
case 'Pointer' :
valuesArray . push ( object [ fieldName ] . objectId ) ;
break ;
2016-06-17 09:59:16 -07:00
case 'Array' :
if ( [ '_rperm' , '_wperm' ] . includes ( fieldName ) ) {
valuesArray . push ( object [ fieldName ] ) ;
} else {
valuesArray . push ( JSON . stringify ( object [ fieldName ] ) ) ;
}
break ;
case 'Object' :
valuesArray . push ( object [ fieldName ] ) ;
break ;
case 'String' :
valuesArray . push ( object [ fieldName ] ) ;
break ;
case 'Number' :
valuesArray . push ( object [ fieldName ] ) ;
break ;
case 'Boolean' :
2016-06-10 14:09:48 -07:00
valuesArray . push ( object [ fieldName ] ) ;
break ;
2016-06-17 09:59:16 -07:00
default :
throw ` Type ${ schema . fields [ fieldName ] . type } not supported yet ` ;
break ;
2016-06-10 14:09:48 -07:00
}
2016-06-06 13:47:11 -07:00
} ) ;
2016-06-10 14:09:48 -07:00
let columnsPattern = columnsArray . map ( ( col , index ) => ` $ ${ index + 2 } :name ` ) . join ( ',' ) ;
2016-06-16 15:39:05 -07:00
let valuesPattern = valuesArray . map ( ( val , index ) => ` $ ${ index + 2 + columnsArray . length } ${ ( [ '_rperm' , '_wperm' ] . includes ( columnsArray [ index ] ) ) ? '::text[]' : '' } ` ) . join ( ',' ) ;
let qs = ` INSERT INTO $ 1:name ( ${ columnsPattern } ) VALUES ( ${ valuesPattern } ) `
let values = [ className , ... columnsArray , ... valuesArray ]
return this . _client . query ( qs , values )
2016-06-10 14:09:48 -07:00
. then ( ( ) => ( { ops : [ object ] } ) )
2016-06-16 15:39:05 -07:00
. catch ( error => {
if ( error . code === PostgresUniqueIndexViolationError ) {
throw new Parse . Error ( Parse . Error . DUPLICATE _VALUE , 'A duplicate value for a field with unique values was provided' ) ;
} else {
throw error ;
}
} )
2016-06-12 16:35:13 -07:00
}
// Remove all objects that match the given Parse Query.
// If no objects match, reject with OBJECT_NOT_FOUND. If objects are found and deleted, resolve with undefined.
// If there is some other error, reject with INTERNAL_SERVER_ERROR.
deleteObjectsByQuery ( className , schema , query ) {
2016-06-27 04:56:01 +01:00
return this . _client . one ( ` WITH deleted AS (DELETE FROM $ <className:name> RETURNING *) SELECT count(*) FROM deleted ` , { className } , res => parseInt ( res . count ) )
. then ( count => {
if ( count === 0 ) {
2016-06-10 14:09:48 -07:00
throw new Parse . Error ( Parse . Error . OBJECT _NOT _FOUND , 'Object not found.' ) ;
} else {
2016-06-27 04:56:01 +01:00
return count ;
2016-06-10 14:09:48 -07:00
}
} ) ;
2016-06-12 16:35:13 -07:00
}
// Apply the update to all objects that match the given Parse Query.
updateObjectsByQuery ( className , schema , query , update ) {
2016-06-16 19:34:00 -07:00
return Promise . reject ( 'Not implemented yet.' )
2016-06-12 16:35:13 -07:00
}
2016-06-11 00:43:02 -07:00
// Return value not currently well specified.
2016-06-12 16:35:13 -07:00
findOneAndUpdate ( className , schema , query , update ) {
2016-06-11 00:43:02 -07:00
let conditionPatterns = [ ] ;
let updatePatterns = [ ] ;
2016-06-16 15:39:05 -07:00
let values = [ className ]
2016-06-11 00:43:02 -07:00
let index = 2 ;
for ( let fieldName in update ) {
let fieldValue = update [ fieldName ] ;
if ( fieldValue . _ _op === 'Increment' ) {
updatePatterns . push ( ` $ ${ index } :name = COALESCE( $ ${ index } :name, 0) + $ ${ index + 1 } ` ) ;
values . push ( fieldName , fieldValue . amount ) ;
index += 2 ;
2016-06-17 09:59:16 -07:00
} else if ( fieldValue . _ _op === 'Add' ) {
updatePatterns . push ( ` $ ${ index } :name = COALESCE( $ ${ index } :name, '[]'::jsonb) || $ ${ index + 1 } ` ) ;
values . push ( fieldName , fieldValue . objects ) ;
index += 2 ;
} else if ( fieldValue . _ _op === 'Remove' ) {
return Promise . reject ( new Parse . Error ( Parse . Error . OPERATION _FORBIDDEN , 'Postgres does not support Remove operator.' ) ) ;
} else if ( fieldValue . _ _op === 'AddUnique' ) {
return Promise . reject ( new Parse . Error ( Parse . Error . OPERATION _FORBIDDEN , 'Postgres does not support AddUnique operator' ) ) ;
2016-06-11 00:43:02 -07:00
} else if ( fieldName === 'updatedAt' ) { //TODO: stop special casing this. It should check for __type === 'Date' and use .iso
updatePatterns . push ( ` $ ${ index } :name = $ ${ index + 1 } ` )
values . push ( fieldName , new Date ( fieldValue ) ) ;
index += 2 ;
2016-06-17 09:59:16 -07:00
} else if ( typeof fieldValue === 'string' ) {
updatePatterns . push ( ` $ ${ index } :name = $ ${ index + 1 } ` ) ;
values . push ( fieldName , fieldValue ) ;
index += 2 ;
2016-06-17 11:09:42 -07:00
} else if ( fieldValue . _ _type === 'Pointer' ) {
updatePatterns . push ( ` $ ${ index } :name = $ ${ index + 1 } ` ) ;
values . push ( fieldName , fieldValue . objectId ) ;
index += 2 ;
2016-06-11 00:43:02 -07:00
} else {
2016-06-17 09:59:16 -07:00
return Promise . reject ( new Parse . Error ( Parse . Error . OPERATION _FORBIDDEN , ` Postgres doesn't support update ${ JSON . stringify ( fieldValue ) } yet ` ) ) ;
2016-06-11 00:43:02 -07:00
}
}
2016-06-16 15:39:05 -07:00
let where = buildWhereClause ( { schema , index , query } )
values . push ( ... where . values ) ;
let qs = ` UPDATE $ 1:name SET ${ updatePatterns . join ( ',' ) } WHERE ${ where . pattern } RETURNING * ` ;
2016-06-11 00:43:02 -07:00
return this . _client . query ( qs , values )
2016-06-16 19:34:00 -07:00
. then ( val => val [ 0 ] ) ; // TODO: This is unsafe, verification is needed, or a different query method;
2016-06-12 16:35:13 -07:00
}
2016-06-16 19:34:00 -07:00
// Hopefully, we can get rid of this. It's only used for config and hooks.
2016-06-12 16:35:13 -07:00
upsertOneObject ( className , schema , query , update ) {
2016-06-16 19:34:00 -07:00
return Promise . reject ( 'Not implemented yet.' )
2016-06-12 16:35:13 -07:00
}
find ( className , schema , query , { skip , limit , sort } ) {
2016-06-16 15:39:05 -07:00
let values = [ className ] ;
let where = buildWhereClause ( { schema , query , index : 2 } )
values . push ( ... where . values ) ;
2016-06-11 00:43:02 -07:00
2016-06-17 11:09:42 -07:00
const wherePattern = where . pattern . length > 0 ? ` WHERE ${ where . pattern } ` : '' ;
const limitPattern = limit !== undefined ? ` LIMIT $ ${ values . length + 1 } ` : '' ;
const qs = ` SELECT * FROM $ 1:name ${ wherePattern } ${ limitPattern } ` ;
2016-06-16 15:39:05 -07:00
if ( limit !== undefined ) {
values . push ( limit ) ;
2016-06-11 00:43:02 -07:00
}
2016-06-16 15:39:05 -07:00
return this . _client . query ( qs , values )
2016-06-10 14:09:48 -07:00
. then ( results => results . map ( object => {
Object . keys ( schema . fields ) . filter ( field => schema . fields [ field ] . type === 'Pointer' ) . forEach ( fieldName => {
object [ fieldName ] = { objectId : object [ fieldName ] , _ _type : 'Pointer' , className : schema . fields [ fieldName ] . targetClass } ;
} ) ;
2016-06-11 00:43:02 -07:00
//TODO: remove this reliance on the mongo format. DB adapter shouldn't know there is a difference between created at and any other date field.
if ( object . createdAt ) {
object . createdAt = object . createdAt . toISOString ( ) ;
}
if ( object . updatedAt ) {
object . updatedAt = object . updatedAt . toISOString ( ) ;
}
if ( object . expiresAt ) {
object . expiresAt = { _ _type : 'Date' , iso : object . expiresAt . toISOString ( ) } ;
}
for ( let fieldName in object ) {
if ( object [ fieldName ] === null ) {
delete object [ fieldName ] ;
}
2016-06-17 09:59:16 -07:00
if ( object [ fieldName ] instanceof Date ) {
object [ fieldName ] = { _ _type : 'Date' , iso : object [ fieldName ] . toISOString ( ) } ;
}
2016-06-11 00:43:02 -07:00
}
2016-06-10 14:09:48 -07:00
return object ;
} ) )
2016-06-12 16:35:13 -07:00
}
// Create a unique index. Unique indexes on nullable fields are not allowed. Since we don't
// currently know which fields are nullable and which aren't, we ignore that criteria.
// As such, we shouldn't expose this function to users of parse until we have an out-of-band
// Way of determining if a field is nullable. Undefined doesn't count against uniqueness,
// which is why we use sparse indexes.
ensureUniqueness ( className , schema , fieldNames ) {
2016-06-16 15:39:05 -07:00
// Use the same name for every ensureUniqueness attempt, because postgres
// Will happily create the same index with multiple names.
const constraintName = ` unique_ ${ fieldNames . sort ( ) . join ( '_' ) } ` ;
const constraintPatterns = fieldNames . map ( ( fieldName , index ) => ` $ ${ index + 3 } :name ` ) ;
const qs = ` ALTER TABLE $ 1:name ADD CONSTRAINT $ 2:name UNIQUE ( ${ constraintPatterns . join ( ',' ) } ) ` ;
return this . _client . query ( qs , [ className , constraintName , ... fieldNames ] )
2016-06-17 09:59:16 -07:00
. catch ( error => {
if ( error . code === PostgresDuplicateRelationError && error . message . includes ( constraintName ) ) {
// Index already exists. Ignore error.
} else {
throw error ;
}
} ) ;
2016-06-12 16:35:13 -07:00
}
2016-06-16 19:34:00 -07:00
// Executes a count.
2016-06-12 16:35:13 -07:00
count ( className , schema , query ) {
2016-06-17 11:09:42 -07:00
let values = [ className ] ;
let where = buildWhereClause ( { schema , query , index : 2 } ) ;
values . push ( ... where . values ) ;
const wherePattern = where . pattern . length > 0 ? ` WHERE ${ where . pattern } ` : '' ;
const qs = ` SELECT COUNT(*) FROM $ 1:name ${ wherePattern } ` ;
return this . _client . query ( qs , values )
. then ( result => parseInt ( result [ 0 ] . count ) )
2016-06-12 16:35:13 -07:00
}
}
export default PostgresStorageAdapter ;
module . exports = PostgresStorageAdapter ; // Required for tests