转载:Advanced Concepts of Gstreamer Plugin

Request and Sometimes pads

There's pads that are only being created in some cases, or only if the application requests the pad. The first is called a sometimes; the second is called a request pad. The availability of a pad (always, sometimes or request) can be seen in a pad's template. 

Sometimes pads

A “sometimes” pad is a pad that is created under certain conditions, but not in all cases. This mostly depends on stream content: demuxers will generally parse the stream header, decide what elementary (video, audio, subtitle, etc.) streams are embedded inside the system stream, and will then create a sometimes pad for each of those elementary streams. At its own choice, it can also create more than one instance of each of those per element instance. The only limitation is that each newly created pad should have a unique name. Sometimes pads are disposed when the stream data is disposed, too (i.e. when going from PAUSED to the READY state). You should not dispose the pad on EOS, because someone might re-activate the pipeline and seek back to before the end-of-stream point. The stream should still stay valid after EOS, at least until the stream data is disposed. In any case, the element is always the owner of such a pad.

The example code below will parse a text file, where the first line is a number (n). The next lines all start with a number (0 to n-1), which is the number of the source pad over which the data should be sent.

3
0: foo
1: bar
0: boo
2: bye

The code to parse this file and create the dynamic “sometimes” pads, looks like this:

typedef struct _GstMyFilter {
[..]
  gboolean firstrun;
  GList *srcpadlist;
} GstMyFilter;

static GstStaticPadTemplate src_factory =
GST_STATIC_PAD_TEMPLATE (
  "src_%u",
  GST_PAD_SRC,
  GST_PAD_SOMETIMES,
  GST_STATIC_CAPS ("ANY")
);

static void
gst_my_filter_class_init (GstMyFilterClass *klass)
{
  GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
[..]
  gst_element_class_add_pad_template (element_class,
    gst_static_pad_template_get (&src_factory));
[..]
}

static void
gst_my_filter_init (GstMyFilter *filter)
{
[..]
  filter->firstrun = TRUE;
  filter->srcpadlist = NULL;
}

/*
 * Get one line of data - without newline.
 */

static GstBuffer *
gst_my_filter_getline (GstMyFilter *filter)
{
  guint8 *data;
  gint n, num;

  /* max. line length is 512 characters - for safety */
  for (n = 0; n < 512; n++) {
    num = gst_bytestream_peek_bytes (filter->bs, &data, n + 1);
    if (num != n + 1)
      return NULL;

    /* newline? */
    if (data[n] == '\n') {
      GstBuffer *buf = gst_buffer_new_allocate (NULL, n + 1, NULL);

      gst_bytestream_peek_bytes (filter->bs, &data, n);
      gst_buffer_fill (buf, 0, data, n);
      gst_buffer_memset (buf, n, '\0', 1);
      gst_bytestream_flush_fast (filter->bs, n + 1);

      return buf;
    }
  }
}

