背景
在事先Android平台RTSP、RTMP转GB28181网关之前,我们已经实现了Android平台GB28181的接入,可实现Android平台采集到的音视频数据,编码后,打包按需发到GB28181服务平台。此外,拉流端,我们已经有了成熟的RTSP和RTMP拉流播放方案。
今天,我们要做的是,把外部的RTSP或RTMP流,拉过来,然后对接到GB28181国标平台,实现媒体流数据的按需播放。
和我们之前实现的轻量级RTSP服务网关模块类似,我们要做的是,实现RTSP或RTMP流,按需打包对接到GB28181服务平台。
简单来说,Android平台RTSP、RTMP转GB28181网关平台,是GB28181设备接入模块的一个扩展,由拉流端、GB28181接入端两个模块组成。
轻量级RTSP服务模块、RTSP|RTMP转GB28181网关模块和内置RTSP网关模块的区别和联系:
内置轻量级RTSP服务模块和内置RTSP网关模块,核心痛点是避免用户或者开发者单独部署RTSP或者RTMP服务,数据汇聚到内置RTSP服务,对外提供可供拉流的RTSP URL,适用于内网环境下,对并发要求不高的场景,支持H.264/H.265,支持RTSP鉴权、单播、组播模式,考虑到单个服务承载能力,我们支持同时创建多个RTSP服务,并支持获取当前RTSP服务会话连接数。
RTSP|RTMP转GB28181网关模块,实现的是音视频数据的转发,类似于RTSP|RTMP转RTMP推送模块,把本地数据源,对接到GB28181服务平台或RTMP服务平台。
三者不同点:数据来源不同
1. 内置轻量级RTSP服务模块,数据源来自摄像头、屏幕、麦克风等编码前数据,或者本地编码后的对接数据,这点和GB28181的设备接入模块类似。
2. 内置RTSP网关模块,实际上是RTSP/RTMP拉流模块+内置轻量级RTSP服务模块组合出来的。数据源来自RTSP或RTMP网络流,拉流模块完成编码后的音视频数据回调,然后,汇聚到内置轻量级RTSP服务模块。RTSP|RTMP转GB28181网关模块,和内置RTSP网关模块数据源接入一样。
技术实现
本文以之前Android平台RTSP|RTMP转发demo为例,在这个基础上,加GB28181网关扩展。
拉流端音频数据回调,拉流端获取到编码后是数据,回调上来,通过SmartPublisherPostAudioEncodedData()发送到推送模块。
class PlayerAudioDataCallback implements NTAudioDataCallback { private int audio_buffer_size = 0; private int param_info_size = 0; private ByteBuffer audio_buffer_ = null; private ByteBuffer parameter_info_ = null; @Override public ByteBuffer getAudioByteBuffer(int size) { //Log.i("getAudioByteBuffer", "size: " + size); if( size < 1 ) { return null; } if ( size <= audio_buffer_size && audio_buffer_ != null ) { return audio_buffer_; } audio_buffer_size = size + 512; audio_buffer_size = (audio_buffer_size+0xf) & (~0xf); audio_buffer_ = ByteBuffer.allocateDirect(audio_buffer_size); // Log.i("getAudioByteBuffer", "size: " + size + " buffer_size:" + audio_buffer_size); return audio_buffer_; } @Override public ByteBuffer getAudioParameterInfo(int size) { //Log.i("getAudioParameterInfo", "size: " + size); if(size < 1) { return null; } if ( size <= param_info_size && parameter_info_ != null ) { return parameter_info_; } param_info_size = size + 32; param_info_size = (param_info_size+0xf) & (~0xf); parameter_info_ = ByteBuffer.allocateDirect(param_info_size); //Log.i("getAudioParameterInfo", "size: " + size + " buffer_size:" + param_info_size); return parameter_info_; } public void onAudioDataCallback(int ret, int audio_codec_id, int sample_size, int is_key_frame, long timestamp, int sample_rate, int channel, int parameter_info_size, long reserve) { //Log.i("onAudioDataCallback", "ret: " + ret + ", audio_codec_id: " + audio_codec_id + ", sample_size: " + sample_size + ", timestamp: " + timestamp + // ",sample_rate:" + sample_rate); if ( audio_buffer_ == null) return; audio_buffer_.rewind(); if ( ret == 0 && (isPushing || isRTSPPublisherRunning || isGB28181StreamRunning)) { libPublisher.SmartPublisherPostAudioEncodedData(publisherHandle, audio_codec_id, audio_buffer_, sample_size, is_key_frame, timestamp, parameter_info_, parameter_info_size); } // test /* byte[] test_buffer = new byte[16]; pcm_buffer_.get(test_buffer); Log.i(TAG, "onGetPcmFrame data:" + bytesToHexString(test_buffer)); */ } }
拉流端视频数据回调,拉流端获取到编码后是数据,回调上来,通过SmartPublisherPostVideoEncodedData()发送到推送模块。
class PlayerVideoDataCallback implements NTVideoDataCallback { private int video_buffer_size = 0; private ByteBuffer video_buffer_ = null; @Override public ByteBuffer getVideoByteBuffer(int size) { //Log.i("getVideoByteBuffer", "size: " + size); if( size < 1 ) { return null; } if ( size <= video_buffer_size && video_buffer_ != null ) { return video_buffer_; } video_buffer_size = size + 1024; video_buffer_size = (video_buffer_size+0xf) & (~0xf); video_buffer_ = ByteBuffer.allocateDirect(video_buffer_size); // Log.i("getVideoByteBuffer", "size: " + size + " buffer_size:" + video_buffer_size); return video_buffer_; } public void onVideoDataCallback(int ret, int video_codec_id, int sample_size, int is_key_frame, long timestamp, int width, int height, long presentation_timestamp) { //Log.i("onVideoDataCallback", "ret: " + ret + ", video_codec_id: " + video_codec_id + ", sample_size: " + sample_size + ", is_key_frame: "+ is_key_frame + ", timestamp: " + timestamp + // ",presentation_timestamp:" + presentation_timestamp); if ( video_buffer_ == null) return; video_buffer_.rewind(); if ( ret == 0 && (isPushing || isRTSPPublisherRunning || isGB28181StreamRunning) ) { libPublisher.SmartPublisherPostVideoEncodedData(publisherHandle, video_codec_id, video_buffer_, sample_size, is_key_frame, timestamp, presentation_timestamp); } } }
GB28181网关模块,初始化:
/* * gb28181 agent初始化 * Github: https://github.com/daniulive/SmarterStreaming */ private boolean initGB28181Agent() { if ( gb28181_agent_ != null ) return true; getLocation(myContext); String local_ip_addr = IPAddrUtils.getIpAddress(myContext); Log.i(TAG, "initGB28181Agent local ip addr: " + local_ip_addr); if ( local_ip_addr == null || local_ip_addr.isEmpty() ) { Log.e(TAG, "initGB28181Agent local ip is empty"); return false; } gb28181_agent_ = GBSIPAgentFactory.getInstance().create(); if ( gb28181_agent_ == null ) { Log.e(TAG, "initGB28181Agent create agent failed"); return false; } gb28181_agent_.addListener(this); // 必填信息 gb28181_agent_.setLocalAddressInfo(local_ip_addr, gb28181_sip_local_port_); gb28181_agent_.setServerParameter(gb28181_sip_server_addr_, gb28181_sip_server_port_, gb28181_sip_server_id_, gb28181_sip_server_domain_); gb28181_agent_.setUserInfo(gb28181_sip_username_, gb28181_sip_password_); // 可选参数 gb28181_agent_.setUserAgent(gb28181_sip_user_agent_filed_); gb28181_agent_.setTransportProtocol(gb28181_sip_trans_protocol_==0?"UDP":"TCP"); // GB28181配置 gb28181_agent_.config(gb28181_reg_expired_, gb28181_heartbeat_interval_, gb28181_heartbeat_count_); com.gb28181.ntsignalling.Device gb_device = new com.gb28181.ntsignalling.Device("34020000001380000001", "安卓测试设备", Build.MANUFACTURER, Build.MODEL, "宇宙","火星1","火星", true); if (mLongitude != null && mLatitude != null) { com.gb28181.ntsignalling.DevicePosition device_pos = new com.gb28181.ntsignalling.DevicePosition(); device_pos.setTime(mLocationTime); device_pos.setLongitude(mLongitude); device_pos.setLatitude(mLatitude); gb_device.setPosition(device_pos); gb_device.setSupportMobilePosition(true); // 设置支持移动位置上报 } gb28181_agent_.addDevice(gb_device); if (!gb28181_agent_.initialize()) { gb28181_agent_ = null; Log.e(TAG, "initGB28181Agent gb28181_agent_.initialize failed."); return false; } return true; }
GB28181网关模块按钮相关代码实现,整体和GB28181设备接入一致:
class ButtonGB28181AgentListener implements OnClickListener { public void onClick(View v) { stopGB28181Stream(); destoryRTPSender(); if (null == gb28181_agent_ ) { if( !initGB28181Agent() ) return; } if (gb28181_agent_.isRunning()) { gb28181_agent_.terminateAllPlays(true);// 目前测试下来,发送BYE之后,有些服务器会立即发送INVITE,是否发送BYE根据实际情况看 gb28181_agent_.stop(); btnGB28181Agent.setText("启动GB28181"); } else { if ( gb28181_agent_.start() ) { btnGB28181Agent.setText("停止GB28181"); } } } } //停止GB28181 媒体流 private void stopGB28181Stream() { if(!isGB28181StreamRunning) return; if (libPublisher != null) { libPublisher.StopGB28181MediaStream(publisherHandle); } if (!isRecording && !isRTSPPublisherRunning && !isPushing) { if (publisherHandle != 0) { if (libPublisher != null) { libPublisher.SmartPublisherClose(publisherHandle); publisherHandle = 0; } } } isGB28181StreamRunning = false; } private void destoryRTPSender() { if (gb28181_rtp_sender_handle_ != 0) { libPublisher.DestoryRTPSender(gb28181_rtp_sender_handle_); gb28181_rtp_sender_handle_ = 0; } } GB28181网关模块,事件回调处理: @Override public void ntsRegisterOK(String dateString) { Log.i(TAG, "ntsRegisterOK Date: " + (dateString!= null? dateString : "")); } @Override public void ntsRegisterTimeout() { Log.e(TAG, "ntsRegisterTimeout"); } @Override public void ntsRegisterTransportError(String errorInfo) { Log.e(TAG, "ntsRegisterTransportError error:" + (errorInfo != null?errorInfo :"")); } @Override public void ntsOnHeartBeatException(int exceptionCount, String lastExceptionInfo) { Log.e(TAG, "ntsOnHeartBeatException heart beat timeout count reached, count:" + exceptionCount+ ", exception info:" + (lastExceptionInfo!=null?lastExceptionInfo:"")); // 10毫秒后,停止信令, 然后重启 handler.postDelayed(new Runnable() { @Override public void run() { Log.i(TAG, "gb28281_heart_beart_timeout"); stopGB28181Stream(); destoryRTPSender(); if (gb28181_agent_ != null) { Log.i(TAG, "gb28281_heart_beart_timeout sip stop"); gb28181_agent_.stop(); Log.i(TAG, "gb28281_heart_beart_timeout sip start"); gb28181_agent_.start(); } } },10); } @Override public void ntsOnInvitePlay(String deviceId, InvitePlaySessionDescription session_des) { handler.postDelayed(new Runnable() { @Override public void run() { Log.i(TAG,"ntsInviteReceived, device_id:" +device_id_+", is_tcp:" + session_des_.isRTPOverTCP() + " rtp_port:" + session_des_.getMediaPort() + " ssrc:" + session_des_.getSSRC() + " address_type:" + session_des_.getAddressType() + " address:" + session_des_.getAddress()); // 可以先给信令服务器发送临时振铃响应 //sip_stack_android.respondPlayInvite(180, device_id_); long rtp_sender_handle = libPublisher.CreateRTPSender(0); if ( rtp_sender_handle == 0 ) { gb28181_agent_.respondPlayInvite(488, device_id_); Log.i(TAG, "ntsInviteReceived CreateRTPSender failed, response 488, device_id:" + device_id_); return; } gb28181_rtp_payload_type_ = session_des_.getPSRtpMapAttribute().getPayloadType(); libPublisher.SetRTPSenderTransportProtocol(rtp_sender_handle, session_des_.isRTPOverUDP()?0:1); libPublisher.SetRTPSenderIPAddressType(rtp_sender_handle, session_des_.isIPv4()?0:1); libPublisher.SetRTPSenderLocalPort(rtp_sender_handle, 0); libPublisher.SetRTPSenderSSRC(rtp_sender_handle, session_des_.getSSRC()); libPublisher.SetRTPSenderSocketSendBuffer(rtp_sender_handle, 2*1024*1024); // 设置到2M libPublisher.SetRTPSenderClockRate(rtp_sender_handle, session_des_.getPSRtpMapAttribute().getClockRate()); libPublisher.SetRTPSenderDestination(rtp_sender_handle, session_des_.getAddress(), session_des_.getMediaPort()); if ( libPublisher.InitRTPSender(rtp_sender_handle) != 0 ) { gb28181_agent_.respondPlayInvite(488, device_id_); libPublisher.DestoryRTPSender(rtp_sender_handle); return; } int local_port = libPublisher.GetRTPSenderLocalPort(rtp_sender_handle); if (local_port == 0) { gb28181_agent_.respondPlayInvite(488, device_id_); libPublisher.DestoryRTPSender(rtp_sender_handle); return; } Log.i(TAG,"get local_port:" + local_port); String local_ip_addr = IPAddrUtils.getIpAddress(myContext); gb28181_agent_.respondPlayInviteOK(device_id_,local_ip_addr, local_port); gb28181_rtp_sender_handle_ = rtp_sender_handle; } private String device_id_; private InvitePlaySessionDescription session_des_; public Runnable set(String device_id, InvitePlaySessionDescription session_des) { this.device_id_ = device_id; this.session_des_ = session_des; return this; } }.set(deviceId, session_des),0); } @Override public void ntsOnCancelPlay(String deviceId) { // 这里取消Play会话 handler.postDelayed(new Runnable() { @Override public void run() { Log.i(TAG, "ntsOnCancelPlay, deviceId=" + device_id_); destoryRTPSender(); } private String device_id_; public Runnable set(String device_id) { this.device_id_ = device_id; return this; } }.set(deviceId),0); } @Override public void ntsOnAckPlay(String deviceId) { handler.postDelayed(new Runnable() { @Override public void run() { Log.i(TAG,"ntsOnACKPlay, device_id:" +device_id_); if (!isRecording && !isRTSPPublisherRunning && !isPushing) { OpenPushHandle(); } libPublisher.SetGB28181RTPSender(publisherHandle, gb28181_rtp_sender_handle_, gb28181_rtp_payload_type_); int startRet = libPublisher.StartGB28181MediaStream(publisherHandle); if (startRet != 0) { if (!isRecording && !isRTSPPublisherRunning && !isPushing) { if (publisherHandle != 0) { libPublisher.SmartPublisherClose(publisherHandle); publisherHandle = 0; } } destoryRTPSender(); Log.e(TAG, "Failed to start GB28181 service.."); return; } isGB28181StreamRunning = true; } private String device_id_; public Runnable set(String device_id) { this.device_id_ = device_id; return this; } }.set(deviceId),0); } @Override public void ntsOnPlayInviteResponseException(String deviceId, int statusCode, String errorInfo) { // 这里要释放掉响应的资源 Log.i(TAG, "ntsOnPlayInviteResponseException, deviceId=" + deviceId + " statusCode=" +statusCode + " errorInfo:" + errorInfo); handler.postDelayed(new Runnable() { @Override public void run() { Log.i(TAG, "ntsOnPlayInviteResponseException, deviceId=" + device_id_); destoryRTPSender(); } private String device_id_; public Runnable set(String device_id) { this.device_id_ = device_id; return this; } }.set(deviceId),0); } @Override public void ntsOnByePlay(String deviceId) { handler.postDelayed(new Runnable() { @Override public void run() { Log.i(TAG, "ntsOnByePlay, stop GB28181 media stream, deviceId=" + device_id_); stopGB28181Stream(); destoryRTPSender(); } private String device_id_; public Runnable set(String device_id) { this.device_id_ = device_id; return this; } }.set(deviceId),0); } @Override public void ntsOnPlayDialogTerminated(String deviceId) { /* Play会话对应的对话终止, 一般不会出发这个回调,目前只有在响应了200K, 但在64*T1时间后还没收到ACK,才可能会出发 收到这个请做相关清理处理 */ handler.postDelayed(new Runnable() { @Override public void run() { Log.i(TAG, "ntsOnPlayDialogTerminated, deviceId=" + device_id_); stopGB28181Stream(); destoryRTPSender(); } private String device_id_; public Runnable set(String device_id) { this.device_id_ = device_id; return this; } }.set(deviceId),0); } @Override public void ntsOnDevicePositionRequest(String deviceId, int interval) { handler.postDelayed(new Runnable() { @Override public void run() { getLocation(myContext); Log.v(TAG, "ntsOnDevicePositionRequest, deviceId:" + this.device_id_ + ", Longitude:" + mLongitude + ", Latitude:" + mLatitude + ", Time:" + mLocationTime); if (mLongitude != null && mLatitude != null) { com.gb28181.ntsignalling.DevicePosition device_pos = new com.gb28181.ntsignalling.DevicePosition(); device_pos.setTime(mLocationTime); device_pos.setLongitude(mLongitude); device_pos.setLatitude(mLatitude); if (gb28181_agent_ != null ) { gb28181_agent_.updateDevicePosition(device_id_, device_pos); } } } private String device_id_; private int interval_; public Runnable set(String device_id, int interval) { this.device_id_ = device_id; this.interval_ = interval; return this; } }.set(deviceId, interval),0); }
OnDestroy(),停掉GB28181 stream,释放资源。
@Override protected void onDestroy() { Log.i(TAG, "Run into activity destory++"); if (gb28181_agent_ != null ) { if (gb28181_agent_.isRunning()) { gb28181_agent_.terminateAllPlays(false); gb28181_agent_.stop(); } Log.i(TAG, " gb28181_agent_.unInitialize++"); gb28181_agent_.unInitialize(); Log.i(TAG, " gb28181_agent_.unInitialize--"); gb28181_agent_ = null; } stopGB28181Stream(); destoryRTPSender(); super.onDestroy(); finish(); System.exit(0); }
以上是大概流程,感兴趣的开发者,可酌情参考。