Skip to main content
Participant
November 13, 2011
Question

When FMLE stopped,Remote RTMP stream to FMS 4.5 with rtmfp?

  • November 13, 2011
  • 1 reply
  • 4866 views

When FMLE stopped,Remote RTMP stream to FMS 4.5 with rtmfp?

edit  "applications/multicast/main.asc" ?

HELP ME !!! THANKS!!!

/**
* File: main.asc
* --------------
* The server-side portion of the multicast sample application.
* This app accepts publish and unpublish requests from FMLE, and republishes
* the live stream from FMLE into a target Flash Group.
*/

////////////////////////////////////////////////////////////////////////////////
//
// General Constants
//
////////////////////////////////////////////////////////////////////////////////
// "Constants" representing multicast event types.
var TYPE_FUSION = 1;
var TYPE_IP = 2;
var TYPE_P2P = 3;
////////////////////////////////////////////////////////////////////////////////
//
// StreamContext Description, Constants and Functions
//
////////////////////////////////////////////////////////////////////////////////
/**
* Type: StreamContext
* -------------------
* This application tracks the context for live streams published to the server
* that are being republished into a Flash Group. The StreamContext "type" used
* for this is just an Object containing the following members:
*
*   client         - The encoding/publishing client.
*   streamName     - The source Stream name as published by the client.
*   type           - The multicast event type.
*   groupspec      - The groupspec identifying the Flash Group and capabilities.
*   address        - IP multicast address (optional for pure P2P events).
*   netConnection  - A loopback NetConnection used for the mcastNetStream.
*   mcastNetStream - The NetStream used to republish the source Stream into
*                    the Flash Group.
*   netGroup       - An (optional) NetGroup handle for the target Group.
*                    Only present for Fusion or P2P events.
*   state          - One of the state constants defined immediately below
*                    this comment.
*/

var STATE_INIT            = 0; // Starting state for a StreamContext.
var STATE_CONNECTING      = 1; // Establishing loop-back connection.
var STATE_CONNECTED       = 2; // Connection established.
var STATE_PUBLISH_PENDING = 3; // Attempting to publish.
var STATE_REPUBLISHING    = 4; // Actively republishing to multicast.
var STATE_UNPUBLISHING    = 5; // Shutting down multicast republish.
var STATE_UNPUBLISHED     = 6; // Unpublished successfully.
var STATE_DISCONNECTING   = 7; // Shutting down loopback connection.
var STATE_DISCONNECTED    = 8; // Connection shut down. Done.
/**
* Registers a source Stream published by the specified client, along with the
* context for the multicast event, as a StreamContext Object.
*
* @param client - The Client publishing the stream.
* @param streamName - The source Stream name.
* @param params - The parameters resulting from parsing the source Stream's
*                 query string.
* @return The new StreamContext Object for the registered Stream.
*/

function registerStream(client, streamName, params)
{
    var streamContext = { "client": client,
                          "streamName": streamName,
                          "type": params["fms.multicast.type"],
                          "groupspec": params["fms.multicast.groupspec"] };
if (params["fms.multicast.interface"])
  streamContext["interfaceAddress"] = params["fms.multicast.interface"];
    if (params["fms.multicast.address"])
        streamContext["address"] = params["fms.multicast.address"],
    streamContext.state = STATE_INIT;
    updateStreamContextLookups(streamContext);
    trace("Registered multicast context for source stream: " + streamName);
    return streamContext;
}
/**
* Updates the indexed lookups installed for the passed StreamContext Object
* with the application.
*
* @param streamContext - The StreamContext Object to (re)index.
*/

function updateStreamContextLookups(streamContext)
{
    application.streamTable[streamContext.streamName] = streamContext;
    if (streamContext.netConnection)
        application.netConnTable[streamContext.netConnection] = streamContext;
    if (streamContext.mcastNetStream)
        application.mcastNetStreamTable[streamContext.mcastNetStream] = streamContext;
    if (streamContext.netGroup)
        application.netGroupTable[streamContext.netGroup] = streamContext;
}
/**
* Provides access to the StreamContext Object for a registered source Stream
* by name.
*
* @param streamName - A registered source Stream name.
* @return The associated StreamContext Object; undefined if the source Stream
*         name is not registered.
*/

function getStreamContextForSourceStream(streamName)
{
    return application.streamTable[streamName];
}
/**
* Provides access to the StreamContext Object for a given server-side
* NetConnection hosting a multicast NetStream.
*
* @param netConnection - A server-side NetConnection.
* @return The associated StreamContext Object; undefined if the passed
*         NetConnection is not indexed to a StreamContext.
*/

