Pipeline con netty

De ChuWiki
Revisión del 15:23 4 ago 2018 de Chudiang (Discusión | contribuciones) (StringDecoder)

Saltar a: navegación, buscar

Al final de Ejemplo Sencillo de TCP/IP con netty vimos que Netty permitía que trataramos los bytes recibidos por el socket en partes, de forma que cada uno de nuestros trozos de código haga algo con los bytes recibidos y pase los resultados al siguiente trozo. Vamos a ver aquí todo esto con detalle.

Tienes todo el código de este ejemplo en Ejemplo con netty

Tratamiento de la entrada : ChannelInboundHandler

Cuando llega un array de bytes por el socket, lo más frecuente es tener los siguientes tres trozos de código :

  • Si el que nos envía mensajes lo hace muy seguido, posiblmente recibamos todos los bytes de todos los mensajes seguidos. El primer bloque de código en recepción suele encargarse de separar los bytes que componen cada mensaje para pasarle al siguiente bloque sólo los bytes de un mensaje. Por ejemplo, si los mensajes son líneas de texto separadas por retornos de carro, el primer bloque separa las línesa, buscando los retornos de carro y enviando al siguiente bloque sólo una línea cada vez. Este bloque suele llamarse "extractor de tramas" o "Frame Extractor".
  • El segundo bloque, con la garantía de que el array de bytes que recibe es un mensaje completo en sí mismo, suele traducir estos bytes a una clase java o algo más legible por el código que un array de bytes. En el caso anterior, cogería los bytes de una línea y la convertiría en un String. Este bloque de código se suele llamar "Decodificador" o "Decoder".
  • El tercer bloque es el que hace realmente algo útil con los datos recibidos y ya convertidos en algo legible. Muestra el String por pantalla, o lo interpreta como un comando a ejecutar, o lo que sea que toque hacer con ese String en nuestra aplicación. Este bloque es el de lógica de la aplicación.

En Netty implementamos cada uno de estos bloques haciendo una clase que implemente ChannelInboundHandler, o si no queremos sobreescribir todos sus métodos, heredamos de ChannelInboundHandlerAdapter.

Aunque el extraer mensajes buscando retornos de carro y el convertir bytes a String ya los tiene netty implementados, vamos a hacerlos aquí a mano para mostrar el funcionamiento de esto. Vamos con el primero, el FrameExtractor

Frame Extractor

El código puede ser como el siguiente

package com.chuidiang.examples4;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

public class FrameExtractor extends ChannelInboundHandlerAdapter{

    private ByteBuf buf;  // (1)

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {  // (2)
        buf= ctx.alloc().buffer();
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {  // (3)
        if(null!=buf) {
            buf.release();
            buf=null;
        }

    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  // (4)
        try {
            buf.writeBytes((ByteBuf) msg);  // (5)
            int indexOf = buf.indexOf(0, buf.readableBytes(), (byte) '\n');  // (6)
            while (-1!=indexOf) {
                    ByteBuf line = ctx.alloc().buffer();  // (7)
                    buf.readBytes(line, indexOf);  // (8)
                    buf.readByte(); // Leemos el retorno de carro para eliminarlo.
                    ctx.fireChannelRead(line);  // (9)
                    buf.discardReadBytes();
                    indexOf = buf.indexOf(0, buf.readableBytes(), (byte) '\n');
            }
        } finally {
            ReferenceCountUtil.release(msg); // (10)
        }
    }
}

Vamos con los detalles

  • (1) Cuando nos lleguen bytes leídos, no tenemos garantía de que llegue una línea completa con su retorno de carro, deberemos esperar a las siguientes lecturas. Necesitamos por tanto un buffer donde guardar esos bytes hasta la siguiente lectura, donde quizás sí, quizás no, llegue el retorno de carro. De igual manera, puede que nos llegue un grupo de bytes con un retorno de carro en medio, por lo que necesitamos un buffer donde guardar los bytes que van después del retorno y que no llevan su propio retorno al final, simplemente porque todavía no ha llegado.
  • (2) En el método channelActive() que es al que nos llama netty cuando el canal de comunicación está activo, aprovechamos para crear el buffer que mencionamos en el punto anterior. Usamos los métodos que netty nos proporciona para crear estos buffer.
  • (3) En el método channelInactive() que es al que nos llama netty cuando el canal de comunicación deja de estar activo, aprovechamos para liberar el buffer que creamos en el punto anterior.
  • (4) En el método channelRead() es donde está la madre de la ciencia. Resumimos lo que se pretende : buscar una línea buscando su retorno de carro, crear un ByteBuf para esa línea y pasarla al siguiente trozo de código (handler) para que lo convierta a String. Para pasar esta línea se usa el método ctx.fireChannelRead(line), que avisará al siguiente handler en la cadena pasándole el parámetro que pasamos a este método. Vamos con los detalles.
  • (5) El msg que recibimos como parámetro es un ByteBuf de netty. Añadimos todo su contenido al final de nuestro ByteBuf de acumular bytes, definido el paso (1)
  • (6) En nuestro buff buscamos un retorno de carro '\n'. y nos metemos en un bucle tratando cada uno de los que encontremos (es posible que en una sola lectura de bytes del socket nos lleguen varias líneas.
  • (7) Si hemos encontrado un retorno de carro, creamos un nuevo ByteBuf para meter ahí la línea completa y pasársela al siguiente handler.
  • (8) Metemos en ese buffer todos los bytes hasta el retorno de carro. Leemos el retorno de carro por separado para eliminarlo del buffer de entrada y no enviarlo al siguiente handler.
  • (9) Avisamos al siguiente handler, pasándole nuestro buffer line. No liberamos el buffer line porque es responsabilidad del que lo recibe.
  • (10) Liberamos el buffer que nos ha llegado por parámetro. Como ya no lo necesitamos y no se lo hemos pasado a nadie, es nuestra responsabilidad liberarlo.

StringDecoder

Nuestro siguiente ChannelInboundHander recibirá los bytes que nos envía el FrameExtractor, ya sin retorno de carro. Lo único que tiene que hacer es traducirlo a String para pasárselo al siguiente. El código puede ser como este

public class StringDecoder extends ChannelInboundHandlerAdapter{
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf =(ByteBuf)msg;
        String text = buf.toString(Charset.defaultCharset());
        ctx.fireChannelRead(text);
        buf.release();
    }
}

Nada especial. El msg que recibimos como parámetro es el ByteBuf (line) que nos entregó el handler anterior, hacemos el cast, lo convertimos a String y se lo pasamos al siguiente handler con ctx.fireChannelRead(text). El buffer que recibimos del handler anterior lo liberamos nosotros, ya que no lo necesitamos más y no se lo hemos pasado a nadie.

Lógica de negocio

Nuestro tercer handler recibirá directamente el String, así que hacemos con él ya lo que nos interese.

public class ServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String text = (String)msg;
        System.out.println(text);
    }
}

El msg recibido es el String que nos envía el handler anterior y solo tenemos que hacer el cast y sacarlo por pantalla, o lo que queramos.

Todo junto