java中异步socket类的实现和源代码

java中异步socket类的实现和源代码

我们知道,javasocket类一般操作都是同步进行,常常在read的时候socket就会阻塞直到有数据可读或socket连接断开的时候才返回,虽然可以设置超时返回,但是这样比较低效,需要做一个循环来不停扫描数据是否可读。看来,在同一个线程中,要是想实现异步读写不太容易。

下面介绍的这个类实现了伪异步socket通讯。基本思想就是在现有socket类的基础上进行封装,当socket连接建立成功后,立即创建一个socket数据接收线程,专门负责阻塞式的socket读取(read),而当前线程负责数据的发送(send)。另外定义了一个接口,包括了socket的各种事件的回调。我们要实现这个接口,在接口实现类中创建异步socket对象,并且传递接口类到异步socket对象中,目的是有socket事件的时候回调我们的方法。

下面是接口:

SocketExHandler.java

package com.ly.net;

import java.net.*;  

/**

 * Title:       

 * Description:

 * Copyright:    Copyright (c) 2001

 * Company:      http://dozb.blogchina.com

 * @author          dozb

 * @version 1.0

 */

/**

 * 异步Socket Client Interface

 * 使用方法:

 * 1.定义类 MySocketClientEx 实现SocketExHandler接口,实现 OnReceive OnClose OnConnect 事件

 * 2.在类中实现start方法 MySocketEx = new SocketEx(this,ip,port)

 * 3.在类中实现stop方法 delete MySocketEx

 * 4.当有数据到达时会触发OnReceive事件

 * 5.当对方SOCKET关闭时会触发OnClose事件

 * 6.当SOCKET建立时会触发OnConnect事件

 */

/**

 * 异步Socket Server Interface

 * 使用方法:

 * 1.定义类 MySocketServerEx 实现SocketExHandler接口,实现 OnReceive OnListen  OnClose OnAccept  事件

 * 2.在类中实现start方法 MySocketEx = new ServerSocketEx(this,ip,port)

 * 3.在类中实现stop方法 delete MySocketEx

 * 4.当开始监听时会触发OnListen事件

 * 5.当SOCKET关闭时会触发OnClose事件

 * 6.当有客户端SOCKET要建立连接时会触发OnAccept事件

 */

public interface SocketExHandler

{

        //当客户端sock数据到达时触发

        public void OnReceive(Object socket,byte buf[],int nLen);

        //当客户端sock连接建立成功时触发

        public void OnConnect(Object socket);

        //当服务端sock监听开始时触发

        public void OnListen(Object socket);

        //当服务端sock接受一个新的sock连接时触发

        public void OnAccept(Object socket,SocketEx ClientSocket) ;

        //当sock关闭时触发

        public void OnClose(Object socket);

       

}

 

下面是异步客户端socket类:

SocketEx.java

package com.ly.net;

import java.io.*;

import java.net.*;

import java.util.*;

import com.ly.net.SocketExHandler;

/**

 * Title:       

 * Description:

 * Copyright:    Copyright (c) 2001

 * Company:      http://dozb.blogchina.com

 * @author          dozb

 * @version 1.0

 */

public class SocketEx implements Runnable

{

        public static final boolean isdebug = true;//调试

        /**

         *构造函数.

         */

        public SocketEx(SocketExHandler seh,Socket ClientSocket){this.seh = seh;thisSocket = ClientSocket;     InitNotify();}

        public SocketEx(SocketExHandler seh,String host,int port) throws IOException {this.seh = seh;thisSocket = new Socket(host,port);InitNotify();}

        public SocketEx(SocketExHandler seh, InetAddress address, int port ) throws IOException {this.seh = seh;thisSocket = new Socket(address, port);InitNotify();}

        public SocketEx(SocketExHandler seh, String host, int port, InetAddress localAddr, int localPort ) throws IOException {this.seh = seh;thisSocket = new Socket(host,port,localAddr,localPort );InitNotify();}