function getStreamContextForNetConnection(netConnection)
{
    return application.netConnTable[netConnection];
}
/**
* Provides access to the StreamContext Object for a given multicast NetStream.
*
* @param netStream - A multicast NetStream.
* @return The associated StreamContext Object; undefined if the passed
*         NetStream is not indexed to a StreamContext.
*/

function getStreamContextForMulticastNetStream(netStream)
{
    return application.mcastNetStreamTable[netStream];
}
/**
* Provides access to the StreamContext Object for a given NetGroup associated
* with a multicast NetStream.
*
* @param netGroup - A NetGroup.
* @return The associated StreamContext Object; undefined if the passed
*         NetGroup is not indexed to a StreamContext.
*/

function getStreamContextForNetGroup(netGroup)
{
    return application.netGroupTable[netGroup];
}
/**
* Unregisters the StreamContext from the application.
*
* @param streamContext - The StreamContext Object to unregister.
*/

function unregisterStreamContext(streamContext)
{
    if (streamContext.netConnection)
        delete application.netConnTable[streamContext.netConnection];
    if (streamContext.mcastNetStream)
        delete application.mcastNetStreamTable[streamContext.mcastNetStream];
    if (streamContext.netGroup)
        delete application.netGroupTable[streamContext.netGroup];
    trace("Unregistered multicast context for source stream: " +
          streamContext.streamName);
}
////////////////////////////////////////////////////////////////////////////////
//
// Application callback functions
//
////////////////////////////////////////////////////////////////////////////////
/**
* Initializes global StreamContext lookup tables.
*/

application.onAppStart = function()
{
    application.streamTable = {};
    application.netConnTable = {};
    application.mcastNetStreamTable = {};
    application.netGroupTable = {};
}
/**
* Handles a publish event for the application by validating the request
* and bridging the published stream into a target Flash Group. Invalid
* publish requests are ignored and the publishing client's connection
* is closed.
*
* @param client - The publishing client.
* @param stream - The published stream.
*/

application.onPublish = function(client, stream)
{
    //trace("Handling publish request for source stream: " + stream.name);
    var params = parseQueryString(stream.publishQueryString);
    if (!validateStreamParams(params))
    {
        application.disconnect(client);
        return;
    }
    var prevContext = getStreamContextForSourceStream(stream.name);
    if (prevContext)
    {
        forceCloseStreamContext(prevContext);
    }
    // Register source Stream, and kick off the async process that will
    // eventually wire-up the associated multicast NetStream.
    var streamContext = registerStream(client, stream.name, params);
    openMulticastConnection(streamContext);
}
/**
* Handles an unpublish event for the application by shutting down
* any associated multicast NetStream.
*
* @param client - The unpublishing client.
* @param stream - The source stream being unpublished.
*/

application.onUnpublish = function(client, stream)
{
    trace("Handling unpublish request for source stream: " + stream.name);
    var streamContext = getStreamContextForSourceStream(stream.name);
    if (streamContext && (streamContext.state <= STATE_REPUBLISHING))
        destroyStreamContext(streamContext);
}
////////////////////////////////////////////////////////////////////////////////
//
// Callback functions for NetConnection and multicast NetStream/NetGroup wiring.
//
////////////////////////////////////////////////////////////////////////////////
/**
* First step in setting up a republished multicast NetStream; open the loopback
* connection it requires.
*
* @param streamContext - The StreamContext Object for the publish event.
*/

function openMulticastConnection(streamContext)
{
    var nc = new NetConnection();
    nc.onStatus = netConnectionStatusHandler;
    streamContext.netConnection = nc;
    updateStreamContextLookups(streamContext);
    streamContext.state = STATE_CONNECTING;
    nc.connect(resetUriProtocol(streamContext.client.uri, "rtmfp"));
}
/**
* Status event handler for the loopback NetConnection used by the multicast
* NetStream. Advances setup upon successful connection, or triggers or advances
* tear-down as a result of connection loss or an unpublish and clean shutdown.
*
* @param info - The status info Object.
*/

