Wednesday, April 10, 2013

Integrating WSO2 BAM & WSO2 CEP

This post demonstrate how we can integrate both WSO2 BAM and WSO2 CEP and run them as a single instance.

To integrate both WSO2 BAM and WSO2 CEP we need to download BAM and install the CEP features form the WSO2 p2repo. Here I have used WSO2 BAM 2.2.0 and 4.0.7 p2repo. Follow the wiki link to get to know how to install new features to existing products.

Testing the installation.
To test the instillation add broker-manager-config.xml to repository/conf/, and add axis2_client.xml to repository/conf/axis2/.
Here axis2_client.xml has the email sender information, and I'm using a demo email account for this.

Then create a directory called 'cepbuckets' in repository/deployment/server/ and add bam-cep-kpi-analyzer.xml there.
In bam-cep-kpi-analyzer.xml, you will be able to find a line
<cep:output brokerName="emailBroker" topic="wso2cep.demo@gmail.com/Phone Purchase Notification">
Change "wso2cep.demo@gmail.com" to the endpoint use email ID, who needs to receive the notification.

Now if you run the KIP-analyser sample of BAM all the events that have totalPrice>350000 and quantity>3 will be notified to the user.

Wednesday, March 13, 2013

Monitoring ActiveMQ Queues via JMX MBeans with WSO2 Complex Event Processor (WSO2 CEP)

WSO2 CEP and WSO2 Business Activity Monitor have support for JMX MBeans monitoring. You can find the docs for JMX MBeans monitoring for WSO2 BAM from here.

I'm writing this blog to illustrate how WSO2 CEP can be used to monitor the JMX MBeans, and I'm using ActiveMQ JMS broker as an example to demonstrate its capabilities.
Here I have used apache-activemq-5.4.3

First we need to enable ActiveMQ JMS broker to support JMX remotely.

To do so, you have to enable JMX in the broker, in the <activemq_home>/conf/activemq.xml:
<broker xmlns="http://activemq.org/config/1.0" brokerName="localhost" useJmx="true">
 and 
<managementContext>
    <managementContext connectorPort="1099" />
</managementContext> 
Now you have to enable JMX in the JVM parameters in <activemq_home>/bin/activemq: 
Locate the line 
ACTIVEMQ_OPTS="$ACTIVEMQ_OPTS $SUNJMX $SSL_OPTS" 
and change it to: 
ACTIVEMQ_OPTS="$ACTIVEMQ_OPTS $SUNJMX $SSL_OPTS -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false" 

To test whether its correctly working run "jconsole"

Now we need to Enable to CEP to receive JMX Events

To do so first we need to install JMX Agent to CEP 

Add 4.0.7 P2 repository form the URL  http://dist.wso2.org/p2/carbon/releases/4.0.7

Next install the "BAM JMX Agent Aggregate" 


Restart CEP.

Now we need to configure JMX.

To do so, go to Configure -> JMX Agent 
Click on "Add Profile"  and configure he profile as below


Here use the Data Receiver user name & password as "admin" & "admin" 
and JMX Server user name & password as "admin" & "activemq" 

Then  click on the "Load Means" button to load the available MBeans, 
Note : ActiveMQ need to be up an running in-order to load its MBeans

Next when you click on the appropriate MBean a dropdown will appear showing its attributes. 

You have to select your relevant attributes one by one from the drop down. 

You also need to change the Alias to a much readable strings, 
For this Example I have used:
BrokerName, TotalEnqueueCount, TotalDequeueCount, TotalMessageCount, MemoryPercentUsage

Now click "Save" to save & deploy the JMX Agent.

When we add the above configuration JMX agent will automatically create the following WSO2 Event Stream Definition

{
    "streamId": "org.wso2.bam.jmx.agent.activemq:1.0.0",
    "name": "org.wso2.bam.jmx.agent.activemq",
    "version": "1.0.0",
    "nickName": "JMX Dump",
    "description": "JMX monitoring data",
    "metaData": [{
        "name": "clientType",
        "type": "STRING"
    }, {
        "name": "host",
        "type": "STRING"
    }],
    "payloadData": [{
        "name": "BrokerName",
        "type": "STRING"
    }, {
        "name": "TotalEnqueueCount",
        "type": "LONG"
    }, {
        "name": "TotalDequeueCount",
        "type": "LONG"
    }, {
        "name": "TotalMessageCount",
        "type": "LONG"
    }, {
        "name": "MemoryPercentUsage",
        "type": "INT"
    }]
}

