
523 lines
14 KiB

import buffers from './testing/test-buffers'
import BufferList from './testing/buffer-list'
import { parse } from '.'
import assert from 'assert'
import { PassThrough } from 'stream'
import { BackendMessage } from './messages'
var authOkBuffer = buffers.authenticationOk()
var paramStatusBuffer = buffers.parameterStatus('client_encoding', 'UTF8')
var readyForQueryBuffer = buffers.readyForQuery()
var backendKeyDataBuffer = buffers.backendKeyData(1, 2)
var commandCompleteBuffer = buffers.commandComplete('SELECT 3')
var parseCompleteBuffer = buffers.parseComplete()
var bindCompleteBuffer = buffers.bindComplete()
var portalSuspendedBuffer = buffers.portalSuspended()
var addRow = function (bufferList: BufferList, name: string, offset: number) {
return bufferList
.addCString(name) // field name
.addInt32(offset++) // table id
.addInt16(offset++) // attribute of column number
.addInt32(offset++) // objectId of field's data type
.addInt16(offset++) // datatype size
.addInt32(offset++) // type modifier
.addInt16(0) // format code, 0 => text
var row1 = {
name: 'id',
tableID: 1,
attributeNumber: 2,
dataTypeID: 3,
dataTypeSize: 4,
typeModifier: 5,
formatCode: 0,
var oneRowDescBuff = buffers.rowDescription([row1]) = 'bang'
var twoRowBuf = buffers.rowDescription([
name: 'whoah',
tableID: 10,
attributeNumber: 11,
dataTypeID: 12,
dataTypeSize: 13,
typeModifier: 14,
formatCode: 0,
var emptyRowFieldBuf = new BufferList().addInt16(0).join(true, 'D')
var emptyRowFieldBuf = buffers.dataRow([])
var oneFieldBuf = new BufferList()
.addInt16(1) // number of fields
.addInt32(5) // length of bytes of fields
.join(true, 'D')
var oneFieldBuf = buffers.dataRow(['test'])
var expectedAuthenticationOkayMessage = {
name: 'authenticationOk',
length: 8,
var expectedParameterStatusMessage = {
name: 'parameterStatus',
parameterName: 'client_encoding',
parameterValue: 'UTF8',
length: 25,
var expectedBackendKeyDataMessage = {
name: 'backendKeyData',
processID: 1,
secretKey: 2,
var expectedReadyForQueryMessage = {
name: 'readyForQuery',
length: 5,
status: 'I',
var expectedCommandCompleteMessage = {
name: 'commandComplete',
length: 13,
text: 'SELECT 3',
var emptyRowDescriptionBuffer = new BufferList()
.addInt16(0) // number of fields
.join(true, 'T')
var expectedEmptyRowDescriptionMessage = {
name: 'rowDescription',
length: 6,
fieldCount: 0,
fields: [],
var expectedOneRowMessage = {
name: 'rowDescription',
length: 27,
fieldCount: 1,
fields: [
name: 'id',
tableID: 1,
columnID: 2,
dataTypeID: 3,
dataTypeSize: 4,
dataTypeModifier: 5,
format: 'text',
var expectedTwoRowMessage = {
name: 'rowDescription',
length: 53,
fieldCount: 2,
fields: [
name: 'bang',
tableID: 1,
columnID: 2,
dataTypeID: 3,
dataTypeSize: 4,
dataTypeModifier: 5,
format: 'text',
name: 'whoah',
tableID: 10,
columnID: 11,
dataTypeID: 12,
dataTypeSize: 13,
dataTypeModifier: 14,
format: 'text',
var testForMessage = function (buffer: Buffer, expectedMessage: any) {
it('recieves and parses ' +, async () => {
const messages = await parseBuffers([buffer])
const [lastMessage] = messages
for (const key in expectedMessage) {
assert.deepEqual((lastMessage as any)[key], expectedMessage[key])
var plainPasswordBuffer = buffers.authenticationCleartextPassword()
var md5PasswordBuffer = buffers.authenticationMD5Password()
var SASLBuffer = buffers.authenticationSASL()
var SASLContinueBuffer = buffers.authenticationSASLContinue()
var SASLFinalBuffer = buffers.authenticationSASLFinal()
var expectedPlainPasswordMessage = {
name: 'authenticationCleartextPassword',
var expectedMD5PasswordMessage = {
name: 'authenticationMD5Password',
salt: Buffer.from([1, 2, 3, 4]),
var expectedSASLMessage = {
name: 'authenticationSASL',
mechanisms: ['SCRAM-SHA-256'],
var expectedSASLContinueMessage = {
name: 'authenticationSASLContinue',
data: 'data',
var expectedSASLFinalMessage = {
name: 'authenticationSASLFinal',
data: 'data',
var notificationResponseBuffer = buffers.notification(4, 'hi', 'boom')
var expectedNotificationResponseMessage = {
name: 'notification',
processId: 4,
channel: 'hi',
payload: 'boom',
const parseBuffers = async (buffers: Buffer[]): Promise<BackendMessage[]> => {
const stream = new PassThrough()
for (const buffer of buffers) {
const msgs: BackendMessage[] = []
await parse(stream, (msg) => msgs.push(msg))
return msgs
describe('PgPacketStream', function () {
testForMessage(authOkBuffer, expectedAuthenticationOkayMessage)
testForMessage(plainPasswordBuffer, expectedPlainPasswordMessage)
testForMessage(md5PasswordBuffer, expectedMD5PasswordMessage)
testForMessage(SASLBuffer, expectedSASLMessage)
testForMessage(SASLContinueBuffer, expectedSASLContinueMessage)
// this exercises a found bug in the parser:
// and adds a test which is deterministic, rather than relying on network packet chunking
const extendedSASLContinueBuffer = Buffer.concat([SASLContinueBuffer, Buffer.from([1, 2, 3, 4])])
testForMessage(extendedSASLContinueBuffer, expectedSASLContinueMessage)
testForMessage(SASLFinalBuffer, expectedSASLFinalMessage)
// this exercises a found bug in the parser:
// and adds a test which is deterministic, rather than relying on network packet chunking
const extendedSASLFinalBuffer = Buffer.concat([SASLFinalBuffer, Buffer.from([1, 2, 4, 5])])
testForMessage(extendedSASLFinalBuffer, expectedSASLFinalMessage)
testForMessage(paramStatusBuffer, expectedParameterStatusMessage)
testForMessage(backendKeyDataBuffer, expectedBackendKeyDataMessage)
testForMessage(readyForQueryBuffer, expectedReadyForQueryMessage)
testForMessage(commandCompleteBuffer, expectedCommandCompleteMessage)
testForMessage(notificationResponseBuffer, expectedNotificationResponseMessage)
testForMessage(buffers.emptyQuery(), {
name: 'emptyQuery',
length: 4,
testForMessage(Buffer.from([0x6e, 0, 0, 0, 4]), {
name: 'noData',
describe('rowDescription messages', function () {
testForMessage(emptyRowDescriptionBuffer, expectedEmptyRowDescriptionMessage)
testForMessage(oneRowDescBuff, expectedOneRowMessage)
testForMessage(twoRowBuf, expectedTwoRowMessage)
describe('parsing rows', function () {
describe('parsing empty row', function () {
testForMessage(emptyRowFieldBuf, {
name: 'dataRow',
fieldCount: 0,
describe('parsing data row with fields', function () {
testForMessage(oneFieldBuf, {
name: 'dataRow',
fieldCount: 1,
fields: ['test'],
describe('notice message', function () {
// this uses the same logic as error message
var buff = buffers.notice([{ type: 'C', value: 'code' }])
testForMessage(buff, {
name: 'notice',
code: 'code',
testForMessage(buffers.error([]), {
name: 'error',
describe('with all the fields', function () {
var buffer = buffers.error([
type: 'S',
value: 'ERROR',
type: 'C',
value: 'code',
type: 'M',
value: 'message',
type: 'D',
value: 'details',
type: 'H',
value: 'hint',
type: 'P',
value: '100',
type: 'p',
value: '101',
type: 'q',
value: 'query',
type: 'W',
value: 'where',
type: 'F',
value: 'file',
type: 'L',
value: 'line',
type: 'R',
value: 'routine',
type: 'Z', // ignored
value: 'alsdkf',
testForMessage(buffer, {
name: 'error',
severity: 'ERROR',
code: 'code',
message: 'message',
detail: 'details',
hint: 'hint',
position: '100',
internalPosition: '101',
internalQuery: 'query',
where: 'where',
file: 'file',
line: 'line',
routine: 'routine',
testForMessage(parseCompleteBuffer, {
name: 'parseComplete',
testForMessage(bindCompleteBuffer, {
name: 'bindComplete',
testForMessage(bindCompleteBuffer, {
name: 'bindComplete',
testForMessage(buffers.closeComplete(), {
name: 'closeComplete',
describe('parses portal suspended message', function () {
testForMessage(portalSuspendedBuffer, {
name: 'portalSuspended',
describe('parses replication start message', function () {
testForMessage(Buffer.from([0x57, 0x00, 0x00, 0x00, 0x04]), {
name: 'replicationStart',
length: 4,
describe('copy', () => {
testForMessage(buffers.copyIn(0), {
name: 'copyInResponse',
length: 7,
binary: false,
columnTypes: [],
testForMessage(buffers.copyIn(2), {
name: 'copyInResponse',
length: 11,
binary: false,
columnTypes: [0, 1],
testForMessage(buffers.copyOut(0), {
name: 'copyOutResponse',
length: 7,
binary: false,
columnTypes: [],
testForMessage(buffers.copyOut(3), {
name: 'copyOutResponse',
length: 13,
binary: false,
columnTypes: [0, 1, 2],
testForMessage(buffers.copyDone(), {
name: 'copyDone',
length: 4,
testForMessage(buffers.copyData(Buffer.from([5, 6, 7])), {
name: 'copyData',
length: 7,
chunk: Buffer.from([5, 6, 7]),
// since the data message on a stream can randomly divide the incomming
// tcp packets anywhere, we need to make sure we can parse every single
// split on a tcp message
describe('split buffer, single message parsing', function () {
var fullBuffer = buffers.dataRow([null, 'bang', 'zug zug', null, '!'])
it('parses when full buffer comes in', async function () {
const messages = await parseBuffers([fullBuffer])
const message = messages[0] as any
assert.equal(message.fields.length, 5)
assert.equal(message.fields[0], null)
assert.equal(message.fields[1], 'bang')
assert.equal(message.fields[2], 'zug zug')
assert.equal(message.fields[3], null)
assert.equal(message.fields[4], '!')
var testMessageRecievedAfterSpiltAt = async function (split: number) {
var firstBuffer = Buffer.alloc(fullBuffer.length - split)
var secondBuffer = Buffer.alloc(fullBuffer.length - firstBuffer.length)
fullBuffer.copy(firstBuffer, 0, 0)
fullBuffer.copy(secondBuffer, 0, firstBuffer.length)
const messages = await parseBuffers([fullBuffer])
const message = messages[0] as any
assert.equal(message.fields.length, 5)
assert.equal(message.fields[0], null)
assert.equal(message.fields[1], 'bang')
assert.equal(message.fields[2], 'zug zug')
assert.equal(message.fields[3], null)
assert.equal(message.fields[4], '!')
it('parses when split in the middle', function () {
it('parses when split at end', function () {
it('parses when split at beginning', function () {
testMessageRecievedAfterSpiltAt(fullBuffer.length - 2)
testMessageRecievedAfterSpiltAt(fullBuffer.length - 1)
testMessageRecievedAfterSpiltAt(fullBuffer.length - 5)
describe('split buffer, multiple message parsing', function () {
var dataRowBuffer = buffers.dataRow(['!'])
var readyForQueryBuffer = buffers.readyForQuery()
var fullBuffer = Buffer.alloc(dataRowBuffer.length + readyForQueryBuffer.length)
dataRowBuffer.copy(fullBuffer, 0, 0)
readyForQueryBuffer.copy(fullBuffer, dataRowBuffer.length, 0)
var verifyMessages = function (messages: any[]) {
assert.strictEqual(messages.length, 2)
assert.deepEqual(messages[0], {
name: 'dataRow',
fieldCount: 1,
length: 11,
fields: ['!'],
assert.equal(messages[0].fields[0], '!')
assert.deepEqual(messages[1], {
name: 'readyForQuery',
length: 5,
status: 'I',
// sanity check
it('recieves both messages when packet is not split', async function () {
const messages = await parseBuffers([fullBuffer])
var splitAndVerifyTwoMessages = async function (split: number) {
var firstBuffer = Buffer.alloc(fullBuffer.length - split)
var secondBuffer = Buffer.alloc(fullBuffer.length - firstBuffer.length)
fullBuffer.copy(firstBuffer, 0, 0)
fullBuffer.copy(secondBuffer, 0, firstBuffer.length)
const messages = await parseBuffers([firstBuffer, secondBuffer])
describe('recieves both messages when packet is split', function () {
it('in the middle', function () {
return splitAndVerifyTwoMessages(11)
it('at the front', function () {
return Promise.all([
splitAndVerifyTwoMessages(fullBuffer.length - 1),
splitAndVerifyTwoMessages(fullBuffer.length - 4),
splitAndVerifyTwoMessages(fullBuffer.length - 6),
it('at the end', function () {
return Promise.all([splitAndVerifyTwoMessages(8), splitAndVerifyTwoMessages(1)])