function netConnectionStatusHandler(info)
{
    var streamContext = getStreamContextForNetConnection(this);
    trace("Multicast NetConnection Status: " + info.code +
          (streamContext ? ", Source stream: " + streamContext.streamName : ", Not associated with a source stream."));
    if (streamContext)
    {
        switch (info.code)
        {
        case "NetConnection.Connect.Success":
            streamContext.state = STATE_CONNECTED;
            // If event type is Fusion or P2p, wire up a NetGroup for neighbor
            // bootstrapping and maintenance ahead of (re)publishing the stream.
            var type = streamContext.type;
            if (type == TYPE_FUSION || type == TYPE_P2P)
                initNetGroup(streamContext);
            else
                initMulticastNetStream(streamContext);
            break;
        case "NetConnection.Connect.Failed":
        case "NetConnection.Connect.Rejected":
        case "NetConnection.Connect.AppShutdown":
            trace("MULTICAST PUBLISH ERROR: Failed to establish server-side NetConnection for use by multicast NetStream. " +
                  "Status code: " + info.code + ", description: " + info.description + ", Source stream: " +
                  streamContext.streamName);
            streamContext.state = STATE_DISCONNECTED;
            destroyStreamContext(streamContext);
            break;
        case "NetConnection.Connect.Closed":
            if (streamContext.state < STATE_DISCONNECTING)
            {
                trace("MULTICAST PUBLISH ERROR: Unexpected server-side NetConnection close. " +
                     "Status code: " + info.code + ", description: " + info.description + ", Source stream: " +
                     streamContext.streamName);
            }
            streamContext.state = STATE_DISCONNECTED;
            destroyStreamContext(streamContext);
            break;
        default:
            // Ignore.
        }
    }
}
/**
* Initializes the multicast NetGroup following a successful connection of its
* underlying loopback NetConnection. This hook is optional and only runs for
* event types of Fusion and pure P2P.
*
* @param streamContext - The StreamContext Object for the multicast publish.
*/

function initNetGroup(streamContext)
{
    var ng = null;
    try
    {
        ng = new NetGroup(streamContext.netConnection, streamContext.groupspec);
    }
    catch (e)
    {
        trace("MULTICAST PUBLISH ERROR: Failed to construct NetGroup. Error: "
              + e.name + (e.message ? " " + e.message : "") +
              ", Source stream: " + streamContext.streamName);
        destroyStreamContext(streamContext);
        return;
    }
    ng.onStatus = netGroupStatusHandler;
    streamContext.netGroup = ng;
    updateStreamContextLookups(streamContext);
}
/**
* Status event handler for the multicast NetGroup. Advances to initializing the
* multicast NetStream upon successful NetGroup connect. Otherwise, triggers
* shut down.
*
* @param info - The status info Object.
*/

function netGroupStatusHandler(info)
{
    var streamContext = getStreamContextForNetGroup(this);
    trace("Multicast NetGroup Status: " + info.code +
          (streamContext ? ", Source stream: " + streamContext.streamName : ", Not associated with a source stream."))
    if (streamContext)
    {
        switch (info.code)
        {
        case "NetGroup.Connect.Success":
            initMulticastNetStream(streamContext);
            break;
        case "NetGroup.Connect.Failed":
        case "NetGroup.Connect.Rejected":
            trace("MULTICAST PUBLISH ERROR: Failed to connect multicast NetGroup. " +
                  "Status code: " + info.code + ", description: " + info.description +
                  ", Source stream: " + streamContext.streamName);
            destroyStreamContext(streamContext);
            break;
        case "NetGroup.MulticastStream.UnpublishNotify":
            // At this point, multicast publishers will be notified;
            // continue shut down.
            destroyStreamContext(streamContext);
            break;
        default:
            // Ignore.
        }
    }
}
/**
* Initializes the multicast NetStream following a successful connection of its
* underlying loopback NetConnection.
*
* @param streamContext - The StreamContext Object for the multicast publish.
*/

