|
||||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |
java.lang.Objectjava.lang.Thread
de.bsvrz.pua.prot.processing.ProcessingBuffer
de.bsvrz.pua.prot.processing.archivebuffer.ArchiveBuffer
public class ArchiveBuffer
Puffer für die Daten vom Archivsystem, die von der Datenaufbereitung angefordert werden. Der folgende Algorithmus
wird für jeden Zeitbereich wiederholt:
Für jedes reale Element, d.h. für jedes reale Attribut und für jede Attributgruppe wird eine Archivanfrage über den
Zeitbereich und Datenart gestellt. Der jeweils erste pro Element erhaltene Datensatz wird markiert. Anschließend
werden die Datensätze verschränkt. Dabei wird ein "Gewinner" festgestellt. Der Gewinner ist das Element mit dem
kleinsten Datenzeitstempel. Danach werden die Werte der übrigen Attribute aufgefüllt und als Ausgangsdatensatz in
einem Puffer abgelegt. Im nächsten Schritt wird ein weiterer Datensatz des Gewinners abgerufen, markiert mit den
Datensätzen der übrigen Elementen verschränkt und dabei ein neuer "Gewinner" bestimmt(*). Nun wird die
Datenaufbereitung benachrichtigt, dass ein Ausgangsdatensatz vorliegt. Grund für diese Verzögerung ist, dass die
Bearbeitung eines Ausgangsdatensatzes erst vollständig abgeschlossen ist, wenn der nächste Ausgangsdatensatz erzeugt
wurde. Der in (*) gewonnene Ausgangsdatensatz wird im Puffer abgelegt, und der Algorithmus wiederholt sich bis alle
Daten vom Archivsystem abgerufen wurden. Ausgangsdatensatz wird ebenfalls in einem Puffer abgelegt. Wird in einem
eigenen Thread gestartet, da ggf. auf die Antwort des Archivsystems gewartet werden muss. Da Archivdaten
streambasiert abgefragt werden können, stellt der ArchivBuffer sicher, dass sich im Puffer für die Ausgangsdatensätze
nur eine bestimmte Anzahl von Einträgen ansammelt. Wird dieses Limit (MAX_THRESHOLD
) erreicht, so stellt
der Archivbuffer die arbeit ein, bis sich der Puffer wieder fast vollständig (MIN_THRESHOLD
) geleert hat.
Nested Class Summary |
---|
Nested classes/interfaces inherited from class java.lang.Thread |
---|
java.lang.Thread.State, java.lang.Thread.UncaughtExceptionHandler |
Field Summary | |
---|---|
static int |
MAX_THRESHOLD
Anzahl Ausgangsdatensätze, die im Ausgangspuffer liegen. |
static int |
MIN_THRESHOLD
Anzahl Ausgangsdatensätze, die mindestens im Ausgangspuffer liegen sollten. |
static ArchiveQueryPriority |
PRIORITY
Anfragepriorität an das Archivsystem. |
Fields inherited from class de.bsvrz.pua.prot.processing.ProcessingBuffer |
---|
buffer, bufferResult, dav, debug, done, imdsBuilder, INITIAL_RINGBUFFER_SIZE, periods, pi, realElements, tempElements |
Fields inherited from class java.lang.Thread |
---|
MAX_PRIORITY, MIN_PRIORITY, NORM_PRIORITY |
Constructor Summary | |
---|---|
ArchiveBuffer(ClientDavInterface dav,
ProcessingInterface processor,
ConfigurationObject configAuth,
ProcessingInformation pi,
java.util.List<Tuple<java.lang.Long,java.lang.Long>> periods)
Startet den Online-ProcessingBuffer. |
Method Summary | |
---|---|
boolean |
applyAggregations(BaseDataSet[] baseData)
Führt die Aggregierungen durch. |
int[] |
getLinkedAttributes()
Zeigt an welche Attribute durch die Aggregation spalte zusammengefasst werden. |
boolean |
hasData()
Zeigt an ob Daten abgeholt werden können. |
void |
init()
Sendet erste Anfragen an das Archivsystem. |
boolean |
isDone()
Zeigt ob der Buffer noch weitere Daten liefern wird. |
protected boolean |
isDoneCollecting()
Überprüft ob die Datensammlung abgeschlossen ist. |
protected boolean |
isListAggregation()
Werden nicht aggregierte Daten versendet? |
protected void |
processNewData(java.util.ArrayList<ValueProvider> winners,
ValueProvider[] elements)
Bereitet aus den Ergebnisdatensätzen die Ergebnisdaten auf. |
protected void |
requestData()
Stellt Archivanfragen für alle realen Elemente mit den momentan engetragenen Werten von archiveUser |
void |
storeAggregatedData(byte status)
Falls Aggregationsdatensätze vorhanden sind, werden sie in den Ausgangspuffer gelegt. |
IntermediateDataSet |
take()
Liefert einen Ausgangsdatensatz zurück. |
Methods inherited from class de.bsvrz.pua.prot.processing.ProcessingBuffer |
---|
abort, applyPostFilter, getInsertEmpty, getResult, getTimeStampOrigin, getWinners, isAbort, isAggregate, isDeltaProtocol, nextInterval, notifyProcessor, run, setAggregate, setDone, setTempAttributes, size, storeDataSet |
Methods inherited from class java.lang.Thread |
---|
activeCount, checkAccess, countStackFrames, currentThread, destroy, dumpStack, enumerate, getAllStackTraces, getContextClassLoader, getDefaultUncaughtExceptionHandler, getId, getName, getPriority, getStackTrace, getState, getThreadGroup, getUncaughtExceptionHandler, holdsLock, interrupt, interrupted, isAlive, isDaemon, isInterrupted, join, join, join, resume, setContextClassLoader, setDaemon, setDefaultUncaughtExceptionHandler, setName, setPriority, setUncaughtExceptionHandler, sleep, sleep, start, stop, stop, suspend, toString, yield |
Methods inherited from class java.lang.Object |
---|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait |
Field Detail |
---|
public static final ArchiveQueryPriority PRIORITY
ArchiveRequestManager#request(de.bsvrz.dav.daf.main.archive.ArchiveQueryPriority, de.bsvrz.dav.daf.main.archive.ArchiveDataSpecification)
public static int MAX_THRESHOLD
public static int MIN_THRESHOLD
Constructor Detail |
---|
public ArchiveBuffer(ClientDavInterface dav, ProcessingInterface processor, ConfigurationObject configAuth, ProcessingInformation pi, java.util.List<Tuple<java.lang.Long,java.lang.Long>> periods) throws FailureException
init()
sollte unverzüglich nach Erzeugen des Objekts aufgerufen werden.
dav
- Verbindung zum Datenverteilerprocessor
- Objekt das die Datenaufbereitung durchführt. Wird jedes Mal benachrichtigt, wenn ein
Ausgangsdatensatz vorliegt.configAuth
- Konfigurationsverantwortlicher, dessen Archivsystem verwendet wird.pi
- Informationen zur Datenaufbereitung. Die Zeitbereiche müssen bereits sortiert und zusammengefasst sein!periods
- Zeitbereiche, in denen der Archivbuffer Daten sammeln soll. Inhalt wird geändert!
FailureException
- Fehler bei der Kommunikation mit der KonfigurationMethod Detail |
---|
public void init() throws FailureException, java.lang.InterruptedException
ProcessingBuffer.abort()
aufgerufen.
init
in class ProcessingBuffer
FailureException
- Fehler bei der Archivanfrage.
java.lang.InterruptedException
- Warten auf Archivantwort wurde unterbrochen.ProcessingBuffer.init()
public boolean applyAggregations(BaseDataSet[] baseData)
ProcessingBuffer
ProcessingBuffer.isAggregate()
true
liefert.
applyAggregations
in class ProcessingBuffer
baseData
- Werte des Ausgangsdatensatz. Einträge können von der Methode geändert werden.
true
: Der Ausgangsdatensatz soll nachgefiltert und ausgegeben werden.ProcessingBuffer.applyAggregations(BaseDataSet[])
protected void requestData() throws FailureException, java.lang.InterruptedException
archiveUser
FailureException
- Fehler bei der Archivanfrage.
java.lang.InterruptedException
- Warten auf Antwortdatensatz wurde unterbrochen.protected boolean isDoneCollecting() throws FailureException, java.lang.InterruptedException
isDoneCollecting
in class ProcessingBuffer
True
falls die Datensammlung abgeschlossen ist. Überprüft zudem, wieviele Elemente sich in
der Warteschlange befinden. Ist die Warteschlange bereits über MAX_THRESHOLD
gefüllt, so wird
der ArchivBuffer angehalten, bis die Queue wieder fast vollständig (MIN_THRESHOLD
) geelert
ist.
FailureException
- Fehler bei der Archivanfrage. Es wird eine Archivanfrage gestellt, wenn Daten für ein
weiteres Intervall angefragt werden.
java.lang.InterruptedException
- Warten auf Antwortdatensatz wurde unterbrochen.ProcessingBuffer.isDoneCollecting()
public boolean isDone()
ProcessingBuffer
isDone
in class ProcessingBuffer
true
falls noch weitere Daten zu erwarten sind.ProcessingBuffer.isDone()
public boolean hasData()
hasData
in class ProcessingBuffer
true
falls Daten mittels take()
abgeholt werden können.public IntermediateDataSet take() throws java.lang.InterruptedException
ProcessingBuffer
take
in class ProcessingBuffer
java.lang.InterruptedException
- Warten auf Ausgangsdatensatz wurde unterbrochen.ProcessingBuffer.take()
protected boolean isListAggregation()
ProcessingBuffer
isListAggregation
in class ProcessingBuffer
true
falls Liste
eine der ausgewählten die Aggregationsanwendungen ist.ProcessingBuffer.isListAggregation()
public int[] getLinkedAttributes()
spalte
zusammengefasst werden. Zusammengehörige
Spalten werden duch die gleichen Nummern gekennzeichnet.
null
falls die spalten
Aggregation nicht
verwendet wird.public void storeAggregatedData(byte status)
ProcessingBuffer
storeAggregatedData
in class ProcessingBuffer
status
- Status, den der Aggregationsdatensatz erhalten soll.ProcessingBuffer.storeAggregatedData(byte)
protected void processNewData(java.util.ArrayList<ValueProvider> winners, ValueProvider[] elements) throws FailureException, java.lang.InterruptedException
ProcessingBuffer
processNewData
in class ProcessingBuffer
winners
- Liste der Datensätze mit minimalem, nicht aufgefüllten Datenzeitstempel.elements
- Liste aller Ergebnislieferanten.
FailureException
- Fehler bei der Aufbereitung.
java.lang.InterruptedException
- Aufbereitung wurde unterbrochen.ProcessingBuffer.processNewData(ArrayList, ValueProvider[])
|
||||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |