李勇

修改音视频模块的channel占用释放,更新table数据RCRegistryTableItemPdu的时候,owner的值非常重要,占用频道的时候设置为自己的…

…nodeId,释放占用的时候必须要设置为0,否则其他人无法使用;正在占用channel的人异常掉线之后,MCU服务器会把owner设置为0,其他用户同步的消息中如果owner为0就可以使用
... ... @@ -92,16 +92,12 @@ class AudioApe extends Ape {
return {"code": ApeConsts.RETURN_FAILED, "data":"频道已经被占用!"};
}
let channelInfo={};
let channelInfo=this.mediaModule.getDefaultChannelInfo();
channelInfo.owner=GlobalConfig.nodeId;
channelInfo.status=ApeConsts.CHANNEL_STATUS_OPENING;
channelInfo.fromNodeId=GlobalConfig.nodeId;
channelInfo.channelId=needPublishChannelInfo.channelId;//freeChannel
channelInfo.streamId=needPublishChannelInfo.streamId;//按规则拼接的流名称
channelInfo.timestamp=needPublishChannelInfo.timestamp;//EngineUtils.creatTimestamp();
channelInfo.classId=GlobalConfig.classId;//GlobalConfig.classId;
channelInfo.siteId=GlobalConfig.siteId;//GlobalConfig.siteId;
channelInfo.toNodeId=0;
channelInfo.userId=GlobalConfig.userId;
channelInfo.mediaType=ApeConsts.MEDIA_TYPE_AUDIO;
this.sendTableUpdateHandler(channelInfo);
return {"code": ApeConsts.RETURN_SUCCESS, "data":"推流成功!","mediaId":needPublishChannelInfo.channelId};
... ... @@ -143,15 +139,11 @@ class AudioApe extends Ape {
let channelInfo=this.mediaModule.mediaChannels[channelId];
if(channelInfo&&channelInfo.status==ApeConsts.CHANNEL_STATUS_OPENING){
if(channelInfo.fromNodeId==nodeId){
let channelInfo={};
let channelInfo=this.mediaModule.getDefaultChannelInfo();
channelInfo.status=ApeConsts.CHANNEL_STATUS_RELEASED;
channelInfo.fromNodeId=0;
channelInfo.channelId=channelId;
channelInfo.timestamp=0;
channelInfo.classId=GlobalConfig.classId;
channelInfo.toNodeId=0;
channelInfo.userId=GlobalConfig.userId;
channelInfo.mediaType=ApeConsts.MEDIA_TYPE_DEFAULT;
this.sendTableUpdateHandler(channelInfo);
}else {
loger.warn(channelId,"不属于nodeId",nodeId,"不能释放",channelInfo);
... ... @@ -174,15 +166,10 @@ class AudioApe extends Ape {
return {"code": ApeConsts.RETURN_FAILED, "data": "没有占用channel不需要处理"};
}
let channelInfo={};
let channelInfo=this.mediaModule.getDefaultChannelInfo();
channelInfo.status=ApeConsts.CHANNEL_STATUS_RELEASED;
channelInfo.fromNodeId=0;
channelInfo.channelId=openingChannel;
channelInfo.timestamp=0;
channelInfo.classId=GlobalConfig.classId;
channelInfo.toNodeId=0;
channelInfo.userId=GlobalConfig.userId;
channelInfo.mediaType=ApeConsts.MEDIA_TYPE_DEFAULT;
this.sendTableUpdateHandler(channelInfo);
//递归检查,800毫秒之后执行
setTimeout(function(){
... ... @@ -254,14 +241,12 @@ class AudioApe extends Ape {
let tableItemPdu = new pdu['RCRegistryTableItemPdu'];
tableItemPdu.itemIdx = _channelInfo.channelId;
tableItemPdu.owner = 0;//收到flash的是这个值,不清楚先写固定
tableItemPdu.owner = _channelInfo.owner;//0收到flash的是这个值,MCU做了了用户掉线处理,30秒之后会清理owner为0
tableItemPdu.itemData = updateModelPdu.toArrayBuffer();
//insert
let tableInsertItemPdu = new pdu['RCRegistryTableUpdateItemPdu'];
//optional RCPduType_E type = 1 [default = RCPDU_REG_TABLE_UPDATE_PDU];
//repeated RCRegistryTableItemPdu items = 2;
tableInsertItemPdu.type = pdu.RCPDU_REG_TABLE_UPDATE_PDU;//
tableInsertItemPdu.type = pdu.RCPDU_REG_TABLE_UPDATE_PDU;
tableInsertItemPdu.items.push(tableItemPdu);
let updateObjPdu = new pdu['RCRegistryUpdateObjPdu'];
... ... @@ -303,13 +288,16 @@ class AudioApe extends Ape {
}
tableUpdateHandler(owner, itemIdx, itemData) {
// debugger;
/* let updateChannelInfo = this.unPackPdu(owner, itemIdx, itemData);
let unpackChannelInfo = this.unPackPdu(owner, itemIdx, itemData);
loger.log("tableUpdateHandler,channel",itemIdx);
this.mediaModule.mediaChannels[itemIdx] = updateChannelInfo;
//****很重要********
//如果owner的值为0,代表的是这个歌频道已经被释放了(mcu服务端对于占用channel的掉线用户,就是把owner设置为0)
if(owner==0){
loger.log("释放占用的频道,channel",itemIdx);
unpackChannelInfo.status=ApeConsts.CHANNEL_STATUS_RELEASED;
}
this._emit(MessageTypes.AUDIO_UPDATE, updateChannelInfo);*/
let unpackChannelInfo = this.unPackPdu(owner, itemIdx, itemData);
this.mediaModule.mediaChannels[itemIdx] = unpackChannelInfo;
if(unpackChannelInfo&&unpackChannelInfo.fromNodeId!=GlobalConfig.nodeId){
... ... @@ -330,13 +318,11 @@ class AudioApe extends Ape {
if(rtmpStream.code==0){
receiveChannelInfo.rtmpUrl=rtmpStream.playUrl;
}
loger.log("AUDIO_PLAY");
console.log(receiveChannelInfo);
loger.log("AUDIO_PLAY",receiveChannelInfo);
//广播播放视频的消息
this._emit(MessageTypes.AUDIO_PLAY, receiveChannelInfo);
}else {
loger.log("AUDIO_STOP");
console.log(receiveChannelInfo);
loger.log("AUDIO_STOP",receiveChannelInfo);
//流已经停止
this._emit(MessageTypes.AUDIO_STOP, receiveChannelInfo);
}
... ...
... ... @@ -481,9 +481,9 @@ class ConferApe extends Ape {
rosterInsertHandler(nodeId, nodeData) {
if (GlobalConfig.nodeId == nodeId) {
loger.log("自己加入 rosterInsertHandler");
// loger.log("自己加入 rosterInsertHandler");
} else {
loger.log("有人加入 rosterInsertHandler");
// loger.log("有人加入 rosterInsertHandler");
this.rosterUpdateHandler(nodeId, nodeData);
}
}
... ... @@ -532,8 +532,16 @@ class ConferApe extends Ape {
let newNodeData = nodeData;
newNodeData.userData = userDataObj;
//如果是监课,不告诉其他人
if (nodeData.role == ApeConsts.NR_INVISIBLE) {
loger.log("NR_INVISIBLE");
return;
}
this._emit(MessageTypes.CLASS_INSERT_ROSTER, {"nodeId": nodeId, "nodeData": newNodeData});
this.emitRosterChange();
} else {
//loger.log("更新人员列表数据,rosterExists已经存在",rosterExists);
}
... ... @@ -547,20 +555,11 @@ class ConferApe extends Ape {
//视频模块发生更新,人员状态需要更新
updaterRosterStatus(_param) {
if (_param) {
//loger.log("媒体模块发生更新,人员状态需要更新,fromNodeId->",_param.fromNodeId);
//loger.log(_param.status,_param.fromNodeId);
//console.log(_param.fromNodeId);
//如果是自己。改变自己的状态同步到MCU
//if(_param.fromNodeId==GlobalConfig.nodeId){
//
//}
//如果视频消息中channel的占用人 fromNodeId在人员列表中不存在,需要释放这channel,因为这个有可能是之前没释放成功的
if (_param.status == ApeConsts.CHANNEL_STATUS_OPENING && this.rosters[_param.fromNodeId] == null) {
loger.log("媒体模块被占用,占有人已经不存在课堂中,释放Channel,_param->", _param);
this._emit(MessageTypes.CLASS_NONENTITY_ROSTER, {"nodeId": _param.fromNodeId});
}
//loger.log("媒体模块发生更新,人员状态需要更新,fromNodeId->",_param.fromNodeId);
//如果视频消息中channel的占用人 fromNodeId在人员列表中不存在,需要释放这channel,因为这个有可能是之前没释放成功的
if (_param && _param.status == ApeConsts.CHANNEL_STATUS_OPENING && this.rosters[_param.fromNodeId] == null) {
loger.log("媒体模块被占用,占有人已经不存在课堂中,释放Channel,_param->", _param);
this._emit(MessageTypes.CLASS_NONENTITY_ROSTER, {"nodeId": _param.fromNodeId});
}
}
... ...
... ... @@ -137,6 +137,22 @@ class MediaModule {
}
return true;
}
//获取默认的频道信息
getDefaultChannelInfo(){
let channelInfo={};
channelInfo.owner=0;//这个很重要,释放的时候必须设置为0,占用的时候设置为自己的nodeId
channelInfo.status=ApeConsts.CHANNEL_STATUS_RELEASED;
channelInfo.fromNodeId=GlobalConfig.nodeId;
channelInfo.channelId=0;
channelInfo.streamId=""
channelInfo.classId=GlobalConfig.classId;
channelInfo.siteId=GlobalConfig.siteId;
channelInfo.toNodeId=0;
channelInfo.userId=GlobalConfig.userId;
channelInfo.mediaType=ApeConsts.MEDIA_TYPE_DEFAULT;
return channelInfo;
}
}
export default MediaModule;
... ...
... ... @@ -72,7 +72,6 @@ class VideoApe extends Ape {
return {"code": ApeConsts.RETURN_FAILED, "data": "推流数据已经无效"};
}
/* //20170302 修改频道占用规则,同一个人可以推多路流,暂停下面的限制
//同一个nodeId只允许推一个流,如果已经推了就不能再推
... ... @@ -81,7 +80,6 @@ class VideoApe extends Ape {
return {"code": ApeConsts.RETURN_FAILED, "data": "已经存在一个流,不能再推"};
}*/
//判断当前是否还有空闲的channle
let freeChannel = this.mediaModule.getFreeMediaChannel();
if (freeChannel == 0) {
... ... @@ -96,17 +94,14 @@ class VideoApe extends Ape {
return {"code": ApeConsts.RETURN_FAILED, "data":"频道已经被占用!","mediaChannels":this.mediaModule.mediaChannels};
}
let channelInfo={};
let channelInfo=this.mediaModule.getDefaultChannelInfo();
channelInfo.owner=GlobalConfig.nodeId;
channelInfo.status=ApeConsts.CHANNEL_STATUS_OPENING;
channelInfo.fromNodeId=GlobalConfig.nodeId;
channelInfo.channelId=needPublishChannelInfo.channelId;
channelInfo.streamId=needPublishChannelInfo.streamId;//按规则拼接的流名称
channelInfo.classId=GlobalConfig.classId;
channelInfo.siteId=GlobalConfig.siteId;
channelInfo.toNodeId=0;
channelInfo.userId=GlobalConfig.userId;
channelInfo.mediaType=ApeConsts.MEDIA_TYPE_VIDEO;
this.sendTableUpdateHandler(channelInfo);
return {"code": ApeConsts.RETURN_SUCCESS, "data":"推流成功!","mediaId":needPublishChannelInfo.channelId};
}
... ... @@ -145,15 +140,11 @@ class VideoApe extends Ape {
let channelInfo=this.mediaModule.mediaChannels[channelId];
if(channelInfo&&channelInfo.status==ApeConsts.CHANNEL_STATUS_OPENING){
if(channelInfo.fromNodeId==nodeId){
let channelInfo={};
let channelInfo=this.mediaModule.getDefaultChannelInfo();
channelInfo.status=ApeConsts.CHANNEL_STATUS_RELEASED;
channelInfo.fromNodeId=0;
channelInfo.channelId=channelId;
channelInfo.timestamp=0;
channelInfo.classId=GlobalConfig.classId;
channelInfo.toNodeId=0;
channelInfo.userId=GlobalConfig.userId;
channelInfo.mediaType=ApeConsts.MEDIA_TYPE_DEFAULT;
this.sendTableUpdateHandler(channelInfo);
}else {
loger.warn(channelId,"不属于nodeId",nodeId,"不能释放",channelInfo);
... ... @@ -176,15 +167,10 @@ class VideoApe extends Ape {
return {"code": ApeConsts.RETURN_FAILED, "data": "没有占用channel不需要处理"};
}
let channelInfo={};
let channelInfo=this.mediaModule.getDefaultChannelInfo();
channelInfo.status=ApeConsts.CHANNEL_STATUS_RELEASED;
channelInfo.fromNodeId=0;
channelInfo.channelId=openingChannel;
channelInfo.timestamp=0;
channelInfo.classId=GlobalConfig.classId;
channelInfo.toNodeId=0;
channelInfo.userId=GlobalConfig.userId;
channelInfo.mediaType=ApeConsts.MEDIA_TYPE_DEFAULT;
this.sendTableUpdateHandler(channelInfo);
//递归检查,800毫秒之后执行
setTimeout(function(){
... ... @@ -223,12 +209,6 @@ class VideoApe extends Ape {
return {"code": ApeConsts.RETURN_FAILED, "data": "不能再打开更多的设备","mediaChannels":this.mediaModule.mediaChannels};
}
}
/* message RCVideoSendDataRequestPdu {
required uint32 from_node_id = 1;//发起人
optional uint32 to_node_id = 2;//接收人,如果是0就是所有人都接收
optional uint32 actionType = 3;//消息指令类型;
optional bytes data = 4;//其他数据,这个根据actionType来确定数据的结构
}*/
let videoSendPdu = new pdu['RCVideoSendDataRequestPdu'];
videoSendPdu.type = pdu.RCPDU_SEND_VIDEO_DATA_REQUEST;
... ... @@ -255,6 +235,7 @@ class VideoApe extends Ape {
sendTableUpdateHandler(_channelInfo) {
loger.log("video===sendTableUpdateHandler ");
let updateModelPdu = this.packPdu(_channelInfo, _channelInfo.channelId);//let updateModelPdu=this.packPdu({},ApeConsts.VIDEO_OBJ_TABLE_ID+2);
if(updateModelPdu==null){
loger.warn("sendTableUpdateHandler error,updateModelPdu=null");
return;
... ... @@ -262,13 +243,11 @@ class VideoApe extends Ape {
let tableItemPdu = new pdu['RCRegistryTableItemPdu'];
tableItemPdu.itemIdx = _channelInfo.channelId;//tableItemPdu.itemIdx=ApeConsts.VIDEO_OBJ_TABLE_ID+2;
tableItemPdu.owner = 0;//收到flash的是这个值,不清楚先写固定
tableItemPdu.owner = _channelInfo.owner;//0收到flash的是这个值,MCU做了了用户掉线处理,30秒之后会清理owner为0
tableItemPdu.itemData = updateModelPdu.toArrayBuffer();
//insert
let tableInsertItemPdu = new pdu['RCRegistryTableUpdateItemPdu'];
//optional RCPduType_E type = 1 [default = RCPDU_REG_TABLE_UPDATE_PDU];
//repeated RCRegistryTableItemPdu items = 2;
tableInsertItemPdu.type = pdu.RCPDU_REG_TABLE_UPDATE_PDU;//
tableInsertItemPdu.items.push(tableItemPdu);
... ... @@ -296,11 +275,11 @@ class VideoApe extends Ape {
receiveVideoCommandHandler(_data) {
let videoReceivePdu = pdu['RCVideoSendDataRequestPdu'].decode(_data);
if (videoReceivePdu == null) {
loger.warn("视频消息处理,收到的消息为null,不做处理");
loger.warn("视频控制消息处理,收到的消息为null,不做处理");
return;
}
videoReceivePdu.data = this._rCArrayBufferUtil.uint8ArrayToStr(videoReceivePdu.data, 2);//开头两个字会乱码
loger.log('视频消息处理 receiveVideoCommandHandler.');
loger.log('视频控制消息处理 .',videoReceivePdu);
console.log(videoReceivePdu);
//判断接收者的id,如果不是0,并且也不是自己的nodeId,那么消息不做处理
... ... @@ -314,6 +293,15 @@ class VideoApe extends Ape {
tableUpdateHandler(owner, itemIdx, itemData) {
// debugger;
let unpackChannelInfo = this.unPackPdu(owner, itemIdx, itemData);
loger.log("tableUpdateHandler,channel",itemIdx);
//****很重要********
//如果owner的值为0,代表的是这个歌频道已经被释放了(mcu服务端对于占用channel的掉线用户,就是把owner设置为0)
if(owner==0){
loger.log("释放占用的频道,channel",itemIdx);
unpackChannelInfo.status=ApeConsts.CHANNEL_STATUS_RELEASED;
}
this.mediaModule.mediaChannels[itemIdx] = unpackChannelInfo;
if(unpackChannelInfo&&unpackChannelInfo.fromNodeId!=GlobalConfig.nodeId){
... ... @@ -334,13 +322,11 @@ class VideoApe extends Ape {
if(rtmpStream.code==0){
receiveChannelInfo.rtmpUrl=rtmpStream.playUrl;
}
loger.log("VIDEO_PLAY");
console.log(receiveChannelInfo);
loger.log("VIDEO_PLAY",receiveChannelInfo);
//广播播放视频的消息
this._emit(MessageTypes.VIDEO_PLAY, receiveChannelInfo);
}else {
loger.log("VIDEO_STOP");
console.log(receiveChannelInfo);
loger.log("VIDEO_STOP",receiveChannelInfo);
//流已经停止
this._emit(MessageTypes.VIDEO_STOP, receiveChannelInfo);
}
... ... @@ -374,18 +360,17 @@ class VideoApe extends Ape {
packPduModel.timestamp =_param.timestamp||EngineUtils.creatTimestamp();
packPduModel.fromNodeId = GlobalConfig.nodeId;
packPduModel.toNodeId = 0;
console.log("packPdu",packPduModel);
console.log(packPduModel);
return packPduModel;
}
unPackPdu(owner, itemIdx, itemData) {
loger.log("unPackPdu ");
loger.log("unPackPdu->owner:",owner,"itemIdx->",itemIdx);
if (owner == null || itemIdx == null || itemData == null) {
this._emit(MessageTypes.MCU_ERROR, MessageTypes.ERR_APE_INTERFACE_PARAM_WRONG);
return null;
}
try {
let videoChannelInfo = pdu['RCVideoChannelInfoPdu'].decode(itemData);
console.log(videoChannelInfo);
return videoChannelInfo;
... ...