static void
gst_my_filter_loopfunc (GstElement *element)
{
  GstMyFilter *filter = GST_MY_FILTER (element);
  GstBuffer *buf;
  GstPad *pad;
  GstMapInfo map;
  gint num, n;

  /* parse header */
  if (filter->firstrun) {
    gchar *padname;
    guint8 id;

    if (!(buf = gst_my_filter_getline (filter))) {
      gst_element_error (element, STREAM, READ, (NULL),
             ("Stream contains no header"));
      return;
    }
    gst_buffer_extract (buf, 0, &id, 1);
    num = atoi (id);
    gst_buffer_unref (buf);

    /* for each of the streams, create a pad */
    for (n = 0; n < num; n++) {
      padname = g_strdup_printf ("src_%u", n);
      pad = gst_pad_new_from_static_template (src_factory, padname);
      g_free (padname);

      /* here, you would set _event () and _query () functions */

      /* need to activate the pad before adding */
      gst_pad_set_active (pad, TRUE);

      gst_element_add_pad (element, pad);
      filter->srcpadlist = g_list_append (filter->srcpadlist, pad);
    }
  }

  /* and now, simply parse each line and push over */
  if (!(buf = gst_my_filter_getline (filter))) {
    GstEvent *event = gst_event_new (GST_EVENT_EOS);
    GList *padlist;

    for (padlist = srcpadlist;
         padlist != NULL; padlist = g_list_next (padlist)) {
      pad = GST_PAD (padlist->data);
      gst_pad_push_event (pad, gst_event_ref (event));
    }
    gst_event_unref (event);
    /* pause the task here */
    return;
  }

  /* parse stream number and go beyond the ':' in the data */
  gst_buffer_map (buf, &map, GST_MAP_READ);
  num = atoi (map.data[0]);
  if (num >= 0 && num < g_list_length (filter->srcpadlist)) {
    pad = GST_PAD (g_list_nth_data (filter->srcpadlist, num);

    /* magic buffer parsing foo */
    for (n = 0; map.data[n] != ':' &&
                map.data[n] != '\0'; n++) ;
    if (map.data[n] != '\0') {
      GstBuffer *sub;

      /* create region copy that starts right past the space. The reason
       * that we don't just forward the data pointer is because the
       * pointer is no longer the start of an allocated block of memory,
       * but just a pointer to a position somewhere in the middle of it.
       * That cannot be freed upon disposal, so we'd either crash or have
       * a memleak. Creating a region copy is a simple way to solve that. */
      sub = gst_buffer_copy_region (buf, GST_BUFFER_COPY_ALL,
          n + 1, map.size - n - 1);
      gst_pad_push (pad, sub);
    }
  }
  gst_buffer_unmap (buf, &map);
  gst_buffer_unref (buf);
}

ote that we use a lot of checks everywhere to make sure that the content in the file is valid. This has two purposes: first, the file could be erroneous, in which case we prevent a crash. The second and most important reason is that - in extreme cases - the file could be used maliciously to cause undefined behaviour in the plugin, which might lead to security issues. Always assume that the file could be used to do bad things.

Request pads

“Request” pads are similar to sometimes pads, except that request are created on demand of something outside of the element rather than something inside the element. This concept is often used in muxers, where - for each elementary stream that is to be placed in the output system stream - one sink pad will be requested. It can also be used in elements with a variable number of input or outputs pads, such as the tee (multi-output) or input-selector (multi-input) elements.

To implement request pads, you need to provide a padtemplate with a GST_PAD_REQUEST presence and implement therequest_new_pad virtual method in GstElement. To clean up, you will need to implement the release_padvirtual method.

static GstPad * gst_my_filter_request_new_pad   (GstElement     *element,
                         GstPadTemplate *templ,
                                                 const gchar    *name,
                                                 const GstCaps  *caps);

static void gst_my_filter_release_pad (GstElement *element,
                                       GstPad *pad);

static GstStaticPadTemplate sink_factory =
GST_STATIC_PAD_TEMPLATE (
  "sink_%u",
  GST_PAD_SINK,
  GST_PAD_REQUEST,
  GST_STATIC_CAPS ("ANY")
);

static void
gst_my_filter_class_init (GstMyFilterClass *klass)
{
  GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
[..]
  gst_element_class_add_pad_template (klass,
    gst_static_pad_template_get (&sink_factory));
[..]
  element_class->request_new_pad = gst_my_filter_request_new_pad;
  element_class->release_pad = gst_my_filter_release_pad;
}

static GstPad *
gst_my_filter_request_new_pad (GstElement     *element,
                   GstPadTemplate *templ,
                   const gchar    *name,
                               const GstCaps  *caps)
{
  GstPad *pad;
  GstMyFilterInputContext *context;

  context = g_new0 (GstMyFilterInputContext, 1);
  pad = gst_pad_new_from_template (templ, name);
  gst_pad_set_element_private (pad, context);

  /* normally, you would set _chain () and _event () functions here */

  gst_element_add_pad (element, pad);

  return pad;
}

static void
gst_my_filter_release_pad (GstElement *element,
                           GstPad *pad)
{
  GstMyFilterInputContext *context;

  context = gst_pad_get_element_private (pad);
  g_free (context);

  gst_element_remove_pad (element, pad);
}

Different scheduling modes

The scheduling mode of a pad defines how data is retrieved from (source) or given to (sink) pads. GStreamer can operate in two scheduling mode, called push- and pull-mode. GStreamer supports elements with pads in any of the scheduling modes where not all pads need to be operating in the same mode.

So far, we have only discussed _chain ()-operating elements, i.e. elements that have a chain-function set on their sink pad and push buffers on their source pad(s). We call this the push-mode because a peer element will use gst_pad_push () on a srcpad, which will cause our _chain ()-function to be called, which in turn causes our element to push out a buffer on the source pad. The initiative to start the dataflow happens somewhere upstream when it pushes out a buffer and all downstream elements get scheduled when their _chain ()-functions are called in turn.

Before we explain pull-mode scheduling, let's first understand how the different scheduling modes are selected and activated on a pad.

The pad activation stage

During the element state change of READY->PAUSED, the pads of an element will be activated. This happens first on the source pads and then on the sink pads of the element. GStreamer calls the _activate () of a pad. By default this function will activate the pad in push-mode by calling gst_pad_activate_mode () with the GST_PAD_MODE_PUSH scheduling mode. It is possible to override the _activate () of a pad and decide on a different scheduling mode. You can know in what scheduling mode a pad is activated by overriding the_activate_mode ()-function.

GStreamer allows the different pads of an element to operate in different scheduling modes. This allows for many different possible use-cases. What follows is an overview of some typical use-cases.

  • If all pads of an element are activated in push-mode scheduling, the element as a whole is operating in push-mode. For source elements this means that they will have to start a task that pushes out buffers on the source pad to the downstream elements. Downstream elements will have data pushed to them by upstream elements using the sinkpads _chain ()-function which will push out buffers on the source pads. Prerequisites for this scheduling mode are that a chain-function was set for each sinkpad usinggst_pad_set_chain_function () and that all downstream elements operate in the same mode.

  • Alternatively, sinkpads can be the driving force behind a pipeline by operating in pull-mode, while the sourcepads of the element still operate in push-mode. In order to be the driving force, those pads start a GstTask when they are activated. This task is a thread, which will call a function specified by the element. When called, this function will have random data access (through gst_pad_pull_range ()) over all sinkpads, and can push data over the sourcepads, which effectively means that this element controls data flow in the pipeline. Prerequisites for this mode are that all downstream elements can act in push mode, and that all upstream elements operate in pull-mode (see below).

    Source pads can be activated in PULL mode by a downstream element when they return GST_PAD_MODE_PULL from the GST_QUERY_SCHEDULING query. Prerequisites for this scheduling mode are that a getrange-function was set for the source pad using gst_pad_set_getrange_function ().

  • Lastly, all pads in an element can be activated in PULL-mode. However, contrary to the above, this does not mean that they start a task on their own. Rather, it means that they are pull slave for the downstream element, and have to provide random data access to it from their _get_range ()-function. Requirements are that the a _get_range ()-function was set on this pad using the functiongst_pad_set_getrange_function (). Also, if the element has any sinkpads, all those pads (and thereby their peers) need to operate in PULL access mode, too.

    When a sink element is activated in PULL mode, it should start a task that calls gst_pad_pull_range () on its sinkpad. It can only do this when the upstream SCHEDULING query returns support for the GST_PAD_MODE_PULL scheduling mode.

In the next two sections, we will go closer into pull-mode scheduling (elements/pads driving the pipeline, and elements/pads providing random access), and some specific use cases will be given.

Pads driving the pipeline

Sinkpads operating in pull-mode, with the sourcepads operating in push-mode (or it has no sourcepads when it is a sink), can start a task that will drive the pipeline data flow. Within this task function, you have random access over all of the sinkpads, and push data over the sourcepads. This can come in useful for several different kinds of elements:

  • Demuxers, parsers and certain kinds of decoders where data comes in unparsed (such as MPEG-audio or video streams), since those will prefer byte-exact (random) access from their input. If possible, however, such elements should be prepared to operate in push-mode mode, too.

  • Certain kind of audio outputs, which require control over their input data flow, such as the Jack sound server.

First you need to perform a SCHEDULING query to check if the upstream element(s) support pull-mode scheduling. If that is possible, you can activate the sinkpad in pull-mode. Inside the activate_mode function you can then start the task.

#include "filter.h"
#include <string.h>

static gboolean gst_my_filter_activate      (GstPad      * pad,
                                             GstObject   * parent);
static gboolean gst_my_filter_activate_mode (GstPad      * pad,
                                             GstObject   * parent,
                                             GstPadMode    mode,
                         gboolean      active);
static void gst_my_filter_loop      (GstMyFilter * filter);

G_DEFINE_TYPE (GstMyFilter, gst_my_filter, GST_TYPE_ELEMENT);


static void
gst_my_filter_init (GstMyFilter * filter)
{

[..]

  gst_pad_set_activate_function (filter->sinkpad, gst_my_filter_activate);
  gst_pad_set_activatemode_function (filter->sinkpad,
      gst_my_filter_activate_mode);


[..]
}

[..]

static gboolean
gst_my_filter_activate (GstPad * pad, GstObject * parent)
{
  GstQuery *query;
  gboolean pull_mode;

  /* first check what upstream scheduling is supported */
  query = gst_query_new_scheduling ();

  if (!gst_pad_peer_query (pad, query)) {
    gst_query_unref (query);
    goto activate_push;
  }

  /* see if pull-mode is supported */
  pull_mode = gst_query_has_scheduling_mode_with_flags (query,
      GST_PAD_MODE_PULL, GST_SCHEDULING_FLAG_SEEKABLE);
  gst_query_unref (query);

  if (!pull_mode)
    goto activate_push;

  /* now we can activate in pull-mode. GStreamer will also
   * activate the upstream peer in pull-mode */
  return gst_pad_activate_mode (pad, GST_PAD_MODE_PULL, TRUE);

activate_push:
  {
    /* something not right, we fallback to push-mode */
    return gst_pad_activate_mode (pad, GST_PAD_MODE_PUSH, TRUE);
  }
}

static gboolean
gst_my_filter_activate_pull (GstPad    * pad,
                 GstObject * parent,
                 GstPadMode  mode,
                 gboolean    active)
{
  gboolean res;
  GstMyFilter *filter = GST_MY_FILTER (parent);

  switch (mode) {
    case GST_PAD_MODE_PUSH:
      res = TRUE;
      break;
    case GST_PAD_MODE_PULL:
      if (active) {
        filter->offset = 0;
        res = gst_pad_start_task (pad,
            (GstTaskFunction) gst_my_filter_loop, filter, NULL);
      } else {
        res = gst_pad_stop_task (pad);
      }
      break;
    default:
      /* unknown scheduling mode */
      res = FALSE;
      break;
  }
  return res;
}

Once started, your task has full control over input and output. The most simple case of a task function is one that reads input and pushes that over its source pad. It's not all that useful, but provides some more flexibility than the old push-mode case that we've been looking at so far.

#define BLOCKSIZE 2048

    static void
    gst_my_filter_loop (GstMyFilter * filter)
    {
      GstFlowReturn ret;
      guint64 len;
      GstBuffer *buf = NULL;

      if (!gst_pad_query_duration (filter->sinkpad, GST_FORMAT_BYTES, &len)) {
        GST_DEBUG_OBJECT (filter, "failed to query duration, pausing");
        goto stop;
      }

       if (filter->offset >= len) {
        GST_DEBUG_OBJECT (filter, "at end of input, sending EOS, pausing");
        gst_pad_push_event (filter->srcpad, gst_event_new_eos ());
        goto stop;
      }

      /* now, read BLOCKSIZE bytes from byte offset filter->offset */
      ret = gst_pad_pull_range (filter->sinkpad, filter->offset,
          BLOCKSIZE, &buf);

      if (ret != GST_FLOW_OK) {
        GST_DEBUG_OBJECT (filter, "pull_range failed: %s", gst_flow_get_name (ret));
        goto stop;
      }

      /* now push buffer downstream */
      ret = gst_pad_push (filter->srcpad, buf);

      buf = NULL; /* gst_pad_push() took ownership of buffer */

      if (ret != GST_FLOW_OK) {
        GST_DEBUG_OBJECT (filter, "pad_push failed: %s", gst_flow_get_name (ret));
        goto stop;
      }

      /* everything is fine, increase offset and wait for us to be called again */
      filter->offset += BLOCKSIZE;
      return;

    stop:
      GST_DEBUG_OBJECT (filter, "pausing task");
      gst_pad_pause_task (filter->sinkpad);
    }

 

 

 

 

 

#define BLOCKSIZE 2048

    static void
    gst_my_filter_loop (GstMyFilter * filter)
    {
      GstFlowReturn ret;
      guint64 len;
      GstBuffer *buf = NULL;

      if (!gst_pad_query_duration (filter->sinkpad, GST_FORMAT_BYTES, &len)) {
        GST_DEBUG_OBJECT (filter, "failed to query duration, pausing");
        goto stop;
      }

       if (filter->offset >= len) {
        GST_DEBUG_OBJECT (filter, "at end of input, sending EOS, pausing");
        gst_pad_push_event (filter->srcpad, gst_event_new_eos ());
        goto stop;
      }

      /* now, read BLOCKSIZE bytes from byte offset filter->offset */
      ret = gst_pad_pull_range (filter->sinkpad, filter->offset,
          BLOCKSIZE, &buf);

      if (ret != GST_FLOW_OK) {
        GST_DEBUG_OBJECT (filter, "pull_range failed: %s", gst_flow_get_name (ret));
        goto stop;
      }

      /* now push buffer downstream */
      ret = gst_pad_push (filter->srcpad, buf);

      buf = NULL; /* gst_pad_push() took ownership of buffer */

      if (ret != GST_FLOW_OK) {
        GST_DEBUG_OBJECT (filter, "pad_push failed: %s", gst_flow_get_name (ret));
        goto stop;
      }

      /* everything is fine, increase offset and wait for us to be called again */
      filter->offset += BLOCKSIZE;
      return;

    stop:
      GST_DEBUG_OBJECT (filter, "pausing task");
      gst_pad_pause_task (filter->sinkpad);
    }
上一篇:编译gstreamer for android


下一篇:【Nvidia DeepStream. 001】DeepStream-test1样例 逐行讲解版,原来竟如此简单