de.bsvrz.pua.prot.processing.archivebuffer
Class ArchiveBuffer

java.lang.Object
  extended by java.lang.Thread
      extended by de.bsvrz.pua.prot.processing.ProcessingBuffer
          extended by de.bsvrz.pua.prot.processing.archivebuffer.ArchiveBuffer
All Implemented Interfaces:
java.lang.Runnable

public class ArchiveBuffer
extends ProcessingBuffer

Puffer für die Daten vom Archivsystem, die von der Datenaufbereitung angefordert werden. Der folgende Algorithmus wird für jeden Zeitbereich wiederholt:
Für jedes reale Element, d.h. für jedes reale Attribut und für jede Attributgruppe wird eine Archivanfrage über den Zeitbereich und Datenart gestellt. Der jeweils erste pro Element erhaltene Datensatz wird markiert. Anschließend werden die Datensätze verschränkt. Dabei wird ein "Gewinner" festgestellt. Der Gewinner ist das Element mit dem kleinsten Datenzeitstempel. Danach werden die Werte der übrigen Attribute aufgefüllt und als Ausgangsdatensatz in einem Puffer abgelegt. Im nächsten Schritt wird ein weiterer Datensatz des Gewinners abgerufen, markiert mit den Datensätzen der übrigen Elementen verschränkt und dabei ein neuer "Gewinner" bestimmt(*). Nun wird die Datenaufbereitung benachrichtigt, dass ein Ausgangsdatensatz vorliegt. Grund für diese Verzögerung ist, dass die Bearbeitung eines Ausgangsdatensatzes erst vollständig abgeschlossen ist, wenn der nächste Ausgangsdatensatz erzeugt wurde. Der in (*) gewonnene Ausgangsdatensatz wird im Puffer abgelegt, und der Algorithmus wiederholt sich bis alle Daten vom Archivsystem abgerufen wurden. Ausgangsdatensatz wird ebenfalls in einem Puffer abgelegt. Wird in einem eigenen Thread gestartet, da ggf. auf die Antwort des Archivsystems gewartet werden muss. Da Archivdaten streambasiert abgefragt werden können, stellt der ArchivBuffer sicher, dass sich im Puffer für die Ausgangsdatensätze nur eine bestimmte Anzahl von Einträgen ansammelt. Wird dieses Limit (MAX_THRESHOLD) erreicht, so stellt der Archivbuffer die arbeit ein, bis sich der Puffer wieder fast vollständig (MIN_THRESHOLD) geleert hat.

Version:
$Revision: 1.2 $ / $Date: 2008/01/22 16:55:49 $ / ($Author: yvonnes $)
Author:
beck et al. projects GmbH, Martin Hilgers

Nested Class Summary
 
Nested classes/interfaces inherited from class java.lang.Thread
java.lang.Thread.State, java.lang.Thread.UncaughtExceptionHandler
 
Field Summary
static int MAX_THRESHOLD
          Anzahl Ausgangsdatensätze, die im Ausgangspuffer liegen.
static int MIN_THRESHOLD
          Anzahl Ausgangsdatensätze, die mindestens im Ausgangspuffer liegen sollten.
static ArchiveQueryPriority PRIORITY
          Anfragepriorität an das Archivsystem.
 
Fields inherited from class de.bsvrz.pua.prot.processing.ProcessingBuffer
buffer, bufferResult, dav, debug, done, imdsBuilder, INITIAL_RINGBUFFER_SIZE, periods, pi, realElements, tempElements
 
Fields inherited from class java.lang.Thread
MAX_PRIORITY, MIN_PRIORITY, NORM_PRIORITY
 
Constructor Summary
ArchiveBuffer(ClientDavInterface dav, ProcessingInterface processor, ConfigurationObject configAuth, ProcessingInformation pi, java.util.List<Tuple<java.lang.Long,java.lang.Long>> periods)
          Startet den Online-ProcessingBuffer.
 
Method Summary
 boolean applyAggregations(BaseDataSet[] baseData)
          Führt die Aggregierungen durch.
 int[] getLinkedAttributes()
          Zeigt an welche Attribute durch die Aggregation spalte zusammengefasst werden.
 boolean hasData()
          Zeigt an ob Daten abgeholt werden können.
 void init()
          Sendet erste Anfragen an das Archivsystem.
 boolean isDone()
          Zeigt ob der Buffer noch weitere Daten liefern wird.
protected  boolean isDoneCollecting()
          Überprüft ob die Datensammlung abgeschlossen ist.
protected  boolean isListAggregation()
          Werden nicht aggregierte Daten versendet?
