public class StreamDemultiplexer
extends java.lang.Object
Diese Klasse empfängt Nutzdatenpakete, die vom StreamMultiplexer über Streams versandt wurden. Dabei werden die Daten in streams aufgeteilt. Jeder stream stellt dabei der Applikation Nutzdaten zur Verfügung, die sie mit einer Methode abrufen kann. Der StreamMultiplexer wird benachrichtigt falls die Applikation Nutzdaten benötigt. Dieser sorgt dann dafür, dass neue Daten auf dem dafür vorgesehenen Stream bereitgestellt werden. Das Speichern der Nutzdaten geschieht in einem Puffer, unterschreitet die Anzahl der vorgehaltenen Daten einen bestimmten Wert, so fordert der StreamDemultiplexer neue Nutzdaten vom StreamMultiplexer an. Dies gewährleistet einen ständigen Vorrat von Nutzdaten, auf die die Applikation zugreifen kann.
Modifier and Type | Class and Description |
---|---|
private static class |
StreamDemultiplexer.DemultiplexerStreaminformations
Diese Objekt beinhaltet alle Informationen, die für einen Stream, auf Empfängerseite, wichtig sind.
|
private static class |
StreamDemultiplexer.ReferenceDataPacket
Objekte dieser Klasse speichern die Nutzdaten(Byte-Array) und den Index der Nutzdaten.
|
Modifier and Type | Field and Description |
---|---|
private int |
_abortedStreamReceivedData
Debug Wenn ein stream aborted wird, dann darf er keine Daten mehr empfangen.
|
private StreamDemultiplexer.DemultiplexerStreaminformations[] |
_arrayOfStreams
Alle Streams bekommen einen Eintrag in diesem Array.
|
private int |
_blockingFactor
Der _blockingFactor bestimmt die größe des Empfängerpuffers.
|
private static Debug |
_debug |
private StreamDemultiplexerDirector |
_director
Objekt, das das versenden von Tickets (Bestätigungen, dass der Sender Datenpakete senden darf) zur Verfügung stellen
|
private int |
_numberOfOverchargesReceiver
Debug Wie oft überlastet der Sender den Empfänger (der Sender schickt mehr Nutzdatenpakete als die dataQueue des Empfängers aufnehmen darf (_blockingFactor)).
|
private int |
_numberOfPacketsReceived
Debug, hier wird gezählt wie viele Nutzdatenpackete empfangen wurden
|
private int[] |
_numberOfReceivedPackets
Debug Anzahl der empfangenen Pakete je stream
|
private int |
_numberOfStreams
Anzahl der Streams auf denen Datensätze verschickt werden können
|
private int[] |
_numberOfTakes
Debug Anzahl der Takes, die auch Nutzdaten enthielten (take mit einem Datensatz, der null enthält wird nicht gezählt)
|
private int |
_ticketBlockingFactor
Der _blockingFactor wird durch 2 geteilt.
|
Constructor and Description |
---|
StreamDemultiplexer(int numberOfStreams,
int blockingFactor,
StreamDemultiplexerDirector director) |
Modifier and Type | Method and Description |
---|---|
void |
abort(int indexOfStream)
Eine Methode zum beenden eines Streams.
|
void |
killAllStreams()
Die physische Verbindung zum Sender ist zusammengebrochen und alle Streams werden beendet.
|
private void |
printByteArrayScreen(byte[] data) |
private void |
printDebugVariables() |
void |
receivedDataFromSender(byte[] streamDataPacket)
Ein streamDataPacket, das der Multiplexer verschickt hat, wird entgegen genommen.
|
private void |
sendNewTicketIndexToSender(int indexOfStream,
int maximumStreamTicketsIndex)
Der Sender wird benachrichtigt, dass er auf einem Stream weitere Nutzdatenpakete schicken darf.
|
byte[] |
take(int indexOfStream)
Diese Methode gibt die Nutzdaten eines bestimmten Streams an die Empfängerapplikation zurück.
|
private void |
unpackBigPacket(StreamDemultiplexer.DemultiplexerStreaminformations stream) |
private final int _numberOfStreams
Anzahl der Streams auf denen Datensätze verschickt werden können
private final int _blockingFactor
Der _blockingFactor bestimmt die größe des Empfängerpuffers. Ist der Puffer nur noch halb voll, wird der Sender benachrichtigt, dass er neue Nutzdatensätze verschicken soll.
private final int _ticketBlockingFactor
Der _blockingFactor wird durch 2 geteilt. Dieser Wert wird für die Ermittlung des neuen Indizes, an dem ein neues Ticket verschickt wird, benötigt. Im Konstruktor wird diese Variable auf _blockingFactor/2 gesetzt, wurde als _blockingFactor der Wert 1 gewählt, dann wird diese Variable ebenfalls auf 1 gesetzt.
private final StreamDemultiplexerDirector _director
Objekt, das das versenden von Tickets (Bestätigungen, dass der Sender Datenpakete senden darf) zur Verfügung stellen
private final StreamDemultiplexer.DemultiplexerStreaminformations[] _arrayOfStreams
Alle Streams bekommen einen Eintrag in diesem Array. Der Platz an dem sie gespeichert werden entspricht der Nummer des Streams.
private int _numberOfPacketsReceived
Debug, hier wird gezählt wie viele Nutzdatenpackete empfangen wurden
private int _abortedStreamReceivedData
Debug Wenn ein stream aborted wird, dann darf er keine Daten mehr empfangen. Diese Variable zählt wie oft dies doch geschieht (durch verzahnung von Threads). Dieses Verhalten löst keinen Fehler aus.
private int _numberOfOverchargesReceiver
Debug Wie oft überlastet der Sender den Empfänger (der Sender schickt mehr Nutzdatenpakete als die dataQueue des Empfängers aufnehmen darf (_blockingFactor)). Für einen fehlerfreien Betrieb muß dieser Wert bei 0 bleiben !
private final int[] _numberOfReceivedPackets
Debug Anzahl der empfangenen Pakete je stream
private final int[] _numberOfTakes
Debug Anzahl der Takes, die auch Nutzdaten enthielten (take mit einem Datensatz, der null enthält wird nicht gezählt)
private static Debug _debug
public StreamDemultiplexer(int numberOfStreams, int blockingFactor, StreamDemultiplexerDirector director)
numberOfStreams
- Anzahl von Streams, die Datenpakete versenden sollenblockingFactor
- Puffer des Empfängers (dieser Wert muß größer gleich 1 sein)director
- Schnittstelle, die eine Methode zum verschicken von Informationen an den Sender bereitstellt (siehe Interface Beschreibung)java.lang.IllegalArgumentException
- Der blockingFactor war kleiner als 1public void abort(int indexOfStream)
Eine Methode zum beenden eines Streams. Die restlichen Daten, die sich im _arrayOfStreams befinden, werden gelöscht. An den Sender der Daten wird ein “-1” geschickt, dieser wird darauf hin keine Daten mehr auf diesem Stream schicken. Der Stream wird über seinen Index identifiziert.
indexOfStream
- Index des Streams, der beendet werden sollpublic byte[] take(int indexOfStream) throws java.lang.InterruptedException, java.lang.IllegalStateException, java.net.ProtocolException, java.nio.channels.ClosedChannelException
Diese Methode gibt die Nutzdaten eines bestimmten Streams an die Empfängerapplikation zurück. Wenn alle Nutzdatenpakete von der Senderapplikation mit take angefordert wurden, gibt diese Methode nur noch null zurück, im Fehlerfall werden entsprechende Exceptions geworfen.
indexOfStream
- Eindeutiger Index des Streams, der Daten zurückgeben solljava.lang.InterruptedException
- Ein Thread, der Nutzdaten mit take anfordert hat, wird mit Interrupt abgebrochen.java.lang.IllegalStateException
- Ein Stream wurde mit abort durch die Empfängerapplikation beendet und anschließend führte die Empfängerapplikation erneut ein take auf diesen Stream aus.java.net.ProtocolException
- Es wurde ein Paket mit einem falschen Index bearbeitet. Das deutet darauf hin, dass das Paket entweder schon einmal empfangen wurde (doppelt vorhanden) oder das ein Paket fehlt. Der Stream wird automatisch abgebrochen. Die Applikation kann weiter “take” aufrufen, wird aber immer für diesen Stream, diese Exception bekommen.java.nio.channels.ClosedChannelException
- Die physische Verbindung zum Sender wurde unterbrochen. Jeder take Aufruf wird für alle Streams diese Exception ausgeben, da alle Streams betroffen sindprivate void unpackBigPacket(StreamDemultiplexer.DemultiplexerStreaminformations stream) throws java.lang.InterruptedException
java.lang.InterruptedException
public void receivedDataFromSender(byte[] streamDataPacket) throws java.io.IOException
Ein streamDataPacket, das der Multiplexer verschickt hat, wird entgegen genommen. Es hat als Inhalt den Index des Streams (die ersten 4 Bytes), den Index des Pakets (die nächsten 4 Bytes), die Größe des Byte-Arrays in dem die Nutzdaten gespeichert waren und die Nutzdaten selber (der Rest).
Diese Methode erzeugt dann aus dem Byte-Array die benötigten Objekte und legt diese in den dafür vorgesehenen Datenstrukturen ab.
Der Empfang neuer Nutzdaten wird von der Applikation an den StreamDemultiplexer geleitet (dadurch hat die Applikation den Datentransfer als Aufgabe).
streamDataPacket
- Ein Byte-Array in dem verschlüsselt der Index des Streams, der Index des Pakets, die Größe des Byte-Arrays in dem die Nutzdaten gespeichert sind und die Nutzdaten selber stehen.java.io.IOException
- Es ist ein Fehler beim deserialisieren der Daten aufgetretenprivate void sendNewTicketIndexToSender(int indexOfStream, int maximumStreamTicketsIndex) throws java.io.IOException
Der Sender wird benachrichtigt, dass er auf einem Stream weitere Nutzdatenpakete schicken darf.
Dies ist ein Vorgang, der intern zwischen dem Multiplexer/Demultiplexer statt findet, darauf soll die Applikation keinen Zugriff haben.
indexOfStream
- Die eindeutige Nummer des Stream, der neue Datenpakete schicken darfmaximumStreamTicketsIndex
- Bis zu diesem Wert darf der Sender auf dem Stream neue Nutzdatenpakete schicken. Der Sender benutzt dafür den streamPacketIndex als Anzahl wie viele Datenpakete verschickt wurden.java.io.IOException
- Es ist ein Fehler beim serialisieren der Daten aufgetretenpublic void killAllStreams()
Die physische Verbindung zum Sender ist zusammengebrochen und alle Streams werden beendet. Allen Streams wird die Erlaubnis zum empfangen/senden entzogen. Diese Methode wird von der übergeordneten Applikation aufgerufen, diese bemerkt den Fehler.
private void printByteArrayScreen(byte[] data)
private void printDebugVariables()