        public SocketEx(SocketExHandler seh, InetAddress address, int port, InetAddress localAddr, int localPort ) throws IOException {this.seh = seh;thisSocket = new Socket(address, port, localAddr,localPort );InitNotify();}

        /**

         * 实现Socket的可见方法.

         */

        public synchronized void close() throws IOException   {IsRunning = false;thisSocket.close();}

        public InetAddress getInetAddress() {return thisSocket.getInetAddress();}

        public InputStream getInputStream() throws IOException{      return thisSocket.getInputStream(); }

        public InetAddress getLocalAddress() { return thisSocket.getLocalAddress() ; }

        public int getLocalPort() {    return thisSocket.getLocalPort() ; }

        public OutputStream getOutputStream() throws IOException{return thisSocket.getOutputStream(); }

        public int getPort() { return thisSocket.getPort() ; }

        public int getSoLinger() throws SocketException{      return thisSocket.getSoLinger(); }

        public synchronized int getSoTimeout() throws SocketException {      return thisSocket.getSoTimeout(); }

        public boolean getTcpNoDelay() throws SocketException {      return thisSocket.getTcpNoDelay(); }

        public void setSoLinger( boolean on, int val ) throws SocketException {     thisSocket.setSoLinger(on,val); }

        public synchronized void setSoTimeout( int timeout ) throws SocketException {thisSocket.setSoTimeout( timeout ) ; }

        public void setTcpNoDelay( boolean on ) throws SocketException {thisSocket.setTcpNoDelay(on); }

        public String toString() {     return thisSocket.toString() ; }

        /**

         * 获取Socket

         */

        public Socket GetSocket(){return thisSocket;}

        /**

         * 初始化异步Socket

         */

        private void ShowMsg(String Msg)

        {

                if(isdebug)

                        System.out.println(Msg);

        }

        private void InitNotify()

        {

                if(NotifyThread != null) return ;

                try{

                        biStream = new BufferedInputStream(getInputStream());

                        thisSocket.setSoTimeout(0);

                }catch(IOException e){

                        ShowMsg("InitNotify() IOException.");

                }

                IsRunning = true;

                NotifyThread = new Thread(this,"SocketEx_NoitfyThread");

                NotifyThread.setDaemon(true);

                NotifyThread.start();

                if(seh !=null)

                        seh.OnConnect(this);

        }

        /**

         * 关闭Socket

         */

        private void Close()

        {

                try{

                        close();

                }catch(Exception eclose){

                        ShowMsg("Close() Exception.");

                }

        }

        protected void finalize() throws Throwable

        {

                Close();

                super.finalize();

        }

        /**

         * Thread 运行

         */

        public void run()

        {

                while(IsRunning){

                        try{

                                if(getInputStream().read(buf,0,1) <= 0)//试读一个字节

                                {

                                        DoClose();

                                        return ;

                                }

                                if(!DoReceive(getInputStream().available()))

                                        return ;

                        }catch(Exception e){

                                ShowMsg("run() Exception.");

                                DoClose();

                                return ;

                        }

                        try{

                                Thread.sleep(0); //

                        }catch(InterruptedException e){

                                ShowMsg("run() InterruptedException.");

                                DoClose();

                                return ;

                        }

                }

        }

        /**

         * 当有数据到达时的回调方法.

         */

        private boolean DoReceive(int nCanReadCount)

        {

                try{

                        int len = 0,nCurrReadCount=0,nStart=1;

                        do{

                                for(int i=nStart;i< BUFLEN;i++)

                                        buf[i]=0;

                                if(nCanReadCount == 0)

                                {

                                        if(seh !=null)

                                                seh.OnReceive(this,buf,nStart);

                                        return true;

                                }

                                if(nCanReadCount >(BUFLEN-2))

                                {

                                        nCurrReadCount = BUFLEN-2;

                                }

                                else

                                {

                                        nCurrReadCount = nCanReadCount;

                                }

                                len = biStream.read(buf,nStart,nCurrReadCount);

                                if(len == 0)

                                {

                                        DoClose();

                                        return false;

                                }

                                nCanReadCount -= len;

                                buf[len+nStart] = 0;

                                if(seh !=null)

                                seh.OnReceive(this,buf,len+nStart);

                                nStart = 0;

                        }while(nCanReadCount >0);

            }catch(Exception excpt){

                        ShowMsg("DoReceive() Exception.");

                        DoClose();

                        return false;

        }

                return true;

        }

        /**

         * 当Socket建立连接时的回调方法.

         */

        private void DoConnect()

        {

                if(seh !=null)

                        seh.OnConnect(this);

        }

        /**

         * 当Socket关闭时的回调方法.

         */

        private void DoClose()

        {

                try{

      if(IsRunning)

      {

        Close();

        if(seh !=null)

          seh.OnClose(this);

        IsRunning = false;

      }

                }catch(Exception e){

                        ShowMsg("DoClose() Exception.");

                }

        }

        /**

         * 以下实现不要改动!!!!

         */

        private Thread NotifyThread=null;

        private boolean IsRunning = false;

        private Socket thisSocket = null;

        private static final int BUFLEN = 4097;

        private byte buf[] = new byte[BUFLEN];

        private BufferedInputStream biStream = null;

        private SocketExHandler seh=null;

}

 

下面是异步socketserver类:

ServerSocketEx .java

package com.ly.net;

import java.io.*;

import java.net.*;  

import java.util.*; 

/**

 * Title:       

 * Description:

 * Copyright:    Copyright (c) 2001

 * Company:      http://dozb.blogchina.com

 * @author          dozb

 * @version 1.0

 */

public class ServerSocketEx extends ServerSocket implements Runnable

{

        /**

         * 以下实现不要改动!!!!

         */

        public ServerSocketEx(SocketExHandler seh, int port ) throws IOException {super(port);this.seh = seh; Listen();}

        public ServerSocketEx(SocketExHandler seh, int port, int backlog ) throws IOException {super(port,backlog);this.seh = seh;    Listen();}

        public ServerSocketEx(SocketExHandler seh, int port, int backlog, InetAddress bindAddr ) throws IOException {super(port,backlog, bindAddr);this.seh = seh;Listen();}

       

        public void setTimeout(int timeout) {

            this.timeout = timeout;

    }

        public static Vector GetClientPool(){return ClientPool;};

        public static void CloseAllClients(){

                for(int i=0;i< ClientPool.size();i++)

                {

                        try{

                                SocketEx s=(SocketEx)ClientPool.elementAt(i);

                                if(s != null) s.close();

                        }catch(Exception e){}

                }

                ClientPool.removeAllElements();

        }

        public static void PutClient(SocketEx s){ClientPool.addElement(s);};

        /**

         * 关闭Socket

         */

        public void Close()

        {

                IsRunning = false;

                try{

                        close();

                }catch(Exception eclose){

                }

        }

        protected void finalize() throws Throwable

        {

                Close();

                super.finalize();

        }

        /**

         * Thread 运行

         */    

        public void run()

        {

                while(IsRunning){

                        acceptConnections();

                }

                DoClose();

        }

       

        // -------------------- Private methods

        private void Listen() throws IOException

        {

                InitNotify();

                if(seh !=null)

                        seh.OnListen(this);

        }

        /**

         * 初始化异步Socket

         */

        private void InitNotify()throws IOException

        {

                if(NotifyThread != null) return ;

                try{

                        setSoTimeout(this.timeout);

                } catch( IOException ex ) {

                    IsRunning=false;

                    throw ex;

                }

                IsRunning = true;

                NotifyThread = new Thread(this,"ServerSocketEx_NoitfyThread");

                NotifyThread.setDaemon(true);

                NotifyThread.start();

        }

        /**

         * 当Socket关闭时的回调方法.

         */

        private void DoClose()

        {

                try{

                        if(seh !=null)

                                seh.OnClose(this);

                                //NotifyThread.stop();

                }catch(Exception e){

                }

        }

    private void processSocket(Socket s) throws IOException

