Skip to main content
aboutsummaryrefslogblamecommitdiffstats
blob: e48fef9f50fa68c80c73add87fd6f10dd8937047 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

















                                                                            
                                        
                                                      
                                                     
 
                                               





                                                              
                                      









                                                                                             
                                                                                                                   
 



                                      

                                    












                                             
                          

























                                                          






                                                                              
                        
                                                  

   








                                                       















                                                                        
                          
                                                       



                                  
                          





                                                                           









                                                                      


                                                                                                    





                                                           

                                 
                                                                                 

           











                                     
                          





                                                                            



























                                                                
                            
                                                       

       



                                       





                                     
                          

















                                                                
                                                                                                         
   

       
                                                                    







                                                                                           
                        
     
                                       







                                                   
                                                                  





                                 
                                                     
   
                              

                         
                                               









                                                  
                                                








                                                
                                  
     
                        







                            
                            



















                            














                            







                                                              
 
/***************************************************************************
 * Copyright (c) 2004, 2005, 2006 Eike Stepper, Germany.
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Public License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/legal/epl-v10.html
 * 
 * Contributors:
 *    Eike Stepper - initial API and implementation
 **************************************************************************/
package org.eclipse.internal.net4j.transport.tcp;

import org.eclipse.net4j.transport.Buffer;
import org.eclipse.net4j.transport.Channel;
import org.eclipse.net4j.transport.ConnectorException;
import org.eclipse.net4j.transport.tcp.TCPConnector;
import org.eclipse.net4j.transport.tcp.TCPSelector;
import org.eclipse.net4j.transport.tcp.TCPSelectorListener;
import org.eclipse.net4j.util.Net4jUtil;
import org.eclipse.net4j.util.lifecycle.LifecycleUtil;
import org.eclipse.net4j.util.om.trace.ContextTracer;

import org.eclipse.internal.net4j.bundle.Net4j;
import org.eclipse.internal.net4j.transport.AbstractConnector;
import org.eclipse.internal.net4j.transport.ChannelImpl;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.List;
import java.util.Queue;

/**
 * @author Eike Stepper
 */