Next step is creating a CEP Bucket to use JMX event. 
To do this:
  • Shutdown CEP 
  • Run "ant deploy-broker-manager" from  <cep_home>/samples/cep-samples
  • Copy paste activemq-all-xxx.jar from the <activemq_home> directory to <cep_home>/samples/lib directory. 
  • Copy paste activemq-core-xxx.jar and geronimo-j2ee-management_1.1_spec-1.0.1.jar from <activemq_home>/lib to <cep_home>/repository/components/lib directory.
  • Copy the queue-monitor.xml to <cep_home>/repository/deployment/server/cepbuckets
  • In a new terminal run "ant jmsSubscriber -Dtopic=QueueInfo" from <cep_home>/samples/cep-samples to receive the outputs of the queue-monitor bucket.
  • Start CEP 
Now try running the Queue example which is in the <amq_home>/example.
In a new terminal run "ant consumer" to run the sample consumer 
In a new terminal run "ant producer" to send some sample messages to the ActiveMQ Queue. 

you will be able to view some appropriate outputs in CEP QueueInfo jmsSubscriber

Monday, February 25, 2013

No connector available to access repository - FIXED! - Maven Wagon

When you try to deploy artifacts to Sonatype using maven 3, If you encounter the following error, you probably have not configured Maven Wagon properly  

