>

수신 된 메시지를 두 가지 다른 흐름으로 브로드 캐스트하는 방법

데이터 웹 소켓 서버를 요청하고 수신하기 위해 akka 스트림 웹 소켓 클라이언트를 사용하고 있습니다. 웹 소켓에서 수신 된 데이터를 사용하여 두 가지 다른 흐름으로 브로드 캐스트하고 싶습니다. 아래 이미지는 시나리오를 명확히해야합니다.

이미지에서 볼 수 있듯이 분리 된 싱크를 위해 두 개의 다른 흐름으로 브로드 캐스트되어야합니다.

웹 소켓 클라이언트는 다음과 같이 만들 수 있습니다 :

import akka.actor.ActorSystem
import akka.Done
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws._
import scala.concurrent.Future
object WebSocketClientFlow {
  def main(args: Array[String]) = {
    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()
    import system.dispatcher
    // Future[Done] is the materialized value of Sink.foreach,
    // emitted when the stream completes
    val incoming: Sink[Message, Future[Done]] =
      Sink.foreach[Message] {
        case message: TextMessage.Strict =>
          println(message.text)
      }
    // send this as a message over the WebSocket
    val outgoing = Source.single(TextMessage("hello world!"))
    // flow to use (note: not re-usable!)
    val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://echo.websocket.org"))
    // the materialized value is a tuple with
    // upgradeResponse is a Future[WebSocketUpgradeResponse] that
    // completes or fails when the connection succeeds or fails
    // and closed is a Future[Done] with the stream completion from the incoming sink
    val (upgradeResponse, closed) =
      outgoing
        .viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
        .toMat(incoming)(Keep.both) // also keep the Future[Done]
        .run()
    // just like a regular http request we can access response status which is available via upgrade.response.status
    // status code 101 (Switching Protocols) indicates that server support WebSockets
    val connected = upgradeResponse.flatMap { upgrade =>
      if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
        Future.successful(Done)
      } else {
        throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
      }
    }
    // in a real application you would not side effect here
    connected.onComplete(println)
    closed.foreach(_ => println("closed"))
  }
}


  • 답변 # 1

    SinkShape를 사용하여 필요한 흐름을 얻을 수 있습니다

    Sink.fromGraph(GraphDSL.create(){
      implicit b =>
        val bcast = b.add(Broadcast[Message](2))
        val flow1 = b.add(Flow[Message].map(m => m))
        val flow2 = b.add(Flow[Message].map(m => m ))
        val sink1 = b.add(Sink.foreach(println))
        val sink2 = b.add(Sink.foreach(println))
        bcast ~> flow1 ~> sink1
        bcast ~> flow2 ~> sink2
        SinkShape(bcast.in)
    })
    
    

    전체 코드는

     implicit val system = ActorSystem()
      implicit val materializer = ActorMaterializer()
      import system.dispatcher
      // Future[Done] is the materialized value of Sink.foreach,
      // emitted when the stream completes
      val incomingSink = Sink.fromGraph(GraphDSL.create() {
        implicit b =>
          import GraphDSL.Implicits._
          val bcast = b.add(Broadcast[Message](2))
          val flow1 = b.add(Flow[Message].map(m => m))
          val flow2 = b.add(Flow[Message].map(m => m ))
          val sink1 = b.add(Sink.head[Message])
          val sink2 = b.add(Sink.head[Message])
          bcast ~> flow1 ~> sink1
          bcast ~> flow2 ~> sink2
          SinkShape(bcast.in)
      }).mapMaterializedValue(_ => Future(Done))
      // send this as a message over the WebSocket
      val outgoing = Source.single(TextMessage("hello world!"))
      // flow to use (note: not re-usable!)
      val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://echo.websocket.org"))
      // the materialized value is a tuple with
      // upgradeResponse is a Future[WebSocketUpgradeResponse] that
      // completes or fails when the connection succeeds or fails
      // and closed is a Future[Done] with the stream completion from the incoming sink
      val (upgradeResponse, closed) =
      outgoing
        .viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
        .toMat(incomingSink)(Keep.both) // also keep the Future[Done]
        .run()
      // just like a regular http request we can access response status which is available via upgrade.response.status
      // status code 101 (Switching Protocols) indicates that server support WebSockets
      val connected = upgradeResponse.flatMap { upgrade =>
        if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
          Future.successful(Done)
        } else {
          throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
        }
      }
      // in a real application you would not side effect here
      connected.onComplete(println)
      closed.foreach(_ => println("closed"))
    
    

  • 이전 .net - C #에서 추상 정적 메서드를 사용할 수없는 이유는 무엇입니까?
  • 다음 python - 좋은 wxpython GUI 빌더?