JBoss.orgCommunity Documentation
This mode does not impose any kind of additional requirements on facts. So for instance:
KieBaseConfiguration config = KieServices.Factory.get().newKieBaseConfiguration();
config.setOption( EventProcessingOption.CLOUD );
drools.eventProcessingMode = cloud
The main requirements to use STREAM mode are:
KieBaseConfiguration config = KieServices.Factory.get().newKieBaseConfiguration();
config.setOption( EventProcessingOption.STREAM );
drools.eventProcessingMode = stream
All these features are explained in the following sections.
The above rule has no temporal constraints that would require delaying the rule, and so, the rule activates immediately. The following rule on the other hand, must wait for 10 seconds before activating, since it may take up to 10 seconds for the sprinklers to activate:
Example 9.2. a rule that automatically delays activation due to temporal constraints
rule "Sound the alarm"
when
$f : FireDetected( )
not( SprinklerActivated( this after[0s,10s] $f ) )
then
// sound the alarm
end
This behaviour allows the engine to keep consistency when dealing with negative patterns and temporal constraints at the same time. The above would be the same as writing the rule as below, but does not burden the user to calculate and explicitly write the appropriate duration parameter:
Example 9.3. same rule with explicit duration parameter
rule "Sound the alarm"
duration( 10s )
when
$f : FireDetected( )
not( SprinklerActivated( this after[0s,10s] $f ) )
then
// sound the alarm
end
The following rule expects every 10 seconds at least one “Heartbeat” event, if not the rule fires. The special case in this rule is that we use the same type of the object in the first pattern and in the negative pattern. The negative pattern has the temporal constraint to wait between 0 to 10 seconds before firing and it excludes the Heartbeat bound to $h. Excluding the bound Heartbeat is important since the temporal constraint [0s, ...] does not exclude by itself the bound event $h from being matched again, thus preventing the rule to fire.
Example 9.4. excluding bound events in negative patterns
rule "Sound the alarm"
when
$h: Heartbeat( ) from entry-point "MonitoringStream"
not( Heartbeat( this != $h, this after[0s,10s] $h ) from entry-point "MonitoringStream" )
then
// Sound the alarm
end
To configure the pseudo session clock, do:
KieSessionConfiguration config = KieServices.Factory.get().newKieSessionConfiguration();
config.setOption( ClockTypeOption.get("pseudo") );
As an example of how to control the pseudo session clock:
KieSessionConfiguration config = KieServices.Factory.get().newKieSessionConfiguration();
conf.setOption( ClockTypeOption.get( "pseudo" ) );
KieSession session = kbase.newKieSession( conf, null );
SessionPseudoClock clock = session.getSessionClock();
// then, while inserting facts, advance the clock as necessary:
FactHandle handle1 = session.insert( tick1 );
clock.advanceTime( 10, TimeUnit.SECONDS );
FactHandle handle2 = session.insert( tick2 );
clock.advanceTime( 30, TimeUnit.SECONDS );
FactHandle handle3 = session.insert( tick3 );
The next sections will detail each of them.
StockTick() over window:time( 2m )
Drools uses the "over" keyword to associate windows to patterns.
The engine will automatically disregard any SensorReading older than 10 minutes and keep the calculated average consistent.
Please note that time based windows are considered when calculating the interval an event remains in the working memory before being expired, but an event falling off a sliding window does not mean by itself that the event will be discarded from the working memory, as there might be other rules that depend on that event. The engine will discard events only when no other rules depend on that event and the expiration policy for that event type is fulfilled.
StockTick( company == "RHT" ) over window:length( 10 )
The engine will keep only consider the last 100 readings to calculate the average temperature.
Please note that falling off a length based window is not criteria for event expiration in the session. The engine disregards events that fall off a window when calculating that window, but does not remove the event from the session based on that condition alone as there might be other rules that depend on that event.
Please note that length based windows do not define temporal constraints for event expiration from the session, and the engine will not consider them. If events have no other rules defining temporal constraints and no explicit expiration policy, the engine will keep them in the session indefinitely.
When using a sliding window, alpha constraints are evaluated before the window is considered, but beta (join) constraints are evaluated afterwards. This usually doesn't make a difference when time windows are concerned, but it's important when using a length window. For example this pattern:
StockTick( company == "RHT" ) over window:length( 10 )
defines a window of (at most) 10 StockTicks all having company equal to "RHT", while the following one:
$s : String()
StockTick( company == $s ) over window:length( 10 )
first creates a window of (at most) 10 StockTicks regardless of the value of their company attribute and then filters among them only the ones having the company equal to the String selected from the working memory.
The simplified EBNF to declare a window is:
windowDeclaration := DECLARE WINDOW ID annotation* lhsPatternBind END
declare window Ticks
StockTick( source == "NYSE" )
over window:length( 10 )
from entry-point STStream
end
Rules can then use this declared window by using it as a source for a FROM as in:
rule "RHT ticks in the window"
when
accumulate( StockTick( company == "RHT" ) from window Ticks,
$cnt : count(1) )
then
// there has been $cnt RHT ticks over the last 10 ticks
end
In the previous example, the engine compiler will identify that the pattern is tied to the entry point "ATM Stream" and will both create all the necessary structures for the rulebase to support the "ATM Stream" and will only match WithdrawRequests coming from the "ATM Stream". In the previous example, the rule is also joining the event from the stream with a fact from the main working memory (CheckingAccount).
Now, lets imagine a second rule that states that a fee of $2 must be applied to any account for which a withdraw request is placed at a bank branch:
Example 9.8. Using a different Stream
rule "apply fee on withdraws on branches"
when
WithdrawRequest( $ai : accountId, processed == true ) from entry-point "Branch Stream"
CheckingAccount( accountId == $ai )
then
// apply a $2 fee on the account
end
The previous rule will match events of the exact same type as the first rule (WithdrawRequest), but from two different streams, so an event inserted into "ATM Stream" will never be evaluated against the pattern on the second rule, because the rule states that it is only interested in patterns coming from the "Branch Stream".
So, entry points, besides being a proper abstraction for streams, are also a way to scope facts in the working memory, and a valuable tool for reducing cross products explosions. But that is a subject for another time.
Inserting events into an entry point is equally simple. Instead of inserting events directly into the working memory, insert them into the entry point as shown in the example below:
Example 9.9. Inserting facts into an entry point
// create your rulebase and your session as usual
KieSession session = ...
// get a reference to the entry point
EntryPoint atmStream = session.getEntryPoint( "ATM Stream" );
// and start inserting your facts into the entry point
atmStream.insert( aWithdrawRequest );
The previous example shows how to manually insert facts into a given entry point. Although, usually, the application will use one of the many adapters to plug a stream end point, like a JMS queue, directly into the engine entry point, without coding the inserts manually. The Drools pipeline API has several adapters and helpers to do that as well as examples on how to do it.
There are basically 2 ways for the engine to calculate the matching window for a given event:
$eventA : EventA( this after[ 3m30s, 4m ] $eventB )
3m30s <= $eventA.startTimestamp - $eventB.endTimeStamp <= 4m
The temporal distance interval for the after operator is optional:
It is possible to define negative distances for this operator. Example:
$eventA : EventA( this after[ -3m30s, -2m ] $eventB )
$eventA : EventA( this before[ 3m30s, 4m ] $eventB )
3m30s <= $eventB.startTimestamp - $eventA.endTimeStamp <= 4m
The temporal distance interval for the before operator is optional:
It is possible to define negative distances for this operator. Example:
$eventA : EventA( this before[ -3m30s, -2m ] $eventB )
$eventA : EventA( this coincides $eventB )
$eventA : EventA( this coincides[15s, 10s] $eventB )
Above pattern will match if and only if:
abs( $eventA.startTimestamp - $eventB.startTimestamp ) <= 15s &&
abs( $eventA.endTimestamp - $eventB.endTimestamp ) <= 10s
$eventA : EventA( this during $eventB )
$eventB.startTimestamp < $eventA.startTimestamp <= $eventA.endTimestamp < $eventB.endTimestamp
The during operator accepts 1, 2 or 4 optional parameters as follow:
$eventA : EventA( this finishes $eventB )
$eventB.startTimestamp < $eventA.startTimestamp &&
$eventA.endTimestamp == $eventB.endTimestamp
$eventA : EventA( this finishes[ 5s ] $eventB )
$eventB.startTimestamp < $eventA.startTimestamp &&
abs( $eventA.endTimestamp - $eventB.endTimestamp ) <= 5s
$eventA : EventA( this finishedby $eventB )
$eventA.startTimestamp < $eventB.startTimestamp &&
$eventA.endTimestamp == $eventB.endTimestamp
$eventA : EventA( this finishedby[ 5s ] $eventB )
$eventA.startTimestamp < $eventB.startTimestamp &&
abs( $eventA.endTimestamp - $eventB.endTimestamp ) <= 5s
$eventA : EventA( this includes $eventB )
$eventA.startTimestamp < $eventB.startTimestamp <= $eventB.endTimestamp < $eventA.endTimestamp
The includes operator accepts 1, 2 or 4 optional parameters as follow:
$eventA : EventA( this meets $eventB )
The previous pattern will match if and only if the $eventA finishes at the same time $eventB starts.
abs( $eventB.startTimestamp - $eventA.endTimestamp ) == 0
$eventA : EventA( this meets[ 5s ] $eventB )
abs( $eventB.startTimestamp - $eventA.endTimestamp) <= 5s
$eventA : EventA( this metby $eventB )
The previous pattern will match if and only if the $eventA starts at the same time $eventB finishes.
abs( $eventA.startTimestamp - $eventB.endTimestamp ) == 0
$eventA : EventA( this metby[ 5s ] $eventB )
abs( $eventA.startTimestamp - $eventB.endTimestamp) <= 5s
$eventA : EventA( this overlaps $eventB )
The previous pattern will match if and only if:
$eventA.startTimestamp < $eventB.startTimestamp < $eventA.endTimestamp < $eventB.endTimestamp
The overlaps operator accepts 1 or 2 optional parameters as follow:
$eventA : EventA( this overlappedby $eventB )
The previous pattern will match if and only if:
$eventB.startTimestamp < $eventA.startTimestamp < $eventB.endTimestamp < $eventA.endTimestamp
The overlappedby operator accepts 1 or 2 optional parameters as follow:
$eventA : EventA( this starts $eventB )
$eventA.startTimestamp == $eventB.startTimestamp &&
$eventA.endTimestamp < $eventB.endTimestamp
$eventA : EventA( this starts[ 5s ] $eventB )
abs( $eventA.startTimestamp - $eventB.startTimestamp ) <= 5s &&
$eventA.endTimestamp < $eventB.endTimestamp