홈>
수신 된 메시지를 두 가지 다른 흐름으로 브로드 캐스트하는 방법
데이터 웹 소켓 서버를 요청하고 수신하기 위해 akka 스트림 웹 소켓 클라이언트를 사용하고 있습니다. 웹 소켓에서 수신 된 데이터를 사용하여 두 가지 다른 흐름으로 브로드 캐스트하고 싶습니다. 아래 이미지는 시나리오를 명확히해야합니다.
이미지에서 볼 수 있듯이 분리 된 싱크를 위해 두 개의 다른 흐름으로 브로드 캐스트되어야합니다.
웹 소켓 클라이언트는 다음과 같이 만들 수 있습니다 :
import akka.actor.ActorSystem
import akka.Done
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws._
import scala.concurrent.Future
object WebSocketClientFlow {
def main(args: Array[String]) = {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
import system.dispatcher
// Future[Done] is the materialized value of Sink.foreach,
// emitted when the stream completes
val incoming: Sink[Message, Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
println(message.text)
}
// send this as a message over the WebSocket
val outgoing = Source.single(TextMessage("hello world!"))
// flow to use (note: not re-usable!)
val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://echo.websocket.org"))
// the materialized value is a tuple with
// upgradeResponse is a Future[WebSocketUpgradeResponse] that
// completes or fails when the connection succeeds or fails
// and closed is a Future[Done] with the stream completion from the incoming sink
val (upgradeResponse, closed) =
outgoing
.viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
.toMat(incoming)(Keep.both) // also keep the Future[Done]
.run()
// just like a regular http request we can access response status which is available via upgrade.response.status
// status code 101 (Switching Protocols) indicates that server support WebSockets
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}
// in a real application you would not side effect here
connected.onComplete(println)
closed.foreach(_ => println("closed"))
}
}
-
답변 # 1
관련 자료
- python - Celery Beat가 하나의 마감 작업을 보낼 때 MainProcess가 세 가지 다른 작업을받은 이유는 무엇입니까?
- elasticsearch - Logstash는 다양한 유형의 메시지에서 데이터를 추출합니다
- python - asyncioQueue에서받은 메시지가 보낸 순서를 유지하지 않는 이유는 무엇입니까?
- swift - 숫자 변수, 즉 온도에 따라 다른 print () 메시지를 표시하려고합니다
- Slack bot - 슬랙 봇 - 여러 작업 공간에서 다른 사용자에게 메시지 보내기
- c - 수신 된 메시지에 대해 select ()가 반응하지 않는 이유는 무엇입니까?
- python - 다른 로그 파일을 다른 로그 파일로 필터링
- c# - Client-Unet에서 방송을받지 못했습니다
- ruby - 다른 사용자 입력에 대한 다른 메시지
- python - pip install, pip list ect 사용시 다른 오류 메시지
- android - 브로드 캐스트 수신기가 USB 권한을받지 못함
관련 질문
- scala - Akka 스트림의 groupBy가 병렬로 실행되는 하위 스트림을 생성합니까?
- scala - Futures, Akka Streams 및 Akka Actuator를 사용하는 동시 환경에서 역 압력을 유지하면서 비 스레드 안전 서비스와 통합
- scala - Akka Streams의 GraphStage 내부에서 집계를 어떻게 계산합니까?
- scala - MongoDB는 패턴 일치로 올바른 참조를 얻습니다
- scala - akkastreams에서 서브 스트림 키를 추출 할 수 있습니까?
- scala - Akka 스트림을 정상적으로 중지
- scala - akka-stream의 소스를 설명하는 데 사용되는 알 수없는 구문 - "#repr"
- scala - Akka Stream actor-conflation-ratelimit-actor는 처음 몇 개의 메시지를 삭제합니다 (때로는)
- scala - Behaviorsreceive 내에서 재귀 호출을 만드는 방법은 무엇입니까?
- scala - 미래의 선물에서 성공한 가치만을 읽는 방법
SinkShape를 사용하여 필요한 흐름을 얻을 수 있습니다
전체 코드는