    {

                SocketEx se = new SocketEx(seh,s);

                PutClient(se);

                if(seh !=null)

                        seh.OnAccept(this,se);

               

    }

    private void acceptConnections() {

        try {

            if(IsRunning == false)

                                return;

                        Socket socket = acceptSocket();

                        if(IsRunning != false && socket != null) {

                            processSocket(socket);

                        }

        } catch(Throwable e) {

            IsRunning = false;

        }

    }

  private Socket acceptSocket() {

        Socket accepted = null;

        try {

            if(IsRunning == true) {

                                accepted = accept();

                                if(IsRunning == false) {

                                if(null != accepted) {

                                    accepted.close();  // rude, but unlikely!

                                    accepted = null;

                                }

                                }

            }

        } catch(InterruptedIOException iioe) {

            // normal part -- should happen regularly so

            // that the endpoint can release if the server

            // is shutdown.

            // you know, i really wish that there was a

            // way for the socket to timeout without

            // tripping an exception. Exceptions are so

            // 'spensive.

        } catch (SocketException e) {

            if (IsRunning != false) {

                                IsRunning = false;

            }

        } catch(Throwable e) {

            IsRunning = false;

        }

        return accepted;

    }

 

    private static final int TIMEOUT = 5000;

    private int timeout = TIMEOUT;

        private Thread NotifyThread=null;

        private boolean IsRunning=false;

        private SocketExHandler seh=null;

        private static Vector ClientPool=new Vector();

}

 

下面是几个工具类:

TimerClient.java

package com.ly.util;

/**

 * TimerClient Interface

 *

 * @version 1.0, 8 October 1995

 *

 */

public interface TimerClient

{

  void timerEvent(int id);

}

TimerCtl.java

package com.ly.util;

import java.util.Vector;

import java.util.Enumeration;

//import com.borland.jb.util.Diagnostic;

/**

 * Timer Component

 *

 * Note:

 *  - The successful operation of this timer requires clients to execute simple, short

 *    code snippets when called back by the engine.  Otherwise the queue's delivery

 *    mechanism will be held up

 *

 * Further work:

 *  - When Thread.Interrupt is implemented we can switch from the busy wait model to

 *    the calculated wait model.  Without the interrupt the thread waits for the

 *    calculated interval before waking up.  This is a problem if another shorter

 *    request arrives.  For now we'll assume the minimum resolution of the timer is

 *    100ms.

 *

 * @version 1.0, 2 October 1995

 *

 */

public class TimerCtl

{

  static TimerTasks timerTasks;

  public TimerCtl() {

  }

  /*

  * Start a timer running

  */

  public static void startTimer(TimerClient client, int eventId, long delay, boolean repeat) {

    // create the timer if necessary

    if (timerTasks == null) {

      timerTasks = new TimerTasks();

      timerTasks.start();

    }

    //Diagnostic.out.println("TIMER: startTimer"+eventId);

    // add the new task to the queue

    timerTasks.add(client, eventId, delay, repeat);

  }

  /*

  * Stop a timer

  */

  public static void stopTimer(TimerClient client, int eventId) {

    //Diagnostic.out.println("TIMER: stopTimer"+eventId);

    if(timerTasks != null)

        timerTasks.end(client, eventId);

  }

}

class TimerTasks extends Thread

{

  Vector tasks = new Vector();

  boolean suspended = false;

  boolean sleeping = false;

  /**

   * Thread task runner

   */

  public void run() {

    // Loop forever

    while (true) {

      long sleepTime = 0;

      // Ensure that the tasks class is protected

      synchronized (tasks) {

        //Diagnostic.out.println("TIMER: Tick");

        // Scan the job list for any jobs which may fire.

        // Mark one-shot jobs for deletion

        // Calculate the maximum time we can sleep for

        sleepTime = scan();

        // Delete DeletePending jobs.  DeletePending jobs result from one-shots which have

        // been sent, and repeat jobs which have been cancelled.  Jobs may have been

        // cancelled during the Scan process.

        purge();

      }

      // Suspend timer if necessary

      if (tasks.size() == 0) {

        //Diagnostic.out.println("TIMER: Suspend");

        try {

          synchronized(this) {

            suspended = true;

            wait();

          }

        }

        catch (InterruptedException e) {

        }

      }

      else {

        //Diagnostic.out.println("TIMER: Suggested Sleeping for "+sleepTime);

        if (sleepTime >= 0) {

          try {

            sleeping = true;

            sleep(sleepTime);

            sleeping = false;

          }

          catch (InterruptedException i) {

            //Diagnostic.out.println("TIMER: Caught me napping");

          }

        }

      }

    }

  }

  /**

   * Add a new task

   */

  public void add(TimerClient client, int eventId, long delay, boolean repeat) {

    TimerTask t = new TimerTask(client, eventId, delay, repeat);

    synchronized (tasks) {

      tasks.addElement((Object)t);

    }

    // Want instant response - wake the thread if it's napping

    // unfortunately the interrupt() method is not working

//    if (sleeping)

//      interrupt();

    if (suspended) {

      synchronized(this) {

        notify();

        //Diagnostic.out.println("TIMER: Resume");

        suspended = false;

      }

    }

  }

  /**

   * Find the job and mark it for deletion

   */

  public void end(TimerClient client, int eventId) {

    synchronized (tasks) {

      for (int i = 0; i < tasks.size(); i++) {

        TimerTask t = (TimerTask)tasks.elementAt(i);

        //if (!t.deletePending && t.client == client && t.eventId == eventId)

        if (t.deletePending == false && t.client == client && t.eventId == eventId) {

          // JPBS - if we don't reset 'repeat', deletePending will be set again

          t.repeat = false;

          t.deletePending = true;

          break;

        }

      }

    }

  }

  /**

   * Clear out all the dead wood

   */

  void purge() {

    for (int i = 0; i < tasks.size(); i++) {

      TimerTask t = (TimerTask)tasks.elementAt(i);

      if (t.deletePending) {

        //Diagnostic.out.println("TIMER: purged");

        tasks.removeElementAt(i);

        i--;

      }

    }

  }

  long scan() {

    // The value added to the current time determines the MAX time until

    // the next scan

    // This is 100 now since thread.interrupt() is not implemented

    long nextTime = System.currentTimeMillis() + 100;

    for (int i = 0; i < tasks.size(); i++) {

      TimerTask t = (TimerTask)tasks.elementAt(i);

      // if not already deletePending, test (and possibly send the event)

      // as a result, the job may be flagged for deletion.

      // May also be a non-repeating job and so require self deletion

      if (!t.deletePending)

        t.test();

      // if the task didn't get deleted - see what it contributes to the time

      if (!t.deletePending)

        nextTime = Math.min(nextTime, t.timeNext);

      //Diagnostic.out.println("TIMER: Scanning "+t.eventId+" "+(t.deletePending == true ? "DEL" : ""));

    }

    return nextTime - System.currentTimeMillis();

  }

}

class TimerTask

{

  TimerClient client;

  int         eventId;

  long        timePrev;

  long        timeDelay;

  long        timeNext;

  boolean repeat;

  boolean deletePending;

  public TimerTask(TimerClient client, int eventId, long timeDelay, boolean repeat) {

    this.client = client;

    this.eventId = eventId;

    this.timeDelay = timeDelay;

    this.repeat = repeat;

    // schedule the next click - now + delay

    timeNext = System.currentTimeMillis() + timeDelay;

    deletePending = false;

    //Diagnostic.out.println("TIMER: Adding New Task");

  }

  public void test() {

    if (System.currentTimeMillis() >= timeNext) {

      //Diagnostic.out.println("TIMER: fire");

      // Fire the event

      client.timerEvent(eventId);

      // Update the next time

      timeNext = System.currentTimeMillis() + timeDelay;

      deletePending = !repeat;

    }

  }

}

 

下面是使用上面的异步socket类,做的demo开发

DemoAppSocketHandler.java

package com.ly.net;

import java.net.*;  

import java.util.*;

