Monday, August 27, 2012

Distributed Processing Sample for WSO2 CEP


Today we released WSO2 Complex Event Processor 2.0.0 Milestone 2.
This is available at  https://svn.wso2.org/repos/wso2/people/suho/packs/cep/wso2cep-2.0.0-M2.zip

One of the key feature of this CEP is its support for distributed processing via Siddhi CEP Engine. To demonstrate its capability I came up with a sample on distributed Processing. This sample uses Siddhi CEP Engine for processing and JMS ActiveMQ Broker to publish and subscribe events.

To run the Distributed Processing Sample follow the steps give below;
 
Step 01: Configure and run ActiveMQ in your local machine
Download the ActiveMQ from "http://activemq.apache.org/activemq-543-release.html".
unzip the distribution and run the ActiveMQ server using the command “./activemq console” from apache-activemq-xxx/bin (in Linux)
Note: WSO2CEP has been tested with ActiveMQ 5.4.3

For each CEP node in the cluster follow the steps from 02 to 08.

Step 02: Deploy CEP server 
Unzip the CEP server (Do not start the server).
      
Step 03: Change the CEP server Offset.
If you are running multiple servers in the same machine change the offset
from file "wso2cep-2.0.0-2/repository/conf/carbon.xml" to different numbers. 
E.g.
<offset>1</offset>

This is to overcome server port conflicts.
E.g. If three WSO2 servers are going to be deployed in the same machine, they can have offsets as 1, 2, and 3.

Step 04: Copy paste ActiveMQ jars.
Copy paste activemq-all-xxx.jar from the ActiveMQ home directory to 
wso2cep-2.0.0/samples/lib directory.
Copy paste activemq-core-xxx.jar and geronimo-j2ee-management_1.1_spec-1.0.1.jar
from apache-activemq-xxx/lib to wso2cep-2.0.0/repository/components/lib directory
 
Step 06: Deploy required broker management configuration
This allows CEP to receive and send messages via JMS Broker.
To deploy run "ant deploy-broker-manager" from wso2cep-2.0.0/samples/cep-samples directory.

Step 07: Configure input, output and queries
For this sample, the configurations of input, output and queries are at wso2cep-2.0.0-1/samples/cep-samples/conf/buckets/purchase-analyser-bucket.xml
When running multiple server nodes each server need to receive the input events through different topics. To enable this behaviour, open the above file and change the input topics to have different names. The default input topic name is “PurchaseTopic”.

E.g changing the names to “PurchaseTopic1”.
<input brokername="activemqJmsBroker" topic="PurchaseTopic1" />

Note : To enable distributed processing (already enabled for this sample) the change we have to do is to make the "siddhi.enable.distributed.processing" property to "true".

<engineproviderconfiguration engineprovider="SiddhiCEPRuntime">
     <property name="siddhi.persistence.snapshot.time.interval.minutes">0</property>
     <property name="siddhi.enable.distributed.processing">true</property>
</engineproviderconfiguration>

Step 08: Deploy bucket configuration
Deploy the bucket by adding the configuration purchase-analyser-bucket.xml to the directory wso2cep-2.0.0/repository/deployment/server/cepbuckets/

Step 09: Start all the servers
Start CEP servers by running "./wso2server.sh" from wso2cep-2.0.0/bin directory 

Step 10:Subscribing to output topic
Start the subscriber of the output topic "PurchaseResults" by running "ant purchaseResultsSubscriber" in a separate terminal,
from wso2cep-2.0.0/samples/cep-samples directory

Step 11:Publishing events
To publish events to all the servers (to their input topics as we defined in Step 07 ) run the publisher from wso2cep-2.0.0/samples/cep-samples directory
with the command  "ant purchasePublisher -Dtopics=xxx,xxx,..."
The client will publish events to all the given topics in a round robin manner.

E.g If we we have configured the buckets to receive the inputs via the input topics as PurchaseTopic1, PurchaseTopic2 and PurchaseTopic3
"ant purchasePublisher -Dtopics=PurchaseTopic1,PurchaseTopic2,PurchaseTopic3."


Step 12: Observation
You will be able to observe how the counts in the results steadily increases when messages are sent to any of the servers, and how the results remain consistent even when some servers goes down and comes back.