protected  void processNewData(java.util.ArrayList<ValueProvider> winners, ValueProvider[] elements)
          Bereitet aus den Ergebnisdatensätzen die Ergebnisdaten auf.
protected  void requestData()
          Stellt Archivanfragen für alle realen Elemente mit den momentan engetragenen Werten von archiveUser
 void storeAggregatedData(byte status)
          Falls Aggregationsdatensätze vorhanden sind, werden sie in den Ausgangspuffer gelegt.
 IntermediateDataSet take()
          Liefert einen Ausgangsdatensatz zurück.
 
Methods inherited from class de.bsvrz.pua.prot.processing.ProcessingBuffer
abort, applyPostFilter, getInsertEmpty, getResult, getTimeStampOrigin, getWinners, isAbort, isAggregate, isDeltaProtocol, nextInterval, notifyProcessor, run, setAggregate, setDone, setTempAttributes, size, storeDataSet
 
Methods inherited from class java.lang.Thread
activeCount, checkAccess, countStackFrames, currentThread, destroy, dumpStack, enumerate, getAllStackTraces, getContextClassLoader, getDefaultUncaughtExceptionHandler, getId, getName, getPriority, getStackTrace, getState, getThreadGroup, getUncaughtExceptionHandler, holdsLock, interrupt, interrupted, isAlive, isDaemon, isInterrupted, join, join, join, resume, setContextClassLoader, setDaemon, setDefaultUncaughtExceptionHandler, setName, setPriority, setUncaughtExceptionHandler, sleep, sleep, start, stop, stop, suspend, toString, yield
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 

Field Detail

PRIORITY

public static final ArchiveQueryPriority PRIORITY
Anfragepriorität an das Archivsystem.

See Also:
ArchiveRequestManager#request(de.bsvrz.dav.daf.main.archive.ArchiveQueryPriority, de.bsvrz.dav.daf.main.archive.ArchiveDataSpecification)

MAX_THRESHOLD

public static int MAX_THRESHOLD
Anzahl Ausgangsdatensätze, die im Ausgangspuffer liegen.


MIN_THRESHOLD

public static int MIN_THRESHOLD
Anzahl Ausgangsdatensätze, die mindestens im Ausgangspuffer liegen sollten.

Constructor Detail

ArchiveBuffer

public ArchiveBuffer(ClientDavInterface dav,
                     ProcessingInterface processor,
                     ConfigurationObject configAuth,
                     ProcessingInformation pi,
                     java.util.List<Tuple<java.lang.Long,java.lang.Long>> periods)
              throws FailureException
Startet den Online-ProcessingBuffer. Meldet sich für jede benötigten Datenidentifikation beim Datenverteiler an. init() sollte unverzüglich nach Erzeugen des Objekts aufgerufen werden.

Parameters:
dav - Verbindung zum Datenverteiler
processor - Objekt das die Datenaufbereitung durchführt. Wird jedes Mal benachrichtigt, wenn ein Ausgangsdatensatz vorliegt.
configAuth - Konfigurationsverantwortlicher, dessen Archivsystem verwendet wird.
pi - Informationen zur Datenaufbereitung. Die Zeitbereiche müssen bereits sortiert und zusammengefasst sein!
periods - Zeitbereiche, in denen der Archivbuffer Daten sammeln soll. Inhalt wird geändert!
Throws:
FailureException - Fehler bei der Kommunikation mit der Konfiguration
Method Detail

init

public void init()
          throws FailureException,
                 java.lang.InterruptedException
Sendet erste Anfragen an das Archivsystem. Startet Anschließend einen Thread, der die Daten vom Archivsystem entgegennimmt. Falls es zu einem Fehler bei der Archivanfrage kommt, wird der Thread nicht gestartet, und ProcessingBuffer.abort() aufgerufen.

Specified by:
init in class ProcessingBuffer
Throws:
FailureException - Fehler bei der Archivanfrage.
java.lang.InterruptedException - Warten auf Archivantwort wurde unterbrochen.
See Also:
ProcessingBuffer.init()

applyAggregations

public boolean applyAggregations(BaseDataSet[] baseData)
Description copied from class: ProcessingBuffer
Führt die Aggregierungen durch. Die Aggregierungen werden jedoch nur durchgeführt, falls ProcessingBuffer.isAggregate() true liefert.

Specified by:
applyAggregations in class ProcessingBuffer
Parameters:
baseData - Werte des Ausgangsdatensatz. Einträge können von der Methode geändert werden.
Returns:
true: Der Ausgangsdatensatz soll nachgefiltert und ausgegeben werden.
See Also:
ProcessingBuffer.applyAggregations(BaseDataSet[])