function initMulticastNetStream(streamContext)
{
    var ns = null;
    try
    {
        ns = new NetStream(streamContext.netConnection, streamContext.groupspec);
    }
    catch (e)
    {
        trace("MULTICAST PUBLISH ERROR: Failed to construct multicast NetStream. Error: " +
              e.name + (e.message ? " " + e.message : "") +
              ", Source stream: " + streamContext.streamName);
        destroyStreamContext(streamContext);
        return;
    }
    var type = streamContext.type;
    if (type == TYPE_FUSION || type == TYPE_IP)
    {
  var iAddr = (streamContext.interfaceAddress) ? streamContext.interfaceAddress : null;
 
        try
        {
            trace("Multicast NetStream will publish to IP address: " + streamContext.address +
      " on interface address: " + ((iAddr) ? iAddr : "default") +
                  ", Source stream: " + streamContext.streamName);
            ns.setIPMulticastPublishAddress(streamContext.address, iAddr);
        }
        catch (e2)
        {
            trace("MULTICAST PUBLISH ERROR: Failed to assign IP multicast address and port for publishing. Address: "
                  + streamContext.address + " on interface address: " + ((iAddr) ? iAddr : "default") +
      ", Source stream: " + streamContext.streamName);
            destroyStreamContext(streamContext);
            return;
        }
    }
    ns.onStatus = netStreamStatusHandler;
    streamContext.mcastNetStream = ns;
    updateStreamContextLookups(streamContext);
    streamContext.state = STATE_PUBLISH_PENDING;
}
/**
* Status event handler for the multicast NetStream. Advances state upon successful
* connect and publish, or upon successful unpublish. Triggers tear-down if we fail
* to attach to a source Stream to republish.
*
* @param info - The status info Object.
*/

function netStreamStatusHandler(info)
{
    var streamContext = getStreamContextForMulticastNetStream(this);
    trace("Multicast NetStream Status: " + info.code +
          (streamContext ? ", Source stream: " + streamContext.streamName : ", Not associated with a source stream."))
    if (streamContext)
    {
        switch (info.code)
        {
        case "NetStream.Connect.Success":
            if (!this.attach(Stream.get(streamContext.streamName)))
            {
                trace("MULTICAST PUBLISH ERROR: Failed to attach multicast NetStream to source. Source stream: " +
                      streamContext.streamName);
                destroyStreamContext(streamContext);
    //var stream;
            //stream = Stream.get("liveStream");
                //return;
            }else{
            this.publish(streamContext.streamName, "live");
   }
            break;
        case "NetStream.Publish.Start":
            streamContext.state = STATE_REPUBLISHING;
            break;
        case "NetStream.Unpublish.Success":
            streamContext.state = STATE_UNPUBLISHED;
            // Wait for unpublish notify event if the context has a NetGroup;
            // otherwise continue shut down now.
            if (!streamContext.netGroup)
            {
                destroyStreamContext(streamContext);
                break;
            }
        default:
            // Ignore.
        }
    }
}
/**
* The common tear-down hook. Other functions that manage or shut down
* the StreamContext Object delegate to this function upon detecting a fatal
* error or during shut down.
*
* @param streamContext - The StreamContext Object for the source Stream and
*                        (potentially wired-up) multicast NetStream.
*/

function destroyStreamContext(streamContext)
{
    // Unregister by Stream name immediately; lookups by NetConnection, NetGroup
    // and multicast NetStream remain in place until tear-down is complete.
    delete application.streamTable[streamContext.streamName];
    switch (streamContext.state)
    {
    case STATE_REPUBLISHING:
        streamContext.mcastNetStream.attach(false);
        streamContext.mcastNetStream.publish(false);
        streamContext.state = STATE_UNPUBLISHING;
        return;
    case STATE_CONNECTING:
    case STATE_CONNECTED:
    case STATE_PUBLISH_PENDING:
    case STATE_UNPUBLISHED:
        // Delete status handler callbacks and cleanup in case we arrived here
        // as a result of a force close.
        if (streamContext.netGroup)
            delete streamContext.netGroup.onStatus;
        if (streamContext.mcastNetStream)
        {
            streamContext.mcastNetStream.attach(false);
            delete streamContext.mcastNetStream.onStatus;
        }
        streamContext.netConnection.close();
        streamContext.state = STATE_DISCONNECTING;
        return;
    default:
        // Fall-through.
    }
    // At this point, we either never got to the republishing state or we've
    // proceeded through the clean shut down steps above. Everything for this
    // StreamContext can go away.
    unregisterStreamContext(streamContext);
}
/**
* Utility function used to force close a StreamContext in the event that we
* start handling a republish of a Source stream before the context for its
* prior incarnation has been torn down.
*
* @param streamContext - The StreamContext Object for the source Stream.
*/

function forceCloseStreamContext(streamContext)
{
    trace("Force closing previous multicast context for source stream: " + stream.name);
    prevContext.state = STATE_UNPUBLISHED;
    destroyStreamContext(prevContext);
}
////////////////////////////////////////////////////////////////////////////////
//
// Client callback functions
//
////////////////////////////////////////////////////////////////////////////////
/**
* A no-op. Answers the RPC in the fashion expected by encoders, but the real
* work happens in application.onPublish.
*
* @param streamName - The name of the stream being published.
*/

