2017-04-17 11:32:30 +09:00
import os from 'os' ;
import cluster from 'cluster' ;
2017-02-02 01:31:09 +01:00
import dotenv from 'dotenv'
import express from 'express'
2017-02-04 00:34:31 +01:00
import http from 'http'
2017-02-02 01:31:09 +01:00
import redis from 'redis'
import pg from 'pg'
2017-02-02 13:56:14 +01:00
import log from 'npmlog'
2017-02-04 00:34:31 +01:00
import url from 'url'
2017-05-07 00:05:38 +09:00
import WebSocket from 'uws'
2017-02-05 23:37:25 +01:00
import uuid from 'uuid'
2017-02-02 01:31:09 +01:00
2017-02-02 16:11:36 +01:00
const env = process . env . NODE _ENV || 'development'
dotenv . config ( {
path : env === 'production' ? '.env.production' : '.env'
} )
2017-02-02 01:31:09 +01:00
2017-05-03 23:18:13 +02:00
const dbUrlToConfig = ( dbUrl ) => {
if ( ! dbUrl ) {
return { }
}
const params = url . parse ( dbUrl )
2017-05-04 22:53:44 +09:00
const config = { }
if ( params . auth ) {
[ config . user , config . password ] = params . auth . split ( ':' )
}
if ( params . hostname ) {
config . host = params . hostname
}
if ( params . port ) {
config . port = params . port
2017-05-03 23:18:13 +02:00
}
2017-05-04 22:53:44 +09:00
if ( params . pathname ) {
2017-05-11 18:23:24 +09:00
config . database = params . pathname . split ( '/' ) [ 1 ]
2017-05-04 22:53:44 +09:00
}
const ssl = params . query && params . query . ssl
if ( ssl ) {
config . ssl = ssl === 'true' || ssl === '1'
}
return config
2017-05-03 23:18:13 +02:00
}
if ( cluster . isMaster ) {
// Cluster master
2017-04-22 22:11:56 +09:00
const core = + process . env . STREAMING _CLUSTER _NUM || ( env === 'development' ? 1 : Math . max ( os . cpus ( ) . length - 1 , 1 ) )
2017-05-03 23:18:13 +02:00
2017-04-17 11:32:30 +09:00
const fork = ( ) => {
const worker = cluster . fork ( ) ;
2017-05-03 23:18:13 +02:00
2017-04-17 11:32:30 +09:00
worker . on ( 'exit' , ( code , signal ) => {
log . error ( ` Worker died with exit code ${ code } , signal ${ signal } received. ` ) ;
setTimeout ( ( ) => fork ( ) , 0 ) ;
} ) ;
} ;
2017-05-03 23:18:13 +02:00
2017-04-17 11:32:30 +09:00
for ( let i = 0 ; i < core ; i ++ ) fork ( ) ;
2017-05-03 23:18:13 +02:00
log . info ( ` Starting streaming API server master with ${ core } workers ` )
2017-04-17 11:32:30 +09:00
} else {
2017-05-03 23:18:13 +02:00
// Cluster worker
2017-04-17 11:32:30 +09:00
const pgConfigs = {
development : {
database : 'mastodon_development' ,
host : '/var/run/postgresql' ,
max : 10
} ,
production : {
user : process . env . DB _USER || 'mastodon' ,
password : process . env . DB _PASS || '' ,
database : process . env . DB _NAME || 'mastodon_production' ,
host : process . env . DB _HOST || 'localhost' ,
port : process . env . DB _PORT || 5432 ,
max : 10
}
2017-02-02 01:31:09 +01:00
}
2017-04-17 11:32:30 +09:00
const app = express ( )
2017-05-04 22:53:44 +09:00
const pgPool = new pg . Pool ( Object . assign ( pgConfigs [ env ] , dbUrlToConfig ( process . env . DATABASE _URL ) ) )
2017-04-17 11:32:30 +09:00
const server = http . createServer ( app )
const wss = new WebSocket . Server ( { server } )
2017-05-07 13:42:32 -04:00
const redisNamespace = process . env . REDIS _NAMESPACE || null
2017-02-07 14:37:12 +01:00
2017-05-07 13:42:32 -04:00
const redisParams = {
2017-04-17 11:32:30 +09:00
host : process . env . REDIS _HOST || '127.0.0.1' ,
port : process . env . REDIS _PORT || 6379 ,
2017-05-03 23:18:13 +02:00
password : process . env . REDIS _PASSWORD ,
url : process . env . REDIS _URL || null
2017-05-07 13:42:32 -04:00
}
if ( redisNamespace ) {
redisParams . namespace = redisNamespace
}
const redisPrefix = redisNamespace ? ` ${ redisNamespace } : ` : ''
const redisClient = redis . createClient ( redisParams )
2017-02-07 14:37:12 +01:00
2017-04-17 11:32:30 +09:00
const subs = { }
2017-02-07 14:37:12 +01:00
2017-04-17 11:32:30 +09:00
redisClient . on ( 'pmessage' , ( _ , channel , message ) => {
const callbacks = subs [ channel ]
2017-02-07 14:37:12 +01:00
2017-04-17 11:32:30 +09:00
log . silly ( ` New message on channel ${ channel } ` )
2017-02-07 14:37:12 +01:00
2017-04-17 11:32:30 +09:00
if ( ! callbacks ) {
return
}
callbacks . forEach ( callback => callback ( message ) )
} )
2017-02-07 14:37:12 +01:00
2017-05-07 13:42:32 -04:00
redisClient . psubscribe ( ` ${ redisPrefix } timeline:* ` )
2017-02-07 14:37:12 +01:00
2017-04-17 11:32:30 +09:00
const subscribe = ( channel , callback ) => {
log . silly ( ` Adding listener for ${ channel } ` )
subs [ channel ] = subs [ channel ] || [ ]
subs [ channel ] . push ( callback )
}
2017-02-03 18:27:42 +01:00
2017-04-17 11:32:30 +09:00
const unsubscribe = ( channel , callback ) => {
log . silly ( ` Removing listener for ${ channel } ` )
subs [ channel ] = subs [ channel ] . filter ( item => item !== callback )
}
2017-02-03 18:27:42 +01:00
2017-04-17 11:32:30 +09:00
const allowCrossDomain = ( req , res , next ) => {
res . header ( 'Access-Control-Allow-Origin' , '*' )
res . header ( 'Access-Control-Allow-Headers' , 'Authorization, Accept, Cache-Control' )
res . header ( 'Access-Control-Allow-Methods' , 'GET, OPTIONS' )
2017-02-05 23:37:25 +01:00
2017-04-17 11:32:30 +09:00
next ( )
}
2017-02-05 23:37:25 +01:00
2017-04-17 11:32:30 +09:00
const setRequestId = ( req , res , next ) => {
req . requestId = uuid . v4 ( )
res . header ( 'X-Request-Id' , req . requestId )
2017-02-02 01:31:09 +01:00
2017-04-17 11:32:30 +09:00
next ( )
}
2017-02-02 01:31:09 +01:00
2017-04-17 11:32:30 +09:00
const accountFromToken = ( token , req , next ) => {
pgPool . connect ( ( err , client , done ) => {
2017-02-02 01:31:09 +01:00
if ( err ) {
2017-04-10 15:36:03 -07:00
next ( err )
return
2017-02-02 01:31:09 +01:00
}
2017-04-17 11:32:30 +09:00
client . query ( 'SELECT oauth_access_tokens.resource_owner_id, users.account_id FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id WHERE oauth_access_tokens.token = $1 LIMIT 1' , [ token ] , ( err , result ) => {
done ( )
2017-02-02 01:31:09 +01:00
2017-04-17 11:32:30 +09:00
if ( err ) {
next ( err )
return
}
2017-02-02 01:31:09 +01:00
2017-04-17 11:32:30 +09:00
if ( result . rows . length === 0 ) {
err = new Error ( 'Invalid access token' )
err . statusCode = 401
2017-02-04 00:34:31 +01:00
2017-04-17 11:32:30 +09:00
next ( err )
return
}
2017-02-04 00:34:31 +01:00
2017-04-17 11:32:30 +09:00
req . accountId = result . rows [ 0 ] . account _id
2017-02-04 00:34:31 +01:00
2017-04-17 11:32:30 +09:00
next ( )
} )
} )
2017-02-04 00:34:31 +01:00
}
2017-04-17 11:32:30 +09:00
const authenticationMiddleware = ( req , res , next ) => {
if ( req . method === 'OPTIONS' ) {
next ( )
return
}
2017-02-04 00:34:31 +01:00
2017-04-17 11:32:30 +09:00
const authorization = req . get ( 'Authorization' )
2017-02-04 00:34:31 +01:00
2017-04-17 11:32:30 +09:00
if ( ! authorization ) {
const err = new Error ( 'Missing access token' )
err . statusCode = 401
2017-02-02 01:31:09 +01:00
2017-04-17 11:32:30 +09:00
next ( err )
return
}
2017-02-02 17:10:59 +01:00
2017-04-17 11:32:30 +09:00
const token = authorization . replace ( /^Bearer / , '' )
2017-02-02 13:56:14 +01:00
2017-04-17 11:32:30 +09:00
accountFromToken ( token , req , next )
}
2017-02-05 03:19:04 +01:00
2017-04-17 11:32:30 +09:00
const errorMiddleware = ( err , req , res , next ) => {
log . error ( req . requestId , err )
res . writeHead ( err . statusCode || 500 , { 'Content-Type' : 'application/json' } )
res . end ( JSON . stringify ( { error : err . statusCode ? ` ${ err } ` : 'An unexpected error occurred' } ) )
}
2017-02-05 03:19:04 +01:00
2017-04-17 11:32:30 +09:00
const placeholders = ( arr , shift = 0 ) => arr . map ( ( _ , i ) => ` $ ${ i + 1 + shift } ` ) . join ( ', ' ) ;
2017-02-02 01:31:09 +01:00
2017-04-17 11:32:30 +09:00
const streamFrom = ( id , req , output , attachCloseHandler , needsFiltering = false ) => {
log . verbose ( req . requestId , ` Starting stream from ${ id } for ${ req . accountId } ` )
const listener = message => {
const { event , payload , queued _at } = JSON . parse ( message )
2017-02-02 13:56:14 +01:00
2017-04-17 11:32:30 +09:00
const transmit = ( ) => {
const now = new Date ( ) . getTime ( )
const delta = now - queued _at ;
2017-02-02 13:56:14 +01:00
2017-04-17 11:32:30 +09:00
log . silly ( req . requestId , ` Transmitting for ${ req . accountId } : ${ event } ${ payload } Delay: ${ delta } ms ` )
output ( event , payload )
}
2017-02-02 13:56:14 +01:00
2017-04-17 11:32:30 +09:00
// Only messages that may require filtering are statuses, since notifications
// are already personalized and deletes do not matter
if ( needsFiltering && event === 'update' ) {
pgPool . connect ( ( err , client , done ) => {
2017-02-02 13:56:14 +01:00
if ( err ) {
log . error ( err )
return
}
2017-04-17 11:32:30 +09:00
const unpackedPayload = JSON . parse ( payload )
const targetAccountIds = [ unpackedPayload . account . id ] . concat ( unpackedPayload . mentions . map ( item => item . id ) ) . concat ( unpackedPayload . reblog ? [ unpackedPayload . reblog . account . id ] : [ ] )
2017-04-30 21:49:24 +09:00
client . query ( ` SELECT target_account_id FROM blocks WHERE account_id = $ 1 AND target_account_id IN ( ${ placeholders ( targetAccountIds , 1 ) } ) UNION SELECT target_account_id FROM mutes WHERE account_id = $ 1 AND target_account_id IN ( ${ placeholders ( targetAccountIds , 1 ) } ) ` , [ req . accountId ] . concat ( targetAccountIds ) , ( err , result ) => {
2017-04-17 11:32:30 +09:00
done ( )
if ( err ) {
log . error ( err )
return
}
2017-02-02 13:56:14 +01:00
2017-04-17 11:32:30 +09:00
if ( result . rows . length > 0 ) {
return
}
transmit ( )
} )
2017-02-02 13:56:14 +01:00
} )
2017-04-17 11:32:30 +09:00
} else {
transmit ( )
}
2017-02-02 13:56:14 +01:00
}
2017-04-17 11:32:30 +09:00
2017-05-07 13:42:32 -04:00
subscribe ( ` ${ redisPrefix } ${ id } ` , listener )
attachCloseHandler ( ` ${ redisPrefix } ${ id } ` , listener )
2017-02-07 14:37:12 +01:00
}
2017-02-02 01:31:09 +01:00
2017-04-17 11:32:30 +09:00
// Setup stream output to HTTP
const streamToHttp = ( req , res ) => {
res . setHeader ( 'Content-Type' , 'text/event-stream' )
res . setHeader ( 'Transfer-Encoding' , 'chunked' )
2017-02-04 00:34:31 +01:00
2017-04-17 11:32:30 +09:00
const heartbeat = setInterval ( ( ) => res . write ( ':thump\n' ) , 15000 )
2017-02-04 00:34:31 +01:00
2017-04-17 11:32:30 +09:00
req . on ( 'close' , ( ) => {
log . verbose ( req . requestId , ` Ending stream for ${ req . accountId } ` )
clearInterval ( heartbeat )
} )
2017-02-02 15:20:31 +01:00
2017-04-17 11:32:30 +09:00
return ( event , payload ) => {
res . write ( ` event: ${ event } \n ` )
res . write ( ` data: ${ payload } \n \n ` )
}
}
2017-02-02 01:31:09 +01:00
2017-04-17 11:32:30 +09:00
// Setup stream end for HTTP
const streamHttpEnd = req => ( id , listener ) => {
req . on ( 'close' , ( ) => {
unsubscribe ( id , listener )
} )
2017-02-04 00:34:31 +01:00
}
2017-04-17 11:32:30 +09:00
// Setup stream output to WebSockets
const streamToWs = ( req , ws ) => {
2017-05-07 00:05:38 +09:00
const heartbeat = setInterval ( ( ) => {
// TODO: Can't add multiple listeners, due to the limitation of uws.
if ( ws . readyState !== ws . OPEN ) {
log . verbose ( req . requestId , ` Ending stream for ${ req . accountId } ` )
clearInterval ( heartbeat )
return
}
2017-02-07 14:37:12 +01:00
2017-05-07 00:05:38 +09:00
ws . ping ( )
} , 15000 )
2017-04-02 21:27:14 +02:00
2017-04-17 11:32:30 +09:00
return ( event , payload ) => {
if ( ws . readyState !== ws . OPEN ) {
log . error ( req . requestId , 'Tried writing to closed socket' )
return
}
2017-02-04 00:34:31 +01:00
2017-04-17 11:32:30 +09:00
ws . send ( JSON . stringify ( { event , payload } ) )
2017-02-05 23:37:25 +01:00
}
2017-02-04 00:34:31 +01:00
}
2017-02-02 01:31:09 +01:00
2017-04-17 11:32:30 +09:00
// Setup stream end for WebSockets
const streamWsEnd = ws => ( id , listener ) => {
ws . on ( 'close' , ( ) => {
unsubscribe ( id , listener )
} )
2017-04-02 21:27:14 +02:00
2017-04-17 11:32:30 +09:00
ws . on ( 'error' , e => {
unsubscribe ( id , listener )
} )
}
2017-02-04 00:34:31 +01:00
2017-04-17 11:32:30 +09:00
app . use ( setRequestId )
app . use ( allowCrossDomain )
app . use ( authenticationMiddleware )
app . use ( errorMiddleware )
2017-02-02 01:31:09 +01:00
2017-04-17 11:32:30 +09:00
app . get ( '/api/v1/streaming/user' , ( req , res ) => {
streamFrom ( ` timeline: ${ req . accountId } ` , req , streamToHttp ( req , res ) , streamHttpEnd ( req ) )
} )
2017-02-04 00:34:31 +01:00
2017-04-17 11:32:30 +09:00
app . get ( '/api/v1/streaming/public' , ( req , res ) => {
streamFrom ( 'timeline:public' , req , streamToHttp ( req , res ) , streamHttpEnd ( req ) , true )
} )
2017-02-04 00:34:31 +01:00
2017-04-17 11:32:30 +09:00
app . get ( '/api/v1/streaming/public/local' , ( req , res ) => {
streamFrom ( 'timeline:public:local' , req , streamToHttp ( req , res ) , streamHttpEnd ( req ) , true )
} )
2017-02-06 23:46:14 +01:00
2017-04-17 11:32:30 +09:00
app . get ( '/api/v1/streaming/hashtag' , ( req , res ) => {
2017-04-21 23:45:14 +09:00
streamFrom ( ` timeline:hashtag: ${ req . query . tag } ` , req , streamToHttp ( req , res ) , streamHttpEnd ( req ) , true )
2017-04-17 11:32:30 +09:00
} )
2017-02-02 13:56:14 +01:00
2017-04-17 11:32:30 +09:00
app . get ( '/api/v1/streaming/hashtag/local' , ( req , res ) => {
2017-04-21 23:45:14 +09:00
streamFrom ( ` timeline:hashtag: ${ req . query . tag } :local ` , req , streamToHttp ( req , res ) , streamHttpEnd ( req ) , true )
2017-04-17 11:32:30 +09:00
} )
2017-02-06 23:46:14 +01:00
2017-04-17 11:32:30 +09:00
wss . on ( 'connection' , ws => {
const location = url . parse ( ws . upgradeReq . url , true )
const token = location . query . access _token
const req = { requestId : uuid . v4 ( ) }
2017-02-02 01:31:09 +01:00
2017-04-17 11:32:30 +09:00
accountFromToken ( token , req , err => {
if ( err ) {
log . error ( req . requestId , err )
ws . close ( )
return
}
2017-02-04 00:34:31 +01:00
2017-04-17 11:32:30 +09:00
switch ( location . query . stream ) {
case 'user' :
streamFrom ( ` timeline: ${ req . accountId } ` , req , streamToWs ( req , ws ) , streamWsEnd ( ws ) )
break ;
case 'public' :
streamFrom ( 'timeline:public' , req , streamToWs ( req , ws ) , streamWsEnd ( ws ) , true )
break ;
case 'public:local' :
streamFrom ( 'timeline:public:local' , req , streamToWs ( req , ws ) , streamWsEnd ( ws ) , true )
break ;
case 'hashtag' :
streamFrom ( ` timeline:hashtag: ${ location . query . tag } ` , req , streamToWs ( req , ws ) , streamWsEnd ( ws ) , true )
break ;
case 'hashtag:local' :
streamFrom ( ` timeline:hashtag: ${ location . query . tag } :local ` , req , streamToWs ( req , ws ) , streamWsEnd ( ws ) , true )
break ;
default :
ws . close ( )
}
} )
2017-02-04 00:34:31 +01:00
} )
2017-04-17 11:32:30 +09:00
server . listen ( process . env . PORT || 4000 , ( ) => {
log . level = process . env . LOG _LEVEL || 'verbose'
2017-04-29 21:35:16 +09:00
log . info ( ` Starting streaming API server worker on ${ server . address ( ) . address } : ${ server . address ( ) . port } ` )
2017-04-17 11:32:30 +09:00
} )
2017-04-22 02:24:31 +09:00
process . on ( 'SIGINT' , exit )
process . on ( 'SIGTERM' , exit )
process . on ( 'exit' , exit )
function exit ( ) {
server . close ( )
}
2017-04-17 11:32:30 +09:00
}