requestData

protected void requestData()
                    throws FailureException,
                           java.lang.InterruptedException
Stellt Archivanfragen für alle realen Elemente mit den momentan engetragenen Werten von archiveUser

Throws:
FailureException - Fehler bei der Archivanfrage.
java.lang.InterruptedException - Warten auf Antwortdatensatz wurde unterbrochen.

isDoneCollecting

protected boolean isDoneCollecting()
                            throws FailureException,
                                   java.lang.InterruptedException
Überprüft ob die Datensammlung abgeschlossen ist.

Specified by:
isDoneCollecting in class ProcessingBuffer
Returns:
True falls die Datensammlung abgeschlossen ist. Überprüft zudem, wieviele Elemente sich in der Warteschlange befinden. Ist die Warteschlange bereits über MAX_THRESHOLD gefüllt, so wird der ArchivBuffer angehalten, bis die Queue wieder fast vollständig (MIN_THRESHOLD) geelert ist.
Throws:
FailureException - Fehler bei der Archivanfrage. Es wird eine Archivanfrage gestellt, wenn Daten für ein weiteres Intervall angefragt werden.
java.lang.InterruptedException - Warten auf Antwortdatensatz wurde unterbrochen.
See Also:
ProcessingBuffer.isDoneCollecting()

isDone

public boolean isDone()
Description copied from class: ProcessingBuffer
Zeigt ob der Buffer noch weitere Daten liefern wird.

Specified by:
isDone in class ProcessingBuffer
Returns:
true falls noch weitere Daten zu erwarten sind.
See Also:
ProcessingBuffer.isDone()

hasData

public boolean hasData()
Zeigt an ob Daten abgeholt werden können.

Specified by:
hasData in class ProcessingBuffer
Returns:
true falls Daten mittels take() abgeholt werden können.

take

public IntermediateDataSet take()
                         throws java.lang.InterruptedException
Description copied from class: ProcessingBuffer
Liefert einen Ausgangsdatensatz zurück. Setzt dabei das 'zeitdauer' Attribut. Liegt keiner vor, kehrt die Methode sofort mit dem Rückgabewert null zurück.

Overrides:
take in class ProcessingBuffer
Returns:
Ausgangsdatensatz und Status oder null, falls keiner vorliegt.
Throws:
java.lang.InterruptedException - Warten auf Ausgangsdatensatz wurde unterbrochen.
See Also:
ProcessingBuffer.take()

isListAggregation

protected boolean isListAggregation()
Description copied from class: ProcessingBuffer
Werden nicht aggregierte Daten versendet?

Specified by:
isListAggregation in class ProcessingBuffer
Returns:
true falls Liste eine der ausgewählten die Aggregationsanwendungen ist.
See Also:
ProcessingBuffer.isListAggregation()

getLinkedAttributes

public int[] getLinkedAttributes()
Zeigt an welche Attribute durch die Aggregation spalte zusammengefasst werden. Zusammengehörige Spalten werden duch die gleichen Nummern gekennzeichnet.

Returns:
Zusammen gehörende Spalten. null falls die spalten Aggregation nicht verwendet wird.

storeAggregatedData

public void storeAggregatedData(byte status)
Description copied from class: ProcessingBuffer
Falls Aggregationsdatensätze vorhanden sind, werden sie in den Ausgangspuffer gelegt.

Specified by:
storeAggregatedData in class ProcessingBuffer
Parameters:
status - Status, den der Aggregationsdatensatz erhalten soll.
See Also:
ProcessingBuffer.storeAggregatedData(byte)

processNewData

protected void processNewData(java.util.ArrayList<ValueProvider> winners,
                              ValueProvider[] elements)
                       throws FailureException,
                              java.lang.InterruptedException
Description copied from class: ProcessingBuffer
Bereitet aus den Ergebnisdatensätzen die Ergebnisdaten auf.

Specified by:
processNewData in class ProcessingBuffer
Parameters:
winners - Liste der Datensätze mit minimalem, nicht aufgefüllten Datenzeitstempel.
elements - Liste aller Ergebnislieferanten.
Throws:
FailureException - Fehler bei der Aufbereitung.
java.lang.InterruptedException - Aufbereitung wurde unterbrochen.
See Also:
ProcessingBuffer.processNewData(ArrayList, ValueProvider[])


Copyright © 2005-2008 beck et al. projects GmbH All Rights Reserved.