/**

 * Title:        

 * Description:

 * Copyright:    Copyright (c) 2001

 * Company:      http://dozb.blogchina.com

 * @author          dozb

 * @version 1.0

 */

public interface DemoAppSocketHandler

{

        //当客户端sock有数据到达时触发

        public void OnProcessCmd(Object socket,String FromArea,String ToArea,String ChannNo,String MainFun,String SubFun,Vector ParamList);

        //当客户端sock连接建立成功时触发

        public void OnConnect(Object socket);

        //当服务端sock监听开始时触发

        public void OnListen(Object socket);

        //当服务端sock接受一个新的sock连接时触发

        public void OnAccept(Object socket,SocketEx ClientSocket) ;

        //当sock关闭时触发

        public void OnClose(Object socket);

       

}

DemoAppSocket.java

package com.ly.net;

import java.io.*;

import java.util.*;

import com.ly.util.*;

/**

 * Title:       

 * Description:

 * Copyright:    Copyright (c) 2001

 * Company:      http://dozb.blogchina.com

 * @author          dozb

 * @version 1.0

 */

//这个类是异步socket 客户端和服务端的使用演示

//对于客户端,实现了断开后自动连接

//这个类是SocketExHandler 和 TimerClient 接口的实现

public class DemoAppSocket implements SocketExHandler,TimerClient{

        //Sock构造函数

        public DemoAppSocket(DemoAppSocketHandler issh,boolean isServer,String ip,int port) {

               

                this.issh = issh;

                this.isServer = isServer;

                this.ip = ip;

                if(this.ip == null) this.ip = "";

                this.port = port;

                if(this.port <0) this.port = 0;

   }

   //调用完构造后调用这个函数创建sock对象

        public void create()

        {

        if(!start())

            TimerCtl.startTimer(this,eventId,30*1000,true);

        }

   //这个方法一般不需要直接调用

   public boolean start()

   {

                if(this.isServer) return startServer(); else return startClient();

                       

   }

   //停止socket      

   public void stop()

   {

        TimerCtl.stopTimer(this,eventId);

                if(this.isServer) stopServer(); else stopClient();

   }

   //发送socket消息  

   public boolean SendCmd(Object socket,String strCmd)

   {

                SocketEx currSocketEx = (SocketEx)socket;

                if(!isServer && currSocketEx==null) currSocketEx = socketEx;

        if(currSocketEx == null) return false;

       try{

            PrintWriter Sender = new PrintWriter(currSocketEx.getOutputStream(),true);

            Sender.print(strCmd);

            Sender.flush();

            return true;

        }catch(Exception e)

        {

            return false;

        }

                               

   }

               

   //发送socket消息  

   public boolean SendCmd(Object socket,String FromArea,String ToArea,String ChannNo,String MainFun,String SubFun,String Param)

   {

        String strCmd = FromArea + ToArea + ChannNo + MainFun+SubFun+"&"+Param+"&";

                return SendCmd(socket,strCmd);

   }

        //获得IP地址

   public String getIp()

   {

                return ip;

               

   }

   //获得端口号

   public int getPort()

   {

                return port;

   }

       

        protected boolean startClient()

        {

        if(socketEx != null) {

        try{

            socketEx.close();

            }catch(IOException e){};

            socketEx = null;

        }

        try{

            socketEx = new SocketEx(this,ip,port);

            TimerCtl.stopTimer(this,eventId);

            return true;

        }catch(IOException e)

        {

            socketEx = null;

        }

        return false;

       

        }

        protected boolean startServer()

        {

       if(serverSocketEx != null) {

            try{

            serverSocketEx.close();

            }catch(IOException e){};

            serverSocketEx = null;

        }

        try{

            serverSocketEx = new ServerSocketEx(this,port);

            TimerCtl.stopTimer(this,eventId);

            return true;

        }catch(IOException e)

        {

            serverSocketEx = null;

        }

        return false;

               

        }

       

       

        protected void stopServer()

        {

        if(serverSocketEx != null) {

            try{

            serverSocketEx.close();

            }catch(IOException e){};

            serverSocketEx = null;

        }

        }

       

