de.bsvrz.sys.funclib.communicationStreams
Class StreamDemultiplexer

java.lang.Object
  extended by de.bsvrz.sys.funclib.communicationStreams.StreamDemultiplexer

public class StreamDemultiplexer
extends Object

Diese Klasse empfängt Nutzdatenpakete, die vom StreamMultiplexer über Streams versandt wurden. Dabei werden die Daten in streams aufgeteilt. Jeder stream stellt dabei der Applikation Nutzdaten zur Verfügung, die sie mit einer Methode abrufen kann. Der StreamMultiplexer wird benachrichtigt falls die Applikation Nutzdaten benötigt. Dieser sorgt dann dafür, dass neue Daten auf dem dafür vorgesehenen Stream bereitgestellt werden. Das Speichern der Nutzdaten geschieht in einem Puffer, unterschreitet die Anzahl der vorgehaltenen Daten einen bestimmten Wert, so fordert der StreamDemultiplexer neue Nutzdaten vom StreamMultiplexer an. Dies gewährleistet einen ständigen Vorrat von Nutzdaten, auf die die Applikation zugreifen kann.

Author:
Kappich Systemberatung

Nested Class Summary
private static class StreamDemultiplexer.DemultiplexerStreaminformations
          Diese Objekt beinhaltet alle Informationen, die für einen Stream, auf Empfängerseite, wichtig sind.
private static class StreamDemultiplexer.ReferenceDataPacket
          Objekte dieser Klasse speichern die Nutzdaten(Byte-Array) und den Index der Nutzdaten.
 
Field Summary
private  int _abortedStreamReceivedData
          Debug Wenn ein stream aborted wird, dann darf er keine Daten mehr empfangen.
private  StreamDemultiplexer.DemultiplexerStreaminformations[] _arrayOfStreams
          Alle Streams bekommen einen Eintrag in diesem Array.
private  int _blockingFactor
          Der _blockingFactor bestimmt die größe des Empfängerpuffers.
private static Debug _debug
           
private  StreamDemultiplexerDirector _director
          Objekt, das das versenden von Tickets (Bestätigungen, dass der Sender Datenpakete senden darf) zur Verfügung stellen
private  int _numberOfOverchargesReceiver
          Debug Wie oft überlastet der Sender den Empfänger (der Sender schickt mehr Nutzdatenpakete als die dataQueue des Empfängers aufnehmen darf (_blockingFactor)).
private  int _numberOfPacketsReceived
          Debug, hier wird gezählt wie viele Nutzdatenpackete empfangen wurden
private  int[] _numberOfReceivedPackets
          Debug Anzahl der empfangenen Pakete je stream
private  int _numberOfStreams
          Anzahl der Streams auf denen Datensätze verschickt werden können
private  int[] _numberOfTakes
          Debug Anzahl der Takes, die auch Nutzdaten enthielten (take mit einem Datensatz, der null enthält wird nicht gezählt)
private  int _ticketBlockingFactor
          Der _blockingFactor wird durch 2 geteilt.
 
Constructor Summary
StreamDemultiplexer(int numberOfStreams, int blockingFactor, StreamDemultiplexerDirector director)
           
 
Method Summary
 void abort(int indexOfStream)
          Eine Methode zum beenden eines Streams.
 void killAllStreams()
          Die physische Verbindung zum Sender ist zusammengebrochen und alle Streams werden beendet.
private  void printByteArrayScreen(byte[] data)
           
private  void printDebugVariables()
           
 void receivedDataFromSender(byte[] streamDataPacket)
          Ein streamDataPacket, das der Multiplexer verschickt hat, wird entgegen genommen.
private  void sendNewTicketIndexToSender(int indexOfStream, int maximumStreamTicketsIndex)
          Der Sender wird benachrichtigt, dass er auf einem Stream weitere Nutzdatenpakete schicken darf.
 byte[] take(int indexOfStream)
          Diese Methode gibt die Nutzdaten eines bestimmten Streams an die Empfängerapplikation zurück.
