public class StreamMultiplexer
extends java.lang.Object
Diese Klasse verschickt Nutzdatenpakete mit Streams an einen StreamDemultiplexer. Die Applikation, die ein Objekt dieser Klasse erzeugt hat, stellt ihrerseits Nutzdaten für jeden Stream zur Verfügung. Auf der Gegenseite kann der StreamDemultiplexer Nutzdaten auf jedem Stream anfordern und verarbeiten. Der StreamMultiplexer sendet seinerseits nur dann Nutzdatenpakete, wenn ihn der StreamDemultiplexer dazu auffordert. Die Nutzdaten werden auch erst dann erzeugt, wenn diese verschickt werden sollen. Der StreamMultiplexer verschickt die Nutzdatenpakete nicht einzeln, sondern bündelt diese in einem großen Paket. Diese großen Pakete werden dann vom StreamDemultiplexer entgegen genommen und ausgepackt. Diese Bündelung findet für jeden Stream einzeln statt, in jedem großen Paket befinden sich also nur Nutzdaten für diesen einen Stream, nicht die Nutzdaten anderer Streams.
Modifier and Type | Class and Description |
---|---|
private static class |
StreamMultiplexer.IndexOfStreamAndMaxSendPackets
Diese Klasse erzeugt ein Objekt für die Warteschlange "_queueWithStreamsPermitedToSendData“.
|
private static class |
StreamMultiplexer.MultiplexerStreaminformations
Diese Objekt beinhaltet alle Informationen, die für einen Stream, auf Senderseiteseite, wichtig sind.
|
Modifier and Type | Field and Description |
---|---|
private int |
_blockingFactor
Der blockingFactor bestimmt die Größe des Empfangspuffers.
|
private int |
_bufferSizeStream
Diese Variable bestimmt die Größe des Puffers, den jeder Stream zur Verfügung hat.
|
private int |
_bufferSizeStreamMultiplexer
Diese Variable bestimmt die gesamte Größe des Puffers, der zum StreamMultiplexer gehört.
|
private static Debug |
_debug |
private StreamMultiplexerDirector |
_director
Objekt, das das Anfordern von Daten (von der Applikation) und versenden von Datenpaketen an den Empfänger ermöglicht
|
private int |
_numberOfFalseMaxTickets |
private int |
_numberOfPacketsSend |
private int |
_numberOfStreams
Anzahl der Streams auf denen Nutzdatenpakete verschickt werden können
|
private int |
_numberTerminatedStreams
Anzahl der Streams, die mit “Abort” abgebrochen wurden oder die ein Nutzdatenpaket verschickt haben, in dem die Nutzdaten
null waren. |
private UnboundedQueue |
_queueWithStreamsPermitedToSendData
Eine Warteschlange, in ihr werden alle Streams gespeichert, die Nutzdatenpakete verschicken können.
|
private int |
_serializerVersion
Mit dieser Version wird der Serializer Nutzdaten verpacken und der Deserializer Tickets auspacken.
|
private StreamMultiplexer.MultiplexerStreaminformations[] |
_streams
In diesem Array werden alle Informationen aller Streams gespeichert.
|
Constructor and Description |
---|
StreamMultiplexer(int numberOfStreams,
int blockingFactor,
int bufferSizeStreamMultiplexer,
int serializerVersion,
StreamMultiplexerDirector director) |
Modifier and Type | Method and Description |
---|---|
void |
killAllStreams()
Alle Streams werden beendet, da die Verbindung zum Empfänger unterbrochen wurde.
|
private void |
printDebugVariables() |
void |
sendAllStreamData()
Diese Methode verschickt Nutzdaten, die die Senderapplikation erzeugt hat, an den Empfänger (StreamDemultiplexer).
|
private void |
sendDataToReceiver(int indexOfStream,
int streamPacketIndex,
byte[] data)
Diese Methode verschickt die Nutzdaten über einen bestimmten Stream an den Empfänger.
|
void |
setMaximumStreamTicketIndexForStream(byte[] streamTicketPacket)
Diese Methode setzt den “maximumStreamTicketIndex” eines Streams herauf.
|
private byte[] |
take(int indexOfStream)
Diese Methode fordert von einer Application, auf einem bestimmten Stream, neue Nutzdaten an.
|
private final int _blockingFactor
Der blockingFactor bestimmt die Größe des Empfangspuffers. Dieser Wert bestimmt beim Sender nur, wie viele Pakete der Sender ohne Bestätigung des Empfängers, beim ersten Versenden, verschicken darf (danach muß der Empfänger dem Sender eine Sendeerlaubnis(Ticket) schicken). Dieser Wert muß beim Sender und beim Empfänger gleich sein.
private final int _bufferSizeStreamMultiplexer
Diese Variable bestimmt die gesamte Größe des Puffers, der zum StreamMultiplexer gehört. Jeder Stream besitzt einen eigenen Puffer. Sobald dieser gefüllt ist, werden alle Nutzdatenpakete als ein großes Paket verschickt.
private final int _bufferSizeStream
Diese Variable bestimmt die Größe des Puffers, den jeder Stream zur Verfügung hat. Im Konstruktor wird dieser Wert mit _bufferSizeStreamMultiplexer /_numberOfStreams
gesetzt.
private final UnboundedQueue _queueWithStreamsPermitedToSendData
Eine Warteschlange, in ihr werden alle Streams gespeichert, die Nutzdatenpakete verschicken können. In der Warteschlange werden Objekte vom Type “IndexOfStreamAndMaxSendPackets” gespeichert. Diese Objekte enthalten den “indexOfStream”, welcher Stream kann Nutzdatenpakete verschicken, und “maxSendPackets”. Dieser Wert drückt aus, wie viele Nutzdatenpakete verschickt werden dürfen (für diesen Stream). Die Warteschlange erspart somit das “Suchen” eines sendebereiten Streams.
private final int _numberOfStreams
Anzahl der Streams auf denen Nutzdatenpakete verschickt werden können
private int _numberTerminatedStreams
Anzahl der Streams, die mit “Abort” abgebrochen wurden oder die ein Nutzdatenpaket verschickt haben, in dem die Nutzdaten null
waren.
private final StreamMultiplexer.MultiplexerStreaminformations[] _streams
In diesem Array werden alle Informationen aller Streams gespeichert.
private final StreamMultiplexerDirector _director
Objekt, das das Anfordern von Daten (von der Applikation) und versenden von Datenpaketen an den Empfänger ermöglicht
private final int _serializerVersion
Mit dieser Version wird der Serializer Nutzdaten verpacken und der Deserializer Tickets auspacken. Auf der Gegenseite wird der StreamDemultiplexer die Datensätze mit dieser Version deserialisieren und die Tickets serialisieren.
private int _numberOfPacketsSend
private int _numberOfFalseMaxTickets
private static Debug _debug
public StreamMultiplexer(int numberOfStreams, int blockingFactor, int bufferSizeStreamMultiplexer, int serializerVersion, StreamMultiplexerDirector director)
numberOfStreams
- Anzahl von Streams, die Datenpakete versenden sollenblockingFactor
- Anzahl der Pakete, die initial am Anfang versendet werdenbufferSizeStreamMultiplexer
- Diese Variable bestimmt die gesamte Größe des Puffers, der zum StreamMultiplexer gehörtserializerVersion
- Diese Variable legt die Versionsnummer des Deserializer/Serializer fest, der benutzt wird. Sowohl der StreamMultiplexer als auch der StreamDemultiplexer müssen die selbe Version benutzendirector
- Schnittstelle, die eine Methode zum verschicken von Informationen an den Sender bereitstellt (siehe Interface Beschreibung)Serializer
,
Deserializer
private void sendDataToReceiver(int indexOfStream, int streamPacketIndex, byte[] data)
Diese Methode verschickt die Nutzdaten über einen bestimmten Stream an den Empfänger.
indexOfStream
- : Der eindeutige Index des Streams auf dem gesendet werden sollstreamPacketIndex
- : Jedes Datenpacket bekommt eine laufende Nummerdata
- : Nutzdaten, die versendet werden sollenprivate byte[] take(int indexOfStream)
Diese Methode fordert von einer Application, auf einem bestimmten Stream, neue Nutzdaten an.
Die Applikation wird von dem StreamMultiplexer angesprochen, darum private.
indexOfStream
- Index des Streams, auf dem neue Daten angefordert werden sollenpublic void sendAllStreamData() throws java.lang.InterruptedException
Diese Methode verschickt Nutzdaten, die die Senderapplikation erzeugt hat, an den Empfänger (StreamDemultiplexer). Ein Problem das sich dabei ergibt ist, welcher Stream ist gerade sendebereit ? Das durchsuchen aller Streams nach einem sendebereiten Stream kann dabei sehr ungeschickt sein, da im “worst Case” immer alle Streams betrachtet werden müßten und das für jede Nachricht. Da jeder Stream mindestens eine Nachricht verschickt wäre somit eine quadratische Laufzeit erreicht.
Die Grundidee ist, dass alle Tickets an einer zentralen Stelle gesammelt werden. Dafür wurde als Datenstruktur eine Warteschlange gewählt. Diese synchronisiert sich selbständig und schickt wartende Threads automatisch in den wait Modus, gleichzeitig werden diese Threads wieder aufgeweckt, wenn neue Daten zur Verfügung stehen.
Sobald der Empfänger dem Sender eine Sendeerlaubnis erteilt wird der Stream mit seinem Index und der Anzahl Pakete, die er senden darf, in die Warteschlange eingetragen.
Wenn nun ein Stream gesucht wird, der senden soll, dann wird von der Warteschlange das vorderste Element angefordert. Es steht somit sofort ein Stream zur Verfügung. Gleichzeitig ist bekannt, wieviele Pakete dieser Stream verschicken darf.
java.lang.InterruptedException
- Ein Thread, der auf ein Objekt in der Warteschlange gewartet hat, wurde mit Interrupt unterbrochen.public void setMaximumStreamTicketIndexForStream(byte[] streamTicketPacket) throws java.io.IOException
Diese Methode setzt den “maximumStreamTicketIndex” eines Streams herauf. Dadurch kann der Stream Datenpakete bis zu diesem neuen Index versenden. Wird der “maximumStreamTicketIndex” erreicht, stellt der Stream seine Sendetätigkeiten ein, bis der “maximumStreamTicketIndex” wieder erhöht wird. Verschickt der Empfänger eine “-1”, dann will er die Empfangstätigkeiten auf diesem Stream einstellen.
Die ersten 4 Byte enthalten den Index des Streams. Die letzen vier Bytes enthalten den neuen maximalen Index. Dies ist eine Steuerung des einen Multis(Sender) durch den anderen(Receiver), kein Zugriff von außen.
streamTicketPacket
- Dieses Byte-Array enthält verschlüsselt den Index des Streams und den maximalen Index, bis zu dem der StreamMultiplexer senden darf.java.io.IOException
- Ein Fehler beim deserialisieren von Datenpublic void killAllStreams()
Alle Streams werden beendet, da die Verbindung zum Empfänger unterbrochen wurde.
Wenn das Objekt, dem der StreamMultiplexer gehört, einen Fehler des DaV gemeldet bekommt (die Leitung zur Empfängerapplikation wurde unterbrochen, als Beispiel) wird mit dieser Methode jeder Stream abgebrochen. Gleichzeitig wird die Sendeapplikation darauf hingewiesen, dass sie alle Nutzdaten für die Streams verwerfen kann. Auf der Gegenseite wird dem StreamDemultiplexer ebenfalls gemeldet, dass etwas mit der Verbindung nicht stimmt (dies übernimmt dort das Objekt, das den StreamDemultiplexer erzeugt hat). Der StreamDemultiplexer wird daraufhin ebenfalls alle Streams beenden. Der beidseitige Abbruch geschieht automatisch.
private void printDebugVariables()