        protected void stopClient()

        {

        if(socketEx != null) {

            try{

            socketEx.close();

            }catch(IOException e){};

            socketEx = null;

        }

        }

       

       

    public void timerEvent(int id)

    {

        start();

    }

       

        static public String[] DealStr(String strS)

        {

           int CMDHEAD_LEN = 22;

           String[] ret = new String[2];//0留下的字符串1完整的CMD

                ret[0]=strS;//

                //假如只有一个&或没有&则不完整

           int FirstPos=strS.indexOf("&");

           if(FirstPos==-1)

                   return ret;

           if(FirstPos != CMDHEAD_LEN-1)

           {

                   strS = strS.substring(FirstPos+1);

                   return DealStr(strS);

           }

           int nSecondPos = strS.indexOf("&",FirstPos+1);    

           if(nSecondPos<0)

                   return ret;

           //可能格式不正确了

           if(strS.length()< CMDHEAD_LEN)

                   return ret;

               

           ret[1] = strS.substring(0,nSecondPos+1);

           ret[0]=strS.substring(nSecondPos+1);

           return ret;

       

        }

       

        public void OnReceive(Object socket,byte buf[],int nLen){

                       

        String ReceiveBuff = sReceiveBuf + (new String(buf,0,nLen));

                do//分解成单个的命令串

                {

                        String strCmd[] = DealStr(ReceiveBuff);

                        ReceiveBuff = strCmd[0];

                        if(strCmd[1]==null || strCmd[1].equals("")) break;

                        System.out.println(strCmd[1]);

                        String FromArea=strCmd[1].substring(0,6);

                        String ToArea=strCmd[1].substring(6,12);

                        String ChannNo=strCmd[1].substring(12,15);

                        String MainFun=strCmd[1].substring(15,18);

                        String SubFun=strCmd[1].substring(18,21);

                        String Param =strCmd[1].substring(22,strCmd[1].length()-1);

                        Vector ParamList=new Vector();

                        int nLastPos=0;

                        while(true)

                        {

                                int nPos = Param.indexOf(",",nLastPos);

                                if(nPos <0)

                                {

                                        ParamList.add(Param.substring(nLastPos));

        //                              System.out.println(Param.substring(nLastPos));

                                        break;

                                }

                                String sParam = Param.substring(nLastPos,nPos);

                                ParamList.add(sParam);

//                              System.out.println(sParam);

                                nLastPos = nPos+1;    

                        }

                       

                        DoProcessCmd(socket,FromArea,ToArea,ChannNo,MainFun,SubFun,ParamList);

                }while(true);

                sReceiveBuf = ReceiveBuff;

                if(sReceiveBuf == null) sReceiveBuf=null;

    }

    protected void DoProcessCmd(Object socket,String FromArea,String ToArea,String ChannNo,String MainFun,String SubFun,Vector ParamList)

    {

                if(issh !=null)

                        issh.OnProcessCmd(socket,FromArea,ToArea,ChannNo,MainFun, SubFun, ParamList);

    }

        public void OnConnect(Object socket)

        {

           if(issh !=null) issh.OnConnect(socket);

        }

        public void OnListen(Object socket){

           if(issh !=null) issh.OnListen(socket);

        }

        public void OnAccept(Object socket,SocketEx ClientSocket) {

           if(issh !=null) issh.OnAccept(socket,ClientSocket);

        }

        public void OnClose(Object socket){

                notifyAll();

        TimerCtl.startTimer(this,eventId,30*1000,true);

                if(issh !=null) issh.OnClose(socket);

    }

    //Socket

    SocketEx socketEx = null;

        ServerSocketEx serverSocketEx=null;

    final int eventId = 1;

    String sReceiveBuf="";

        DemoAppSocketHandler issh=null;

        boolean isServer=false;

        String ip="127.0.0.1";

        int port=20000;

}

通过这种方式,可以高效地使用socket通讯,在异步socket版本没有发布以前,不失是一种解决问题的方法。:)