VideoPusher.h
#pragma once #include <iostream> #include <gst/gst.h> #include <gst/app/gstappsink.h> #include <glib.h> #include <boost/shared_ptr.hpp> #include <mutex> #ifndef INT64_C #define INT64_C(c) (c ## LL) #define UINT64_C(c) (c ## ULL) #endif class VideoPusher { struct Exception : std::exception{}; template<typename T> T* chk(T* pointer) { if (pointer == nullptr) { throw Exception(); } return pointer; } template<typename T> void delptr(T* pointer) { if (pointer != nullptr) { delete pointer; pointer = nullptr; } } public: VideoPusher(); VideoPusher(std::string strlocation, std::string strCode); virtual ~VideoPusher(); bool Run(std::string strlocation, std::string strCode, std::string strrequest); protected: static void _onPadAdded(GstElement *src, GstPad *src_pad, gpointer user_data); void SetElesNull(); protected: GMainLoop *_loop; GstBus* _bus; GstElement* _pipeline; GstElement* _source; GstElement* _depay; GstElement* _parse; GstElement* _capsfilter; GstElement* _queue1; GstElement* _rtmpsink; GstElement* _flvmux; std::mutex _mutex; bool _bStopPush; }; typedef boost::shared_ptr<VideoPusher> VideoPusherPtr;
VideoPusher.cpp
void VideoPusher::_onPadAdded(GstElement *src, GstPad *src_pad, gpointer user_data) { GstPad* sink_pad = (GstPad*)user_data; gst_pad_link(src_pad, sink_pad); } void VideoPusher::SetElesNull() { _loop = nullptr; _bus = nullptr; _pipeline = nullptr; _source = nullptr; _depay = nullptr; _parse = nullptr; _capsfilter = nullptr; _queue1 = nullptr; _rtmpsink = nullptr; _flvmux = nullptr; } bool VideoPusher::Run(std::string strLocation, std::string strCode, std::string strrequest) { _bStopPush = false; while (!_bStopPush) { SetElesNull(); gboolean terminate = FALSE; gst_init(nullptr, nullptr); std::stringstream stream; stream << "pipeline" << g_pipelinenum++; std::string strname; strname = stream.str(); _pipeline = chk(gst_pipeline_new(strname.c_str())); _source = chk(gst_element_factory_make("rtspsrc", "src")); _depay = chk(gst_element_factory_make(("rtp" + strCode + "depay").c_str(), "depay")); _parse = chk(gst_element_factory_make((strCode + "parse").c_str(), "parse")); _flvmux = chk(gst_element_factory_make("flvmux", "flvmux")); _queue1 = chk(gst_element_factory_make("queue", "queue")); _capsfilter = chk(gst_element_factory_make("capsfilter", "filter")); _rtmpsink = chk(gst_element_factory_make("rtmpsink", "sink")); //g_object_set(_source, "protocols", 0x00000004, NULL); g_object_set(_source, "latency", 0, NULL); g_object_set(_capsfilter, "caps-change-mode", 1, NULL); g_object_set(_rtmpsink, "location", strLocation.c_str(), NULL); gst_bin_add_many(GST_BIN(_pipeline), _source, _depay, _parse, _flvmux, _capsfilter, _queue1, _rtmpsink, NULL); g_signal_connect(_source, "pad-added", G_CALLBACK(&_onPadAdded), gst_element_get_static_pad(_depay, "sink")); gboolean bsuccess = gst_element_link_many(_depay, _parse, _flvmux, _capsfilter, _queue1, _rtmpsink, NULL); if (!bsuccess) { g_print("Failed to link one or more elements!\n"); gst_element_unlink_many(_depay, _parse, _flvmux, _capsfilter, _queue1, _rtmpsink, NULL); Sleep(1000); continue; } g_object_set(_source, "location", strrequest.c_str(), NULL); GstCaps* caps = gst_caps_new_simple( "video/x-raw", "format", G_TYPE_STRING, "rgb", "width", G_TYPE_INT, 426, "height", G_TYPE_INT, 240, "framerate", GST_TYPE_FRACTION, 25, 1, NULL); g_object_set(_capsfilter, "caps", caps, NULL); gst_caps_unref(caps); GstStateChangeReturn res = gst_element_set_state(_pipeline, GST_STATE_PLAYING); if (res == GST_STATE_CHANGE_FAILURE) { g_printerr("Unable to set the pipeline to the playing state.\n"); gst_object_unref(_pipeline); Sleep(1000); continue; } GstMessage *msg; /* Listen to the bus */ //_bus = gst_element_get_bus(_pipeline); _bus = gst_pipeline_get_bus(GST_PIPELINE(_pipeline)); do { msg = gst_bus_timed_pop_filtered(_bus, GST_CLOCK_TIME_NONE, GstMessageType(GST_MESSAGE_STATE_CHANGED | GST_MESSAGE_ERROR | GST_MESSAGE_EOS)); /* Parse message */ if (msg != NULL) { GError *err; gchar *debug_info; switch (GST_MESSAGE_TYPE(msg)) { case GST_MESSAGE_ERROR: { gst_message_parse_error(msg, &err, &debug_info); g_printerr("Error received from element %s: %s\n", GST_OBJECT_NAME(msg->src), err->message); g_printerr("Debugging information: %s\n", debug_info ? debug_info : "none"); g_clear_error(&err); g_free(debug_info); terminate = TRUE; } break; case GST_MESSAGE_EOS: { g_print("End-Of-Streamreached.\n"); terminate = TRUE; } break; case GST_MESSAGE_STATE_CHANGED: { /* We are onlyinterested in state-changed messages from the pipeline */ if (GST_MESSAGE_SRC(msg) == GST_OBJECT(_pipeline)) { GstState old_state, new_state, pending_state; gst_message_parse_state_changed(msg, &old_state, &new_state, &pending_state); g_print("Pipeline state changed from %s to %s:\n", gst_element_state_get_name(old_state), gst_element_state_get_name(new_state)); if (pending_state == GST_STATE_NULL) { terminate = TRUE; } } } break; default: { /* We shouldnot reach here */ g_printerr("Unexpected message received.\n"); break; } } gst_message_unref(msg); // std::this_thread::sleep_for(std::chrono::milliseconds(200)); } } while (!terminate); /* Free resources */ try { std::lock_guard<std::mutex> lock(_mutex); gst_object_unref(_bus); gst_element_set_state(_pipeline, GST_STATE_PAUSED); gst_element_set_state(_pipeline, GST_STATE_READY); gst_element_set_state(_pipeline, GST_STATE_NULL); gst_object_unref(_pipeline); } catch (std::exception &e) { cout << e.what(); return true; } catch (...) { return true; } } return true; }
main.cpp
#include "VideoPusher.h" int main() { VideoPusher pusher; std::string strRtmp = "rtmp://localhost:1945/live/room"; std::string strRtsp = "rtsp://admin:HuaWei123@59.51.115.31/LiveMedia/ch1/Media1"; std::string strCode = "h264"; bool bSuccess = pusher.Run(strRtmp, strCode, strRtsp); getchar(); return 0; }