package org.realtimecomm.client;
import java.io.*;
import java.net.*;
import java.util.concurrent.*;
import org.realtimecomm.xmlmessaging.*;
import org.realtimecomm.*;
import java.util.Vector;
import java.util.*;
public class XMLCommunicator implements SocketCommunicationObserver
{
private String host;
private int port;
private XMLMessageReceiver receiver;
private XMLMessageSender sender;
IncomingMessageProcesor incomingMessageProcesor;
private Socket socket;
private LinkedBlockingQueue incomingMessages=new LinkedBlockingQueue();
private LinkedBlockingQueue outgoingMessages=new LinkedBlockingQueue();
private Vector listeners=new Vector();
public XMLCommunicator(String host, int port) throws Exception
{
this.host=host;
this.port=port;
}
public void dicsonnect()
{
receiver.stopRunning();
sender.stopRunning();
incomingMessageProcesor.stopRunning();
// close socket if it is not closed already
try
{
if (!socket.isClosed()) socket.close();
}
catch (IOException ex)
{
ex.printStackTrace();
}
}
public void socketCommunicationFailed()
{
dicsonnect();
try
{
notifyListenersDisconnected();
}
catch (Exception ex)
{
ex.printStackTrace();
}
}
private void notifyListenersMessageReceived(String xmlMessage)
{
for (Iterator iter = listeners.iterator(); iter.hasNext(); )
{
XMLCommunicatorListener item = (XMLCommunicatorListener) iter.next();
try
{
item.messageReceived(xmlMessage);
}
catch (Exception ex)
{
ex.printStackTrace();
}
}
}
private void notifyListenersDisconnected()
{
for (Iterator iter = listeners.iterator(); iter.hasNext(); )
{
XMLCommunicatorListener item = (XMLCommunicatorListener) iter.next();
try
{
item.disconnected();
}
catch (Exception ex)
{
ex.printStackTrace();
}
}
}
public void addListener(XMLCommunicatorListener listener)
{
listeners.add(listener);
}
public void removeListener(XMLCommunicatorListener listener)
{
listeners.remove(listener);
}
public final void sendMessage(String xmlMessage) throws InterruptedException
{
outgoingMessages.put(xmlMessage);
}
class IncomingMessageProcesor extends Thread
{
boolean running=true;
public IncomingMessageProcesor()
{
start();
}
public void run()
{
try
{
while(true)
{
String messsage=(String)incomingMessages.take();
notifyListenersMessageReceived(messsage);
}
}
catch (Exception ex)
{
ex.printStackTrace();
}
}
/**
* stops this processor
*/
public void stopRunning()
{
running=false;
}
}
public void connect() throws Exception
{
socket=new Socket(host,port);
incomingMessageProcesor = new IncomingMessageProcesor();
sender=new XMLMessageSender(outgoingMessages,socket.getOutputStream(),this);
receiver=new XMLMessageReceiver(incomingMessages,socket.getInputStream(),this);
}
/**
* Receiver object reads incoming xml messages from the socket. Incoming
* messages are terminated by zero byte,so Receiver uses zero byte as a
* message separator. After message is read from the socket it's placed into
* incomingMessagesQueue and waiting to be processed.
*/
}