private  void unpackBigPacket(StreamDemultiplexer.DemultiplexerStreaminformations stream)
           
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

_numberOfStreams

private final int _numberOfStreams
Anzahl der Streams auf denen Datensätze verschickt werden können


_blockingFactor

private final int _blockingFactor
Der _blockingFactor bestimmt die größe des Empfängerpuffers. Ist der Puffer nur noch halb voll, wird der Sender benachrichtigt, dass er neue Nutzdatensätze verschicken soll.


_ticketBlockingFactor

private final int _ticketBlockingFactor
Der _blockingFactor wird durch 2 geteilt. Dieser Wert wird für die Ermittlung des neuen Indizes, an dem ein neues Ticket verschickt wird, benötigt. Im Konstruktor wird diese Variable auf _blockingFactor/2 gesetzt, wurde als _blockingFactor der Wert 1 gewählt, dann wird diese Variable ebenfalls auf 1 gesetzt.


_director

private final StreamDemultiplexerDirector _director
Objekt, das das versenden von Tickets (Bestätigungen, dass der Sender Datenpakete senden darf) zur Verfügung stellen


_arrayOfStreams

private final StreamDemultiplexer.DemultiplexerStreaminformations[] _arrayOfStreams
Alle Streams bekommen einen Eintrag in diesem Array. Der Platz an dem sie gespeichert werden entspricht der Nummer des Streams.


_numberOfPacketsReceived

private int _numberOfPacketsReceived
Debug, hier wird gezählt wie viele Nutzdatenpackete empfangen wurden


_abortedStreamReceivedData

private int _abortedStreamReceivedData
Debug Wenn ein stream aborted wird, dann darf er keine Daten mehr empfangen. Diese Variable zählt wie oft dies doch geschieht (durch verzahnung von Threads). Dieses Verhalten löst keinen Fehler aus.


_numberOfOverchargesReceiver

private int _numberOfOverchargesReceiver
Debug Wie oft überlastet der Sender den Empfänger (der Sender schickt mehr Nutzdatenpakete als die dataQueue des Empfängers aufnehmen darf (_blockingFactor)). Für einen fehlerfreien Betrieb muß dieser Wert bei 0 bleiben !


_numberOfReceivedPackets

private final int[] _numberOfReceivedPackets
Debug Anzahl der empfangenen Pakete je stream


_numberOfTakes

private final int[] _numberOfTakes
Debug Anzahl der Takes, die auch Nutzdaten enthielten (take mit einem Datensatz, der null enthält wird nicht gezählt)


_debug

private static Debug _debug
Constructor Detail

StreamDemultiplexer

public StreamDemultiplexer(int numberOfStreams,
                           int blockingFactor,
                           StreamDemultiplexerDirector director)
Parameters:
numberOfStreams - Anzahl von Streams, die Datenpakete versenden sollen
blockingFactor - Puffer des Empfängers (dieser Wert muß größer gleich 1 sein)
director - Schnittstelle, die eine Methode zum verschicken von Informationen an den Sender bereitstellt (siehe Interface Beschreibung)
Throws:
IllegalArgumentException - Der blockingFactor war kleiner als 1
Method Detail

abort

public void abort(int indexOfStream)
Eine Methode zum beenden eines Streams. Die restlichen Daten, die sich im _arrayOfStreams befinden, werden gelöscht. An den Sender der Daten wird ein "-1" geschickt, dieser wird darauf hin keine Daten mehr auf diesem Stream schicken. Der Stream wird über seinen Index identifiziert.

Parameters:
indexOfStream - Index des Streams, der beendet werden soll

take

public byte[] take(int indexOfStream)
            throws InterruptedException,
                   IllegalStateException,
                   ProtocolException,
                   ClosedChannelException
