李勇

1.修改视频和音频模块的推流和播流接口

2.文档模块修改了获取pdf图片序列的接口
... ... @@ -549,10 +549,12 @@ export default class MessageEntrance extends Emiter {
//保存会态信息成功
_sassSaveClassStatusInfoSuccessHandler(_data) {
loger.log('保存会议状态信息成功.', _data);
loger.log('保存会议状态信息成功.');
console.log(_data);
}
_sassSaveClassRecordInfoSuccessHandler(_data){
loger.log('保存会议录制信息成功.', _data);
loger.log('保存会议录制信息成功.');
console.log(_data);
}
//Sass校验流程结束之后,开始加入MCU
... ...
... ... @@ -42,7 +42,11 @@ class EverSocket extends Emiter {
send(data) {
if (this._connected) {
loger.log('SEND MESSAGE---->');
if(data){
loger.log('SEND MESSAGE,byteLength---->',data.byteLength);
}else {
loger.log('SEND MESSAGE---->');
}
this.websocket.send(data);
} else {
loger.warn('WebSocket未建立连接.消息忽略');
... ...
... ... @@ -34,10 +34,14 @@ MessageTypes.CLASS_RECORD_START='class.record.start';//开始录制
MessageTypes.CHAT_RECEIVE = 'chat.receive';
//视频模块事件定义
MessageTypes.VIDEO_PLAY = 'video.play';//播放视频
MessageTypes.VIDEO_STOP = 'video.stop';//停止视频
MessageTypes.VIDEO_UPDATE = 'video.update';
MessageTypes.VIDEO_BROADCAST= 'video.broadcast';
//音频模块事件定义
MessageTypes.AUDIO_PLAY = 'audio.play';//播放
MessageTypes.AUDIO_STOP = 'audio.stop';//停止
MessageTypes.AUDIO_UPDATE = 'audio.update';
MessageTypes.AUDIO_BROADCAST= 'audio.broadcast';
... ...
... ... @@ -57,15 +57,21 @@ class AudioApe extends Ape {
return {"code": ApeConsts.RETURN_FAILED, "data": "已经断开连接"};
}
if (_param == null||_param.channelId == null||
_param.classId == null||_param.userId == null||
_param.siteId == null|| _param.timestamp==null)
if (_param == null||_param.publishUrl == null)
{
loger.warn('publishAudio,参数错误', _param);
this._emit(MessageTypes.MCU_ERROR, MessageTypes.ERR_APE_INTERFACE_PARAM_WRONG);
return {"code": ApeConsts.RETURN_FAILED, "data": "参数错误"};
}
//根据推流的地址获取对应的频道信息
let needPublishChannelInfo=this.mediaModule.getNeedPublishMediaChannel(_param.publishUrl);
if(needPublishChannelInfo==null){
loger.warn('publishVideo,推流数据已经无效', _param);
return {"code": ApeConsts.RETURN_FAILED, "data": "推流数据已经无效"};
}
//同一个nodeId只允许推一个流,如果已经推了就不能再推
if(this.mediaModule.getOpeningMediaChannel(GlobalConfig.nodeId)!=0){
loger.warn("publishAudio,已经存在一个流,不能再推");
... ... @@ -80,18 +86,19 @@ class AudioApe extends Ape {
}
//判断当前的频道是否已经占用
if(this.mediaModule.checkChannelIsOpening(_param.channelId)){
loger.warn(_param.channelId,"频道已经被占用");
if(this.mediaModule.checkChannelIsOpening(needPublishChannelInfo.channelId)){
loger.warn(needPublishChannelInfo.channelId,"频道已经被占用");
return {"code": ApeConsts.RETURN_FAILED, "data":"频道已经被占用!"};
}
let channelInfo={};
channelInfo.status=ApeConsts.CHANNEL_STATUS_OPENING;
channelInfo.fromNodeId=GlobalConfig.nodeId;
channelInfo.channelId=_param.channelId;//freeChannel
channelInfo.timestamp=_param.timestamp;//EngineUtils.creatTimestamp();
channelInfo.classId=_param.classId;//GlobalConfig.classId;
channelInfo.siteId=_param.siteId;//GlobalConfig.siteId;
channelInfo.channelId=needPublishChannelInfo.channelId;//freeChannel
channelInfo.streamId=needPublishChannelInfo.streamId;//按规则拼接的流名称
channelInfo.timestamp=needPublishChannelInfo.timestamp;//EngineUtils.creatTimestamp();
channelInfo.classId=GlobalConfig.classId;//GlobalConfig.classId;
channelInfo.siteId=GlobalConfig.siteId;//GlobalConfig.siteId;
channelInfo.toNodeId=0;
channelInfo.mediaType=ApeConsts.MEDIA_TYPE_AUDIO;
this.sendTableUpdateHandler(channelInfo);
... ... @@ -105,9 +112,9 @@ class AudioApe extends Ape {
loger.warn(GlobalConfig.getCurrentStatus());
return {"code": ApeConsts.RETURN_FAILED, "data": "已经断开连接"};
}
//_param如果为空,那么默认就是当前自己的nodeId,否则用_param
//_param如果为空或者0,那么默认就是当前自己的nodeId,否则用_param
let nodeId;
if(_param&&parseInt(_param.nodeId)>=0){
if(_param&&parseInt(_param.nodeId)>0){
nodeId=parseInt(_param.nodeId);
}else {
nodeId=GlobalConfig.nodeId;
... ... @@ -115,8 +122,8 @@ class AudioApe extends Ape {
let openingChannel = this.mediaModule.getOpeningMediaChannel(nodeId);
if (openingChannel == 0) {
loger.warn(nodeId,"stopPublishAudio,没有占用channel,不需要关闭");
return {"code": ApeConsts.RETURN_FAILED, "data": "没有占用channel,不需要关闭"};
loger.warn(nodeId,"没有占用channel不需要处理");
return {"code": ApeConsts.RETURN_FAILED, "data": "没有占用channel不需要处理"};
}
let channelInfo={};
... ... @@ -243,11 +250,47 @@ class AudioApe extends Ape {
tableUpdateHandler(owner, itemIdx, itemData) {
// debugger;
let updateChannelInfo = this.unPackPdu(owner, itemIdx, itemData);
this.mediaModule.mediaChannels[itemIdx] = updateChannelInfo;
/* let updateChannelInfo = this.unPackPdu(owner, itemIdx, itemData);
this.mediaModule.mediaChannels[itemIdx] = updateChannelInfo;
this._emit(MessageTypes.AUDIO_UPDATE, updateChannelInfo);*/
let unpackChannelInfo = this.unPackPdu(owner, itemIdx, itemData);
this.mediaModule.mediaChannels[itemIdx] = unpackChannelInfo;
if(unpackChannelInfo&&unpackChannelInfo.fromNodeId!=GlobalConfig.nodeId){
let receiveChannelInfo={};
receiveChannelInfo.mediaId=unpackChannelInfo.channelId;
//消息不是自己同步的,需要处理
if(unpackChannelInfo.status==ApeConsts.CHANNEL_STATUS_OPENING){
//正在推流
receiveChannelInfo.m3u8Url="";
receiveChannelInfo.rtmpUrl="";
let m3u8Stream=this.mediaModule.getMediaPlayPath({"type":"m3u8","streamId": unpackChannelInfo.streamId});
let rtmpStream=this.mediaModule.getMediaPlayPath({"type":"rtmp","streamId": unpackChannelInfo.streamId});
if(m3u8Stream.code==0){
receiveChannelInfo.m3u8Url=m3u8Stream.playUrl;
}
if(rtmpStream.code==0){
receiveChannelInfo.rtmpUrl=rtmpStream.playUrl;
}
loger.log("AUDIO_PLAY");
console.log(receiveChannelInfo);
//广播播放视频的消息
this._emit(MessageTypes.AUDIO_PLAY, receiveChannelInfo);
}else {
loger.log("AUDIO_STOP");
console.log(receiveChannelInfo);
//流已经停止
this._emit(MessageTypes.AUDIO_STOP, receiveChannelInfo);
}
}else {
loger.warn("消息是自己发送的或者是消息无效,不需要处理,消息内容如下:");
console.log(unpackChannelInfo);
this._emit(MessageTypes.AUDIO_UPDATE, updateChannelInfo);
}
}
///////数据的封包和解包/////////////////////////////////////////
... ... @@ -263,6 +306,7 @@ class AudioApe extends Ape {
let packPduModel = new pdu['RCAudioChannelInfoPdu'];
packPduModel.status = _param.status||ApeConsts.CHANNEL_STATUS_RELEASED;
packPduModel.channelId = _itemIdx;
packPduModel.streamId = _param.streamId||"";
packPduModel.siteId=_param.siteId||GlobalConfig.siteId;//GlobalConfig.siteId;
packPduModel.classId =parseInt(_param.classId)||parseInt(GlobalConfig.classId);
packPduModel.userId =_param.userId||"0";
... ... @@ -282,7 +326,7 @@ class AudioApe extends Ape {
}
try {
let packChannelInfo = pdu['RCAudioChannelInfoPdu'].decode(itemData);
loger.log("unPackPdu",packChannelInfo);
console.log(packChannelInfo);
return packChannelInfo;
} catch (err) {
loger.log("unPackPdu error,itemIdx=" + itemIdx + " err:" + err.message);
... ...
... ... @@ -171,7 +171,8 @@ class DocApe extends Ape {
if(lastIndex>0){
let newPath=fullPath.substr(0,lastIndex);
let pathArr=[];
for(let i=1;i<_param.pageNum;i++){
//页数从1开始
for(let i=1;i<=_param.pageNum;i++){
pathArr.push(newPath+"/"+i+"."+fileType);
}
return pathArr;
... ... @@ -210,6 +211,8 @@ class DocApe extends Ape {
//切换文档
documentSwitchDoc(paramInfo){
loger.log('切换文档,documentSwitchDoc');
console.log(paramInfo);
if(paramInfo==null||paramInfo.itemIdx==null){
loger.warn('documentSwitch失败,参数错误',paramInfo);
this._emit(MessageTypes.MCU_ERROR,MessageTypes.ERR_APE_INTERFACE_PARAM_WRONG);
... ... @@ -257,6 +260,8 @@ class DocApe extends Ape {
//文档翻页
documentSwitchPage(paramInfo){
loger.log('文档翻页,documentSwitchPage');
console.log(paramInfo);
//console.log(this.docList);
//获取已经存在的数据
let docDataModel= this.docList[paramInfo.itemIdx];
... ...
... ... @@ -12,6 +12,7 @@ let loger = Loger.getLoger('MediaModule');
class MediaModule {
constructor() {
this.needPublishMediaChannel={};//记录准备推流的频道信息
this.mediaChannels = {};
this.maxMediaChannel=0;
this.MEDIA_OBJ_TABLE_ID=0;
... ... @@ -20,9 +21,7 @@ class MediaModule {
//获取播流地址
getMediaPlayPath(_param) {
loger.log('getMediaPlayPath');
if (_param == null||_param.siteId == null||
_param.classId == null||_param.userId == null||
_param.channelId == null|| _param.timestamp==null)
if (_param == null||_param.streamId == null)
{
loger.warn('getMediaPlayPath,参数错误', _param);
//this._emit(MessageTypes.MCU_ERROR, MessageTypes.ERR_APE_INTERFACE_PARAM_WRONG);
... ... @@ -37,21 +36,13 @@ class MediaModule {
port = (GlobalConfig.RSServerPort == "" || GlobalConfig.RSServerPort == null) ? "":":" + GlobalConfig.RSServerPort;
path = "http://" + GlobalConfig.RSServerIP
+ port + "/live/"
+ _param.siteId
+ "_" + _param.classId
+ "_" + _param.userId
+ "_" + _param.channelId
+ "_" + _param.timestamp
+ _param.streamId
+ "/index.m3u8";
} else {
port = (GlobalConfig.MSServerPort == "" || GlobalConfig.MSServerPort == null) ? "":":" + GlobalConfig.MSServerPort;
path = "rtmp://" + GlobalConfig.MSServerIP
+ port + "/live/"
+ _param.siteId
+ "_" + _param.classId
+ "_" + _param.userId
+ "_" + _param.channelId
+ "_" + _param.timestamp;
+ _param.streamId;
}
return {"code": ApeConsts.RETURN_SUCCESS, "data": "","playUrl": path};
}
... ... @@ -77,18 +68,21 @@ class MediaModule {
//时间戳
let timestamp = EngineUtils.creatTimestamp();
//生成推流地址和推流数据(同步数据的时候用)
let publishUrl = "rtmp://" + GlobalConfig.MSServerIP
+ port + "/"+pubType+"/" +GlobalConfig.siteId+"_"
let streamId=GlobalConfig.siteId+"_"
+ GlobalConfig.classId + "_"+GlobalConfig.userId
+"_" + freeChannel + "_" + timestamp;
//生成推流地址和推流数据(同步数据的时候用)
let publishUrl = "rtmp://" + GlobalConfig.MSServerIP
+ port + "/"+pubType+"/" +streamId;
this.needPublishMediaChannel[publishUrl]={
"channelId":freeChannel,
"publishUrl":publishUrl,
"streamId":streamId
};
return {"code": ApeConsts.RETURN_SUCCESS,
"data":"",
"siteId":GlobalConfig.siteId,
"classId":GlobalConfig.classId,
"userId":GlobalConfig.userId,
"channelId": freeChannel,
"timestamp": timestamp,
"publishUrl": publishUrl
};
}
... ... @@ -111,6 +105,11 @@ class MediaModule {
return 0;//没有空闲的
}
//获取准备推流的频道信息
getNeedPublishMediaChannel(_publishUrl){
return this.needPublishMediaChannel[_publishUrl];
}
//获取当前属于nodeId的已经打开的的channel,返回值为0代表没有打开的,否则返回的就是打开的channelId
getOpeningMediaChannel(_nodeId){
if(_nodeId==null||_nodeId==0){
... ...
... ... @@ -58,16 +58,19 @@ class VideoApe extends Ape {
return {"code": ApeConsts.RETURN_FAILED, "data": "已经断开连接"};
}
if (_param == null||_param.channelId == null||
_param.classId == null||_param.userId == null||
_param.siteId == null|| _param.timestamp==null)
if (_param == null||_param.publishUrl == null)
{
loger.warn('publishVideo,参数错误', _param);
this._emit(MessageTypes.MCU_ERROR, MessageTypes.ERR_APE_INTERFACE_PARAM_WRONG);
return {"code": ApeConsts.RETURN_FAILED, "data": "参数错误"};
}
loger.log('publishVideo -> maxVideoChannels', GlobalConfig.maxVideoChannels);
//根据推流的地址获取对应的频道信息
let needPublishChannelInfo=this.mediaModule.getNeedPublishMediaChannel(_param.publishUrl);
if(needPublishChannelInfo==null){
loger.warn('publishVideo,推流数据已经无效', _param);
return {"code": ApeConsts.RETURN_FAILED, "data": "推流数据已经无效"};
}
//同一个nodeId只允许推一个流,如果已经推了就不能再推
if(this.mediaModule.getOpeningMediaChannel(GlobalConfig.nodeId)!=0){
... ... @@ -82,19 +85,20 @@ class VideoApe extends Ape {
return {"code": ApeConsts.RETURN_FAILED, "data": "不能再打开更多的设备"};
}
//判断当前的频道是否已经占用
if(this.mediaModule.checkChannelIsOpening(_param.channelId)){
loger.warn(_param.channelId,"频道已经被占用");
if(this.mediaModule.checkChannelIsOpening(needPublishChannelInfo.channelId)){
loger.warn(needPublishChannelInfo.channelId,"频道已经被占用");
return {"code": ApeConsts.RETURN_FAILED, "data":"频道已经被占用!"};
}
let channelInfo={};
channelInfo.status=ApeConsts.CHANNEL_STATUS_OPENING;
channelInfo.fromNodeId=GlobalConfig.nodeId;
channelInfo.channelId=_param.channelId;//freeChannel
channelInfo.timestamp=_param.timestamp;//EngineUtils.creatTimestamp();
channelInfo.classId=_param.classId;//GlobalConfig.classId;
channelInfo.siteId=_param.siteId;//GlobalConfig.siteId;
channelInfo.channelId=needPublishChannelInfo.channelId;
channelInfo.streamId=needPublishChannelInfo.streamId;//按规则拼接的流名称
channelInfo.classId=GlobalConfig.classId;
channelInfo.siteId=GlobalConfig.siteId;
channelInfo.toNodeId=0;
channelInfo.mediaType=ApeConsts.MEDIA_TYPE_VIDEO;
this.sendTableUpdateHandler(channelInfo);
... ... @@ -111,7 +115,7 @@ class VideoApe extends Ape {
loger.log('stopPublishVideo -> maxVideoChannels', GlobalConfig.maxVideoChannels);
//_param如果为空,那么默认就是当前自己的nodeId,否则用_param
let nodeId;
if(_param&&parseInt(_param.nodeId)>=0){
if(_param&&parseInt(_param.nodeId)>0){
nodeId=parseInt(_param.nodeId);
}else {
nodeId=GlobalConfig.nodeId;
... ... @@ -119,8 +123,8 @@ class VideoApe extends Ape {
let openingChannel = this.mediaModule.getOpeningMediaChannel(nodeId);
if (openingChannel == 0) {
loger.warn(nodeId,"stopPublishVideo,没有占用channel,不需要关闭");
return {"code": ApeConsts.RETURN_FAILED, "data": "没有占用channel,不需要关闭"};
loger.warn(nodeId,"没有占用channel不需要处理");
return {"code": ApeConsts.RETURN_FAILED, "data": "没有占用channel不需要处理"};
}
let channelInfo={};
... ... @@ -265,9 +269,44 @@ class VideoApe extends Ape {
tableUpdateHandler(owner, itemIdx, itemData) {
// debugger;
let videoChannelInfo = this.unPackPdu(owner, itemIdx, itemData);
this.mediaModule.mediaChannels[itemIdx] = videoChannelInfo;
this._emit(MessageTypes.VIDEO_UPDATE, videoChannelInfo);
let unpackChannelInfo = this.unPackPdu(owner, itemIdx, itemData);
this.mediaModule.mediaChannels[itemIdx] = unpackChannelInfo;
if(unpackChannelInfo&&unpackChannelInfo.fromNodeId!=GlobalConfig.nodeId){
let receiveChannelInfo={};
receiveChannelInfo.mediaId=unpackChannelInfo.channelId;
//消息不是自己同步的,需要处理
if(unpackChannelInfo.status==ApeConsts.CHANNEL_STATUS_OPENING){
//正在推流
receiveChannelInfo.m3u8Url="";
receiveChannelInfo.rtmpUrl="";
let m3u8Stream=this.mediaModule.getMediaPlayPath({"type":"m3u8","streamId": unpackChannelInfo.streamId});
let rtmpStream=this.mediaModule.getMediaPlayPath({"type":"rtmp","streamId": unpackChannelInfo.streamId});
if(m3u8Stream.code==0){
receiveChannelInfo.m3u8Url=m3u8Stream.playUrl;
}
if(rtmpStream.code==0){
receiveChannelInfo.rtmpUrl=rtmpStream.playUrl;
}
loger.log("VIDEO_PLAY");
console.log(receiveChannelInfo);
//广播播放视频的消息
this._emit(MessageTypes.VIDEO_PLAY, receiveChannelInfo);
}else {
loger.log("VIDEO_STOP");
console.log(receiveChannelInfo);
//流已经停止
this._emit(MessageTypes.VIDEO_STOP, receiveChannelInfo);
}
}else {
loger.warn("视频消息是自己发送的或者是视频消息无效,不需要处理,消息内容如下:");
console.log(unpackChannelInfo);
}
//this._emit(MessageTypes.VIDEO_UPDATE, videoChannelInfo);
}
///////数据的封包和解包/////////////////////////////////////////
... ... @@ -283,6 +322,7 @@ class VideoApe extends Ape {
let packPduModel = new pdu['RCVideoChannelInfoPdu'];
packPduModel.status = _param.status||ApeConsts.CHANNEL_STATUS_RELEASED;
packPduModel.channelId = _itemIdx;
packPduModel.streamId = _param.streamId||"";
packPduModel.siteId=_param.siteId||GlobalConfig.siteId;//GlobalConfig.siteId;
packPduModel.classId =parseInt(_param.classId)||parseInt(GlobalConfig.classId);
packPduModel.userId =_param.userId||"0";
... ... @@ -303,7 +343,7 @@ class VideoApe extends Ape {
try {
let videoChannelInfo = pdu['RCVideoChannelInfoPdu'].decode(itemData);
loger.log("unPackPdu",videoChannelInfo);
console.log(videoChannelInfo);
return videoChannelInfo;
} catch (err) {
loger.log("unPackPdu error,itemIdx=" + itemIdx + " err:" + err.message);
... ...
... ... @@ -776,6 +776,7 @@ message RCAudioChannelInfoPdu {
optional uint32 class_id = 7;//课堂号
optional string site_id = 8;//站点号
optional string user_id = 9;//用户的userId
optional string stream_id = 10;//流名称
}
message RCVideoChannelInfoPdu {
... ... @@ -788,6 +789,7 @@ message RCVideoChannelInfoPdu {
optional uint32 class_id = 7;//课堂号
optional string site_id = 8;//站点号
optional string user_id = 9;//用户的userId
optional string stream_id = 10;//流名称
}
message RCVideoChannelInfoRecordPdu {
... ...