When FMLE stopped,Remote RTMP stream to FMS 4.5 with rtmfp?
When FMLE stopped,Remote RTMP stream to FMS 4.5 with rtmfp?
edit "applications/multicast/main.asc" ?
HELP ME !!! THANKS!!!
/**
* File: main.asc
* --------------
* The server-side portion of the multicast sample application.
* This app accepts publish and unpublish requests from FMLE, and republishes
* the live stream from FMLE into a target Flash Group.
*/
////////////////////////////////////////////////////////////////////////////////
//
// General Constants
//
////////////////////////////////////////////////////////////////////////////////
// "Constants" representing multicast event types.
var TYPE_FUSION = 1;
var TYPE_IP = 2;
var TYPE_P2P = 3;
////////////////////////////////////////////////////////////////////////////////
//
// StreamContext Description, Constants and Functions
//
////////////////////////////////////////////////////////////////////////////////
/**
* Type: StreamContext
* -------------------
* This application tracks the context for live streams published to the server
* that are being republished into a Flash Group. The StreamContext "type" used
* for this is just an Object containing the following members:
*
* client - The encoding/publishing client.
* streamName - The source Stream name as published by the client.
* type - The multicast event type.
* groupspec - The groupspec identifying the Flash Group and capabilities.
* address - IP multicast address (optional for pure P2P events).
* netConnection - A loopback NetConnection used for the mcastNetStream.
* mcastNetStream - The NetStream used to republish the source Stream into
* the Flash Group.
* netGroup - An (optional) NetGroup handle for the target Group.
* Only present for Fusion or P2P events.
* state - One of the state constants defined immediately below
* this comment.
*/
var STATE_INIT = 0; // Starting state for a StreamContext.
var STATE_CONNECTING = 1; // Establishing loop-back connection.
var STATE_CONNECTED = 2; // Connection established.
var STATE_PUBLISH_PENDING = 3; // Attempting to publish.
var STATE_REPUBLISHING = 4; // Actively republishing to multicast.
var STATE_UNPUBLISHING = 5; // Shutting down multicast republish.
var STATE_UNPUBLISHED = 6; // Unpublished successfully.
var STATE_DISCONNECTING = 7; // Shutting down loopback connection.
var STATE_DISCONNECTED = 8; // Connection shut down. Done.
/**
* Registers a source Stream published by the specified client, along with the
* context for the multicast event, as a StreamContext Object.
*
* @param client - The Client publishing the stream.
* @param streamName - The source Stream name.
* @param params - The parameters resulting from parsing the source Stream's
* query string.
* @return The new StreamContext Object for the registered Stream.
*/
function registerStream(client, streamName, params)
{
var streamContext = { "client": client,
"streamName": streamName,
"type": params["fms.multicast.type"],
"groupspec": params["fms.multicast.groupspec"] };
if (params["fms.multicast.interface"])
streamContext["interfaceAddress"] = params["fms.multicast.interface"];
if (params["fms.multicast.address"])
streamContext["address"] = params["fms.multicast.address"],
streamContext.state = STATE_INIT;
updateStreamContextLookups(streamContext);
trace("Registered multicast context for source stream: " + streamName);
return streamContext;
}
/**
* Updates the indexed lookups installed for the passed StreamContext Object
* with the application.
*
* @param streamContext - The StreamContext Object to (re)index.
*/
function updateStreamContextLookups(streamContext)
{
application.streamTable[streamContext.streamName] = streamContext;
if (streamContext.netConnection)
application.netConnTable[streamContext.netConnection] = streamContext;
if (streamContext.mcastNetStream)
application.mcastNetStreamTable[streamContext.mcastNetStream] = streamContext;
if (streamContext.netGroup)
application.netGroupTable[streamContext.netGroup] = streamContext;
}
/**
* Provides access to the StreamContext Object for a registered source Stream
* by name.
*
* @param streamName - A registered source Stream name.
* @return The associated StreamContext Object; undefined if the source Stream
* name is not registered.
*/
function getStreamContextForSourceStream(streamName)
{
return application.streamTable[streamName];
}
/**
* Provides access to the StreamContext Object for a given server-side
* NetConnection hosting a multicast NetStream.
*
* @param netConnection - A server-side NetConnection.
* @return The associated StreamContext Object; undefined if the passed
* NetConnection is not indexed to a StreamContext.
*/
function getStreamContextForNetConnection(netConnection)
{
return application.netConnTable[netConnection];
}
/**
* Provides access to the StreamContext Object for a given multicast NetStream.
*
* @param netStream - A multicast NetStream.
* @return The associated StreamContext Object; undefined if the passed
* NetStream is not indexed to a StreamContext.
*/
function getStreamContextForMulticastNetStream(netStream)
{
return application.mcastNetStreamTable[netStream];
}
/**
* Provides access to the StreamContext Object for a given NetGroup associated
* with a multicast NetStream.
*
* @param netGroup - A NetGroup.
* @return The associated StreamContext Object; undefined if the passed
* NetGroup is not indexed to a StreamContext.
*/
function getStreamContextForNetGroup(netGroup)
{
return application.netGroupTable[netGroup];
}
/**
* Unregisters the StreamContext from the application.
*
* @param streamContext - The StreamContext Object to unregister.
*/
function unregisterStreamContext(streamContext)
{
if (streamContext.netConnection)
delete application.netConnTable[streamContext.netConnection];
if (streamContext.mcastNetStream)
delete application.mcastNetStreamTable[streamContext.mcastNetStream];
if (streamContext.netGroup)
delete application.netGroupTable[streamContext.netGroup];
trace("Unregistered multicast context for source stream: " +
streamContext.streamName);
}
////////////////////////////////////////////////////////////////////////////////
//
// Application callback functions
//
////////////////////////////////////////////////////////////////////////////////
/**
* Initializes global StreamContext lookup tables.
*/
application.onAppStart = function()
{
application.streamTable = {};
application.netConnTable = {};
application.mcastNetStreamTable = {};
application.netGroupTable = {};
}
/**
* Handles a publish event for the application by validating the request
* and bridging the published stream into a target Flash Group. Invalid
* publish requests are ignored and the publishing client's connection
* is closed.
*
* @param client - The publishing client.
* @param stream - The published stream.
*/
application.onPublish = function(client, stream)
{
//trace("Handling publish request for source stream: " + stream.name);
var params = parseQueryString(stream.publishQueryString);
if (!validateStreamParams(params))
{
application.disconnect(client);
return;
}
var prevContext = getStreamContextForSourceStream(stream.name);
if (prevContext)
{
forceCloseStreamContext(prevContext);
}
// Register source Stream, and kick off the async process that will
// eventually wire-up the associated multicast NetStream.
var streamContext = registerStream(client, stream.name, params);
openMulticastConnection(streamContext);
}
/**
* Handles an unpublish event for the application by shutting down
* any associated multicast NetStream.
*
* @param client - The unpublishing client.
* @param stream - The source stream being unpublished.
*/
application.onUnpublish = function(client, stream)
{
trace("Handling unpublish request for source stream: " + stream.name);
var streamContext = getStreamContextForSourceStream(stream.name);
if (streamContext && (streamContext.state <= STATE_REPUBLISHING))
destroyStreamContext(streamContext);
}
////////////////////////////////////////////////////////////////////////////////
//
// Callback functions for NetConnection and multicast NetStream/NetGroup wiring.
//
////////////////////////////////////////////////////////////////////////////////
/**
* First step in setting up a republished multicast NetStream; open the loopback
* connection it requires.
*
* @param streamContext - The StreamContext Object for the publish event.
*/
function openMulticastConnection(streamContext)
{
var nc = new NetConnection();
nc.onStatus = netConnectionStatusHandler;
streamContext.netConnection = nc;
updateStreamContextLookups(streamContext);
streamContext.state = STATE_CONNECTING;
nc.connect(resetUriProtocol(streamContext.client.uri, "rtmfp"));
}
/**
* Status event handler for the loopback NetConnection used by the multicast
* NetStream. Advances setup upon successful connection, or triggers or advances
* tear-down as a result of connection loss or an unpublish and clean shutdown.
*
* @param info - The status info Object.
*/
function netConnectionStatusHandler(info)
{
var streamContext = getStreamContextForNetConnection(this);
trace("Multicast NetConnection Status: " + info.code +
(streamContext ? ", Source stream: " + streamContext.streamName : ", Not associated with a source stream."));
if (streamContext)
{
switch (info.code)
{
case "NetConnection.Connect.Success":
streamContext.state = STATE_CONNECTED;
// If event type is Fusion or P2p, wire up a NetGroup for neighbor
// bootstrapping and maintenance ahead of (re)publishing the stream.
var type = streamContext.type;
if (type == TYPE_FUSION || type == TYPE_P2P)
initNetGroup(streamContext);
else
initMulticastNetStream(streamContext);
break;
case "NetConnection.Connect.Failed":
case "NetConnection.Connect.Rejected":
case "NetConnection.Connect.AppShutdown":
trace("MULTICAST PUBLISH ERROR: Failed to establish server-side NetConnection for use by multicast NetStream. " +
"Status code: " + info.code + ", description: " + info.description + ", Source stream: " +
streamContext.streamName);
streamContext.state = STATE_DISCONNECTED;
destroyStreamContext(streamContext);
break;
case "NetConnection.Connect.Closed":
if (streamContext.state < STATE_DISCONNECTING)
{
trace("MULTICAST PUBLISH ERROR: Unexpected server-side NetConnection close. " +
"Status code: " + info.code + ", description: " + info.description + ", Source stream: " +
streamContext.streamName);
}
streamContext.state = STATE_DISCONNECTED;
destroyStreamContext(streamContext);
break;
default:
// Ignore.
}
}
}
/**
* Initializes the multicast NetGroup following a successful connection of its
* underlying loopback NetConnection. This hook is optional and only runs for
* event types of Fusion and pure P2P.
*
* @param streamContext - The StreamContext Object for the multicast publish.
*/
function initNetGroup(streamContext)
{
var ng = null;
try
{
ng = new NetGroup(streamContext.netConnection, streamContext.groupspec);
}
catch (e)
{
trace("MULTICAST PUBLISH ERROR: Failed to construct NetGroup. Error: "
+ e.name + (e.message ? " " + e.message : "") +
", Source stream: " + streamContext.streamName);
destroyStreamContext(streamContext);
return;
}
ng.onStatus = netGroupStatusHandler;
streamContext.netGroup = ng;
updateStreamContextLookups(streamContext);
}
/**
* Status event handler for the multicast NetGroup. Advances to initializing the
* multicast NetStream upon successful NetGroup connect. Otherwise, triggers
* shut down.
*
* @param info - The status info Object.
*/
function netGroupStatusHandler(info)
{
var streamContext = getStreamContextForNetGroup(this);
trace("Multicast NetGroup Status: " + info.code +
(streamContext ? ", Source stream: " + streamContext.streamName : ", Not associated with a source stream."))
if (streamContext)
{
switch (info.code)
{
case "NetGroup.Connect.Success":
initMulticastNetStream(streamContext);
break;
case "NetGroup.Connect.Failed":
case "NetGroup.Connect.Rejected":
trace("MULTICAST PUBLISH ERROR: Failed to connect multicast NetGroup. " +
"Status code: " + info.code + ", description: " + info.description +
", Source stream: " + streamContext.streamName);
destroyStreamContext(streamContext);
break;
case "NetGroup.MulticastStream.UnpublishNotify":
// At this point, multicast publishers will be notified;
// continue shut down.
destroyStreamContext(streamContext);
break;
default:
// Ignore.
}
}
}
/**
* Initializes the multicast NetStream following a successful connection of its
* underlying loopback NetConnection.
*
* @param streamContext - The StreamContext Object for the multicast publish.
*/
function initMulticastNetStream(streamContext)
{
var ns = null;
try
{
ns = new NetStream(streamContext.netConnection, streamContext.groupspec);
}
catch (e)
{
trace("MULTICAST PUBLISH ERROR: Failed to construct multicast NetStream. Error: " +
e.name + (e.message ? " " + e.message : "") +
", Source stream: " + streamContext.streamName);
destroyStreamContext(streamContext);
return;
}
var type = streamContext.type;
if (type == TYPE_FUSION || type == TYPE_IP)
{
var iAddr = (streamContext.interfaceAddress) ? streamContext.interfaceAddress : null;
try
{
trace("Multicast NetStream will publish to IP address: " + streamContext.address +
" on interface address: " + ((iAddr) ? iAddr : "default") +
", Source stream: " + streamContext.streamName);
ns.setIPMulticastPublishAddress(streamContext.address, iAddr);
}
catch (e2)
{
trace("MULTICAST PUBLISH ERROR: Failed to assign IP multicast address and port for publishing. Address: "
+ streamContext.address + " on interface address: " + ((iAddr) ? iAddr : "default") +
", Source stream: " + streamContext.streamName);
destroyStreamContext(streamContext);
return;
}
}
ns.onStatus = netStreamStatusHandler;
streamContext.mcastNetStream = ns;
updateStreamContextLookups(streamContext);
streamContext.state = STATE_PUBLISH_PENDING;
}
/**
* Status event handler for the multicast NetStream. Advances state upon successful
* connect and publish, or upon successful unpublish. Triggers tear-down if we fail
* to attach to a source Stream to republish.
*
* @param info - The status info Object.
*/
function netStreamStatusHandler(info)
{
var streamContext = getStreamContextForMulticastNetStream(this);
trace("Multicast NetStream Status: " + info.code +
(streamContext ? ", Source stream: " + streamContext.streamName : ", Not associated with a source stream."))
if (streamContext)
{
switch (info.code)
{
case "NetStream.Connect.Success":
if (!this.attach(Stream.get(streamContext.streamName)))
{
trace("MULTICAST PUBLISH ERROR: Failed to attach multicast NetStream to source. Source stream: " +
streamContext.streamName);
destroyStreamContext(streamContext);
//var stream;
//stream = Stream.get("liveStream");
//return;
}else{
this.publish(streamContext.streamName, "live");
}
break;
case "NetStream.Publish.Start":
streamContext.state = STATE_REPUBLISHING;
break;
case "NetStream.Unpublish.Success":
streamContext.state = STATE_UNPUBLISHED;
// Wait for unpublish notify event if the context has a NetGroup;
// otherwise continue shut down now.
if (!streamContext.netGroup)
{
destroyStreamContext(streamContext);
break;
}
default:
// Ignore.
}
}
}
/**
* The common tear-down hook. Other functions that manage or shut down
* the StreamContext Object delegate to this function upon detecting a fatal
* error or during shut down.
*
* @param streamContext - The StreamContext Object for the source Stream and
* (potentially wired-up) multicast NetStream.
*/
function destroyStreamContext(streamContext)
{
// Unregister by Stream name immediately; lookups by NetConnection, NetGroup
// and multicast NetStream remain in place until tear-down is complete.
delete application.streamTable[streamContext.streamName];
switch (streamContext.state)
{
case STATE_REPUBLISHING:
streamContext.mcastNetStream.attach(false);
streamContext.mcastNetStream.publish(false);
streamContext.state = STATE_UNPUBLISHING;
return;
case STATE_CONNECTING:
case STATE_CONNECTED:
case STATE_PUBLISH_PENDING:
case STATE_UNPUBLISHED:
// Delete status handler callbacks and cleanup in case we arrived here
// as a result of a force close.
if (streamContext.netGroup)
delete streamContext.netGroup.onStatus;
if (streamContext.mcastNetStream)
{
streamContext.mcastNetStream.attach(false);
delete streamContext.mcastNetStream.onStatus;
}
streamContext.netConnection.close();
streamContext.state = STATE_DISCONNECTING;
return;
default:
// Fall-through.
}
// At this point, we either never got to the republishing state or we've
// proceeded through the clean shut down steps above. Everything for this
// StreamContext can go away.
unregisterStreamContext(streamContext);
}
/**
* Utility function used to force close a StreamContext in the event that we
* start handling a republish of a Source stream before the context for its
* prior incarnation has been torn down.
*
* @param streamContext - The StreamContext Object for the source Stream.
*/
function forceCloseStreamContext(streamContext)
{
trace("Force closing previous multicast context for source stream: " + stream.name);
prevContext.state = STATE_UNPUBLISHED;
destroyStreamContext(prevContext);
}
////////////////////////////////////////////////////////////////////////////////
//
// Client callback functions
//
////////////////////////////////////////////////////////////////////////////////
/**
* A no-op. Answers the RPC in the fashion expected by encoders, but the real
* work happens in application.onPublish.
*
* @param streamName - The name of the stream being published.
*/
Client.prototype.FCPublish = function(streamName)
{
this.call("onFCPublish",
null,
{code:"NetStream.Publish.Start", description:streamName});
}
/**
* A no-op. Answers the RPC in the fashion expected by encoders, but the real
* work happens in application.onUnpublish.
*
* @param streamName - The name of the stream being unpublished.
*/
Client.prototype.FCUnpublish = function(streamName)
{
this.call("onFCUnpublish",
null,
{code:"NetStream.Unpublish.Success", description:streamName});
}
/**
* If the client invoker's ip matches what was captured for a currently publishing
* stream, assume it's the same client and reset the stream. Otherwise, ignore.
*
* @param streamName - The name of the stream being released.
*/
Client.prototype.releaseStream = function(streamName)
{
var streamContext = getStreamContextForSourceStream(streamName);
if (streamContext &&
(streamContext.client.ip == this.ip) &&
(streamContext.state <= STATE_REPUBLISHING))
{
// Only tear-down an orphaned stream if it's not
// already shutting down (see state check above).
destroyStreamContext(streamContext);
}
}
////////////////////////////////////////////////////////////////////////////////
//
// Helper functions
//
////////////////////////////////////////////////////////////////////////////////
/**
* Validates that a newly published stream has correct metadata (e.g. query
* string parameters) to republish into a Flash Group. This function also
* writes a message to the application log for any validation failures.
*
* @param params - The quiery string parameters for the source Stream.
* @return true if valid; otherwise false.
*/
function validateStreamParams(params)
{
var empty = true;
for (var param in params)
{
empty = false;
break;
}
if (empty)
{
trace("MULTICAST PUBLISH ERROR: Stream query string is empty.");
return false;
}
if (!params["fms.multicast.type"])
{
trace("MULTICAST PUBLISH ERROR: Stream query string does not specify a 'fms.multicast.type'.");
return false;
}
var type = params["fms.multicast.type"];
if (type != 1 && type != 2 && type != 3)
{
trace("MULTICAST PUBLISH ERROR: 'fms.multicast.type' has invalid value: " + type);
return false;
}
if (!params["fms.multicast.groupspec"])
{
trace("MULTICAST PUBLISH ERROR: Stream query string does not specify a 'fms.multicast.groupspec'.");
return false;
}
// Fusion and IP require an address:port.
if ((type == 1 || type == 2) &&
!params["fms.multicast.address"])
{
trace("MULTICAST PUBLISH ERROR: Stream query string does not specify a 'fms.multicast.address'.");
return false;
}
// No obvious validation issues.
return true;
}
/**
* Parses the supplied query string, and if valid, returns an Object populated
* with the name-value pairs contained in the query string. The simple processing
* here does not preserve multiple name-value pairings having the same name; the
* last value seen wins. Parameters with no value are mapped to "" (empty String)
* in the returned Object.
*
* @param queryString - A query string portion of a URI, not including the leading
* '?' character.
* @return An Object containing a key-value mapping for each name-value parameter
* defined in the query string; Object is empty if the query string is
* invalid.
*/
function parseQueryString(queryString)
{
var result = {};
var decoded = "";
try
{
decoded = decodeURIComponent(queryString);
}
catch (e) // Invalid URI component; return empty result.
{
return result;
}
if (decoded.length)
{
var params = decoded.split('&');
for (var i in params)
{
var pair = params;
var sepIndex = pair.indexOf('=');
if (sepIndex != -1)
{
var name = pair.substr(0, sepIndex);
result[name] = pair.substr(sepIndex + 1);
}
else
{
result[pair] = "";
}
}
}
return result;
}
/**
* Utility function used to swap out the protocol (scheme) portion
* of a given URI with an alternate.
*
* @param uri - The full URI.
* @param desiredProtocol - The replacement protocol.
* @return The URI with its protocol replaced.
*/
function resetUriProtocol(uri, desiredProtocol)
{
var sepIndex = uri.indexOf("://");
return desiredProtocol + uri.substr(sepIndex);
}