Diese Methode gibt die Nutzdaten eines bestimmten Streams an die Empfängerapplikation zurück. Wenn alle Nutzdatenpakete von der Senderapplikation mit take angefordert wurden, gibt diese Methode nur noch null zurück, im Fehlerfall werden entsprechende Exceptions geworfen.

Parameters:
indexOfStream - Eindeutiger Index des Streams, der Daten zurückgeben soll
Returns:
Die Nutzdaten werden als Byte-Array geliefert
Throws:
InterruptedException - Ein Thread, der Nutzdaten mit take anfordert hat, wird mit Interrupt abgebrochen.
IllegalStateException - Ein Stream wurde mit abort durch die Empfängerapplikation beendet und anschließend führte die Empfängerapplikation erneut ein take auf diesen Stream aus.
ProtocolException - Es wurde ein Paket mit einem falschen Index bearbeitet. Das deutet darauf hin, dass das Paket entweder schon einmal empfangen wurde (doppelt vorhanden) oder das ein Paket fehlt. Der Stream wird automatisch abgebrochen. Die Applikation kann weiter "take" aufrufen, wird aber immer für diesen Stream, diese Exception bekommen.
ClosedChannelException - Die physische Verbindung zum Sender wurde unterbrochen. Jeder take Aufruf wird für alle Streams diese Exception ausgeben, da alle Streams betroffen sind

unpackBigPacket

private void unpackBigPacket(StreamDemultiplexer.DemultiplexerStreaminformations stream)
                      throws InterruptedException
Throws:
InterruptedException

receivedDataFromSender

public void receivedDataFromSender(byte[] streamDataPacket)
                            throws IOException
Ein streamDataPacket, das der Multiplexer verschickt hat, wird entgegen genommen. Es hat als Inhalt den Index des Streams (die ersten 4 Bytes), den Index des Pakets (die nächsten 4 Bytes), die Größe des Byte-Arrays in dem die Nutzdaten gespeichert waren und die Nutzdaten selber (der Rest).

Diese Methode erzeugt dann aus dem Byte-Array die benötigten Objekte und legt diese in den dafür vorgesehenen Datenstrukturen ab.

Der Empfang neuer Nutzdaten wird von der Applikation an den StreamDemultiplexer geleitet (dadurch hat die Applikation den Datentransfer als Aufgabe).

Parameters:
streamDataPacket - Ein Byte-Array in dem verschlüsselt der Index des Streams, der Index des Pakets, die Größe des Byte-Arrays in dem die Nutzdaten gespeichert sind und die Nutzdaten selber stehen.
Throws:
IOException - Es ist ein Fehler beim deserialisieren der Daten aufgetreten

sendNewTicketIndexToSender

private void sendNewTicketIndexToSender(int indexOfStream,
                                        int maximumStreamTicketsIndex)
                                 throws IOException
Der Sender wird benachrichtigt, dass er auf einem Stream weitere Nutzdatenpakete schicken darf.

Dies ist ein Vorgang, der intern zwischen dem Multiplexer/Demultiplexer statt findet, darauf soll die Applikation keinen Zugriff haben.

Parameters:
indexOfStream - Die eindeutige Nummer des Stream, der neue Datenpakete schicken darf
maximumStreamTicketsIndex - Bis zu diesem Wert darf der Sender auf dem Stream neue Nutzdatenpakete schicken. Der Sender benutzt dafür den streamPacketIndex als Anzahl wie viele Datenpakete verschickt wurden.
Throws:
IOException - Es ist ein Fehler beim serialisieren der Daten aufgetreten

killAllStreams

public void killAllStreams()
Die physische Verbindung zum Sender ist zusammengebrochen und alle Streams werden beendet. Allen Streams wird die Erlaubnis zum empfangen/senden entzogen. Diese Methode wird von der übergeordneten Applikation aufgerufen, diese bemerkt den Fehler.


printByteArrayScreen

private void printByteArrayScreen(byte[] data)

printDebugVariables

private void printDebugVariables()