Skip to content

Commit 7bdaf56

Browse files
committed
user and magin ws
1 parent d1c1738 commit 7bdaf56

10 files changed

Lines changed: 530 additions & 204 deletions

File tree

src/http-client.js

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -543,11 +543,7 @@ export default opts => {
543543
keepDataStream: payload => privCall('/api/v3/userDataStream', payload, 'PUT', false, true),
544544
closeDataStream: payload =>
545545
privCall('/api/v3/userDataStream', payload, 'DELETE', false, true),
546-
marginGetDataStream: () => privCall('/sapi/v1/userDataStream', null, 'POST', true),
547-
marginKeepDataStream: payload =>
548-
privCall('/sapi/v1/userDataStream', payload, 'PUT', false, true),
549-
marginCloseDataStream: payload =>
550-
privCall('/sapi/v1/userDataStream', payload, 'DELETE', false, true),
546+
marginGetListenToken: payload => privCall('/sapi/v1/userListenToken', payload, 'POST'),
551547
futuresGetDataStream: () => privCall('/fapi/v1/listenKey', null, 'POST', true),
552548
futuresKeepDataStream: payload =>
553549
privCall('/fapi/v1/listenKey', payload, 'PUT', false, true),

src/websocket.js

Lines changed: 187 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -882,10 +882,15 @@ const getStreamMethods = (opts, variator = '') => {
882882
export const keepStreamAlive = (method, listenKey) => method({ listenKey })
883883

884884
const userWebSocketApi = opts => (cb, transform) => {
885-
const isTestnet = opts.testnet || (opts.httpBase && opts.httpBase.includes('testnet'))
886-
const wsApiUrl = isTestnet
887-
? opts.wsApiTestnet || 'wss://ws-api.testnet.binance.vision/ws-api/v3'
888-
: opts.wsApi || 'wss://ws-api.binance.com:443/ws-api/v3'
885+
const isDemo = opts.testnet && !(opts.httpBase && opts.httpBase.includes('testnet'))
886+
const isTestnet = !isDemo && opts.httpBase && opts.httpBase.includes('testnet')
887+
const wsApiUrl =
888+
opts.wsApi ||
889+
(isDemo
890+
? 'wss://demo-ws-api.binance.com/ws-api/v3'
891+
: isTestnet
892+
? opts.wsApiTestnet || 'wss://ws-api.testnet.binance.vision/ws-api/v3'
893+
: 'wss://ws-api.binance.com:443/ws-api/v3')
889894

890895
let requestId = 1
891896
const errorHandler = userErrorHandler(cb, transform)
@@ -982,6 +987,177 @@ const userWebSocketApi = opts => (cb, transform) => {
982987
})
983988
}
984989

990+
const marginUserWebSocketApi = opts => (cb, transform, marginOpts = {}) => {
991+
const isTestnet = opts.testnet || (opts.httpBase && opts.httpBase.includes('testnet'))
992+
const wsApiUrl = isTestnet
993+
? opts.wsApiTestnet || 'wss://ws-api.testnet.binance.vision/ws-api/v3'
994+
: opts.wsApi || 'wss://ws-api.binance.com:443/ws-api/v3'
995+
996+
const methods = httpMethods(opts)
997+
let requestId = 1
998+
let renewalTimeout = null
999+
let w = null
1000+
let keepClosed = false
1001+
const errorHandler = userErrorHandler(cb, transform)
1002+
const RENEWAL_BUFFER_MS = 5 * 60 * 1000
1003+
1004+
const cleanup = (options = {}, internal = false) => {
1005+
if (!internal) keepClosed = true
1006+
if (renewalTimeout) {
1007+
clearTimeout(renewalTimeout)
1008+
renewalTimeout = null
1009+
}
1010+
if (w) {
1011+
try {
1012+
w.send(
1013+
JSONbig.stringify({
1014+
id: requestId++,
1015+
method: 'userDataStream.unsubscribe',
1016+
}),
1017+
)
1018+
} catch (e) {
1019+
// Ignore send errors during close
1020+
}
1021+
w.close(1000, 'Close handle was called', {
1022+
keepClosed: !internal,
1023+
...options,
1024+
})
1025+
w = null
1026+
}
1027+
}
1028+
1029+
const scheduleRenewal = expirationTime => {
1030+
if (renewalTimeout) clearTimeout(renewalTimeout)
1031+
const delay = Math.max(expirationTime - Date.now() - RENEWAL_BUFFER_MS, 60000)
1032+
renewalTimeout = setTimeout(() => renewToken(), delay)
1033+
}
1034+
1035+
const renewToken = () => {
1036+
if (keepClosed || !w) return
1037+
methods
1038+
.marginGetListenToken(marginOpts)
1039+
.then(({ token, expirationTime }) => {
1040+
if (keepClosed || !w) return
1041+
w.send(
1042+
JSONbig.stringify({
1043+
id: requestId++,
1044+
method: 'userDataStream.subscribe.listenToken',
1045+
params: { listenToken: token },
1046+
}),
1047+
)
1048+
scheduleRenewal(expirationTime)
1049+
})
1050+
.catch(err => {
1051+
if (opts.emitStreamErrors) errorHandler(err)
1052+
if (!keepClosed) {
1053+
renewalTimeout = setTimeout(() => renewToken(), 30e3)
1054+
}
1055+
})
1056+
}
1057+
1058+
const makeStream = isReconnecting => {
1059+
if (keepClosed) return Promise.resolve()
1060+
1061+
return methods
1062+
.marginGetListenToken(marginOpts)
1063+
.then(({ token, expirationTime }) => {
1064+
if (keepClosed) return
1065+
1066+
w = openWebSocket(wsApiUrl)
1067+
1068+
return new Promise((resolve, reject) => {
1069+
let resolved = false
1070+
1071+
w.onopen = () => {
1072+
w.send(
1073+
JSONbig.stringify({
1074+
id: requestId++,
1075+
method: 'userDataStream.subscribe.listenToken',
1076+
params: { listenToken: token },
1077+
}),
1078+
)
1079+
if (opts.emitSocketOpens) {
1080+
userOpenHandler(cb, transform)()
1081+
}
1082+
}
1083+
1084+
w.onmessage = msg => {
1085+
const data = JSONbig.parse(msg.data)
1086+
1087+
// Control response (subscription/unsubscription)
1088+
if ('id' in data) {
1089+
if (data.error) {
1090+
const err = new Error(data.error.msg || 'WebSocket API error')
1091+
err.code = data.error.code
1092+
if (!resolved) {
1093+
resolved = true
1094+
reject(err)
1095+
} else if (opts.emitStreamErrors) {
1096+
errorHandler(err)
1097+
}
1098+
} else if (!resolved) {
1099+
resolved = true
1100+
scheduleRenewal(expirationTime)
1101+
resolve(options => cleanup(options))
1102+
}
1103+
return
1104+
}
1105+
1106+
// User data event - unwrap if in wrapped format
1107+
let eventData = data
1108+
if (data.event && typeof data.event === 'object') {
1109+
eventData = data.event
1110+
}
1111+
1112+
// Handle eventStreamTerminated - token expired
1113+
if (eventData.e === 'eventStreamTerminated') {
1114+
cleanup({}, true)
1115+
if (!keepClosed) {
1116+
setTimeout(() => makeStream(true), 5e3)
1117+
}
1118+
return
1119+
}
1120+
1121+
if (eventData.e) {
1122+
userEventHandler(cb, transform)({
1123+
data: JSONbig.stringify(eventData),
1124+
})
1125+
}
1126+
}
1127+
1128+
w.onerror = event => {
1129+
const error =
1130+
event.error || event.message || new Error('WebSocket error')
1131+
if (opts.emitSocketErrors) {
1132+
errorHandler(typeof error === 'string' ? new Error(error) : error)
1133+
}
1134+
}
1135+
1136+
w.onclose = () => {
1137+
if (!keepClosed && resolved) {
1138+
if (renewalTimeout) clearTimeout(renewalTimeout)
1139+
renewalTimeout = null
1140+
w = null
1141+
setTimeout(() => makeStream(true), 30e3)
1142+
}
1143+
}
1144+
})
1145+
})
1146+
.catch(err => {
1147+
if (isReconnecting) {
1148+
if (!keepClosed) {
1149+
setTimeout(() => makeStream(true), 30e3)
1150+
}
1151+
if (opts.emitStreamErrors) errorHandler(err)
1152+
} else {
1153+
throw err
1154+
}
1155+
})
1156+
}
1157+
1158+
return makeStream(false)
1159+
}
1160+
9851161
const user = (opts, variator) => (cb, transform) => {
9861162
const [getDataStream, keepDataStream, closeDataStream] = getStreamMethods(opts, variator)
9871163

@@ -1133,7 +1309,13 @@ export default opts => {
11331309
customSubStream,
11341310
user: userWebSocketApi(opts),
11351311

1136-
marginUser: user(opts, 'margin'),
1312+
marginUser: marginUserWebSocketApi(opts),
1313+
isolatedMarginUser: (payload, cb, transform) =>
1314+
marginUserWebSocketApi(opts)(cb, transform, {
1315+
isIsolated: true,
1316+
symbol: payload.symbol,
1317+
validity: payload.validity,
1318+
}),
11371319

11381320
futuresDepth: (payload, cb, transform) => depth(payload, cb, transform, 'futures'),
11391321
deliveryDepth: (payload, cb, transform) => depth(payload, cb, transform, 'delivery'),

test/proxy.js

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,15 @@ const main = () => {
266266
clean()
267267
t.pass('User data stream connected successfully through proxy')
268268
} catch (e) {
269-
if (notAvailable(e) || e.message.includes('WebSocket')) {
269+
if (
270+
notAvailable(e) ||
271+
e.message.includes('WebSocket') ||
272+
e.message.includes('ENOTFOUND') ||
273+
e.message.includes('ECONNREFUSED') ||
274+
e.code === -1022 ||
275+
e.code === -2015 ||
276+
e.code === -2008
277+
) {
270278
t.pass('User data stream or proxy not available on testnet')
271279
} else {
272280
throw e

0 commit comments

Comments
 (0)