public abstract class AbstractTCPConnector extends AbstractConnector implements TCPConnector,
    TCPSelectorListener.Active
{
  private static final ContextTracer TRACER = new ContextTracer(Net4j.DEBUG_CONNECTOR, AbstractTCPConnector.class);

  private SocketChannel socketChannel;

  private TCPSelector selector;

  private SelectionKey selectionKey;

  private Buffer inputBuffer;

  private ControlChannelImpl controlChannel;

  public AbstractTCPConnector()
  {
    try
    {
      socketChannel = SocketChannel.open();
      socketChannel.configureBlocking(false);
    }
    catch (IOException ex)
    {
      Net4j.LOG.error(ex);
    }
  }

  /**
   * SocketChannel must already be non-blocking!
   */
  public AbstractTCPConnector(SocketChannel socketChannel)
  {
    this.socketChannel = socketChannel;
  }

  public TCPSelector getSelector()
  {
    return selector;
  }

  public void setSelector(TCPSelector selector)
  {
    this.selector = selector;
  }

  public SocketChannel getSocketChannel()
  {
    return socketChannel;
  }

  /**
   * Called by {@link ChannelImpl} each time a new buffer is available for
   * multiplexing. This or another buffer can be dequeued from the outputQueue
   * of the {@link ChannelImpl}.
   */
  public void multiplexBuffer(Channel channel)
  {
    checkSelectionKey();
    selector.setWriteInterest(selectionKey, true);
  }

  public void registered(SelectionKey selectionKey)
  {
    this.selectionKey = selectionKey;
    if (getType() == Type.SERVER)
    {
      selector.setConnectInterest(selectionKey, false);
    }
  }

  public void handleConnect(TCPSelector selector, SocketChannel channel)
  {
    try
    {
      if (!channel.finishConnect())
      {
        return;
      }
    }
    catch (Exception ex)
    {
      return;
    }

    try
    {
      checkSelectionKey();
      selector.setConnectInterest(selectionKey, false);
      setState(State.NEGOTIATING);
    }
    catch (Exception ex)
    {
      Net4j.LOG.error(ex);
      deactivate();
    }
  }

  public void handleRead(TCPSelector selector, SocketChannel socketChannel)
  {
    try
    {
      if (inputBuffer == null)
      {
        inputBuffer = getBufferProvider().provideBuffer();
      }

      ByteBuffer byteBuffer = inputBuffer.startGetting(socketChannel);
      if (byteBuffer != null)
      {
        short channelIndex = inputBuffer.getChannelIndex();
        ChannelImpl channel = channelIndex == ControlChannelImpl.CONTROL_CHANNEL_ID ? controlChannel
            : getChannel(channelIndex);
        if (channel != null)
        {
          channel.handleBufferFromMultiplexer(inputBuffer);
        }
        else
        {
          if (TRACER.isEnabled())
          {
            TRACER.trace("Discarding buffer from unknown channel"); //$NON-NLS-1$
          }

          inputBuffer.release();
        }

        inputBuffer = null;
      }
    }
    catch (ClosedChannelException ex)
    {
      deactivate();
    }
    catch (Exception ex)
    {
      Net4j.LOG.error(ex);
      deactivate();
    }
  }

  public void handleWrite(TCPSelector selector, SocketChannel socketChannel)
  {
    try
    {
      boolean moreToWrite = false;
      for (Queue<Buffer> bufferQueue : getChannelBufferQueues())
      {
        Buffer buffer = bufferQueue.peek();
        if (buffer != null)
        {
          if (buffer.write(socketChannel))
          {
            bufferQueue.poll();
            buffer.release();

            if (!moreToWrite)
            {
              moreToWrite = !bufferQueue.isEmpty();
            }
          }
          else
          {
            moreToWrite = true;
            break;
          }
        }
      }

      if (!moreToWrite)
      {
        checkSelectionKey();
        selector.setWriteInterest(selectionKey, false);
      }
    }
    catch (NullPointerException ignore)
    {
      ;
    }
    catch (ClosedChannelException ex)
    {
      deactivate();
    }
    catch (Exception ex)
    {
      Net4j.LOG.error(ex);
      deactivate();
    }
  }

  @Override
  protected List<Queue<Buffer>> getChannelBufferQueues()
  {
    List<Queue<Buffer>> queues = super.getChannelBufferQueues();
    Queue<Buffer> controlQueue = controlChannel.getSendQueue();
    if (!controlQueue.isEmpty())
    {
      queues.add(controlQueue);
    }

    return queues;
  }

  @Override
  protected void registerChannelWithPeer(short channelIndex, String protocolID) throws ConnectorException
  {
    try
    {
      if (!controlChannel.registerChannel(channelIndex, protocolID))
      {
        throw new ConnectorException("Failed to register channel with peer"); //$NON-NLS-1$
      }
    }
    catch (ConnectorException ex)
    {
      throw ex;
    }
    catch (Exception ex)
    {
      throw new ConnectorException(ex);
    }
  }

  @Override
  protected void removeChannel(ChannelImpl channel)
  {
    if (isConnected())
    {
      controlChannel.deregisterChannel(channel.getChannelIndex());
    }

    super.removeChannel(channel);
  }

  @Override
  protected void onAboutToActivate() throws Exception
  {
    super.onAboutToActivate();
    if (selector == null)
    {
      selector = Net4jUtil.createTCPSelector();
      LifecycleUtil.activate(selector);
    }
  }

  @Override
  protected void onActivate() throws Exception
  {
    super.onActivate();
    controlChannel = new ControlChannelImpl(this);
    controlChannel.activate();
    selector.registerAsync(socketChannel, this);
  }

  @Override
  protected void onDeactivate() throws Exception
  {
    Exception exception = null;

    try
    {
      controlChannel.deactivate();
    }
    catch (Exception ex)
    {
      if (exception == null)
      {
        exception = ex;
      }
    }
    finally
    {
      controlChannel = null;
    }

    try
    {
      socketChannel.close();
    }
    catch (Exception ex)
    {
      if (exception == null)
      {
        exception = ex;
      }
    }
    finally
    {
      socketChannel = null;
    }

    try
    {
      super.onDeactivate();
    }
    catch (Exception ex)
    {
      if (exception == null)
      {
        exception = ex;
      }
    }

    if (exception != null)
    {
      throw exception;
    }
  }

  private void checkSelectionKey()
  {
    if (selectionKey == null)
    {
      throw new IllegalStateException("selectionKey == null");
    }
  }
}

Back to the top