[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 24.444s
[INFO] Finished at: Tue Feb 26 06:37:48 IST 2013
[INFO] Final Memory: 10M/981M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-deploy-plugin:2.7
:deploy (default-deploy) on project test-core: Failed to deploy artifacts/metad
ata: No connector available to access repository xxx-maven2-repository (scp://x
xxx/maven2/) of type default using the available factories WagonRepositoryConne
ctorFactory -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e swi
tch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please re
ad the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecution
Exception

To fix this issue 
  • Add wagon extensions to your pom.
    <build>
        <extensions>
            <extension>
                <groupId>org.apache.maven.wagon</groupId>
                <artifactId>wagon-ssh</artifactId>
                <version>2.4</version>
            </extension>
        </extensions>
    </build>
  • Add distribution management segment to your pom.
    <distributionManagement>
        <repository>
            <id>xxx-maven2-repository</id>
            <name>Xxx Maven2 Repository</name>
            <url>scp://xxxx/maven2/</url>
        </repository>
        <snapshotRepository>
            <id>xxx-maven2-snapshot-repository</id>
            <name>Xxx Maven2 Snapshot Repository</name>
            <url>scp://xxxx/snapshots/maven2/</url>
        </snapshotRepository>
    </distributionManagement>
Not it works !
For more info on using extensions refer: http://maven.apache.org/guides/mini/guide-using-extensions.html

Monday, February 18, 2013

Using Apache Qpid with WSO2 CEP

Though WSO2 CEP 2.x.x can work with Qpid, it does not have samples that demonstrates this, where  CEP only have samples for ActiveMQ.

Here I'm providing steps to configure, and run some samples using Apache Qpid.

First step is downloading and running Apache Qpid.
You can download Qpid from the project web site. Note, I have used Qpid java 0.18 for these samples, you can download that from here.

Next, unzip and start the broker.

Now to configure WSO2 CEP with Qpid JMS broker, follow the steps provided at Configuring JMS-Qpid Broker.

Test publishing and subscribing to JMS Map messages


Edit the jms-twitter-stockquote-analyser.xml provided at /samples/cep-samples/conf/buckets directory by changing the input and output broker names as, 
<bucket name="TwitterAndStockQuoteAnalyzer" xmlns="http://wso2.org/carbon/cep">
     ...
     <input topic="AllStockQuotes" brokerName="qpidJmsBroker">
     ...
     <input topic="TwitterFeed" brokerName="qpidJmsBroker">
     ...
     ...
     <output topic="PredictedStockQuotes" brokerName="qpidJmsBroker">
     ...
</bucket>

Here we are using the brokerName as "qpidJmsBroker", because we have define it so, when Configuring JMS-Qpid Broker.

Next copy past this file to /repository/deployment/server/cepbuckets directory. 

To test the broker download the client code from here, and unzip cep-210-jms-client-qpid.zip 

To subscribe to the PredictedStockQuotes topic, in the terminal go to cep-210-jms-client-qpid directory and run:
ant mapPredictedStockQuotesSubscriber
To publish to the AllStockQuotes topic, in another terminal from the same directory, run:
ant mapAllStockQuotesPublisher
To publish to the AllStockQuotes topic, in another terminal from the same directory, run:
ant mapTwitterFeedPublisher
Then you will be able to observe the outputs. Here the output will be similar to the output of Twitter and StockQuote Analyzer sample.

Test publishing and subscribing to JMS XML messages


Edit the xml-stockquote-anlyzer.xml provided at /samples/cep-samples/conf/buckets directory by changing the input and output broker names as, 

<cep:bucket name="XMLStockQuoteAnalyzer" xmlns="http://wso2.org/carbon/cep">
     ...
     <cep:input topic="AllStockQuotes" brokerName="qpidJmsBroker">
     ...
     ...
     <cep:output topic="FastMovingStockQuotes" brokerName="qpidJmsBroker">
     ...
</cep:bucket>

Here we are using the brokerName as "qpidJmsBroker", because we have define it so, when Configuring JMS-Qpid Broker.

Next copy past this file to /repository/deployment/server/cepbuckets directory, and make sure to delete other bucket configurations in order to avoid conflicts.

If you don't have the client code (used for the previous sample), download the client code from here, and unzip cep-210-jms-client-qpid.zip, 

To subscribe to the FastMovingStockQuotes topic, in the terminal go to cep-210-jms-client-qpid directory and run:
ant xmlFastMovingStockQuotesSubscriber
To publish to the AllStockQuotes topic, in another terminal from the same directory, run:
ant xmlAllStockQuotesPublisher
You will be able to observe the outputs. Here the output will be similar to the output of Stock Quote Analyzer sample.

Saturday, February 16, 2013

Writing Custom Broker for WSO2 CEP

In this post I'll explain how to create a simple Custom Broker in WSO2 CEP 2.1.x.

You can find a sample template Custom Broker project from here.

The first step is to create a Java project (here I have used Apache Maven),
then create an appropriate Broker Type by extending "org.wso2.carbon.broker.core.BrokerType" and an appropriate Broker Type Factory by extending the  "org.wso2.carbon.broker.core.BrokerTypeFactory" from the jar org.wso2.carbon.broker.core-4.0.7.jar

org.wso2.carbon.broker.core-4.0.7.jar will be available in the WSO2 public Maven Repository, which you can add to your project pom.xml as:

<repositories>
    <repository>
        <id>wso2-maven2-repository</id>
        <name>WSO2 Maven2 Repository</name>
        <url>http://dist.wso2.org/maven2</url>
    </repository>
</repositories>

else you can also find that at CEP_HOME/repository/components/plugins/ as org.wso2.carbon.broker.core_4.0.7.jar

When extending the BrokerType, the BrokerTypeImpl has to implement the following methods.

package org.wso2.carbon.broker.core;

import org.apache.axis2.engine.AxisConfiguration;
import org.wso2.carbon.broker.core.exception.BrokerEventProcessingException;

/**
 * This is a broker type. these interface let users to publish subscribe messages according to
 * some type. this type can either be local, jms or ws
 */
public interface BrokerType {

    /**
     * object which describes this type. it contains the name and available properties.
     *
     * @return - type dto
     */
    BrokerTypeDto getBrokerTypeDto();

    /**
     * subscribe to the connection specified in the broker configuration.
     *
     * @param topicName           - topic name to subscribe
     * @param brokerListener      - broker type will invoke this when it receive events
     * @param brokerConfiguration - broker configuration details
     * @throws BrokerEventProcessingException - if can not subscribe to the broker
     */
    String subscribe(String topicName,
                     BrokerListener brokerListener,
                     BrokerConfiguration brokerConfiguration,
                     AxisConfiguration axisConfiguration) throws BrokerEventProcessingException;

    /**
     * publish a message to a given connection with the broker configuration.
     *
     * @param topicName           - topic name to publish messages
     * @param object              - message to send
     * @param brokerConfiguration - broker configuration to be used
     * @throws BrokerEventProcessingException - if the message can not publish
     */
    void publish(String topicName,
                 Object object,
                 BrokerConfiguration brokerConfiguration) throws BrokerEventProcessingException;


    /**
     * publish test message to check the connection with the broker configuration.
     *
     * @param brokerConfiguration - broker configuration to be used
     * @throws BrokerEventProcessingException - if the message can not publish
     */
    void testConnection(BrokerConfiguration brokerConfiguration) throws BrokerEventProcessingException;

    /**
     * this method unsubscribes the subscription from the broker.
     *
     * @param topicName
     * @param brokerConfiguration
     * @throws BrokerEventProcessingException
     */
    void unsubscribe(String topicName,
                     BrokerConfiguration brokerConfiguration,
                     AxisConfiguration axisConfiguration, String subscriptionId)
            throws BrokerEventProcessingException;

}

Here you have to write the appropriate client code for both sending and receiving events.

The publish() method in the BrokerType will be called when CEP is sending an output, your implementaion to publish the message needs to go under the publish() method. The input parameter message Object, of the publish() method can be either of the following,
  • java.lang.String
  • java.util.Map
  • org.apache.axiom.om.OMElement
  • org.wso2.carbon.databridge.commons.Event
In your implementation you don't need to support all the message types, and supporting the appropriate type will be adequate.  This is because at the bucket output mapping, user can select the appropriate message type mapping when sending the events.

At the same time when a subscription was made for the subscribe() method the appropriate BrokerListener will be also provided, which is associated with the topic. BrokerListener also have a method onEvent(Object message) to which your client code receiving  the incoming events has to pass the received messages. Here the events passed to the onEvent() must have one of the following format,
  • java.util.Map
  • org.apache.axiom.om.OMElement
  • org.wso2.carbon.databridge.commons.Event
When extending the BrokerTypeFactory, the BrokerTypeFactoryImpl has to implement the getBrokerType() method.

package org.wso2.carbon.broker.core;

public interface BrokerTypeFactory {
    BrokerType getBrokerType();
}

Then you have to build the jar and deploy that to the CEP server.

If its a plain jar you have to add that to  CEP_HOME/repository/components/lib or
If its a OSGi bundle then you can add that to CEP_HOME/repository/components/dropings

Next, you have to create a file called "broker.xml" at wso2cep-2.0.1/repository/conf directory, and add the implantation class as in the following XML:

<brokerTypes xmlns="http://wso2.org/carbon/broker">
    <brokerType class="org.test.cep.broker.BrokerTypeFactoryImpl"/>
</brokerTypes>

Note: In the above code snippet change the "org.test.cep.broker.BrokerTypeFactoryImpl" with your implementation class name.

Finally, restart & login to the CEP server,  go to Configure -> Broker -> Add, and there you will be able configure your custom broker.



You will be able to download a sample template project from : https://dl.dropbox.com/u/17922825/blog/cep-ext-broker.zip

Friday, January 11, 2013

Using Siddhi CEP as a Java library


WSO2 Complex Event processor uses Siddhi as the Backend Runtime Engine, which was initially developed at http://siddhi.sourceforge.net/, and its development is now being continued as a commons project of WSO2 Inc.
Siddhi under Apache License V2, and its a product designed for real time processing both in standalone mode and in distributed mode.

Siddhi is basically a Java library that's embedded in WSO2 CEP server to provide enterprise integration for Complex Event Processing.

Since Siddhi is just a java library here are some samples demonstrating siddhi’s capabilities as a library.

You can find the WSO2 CEP product from: 

You can find the Documentation for WSO2 CEP from: 

You can find the Documentation for Siddhi Language Specification from: