李勇

1.Emiter.js 中eimt的时候增加参数

2.录制回放中音视频消息增加seek时间(秒),在APE处理中增加这个seek字段
... ... @@ -23,13 +23,13 @@ export default class Emiter {
}
}
}
_emit(eid, data) {
_emit(eid, data,data2) {
if (eid) {
//eid=* broadcast
let asteriskStub =this.MAPS['*'];
if (asteriskStub && asteriskStub.length) {
asteriskStub.forEach(function (elistener) {
elistener(eid, data);
elistener(eid, data,data2);
})
}
... ... @@ -37,7 +37,7 @@ export default class Emiter {
let stub = this.MAPS[eid];
if (stub && stub.length) {
stub.forEach(function (elistener) {
elistener(data);
elistener(data,data2);
});
}
}
... ...
... ... @@ -34,7 +34,7 @@ class RecordPlayBackParse extends Emiter {
this._recordPlaybackMaxTime = 0;//录制回放的总时间
this._isReady = false;//录制回放是否已经准备完成
this._apes = {};
this._messages = {};
//this._messages = {};
this._conferApeMssages = {};//会议数据
this._chatApeMssages = {};//聊天数据
... ... @@ -53,10 +53,8 @@ class RecordPlayBackParse extends Emiter {
this._apes[ape._session_id] = ape;
}
// 录制回放EverSocket底层消息处理 data-数据;timestamp-数据对应的时间戳
// 1.如果第二个参数timestamp不为空,数据就不往外发送,只做解析和储存
// 2.如果第二个参数timestamp为空,数据就发送给各个ape处理;
_everSocketMsgReceivedHandler(data, timestamp) {
//发送数据个各个APE模块处理,data是数据,seekTime是当前数据需要seek的时间长度(针对音视频的seek)
_everSocketMsgReceivedHandler(data, seekTime) {
let pduMsg = pdu.decode_pdu(data);
let pduType = pduMsg.get("type");
let pduData = pduMsg.get("data");
... ... @@ -67,7 +65,7 @@ class RecordPlayBackParse extends Emiter {
pduMsg.type = PduType.RCPDU_SEND_DATA_REQUEST;
pduType = PduType.RCPDU_SEND_DATA_REQUEST;
}
loger.log('pduType', pduType);
loger.log('_everSocketMsgReceivedHandler->pduType', pduType,"seekTime->",seekTime);
switch (pduType) {
case PduType.RCPDU_CONNECT_PROVIDER_RESPONSE:
//加入课堂请求返回数据处理
... ... @@ -77,7 +75,7 @@ class RecordPlayBackParse extends Emiter {
switch (pduResultCode) {
case PduConsts.RET_SUCCESS:
//加入成功
this._updateMCUConfInfoDescription(joinConfPdu.classDescription);
//this._updateMCUConfInfoDescription(joinConfPdu.classDescription);
this._emit(MessageTypes.CLASS_JOIN_MCU_SUCCESS, this.classInfo);
break;
case PduConsts.RET_FULL_CAPACITY:
... ... @@ -92,41 +90,14 @@ class RecordPlayBackParse extends Emiter {
//先判断当前消息属于哪个APE 根据 sessionId来判断
let ape = this._apes[pduMsg.sessionId];
let sessionLabel = ApeConsts(pduMsg.sessionId);
if (timestamp) {
//只做解析存储,不对外发送
loger.log('解析数据-timestamp->', timestamp, 'sessionId->', pduMsg.sessionId, 'sessionLabel->', sessionLabel);
switch (pduMsg.sessionId) {
case ApeConsts.CONFERENCE_SESSION_ID:
this.saveParseData(data, timestamp, this._conferApeMssages);
break;
case ApeConsts.CHAT_SESSION_ID:
this.saveParseData(data, timestamp, this._chatApeMssages);
break;
case ApeConsts.DOCSHARING_SESSION_ID:
this.saveParseData(data, timestamp, this._docApeMssages);
break;
case ApeConsts.WHITEBOARD_SESSION_ID:
this.saveParseData(data, timestamp, this._whiteApeMssages);
break;
case ApeConsts.VIDEO_SESSION_ID:
this.saveParseData(data, timestamp, this._videoApeMssages);
break;
case ApeConsts.AUDIO_SESSION_ID:
this.saveParseData(data, timestamp, this._audioApeMssages);
break;
default:
break;
}
//对方发送消息
if (ape) {
let subTypeLabel = pdu.id2type(pduMsg.subType);
//loger.log('MCU-SecondLayer封装消息', 'sessionId', sessionLabel, pduMsg.sessionId, 'subtype', subTypeLabel, pduMsg.subType);
//ape广播事件,只要ape中监听就能收到
ape._emit(pduMsg.subType, pduMsg.data,seekTime);//seekTime是音视频模块seek的时间长度
} else {
//对方发送消息
if (ape) {
let subTypeLabel = pdu.id2type(pduMsg.subType);
//loger.log('MCU-SecondLayer封装消息', 'sessionId', sessionLabel, pduMsg.sessionId, 'subtype', subTypeLabel, pduMsg.subType);
//ape广播事件,只要ape中监听就能收到
ape._emit(pduMsg.subType, pduMsg.data);
} else {
loger.warn(sessionLabel + '尚未注册');
}
loger.warn(sessionLabel + '尚未注册');
}
break;
default:
... ... @@ -134,6 +105,74 @@ class RecordPlayBackParse extends Emiter {
}
}
//解析和储存,录制回放EverSocket底层消息处理 data-数据;timestamp-数据对应的时间戳
_parseSaveSocketMsgReceivedHandler(data, timestamp) {
let pduMsg = pdu.decode_pdu(data);
let pduType = pduMsg.get("type");
let pduData = pduMsg.get("data");
//*************非常重要******************
//客户端发送的所有125消息,MCU收到之后会痛120把消息返回给客户端,
//所以需要把125消息type转换为120,因为MCU在录制的时候是直接录制客户端发送的消息而不是MCU转换之后的
if (pduType == PduType.RCPDU_UNIFORM_SEND_DATA_REQUEST) {
pduMsg.type = PduType.RCPDU_SEND_DATA_REQUEST;
pduType = PduType.RCPDU_SEND_DATA_REQUEST;
}
loger.log('pduType', pduType);
switch (pduType) {
case PduType.RCPDU_CONNECT_PROVIDER_RESPONSE:
//加入课堂请求返回数据处理
let joinConfPdu = pdu['RCConferenceJoinResponsePdu'].decode(pduData);
let pduResultCode = joinConfPdu.result;
loger.warn('RCPDU_CONNECT_PROVIDER_RESPONSE ->pduResultCode:' + pduResultCode);
switch (pduResultCode) {
case PduConsts.RET_SUCCESS:
//加入成功
//this._updateMCUConfInfoDescription(joinConfPdu.classDescription);
this._emit(MessageTypes.CLASS_JOIN_MCU_SUCCESS, this.classInfo);
break;
case PduConsts.RET_FULL_CAPACITY:
this._emit(MessageTypes.MCU_ERROR, MessageTypes.ERR_CLASS_JOIN_FULL);
break;
default:
loger.warn('JoinConfPdu-未知类型-等待处理.', pduResultCode);
break
}
break;
case PduType.RCPDU_SEND_DATA_REQUEST:
//先判断当前消息属于哪个APE 根据 sessionId来判断
let ape = this._apes[pduMsg.sessionId];
let sessionLabel = ApeConsts(pduMsg.sessionId);
//只做解析存储,不对外发送
loger.log('解析数据-timestamp->', timestamp, 'sessionId->', pduMsg.sessionId, 'sessionLabel->', sessionLabel);
switch (pduMsg.sessionId) {
case ApeConsts.CONFERENCE_SESSION_ID:
this.saveParseData(data, timestamp, this._conferApeMssages);
break;
case ApeConsts.CHAT_SESSION_ID:
this.saveParseData(data, timestamp, this._chatApeMssages);
break;
case ApeConsts.DOCSHARING_SESSION_ID:
this.saveParseData(data, timestamp, this._docApeMssages);
break;
case ApeConsts.WHITEBOARD_SESSION_ID:
this.saveParseData(data, timestamp, this._whiteApeMssages);
break;
case ApeConsts.VIDEO_SESSION_ID:
this.saveParseData(data, timestamp, this._videoApeMssages);
break;
case ApeConsts.AUDIO_SESSION_ID:
this.saveParseData(data, timestamp, this._audioApeMssages);
break;
default:
break;
}
break;
default:
loger.warn('PDU-未知类型-等待处理.', pduType);
}
}
//保存数据
saveParseData(data, timestamp, apeMessages) {
let messageItem = apeMessages[timestamp];
... ... @@ -223,7 +262,7 @@ class RecordPlayBackParse extends Emiter {
//解析数据
parseArrayBuf() {
this._messages = {};
//this._messages = {};
let byteLength = parseBuffer.offset;
parseBuffer.byteOffset = 0;
var position = 0;
... ... @@ -236,14 +275,14 @@ class RecordPlayBackParse extends Emiter {
position += byteLen;
console.log(timestamp, byteLen, byteData);
let messageItem = this._messages[timestamp];
if (!messageItem) {
this._messages[timestamp] = [];//数组存数据,因为有1秒内收到多个消息的情况,timestamp是按秒记录的
messageItem = this._messages[timestamp];
}
messageItem.push({"timestamp": timestamp, "byteData": byteData});
/* let messageItem = this._messages[timestamp];
if (!messageItem) {
this._messages[timestamp] = [];//数组存数据,因为有1秒内收到多个消息的情况,timestamp是按秒记录的
messageItem = this._messages[timestamp];
}
messageItem.push({"timestamp": timestamp, "byteData": byteData});*/
this._everSocketMsgReceivedHandler(byteData, timestamp);
this._parseSaveSocketMsgReceivedHandler(byteData, timestamp);
//记录最后一个数据的时间戳作为整个录制回放的总时间戳
this._recordPlaybackMaxTime = timestamp;
}
... ... @@ -253,7 +292,7 @@ class RecordPlayBackParse extends Emiter {
GlobalConfig.recordPlaybackMaxTime = this._recordPlaybackMaxTime;
loger.log("录制回放数据解析完成,录制回放的总时间长为->", this._recordPlaybackMaxTime);
console.log("_messages", this._messages);
//console.log("_messages", this._messages);
console.log("_conferApeMssages", this._conferApeMssages);
console.log("_chatApeMssages", this._chatApeMssages);
console.log("_docApeMssages", this._docApeMssages);
... ... @@ -272,7 +311,7 @@ class RecordPlayBackParse extends Emiter {
} else {
//把时间点对应的数据发送,同一秒内有存在多个数据的情况
for (let i = 0; i < msgDataArr.length; i++) {
this._everSocketMsgReceivedHandler(msgDataArr[i].byteData);
this._everSocketMsgReceivedHandler(msgDataArr[i].byteData,0);
}
}
}
... ... @@ -331,14 +370,14 @@ class RecordPlayBackParse extends Emiter {
_searchKeyfram() {
//查找关键帧,找到关键帧后再继续播放
this._searchApeMessageKeyfram(this._conferApeMssages,ApeConsts.CONFERENCE_SESSION_ID);
this._searchApeMessageKeyfram(this._docApeMssages,ApeConsts.DOCSHARING_SESSION_ID);
this._searchApeMessageKeyfram(this._whiteApeMssages,ApeConsts.WHITEBOARD_SESSION_ID);
this._searchApeMessageKeyfram(this._videoApeMssages,ApeConsts.VIDEO_SESSION_ID);
this._searchApeMessageKeyfram(this._audioApeMssages,ApeConsts.AUDIO_SESSION_ID);
this._searchApeMessageKeyfram(this._conferApeMssages, ApeConsts.CONFERENCE_SESSION_ID);
this._searchApeMessageKeyfram(this._docApeMssages, ApeConsts.DOCSHARING_SESSION_ID);
this._searchApeMessageKeyfram(this._whiteApeMssages, ApeConsts.WHITEBOARD_SESSION_ID);
this._searchApeMessageKeyfram(this._videoApeMssages, ApeConsts.VIDEO_SESSION_ID);
this._searchApeMessageKeyfram(this._audioApeMssages, ApeConsts.AUDIO_SESSION_ID);
//聊天模块的比较特殊,消息是累计的
this._searchChatApeMessageKeyfram(this._chatApeMssages,ApeConsts.CHAT_SESSION_ID);
this._searchChatApeMessageKeyfram(this._chatApeMssages, ApeConsts.CHAT_SESSION_ID);
//各个ape模块无论有没有找到关键帧数据,都继续播放
... ... @@ -346,25 +385,25 @@ class RecordPlayBackParse extends Emiter {
}
//查找ape关键帧数据
_searchApeMessageKeyfram(_apeMessages,_apeId) {
_searchApeMessageKeyfram(_apeMessages, _apeId) {
let messageItem;
let keyFrameSeek = 0;
let keyFrameSeekTime = 0;
for (let i = this._recordPlaybackTimestamp; i > 0; i--) {
messageItem = _apeMessages[i];
if (messageItem) {
keyFrameSeek = (this._recordPlaybackTimestamp - i)
loger.log("SEEK->APE",_apeId, this._recordPlaybackTimestamp, "查找到相连的数据, messageItem.timestamp->",i, this._recordPlaybackTimestamp,keyFrameSeek, "秒");
keyFrameSeekTime = (this._recordPlaybackTimestamp - i)
loger.log("SEEK->APE", ApeConsts(_apeId), this._recordPlaybackTimestamp, "查找到相连的timestamp->", i, '需要seek->', keyFrameSeekTime, "秒");
//把时间点对应的数据发送,同一秒内有存在多个数据的情况
for (let k = 0; k < messageItem.length; k++) {
this._everSocketMsgReceivedHandler(messageItem[k].byteData);
this._everSocketMsgReceivedHandler(messageItem[k].byteData,keyFrameSeekTime);
}
if(_apeId==ApeConsts.AUDIO_SESSION_ID||_apeId==ApeConsts.VIDEO_SESSION_ID){
this._emit(MessageTypes.RECORD_PLAYBACK_UPDATE, {"status": SEEK, "keyFrameSeek": keyFrameSeek});
if (_apeId == ApeConsts.AUDIO_SESSION_ID || _apeId == ApeConsts.VIDEO_SESSION_ID) {
this._emit(MessageTypes.RECORD_PLAYBACK_UPDATE, {"status": SEEK, "keyFrameSeekTime": keyFrameSeekTime});
}
return;
}
}
loger.log("SEEK->APE",_apeId, this._recordPlaybackTimestamp, "没有查找到相连的数据");
loger.log("SEEK->APE", ApeConsts(_apeId), this._recordPlaybackTimestamp, "没有查找到相连的数据");
//this._emit(MessageTypes.RECORD_PLAYBACK_UPDATE,{"status":SEEK,"keyFrameSeek":keyFrameSeek});
}
... ... @@ -377,7 +416,7 @@ class RecordPlayBackParse extends Emiter {
if (messageItem) {
//把时间点对应的数据发送,同一秒内有存在多个数据的情况
for (let i = 0; i < messageItem.length; i++) {
this._everSocketMsgReceivedHandler(messageItem[i].byteData);
this._everSocketMsgReceivedHandler(messageItem[i].byteData,0);
}
}
}
... ...
... ... @@ -73,46 +73,27 @@ export default class Ape extends Emiter {
}
// 消息处理
_pduMessageHandler(regBuffer) {
//loger.log("RCPDU_REG_ADAPTER==============================");
_pduMessageHandler(regBuffer,_seekTime) {
let seekTime=_seekTime||0;//这个只有在录制回放的时候才有
//loger.log("RCPDU_REG_ADAPTER============seekTime",seekTime);
if (this._apeDelayed) {
// this._apeDelayedMsgs.push(regBuffer);
// this._apeDelayedStart();
setTimeout(() => {
this._pduRegAdapterHandler(regBuffer);
this._pduRegAdapterHandler(regBuffer,seekTime);
}, GlobalConfig.mcuDelay || 2000);
return;
}
this._pduRegAdapterHandler(regBuffer);
this._pduRegAdapterHandler(regBuffer,seekTime);
}
// _apeDelayedStart() {
// if (this._apeDelayed && !this._apeDelayedTimer) {
// this._apeDelayedTimer = setInterval(this._delayedMsgHandler.bind(this), this._classInfo['mcuDelay'] || 10000);
// }
// }
// _apeDelayedStop() {
// clearInterval(this._apeDelayedTimer);
// this._apeDelayedTimer = 0;
// }
// // 延迟消息处理
// _delayedMsgHandler() {
// if (this._apeDelayedMsgs.length) {
// this._pduRegAdapterHandler(this._apeDelayedMsgs.pop());
// if (!this._apeDelayedMsgs.length) this._apeDelayedStop();
// }
// }
// 数据同步处理
_pduRegAdapterHandler(regBuffer) {
_pduRegAdapterHandler(regBuffer,seekTime) {
let regPdu = pdu['RCAdapterPdu'].decode(regBuffer);
let regItems = regPdu.item;
let regItemSize = regItems.length;
//loger.log(this._session_name + '数据同步消息');
loger.log(this._session_name + '数据同步消息.同步条数', regItemSize);
//console.log(regPdu);
loger.log(this._session_name + '数据同步消息.同步条数', regItemSize,"seekTime->",seekTime);
for (var i = 0; i < regItemSize; ++i) {
let regItem = regItems[i];
... ... @@ -188,7 +169,7 @@ export default class Ape extends Emiter {
for (let i = 0; i < tableUpdateItemsLen; ++i) {
let tableItem = tableUpdateItems[i];
this.tableUpdateHandler(tableItem.owner, tableItem.itemIdx, tableItem.itemData);
this.tableUpdateHandler(tableItem.owner, tableItem.itemIdx, tableItem.itemData,seekTime);
}
break;
case pdu.RCPDU_REG_QUEUE_UPDATE_PDU:
... ... @@ -217,7 +198,7 @@ export default class Ape extends Emiter {
loger.warn(this._session_name + ' tableInsertHandler 应有子类具体覆盖处理.');
}
tableUpdateHandler(ownerId, recordId, recordData) {
tableUpdateHandler(ownerId, recordId, recordData,seekTime) {
loger.warn(this._session_name + ' tableUpdateHandler 应有子类具体覆盖处理.');
}
... ...
... ... @@ -299,10 +299,9 @@ class AudioApe extends Ape {
}
}
tableUpdateHandler(owner, itemIdx, itemData) {
tableUpdateHandler(owner, itemIdx, itemData,seek) {
let unpackChannelInfo = this.unPackPdu(owner, itemIdx, itemData);
loger.log("tableUpdateHandler,channel",itemIdx);
loger.log("tableUpdateHandler->channel",itemIdx,'status->',unpackChannelInfo.status,"seek->",seek);
//****很重要********
//如果owner的值为0,代表的是这个歌频道已经被释放了(mcu服务端对于占用channel的掉线用户,就是把owner设置为0)
if(owner==0){
... ... @@ -324,6 +323,9 @@ class AudioApe extends Ape {
receiveChannelInfo.m3u8Url="";
receiveChannelInfo.rtmpUrl="";
receiveChannelInfo.replay="";
receiveChannelInfo.seek=seek||0;//这个是录制回放时使用的seek
let m3u8Stream=this.mediaModule.getMediaPlayPath({"type":"m3u8","streamId": unpackChannelInfo.streamId});
let rtmpStream=this.mediaModule.getMediaPlayPath({"type":"rtmp","streamId": unpackChannelInfo.streamId});
let replay=this.mediaModule.getMediaRecordPlaybackPath({"type":"m3u8","streamId": unpackChannelInfo.streamId});
... ...
... ... @@ -302,10 +302,10 @@ class VideoApe extends Ape {
}
}
tableUpdateHandler(owner, itemIdx, itemData) {
tableUpdateHandler(owner, itemIdx, itemData,seek) {
// debugger;
let unpackChannelInfo = this.unPackPdu(owner, itemIdx, itemData);
loger.log("tableUpdateHandler,channel",itemIdx);
loger.log("tableUpdateHandler->channel",itemIdx,'status->',unpackChannelInfo.status,"seek->",seek);
//****很重要********
//如果owner的值为0,代表的是这个歌频道已经被释放了(mcu服务端对于占用channel的掉线用户,就是把owner设置为0)
... ... @@ -328,6 +328,8 @@ class VideoApe extends Ape {
receiveChannelInfo.rtmpUrl="";
receiveChannelInfo.replay="";
receiveChannelInfo.seek=seek||0;//这个是录制回放时使用的seek
let m3u8Stream=this.mediaModule.getMediaPlayPath({"type":"m3u8","streamId": unpackChannelInfo.streamId});
let rtmpStream=this.mediaModule.getMediaPlayPath({"type":"rtmp","streamId": unpackChannelInfo.streamId});
let replay=this.mediaModule.getMediaRecordPlaybackPath({"type":"m3u8","streamId": unpackChannelInfo.streamId});
... ...