Client.prototype.FCPublish = function(streamName)
{
    this.call("onFCPublish",
              null,
              {code:"NetStream.Publish.Start", description:streamName});
}
/**
* A no-op. Answers the RPC in the fashion expected by encoders, but the real
* work happens in application.onUnpublish.
*
* @param streamName - The name of the stream being unpublished.
*/

Client.prototype.FCUnpublish = function(streamName)
{
    this.call("onFCUnpublish",
              null,
              {code:"NetStream.Unpublish.Success", description:streamName});
}
/**
* If the client invoker's ip matches what was captured for a currently publishing
* stream, assume it's the same client and reset the stream. Otherwise, ignore.
*
* @param streamName - The name of the stream being released.
*/

Client.prototype.releaseStream = function(streamName)
{
    var streamContext = getStreamContextForSourceStream(streamName);
    if (streamContext &&
        (streamContext.client.ip == this.ip) &&
        (streamContext.state <= STATE_REPUBLISHING))
    {
        // Only tear-down an orphaned stream if it's not
        // already shutting down (see state check above).
        destroyStreamContext(streamContext);
    }
}
////////////////////////////////////////////////////////////////////////////////
//
// Helper functions
//
////////////////////////////////////////////////////////////////////////////////
/**
* Validates that a newly published stream has correct metadata (e.g. query
* string parameters) to republish into a Flash Group. This function also
* writes a message to the application log for any validation failures.
*
* @param params - The quiery string parameters for the source Stream.
* @return true if valid; otherwise false.
*/

function validateStreamParams(params)
{
    var empty = true;
    for (var param in params)
    {
       empty = false;
       break;
    }
    if (empty)
    {
        trace("MULTICAST PUBLISH ERROR: Stream query string is empty.");
        return false;
    }
    if (!params["fms.multicast.type"])
    {
trace("MULTICAST PUBLISH ERROR: Stream query string does not specify a 'fms.multicast.type'.");
        return false;
    }
    var type = params["fms.multicast.type"];
    if (type != 1 && type != 2 && type != 3)
    {
        trace("MULTICAST PUBLISH ERROR: 'fms.multicast.type' has invalid value: " + type);
        return false;
    }
    if (!params["fms.multicast.groupspec"])
    {
        trace("MULTICAST PUBLISH ERROR: Stream query string does not specify a 'fms.multicast.groupspec'.");
        return false;
    }
    // Fusion and IP require an address:port.
    if ((type == 1 || type == 2) &&
        !params["fms.multicast.address"])
    {
        trace("MULTICAST PUBLISH ERROR: Stream query string does not specify a 'fms.multicast.address'.");
        return false;
    }
    // No obvious validation issues.
    return true;
}
/**
* Parses the supplied query string, and if valid, returns an Object populated
* with the name-value pairs contained in the query string. The simple processing
* here does not preserve multiple name-value pairings having the same name; the
* last value seen wins. Parameters with no value are mapped to "" (empty String)
* in the returned Object.
*
* @param queryString - A query string portion of a URI, not including the leading
*                     '?' character.
* @return An Object containing a key-value mapping for each name-value parameter
*         defined in the query string; Object is empty if the query string is
*         invalid.
*/

function parseQueryString(queryString)
{
    var result = {};
    var decoded = "";
    try
    {
        decoded = decodeURIComponent(queryString);
    }
    catch (e) // Invalid URI component; return empty result.
    {
        return result;
    }
    if (decoded.length)
    {
        var params = decoded.split('&');
        for (var i in params)
        {
            var pair = params;
     var sepIndex = pair.indexOf('=');
            if (sepIndex != -1)
            {
                var name = pair.substr(0, sepIndex);
                result[name] = pair.substr(sepIndex + 1);
            }
            else
            {
                result[pair] = "";
            }
        }
    }
    return result;
}
/**
* Utility function used to swap out the protocol (scheme) portion
* of a given URI with an alternate.
*
* @param uri - The full URI.
* @param desiredProtocol - The replacement protocol.
* @return The URI with its protocol replaced.
*/

function resetUriProtocol(uri, desiredProtocol)
{
    var sepIndex = uri.indexOf("://");
    return desiredProtocol + uri.substr(sepIndex);
}

    This topic has been closed for replies.

    1 reply

    Participant
    November 15, 2011

    HELP ME !!! THANKS!!!