Note that there are some explanatory texts on larger screens.

plurals
  1. POAsynchronous http requests using Netty and Scala actors
    primarykey
    data
    text
    <h1>Asynchronous http requests using Netty and Scala actors</h1> <p>Hey hope someone can give me a hand with this.</p> <p>I am trying to use the Scala Actors and Netty.io libraries to get make asynchronous http requests. (Yes I know Scala actors are being deprecated but this is a learning exercise for me)</p> <p>I have written an actor <code>HttpRequestActor</code> that accepts a message in the form of a case class RequestPage(uri:URI).</p> <p>When it receives the message it creates the necessary Netty objects need to make a http request, I have based most of the code from the [<code>HttpSnoopClient</code>] (http://static.netty.io/3.5/xref/org/jboss/netty/example/http/snoop/HttpSnoopClient.html) example.</p> <p>I create a client and pass the current actor instance to my implementation of <code>ChannelPipelineFactory</code> which also passes the actor to my implementation of <code>SimpleChannelUpstreamHandler</code>, where I have overridden the <code>messageReceived</code> function.</p> <p>The actor instance is passed as a listener, I create a request using the <code>DefaultHttpRequest</code> class and write to the channel to make the request. </p> <p>There is a blocking call to an actor object using the <code>ChannelFuture</code> object returned from writing to the channel. When the <code>messageRecieved</code> function of my handler class is called I parse the response of the netty http request as a string, send a message back to actor with the content of the response and close the channel.</p> <p>After the future is completed my code attempts to send a reply to the calling actor with the http content response received.</p> <p>The code works, and I am able to get a reply, send it to my actor instance, print out the content and send a message to the actor instance release resources being used.</p> <p>Problem is when I test it, the original call to the actor does not get a reply and the thread just stays open.</p> <h2>Code Sample - HttpRequestActor</h2> <p>my code for my <code>HttpRequestActor</code> class</p> <pre><code> import scala.actors.Actor import java.net.{InetSocketAddress,URI} import org.jboss.netty.handler.codec.http._ import org.jboss.netty.bootstrap.ClientBootstrap import org.jboss.netty.channel.Channel import org.jboss.netty.channel._ import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory import org.jboss.netty.channel.group.DefaultChannelGroup import java.util.concurrent.{Executors,CancellationException} import org.jboss.netty.util.CharsetUtil import scala.concurrent.{ Promise, Future } import scala.concurrent.ExecutionContext.Implicits.global /** * @author mebinum * */ class HttpRequestActor extends Actor { //initialize response with default uninitialized value private var resp:Response = _ private val executor = Executors.newCachedThreadPool private val executor2 = Executors.newCachedThreadPool private val factory = new NioClientSocketChannelFactory( executor, executor2); private val allChannels = new DefaultChannelGroup("httpRequester") def act = loop { react { case RequestPage(uri) =&gt; requestUri(uri) case Reply(msg) =&gt; setResponse(Reply(msg)) case NoReply =&gt; println("didnt get a reply");setResponse(NoReply) case NotReadable =&gt; println("got a reply but its not readable");setResponse(NotReadable) case ShutDown =&gt; shutDown() } } private def requestUri(uri:URI) = { makeChannel(uri) map { channel =&gt; { allChannels.add(channel) val request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.toString) request.setHeader(HttpHeaders.Names.HOST, uri.getHost()) request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE) request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP) val writeFuture = channel.write(request).awaitUninterruptibly() FutureReactor !? writeFuture match { case future : ChannelFuture =&gt; { future.addListener(new ChannelFutureListener() { def operationComplete(future:ChannelFuture) { // Perform post-closure operation println("current response is " + resp) sendResponse("look ma I finished") } }) future.getChannel().close() } } this ! ShutDown } } //thread ends only if you send a reply from here //println("this is final sender " + sender) //reply("I am the true end") } private def makeChannel(uri:URI) = { val scheme = Some(uri.getScheme()).getOrElse("http") val host = Some(uri.getHost()).getOrElse("localhost") val port = Utils.getPort(uri.getPort, uri.getScheme) // Set up the event pipeline factory. val client = new ClientBootstrap(factory) client.setPipelineFactory(new PipelineFactory(this)) //get the promised channel val channel = NettyFutureBridge(client.connect(new InetSocketAddress(host, port))) channel } private def setResponse(aResponse:Response) = resp = aResponse private def sendResponse(msg:String) = { println("Sending the response " + msg) reply(resp) } private def shutDown() = { println("got a shutdown message") val groupFuture = allChannels.close().awaitUninterruptibly() factory.releaseExternalResources() } override def exceptionHandler = { case e : CancellationException =&gt; println("The request was cancelled"); throw e case tr: Throwable =&gt; println("An unknown exception happened " + tr.getCause()); throw tr } } trait Response case class RequestPage(url:URI) case class Reply(content:String) extends Response case object NoReply extends Response case object NotReadable extends Response case object ShutDown object FutureReactor extends Actor{ def act = //loop { react { case future: ChannelFuture =&gt; { if (future.isCancelled) { throw new CancellationException() } if (!future.isSuccess()) { future.getCause().printStackTrace() throw future.getCause() } if(future.isSuccess() &amp;&amp; future.isDone()){ future.getChannel().getCloseFuture().awaitUninterruptibly() reply(future) } } } //} this.start } class ClientHandler(listener:Actor) extends SimpleChannelUpstreamHandler { override def exceptionCaught( ctx:ChannelHandlerContext, e:ExceptionEvent){ e.getCause().printStackTrace() e.getChannel().close(); throw e.getCause() } override def messageReceived(ctx:ChannelHandlerContext, e:MessageEvent) = { var contentString = "" var httpResponse:Response = null.asInstanceOf[Response] e.getMessage match { case (response: HttpResponse) if !response.isChunked =&gt; { println("STATUS: " + response.getStatus); println("VERSION: " + response.getProtocolVersion); println val content = response.getContent(); if (content.readable()) { contentString = content.toString(CharsetUtil.UTF_8) httpResponse = Reply(contentString) //notify actor }else{ httpResponse = NotReadable } } case chunk: HttpChunk if !chunk.isLast =&gt; { //get chunked content contentString = chunk.getContent().toString(CharsetUtil.UTF_8) httpResponse = Reply(contentString) } case _ =&gt; httpResponse = NoReply } println("sending actor my response") listener ! httpResponse println("closing the channel") e.getChannel().close() //send the close event } } class PipelineFactory(listener:Actor) extends ChannelPipelineFactory { def getPipeline(): ChannelPipeline = { // Create a default pipeline implementation. val pipeline = org.jboss.netty.channel.Channels.pipeline() pipeline.addLast("codec", new HttpClientCodec()) // Remove the following line if you don't want automatic content decompression. pipeline.addLast("inflater", new HttpContentDecompressor()) // Uncomment the following line if you don't want to handle HttpChunks. //pipeline.addLast("aggregator", new HttpChunkAggregator(1048576)) pipeline.addLast("decoder", new HttpRequestDecoder()) //assign the handler pipeline.addLast("handler", new ClientHandler(listener)) pipeline; } } object NettyFutureBridge { import scala.concurrent.{ Promise, Future } import scala.util.Try import java.util.concurrent.CancellationException import org.jboss.netty.channel.{ Channel, ChannelFuture, ChannelFutureListener } def apply(nettyFuture: ChannelFuture): Future[Channel] = { val p = Promise[Channel]() nettyFuture.addListener(new ChannelFutureListener { def operationComplete(future: ChannelFuture): Unit = p complete Try( if (future.isSuccess) { println("Success") future.getChannel } else if (future.isCancelled) { println("Was cancelled") throw new CancellationException } else { future.getCause.printStackTrace() throw future.getCause }) }) p.future } } </code></pre> <p>Code to test it</p> <pre><code>val url = "http://hiverides.com" test("Http Request Actor can recieve and react to message"){ val actor = new HttpRequestActor() actor.start val response = actor !? new RequestPage(new URI(url)) match { case Reply(msg) =&gt; { println("this is the reply response in test") assert(msg != "") println(msg) } case NoReply =&gt; println("Got No Reply") case NotReadable =&gt; println("Got a not Reachable") case None =&gt; println("Got a timeout") case s:Response =&gt; println("response string \n" + s) case x =&gt; {println("Got a value not sure what it is"); println(x);} } } </code></pre> <p>Libraries used: - Scala 2.9.2 - Netty.io 3.6.1.Final - Junit 4.7 - scalatest 1.8 - I am also using <a href="https://twitter.com/viktorklang">@viktorklang</a> NettyFutureBridge object <a href="https://gist.github.com/viktorklang%5d">gist</a> to create a scala future for the Channel object returned</p> <p>How can I send a reply back to the actor object with the content of response from Netty and end the thread?</p> <p>Any help will be much appreciated</p>
    singulars
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload