>

아파치 빔 애플리케이션은 JSON 배열로 메시지를 수신하지만 각 행을 BigQuery 테이블에 삽입합니다. ApacheBeam에서이 사용 사례를 어떻게 지원할 수 있습니까? 각 행을 분할하여 하나씩 테이블에 삽입 할 수 있습니까?

JSON 메시지 예 :

[
  {"id": 1, "name": "post1", "price": 10},
  {"id": 2, "name": "post2", "price": 20},
  {"id": 3, "name": "post3", "price": 30}
]

BigQuery 테이블 스키마 :

[
    {
      "mode": "REQUIRED",
      "name": "id",
      "type": "INT64"
    },
    {
      "mode": "REQUIRED",
      "name": "name",
      "type": "STRING"
    },
    {
      "mode": "REQUIRED",
      "name": "price",
      "type": "INT64"
    }
]


  • 답변 # 1

    여기 내 해결책이 있습니다. JSON 문자열을 List로 한 번 변환 한 다음 c.output을 하나씩 출력합니다. 내 코드는 스칼라에 있지만 Java에서도 같은 일을 할 수 있습니다.

       case class MyTranscationRecord(id: String, name: String, price: Int)
        case class MyTranscation(recordList: List[MyTranscationRecord])
        class ConvertJSONTextToMyRecord extends DoFn[KafkaRecord[java.lang.Long, String], MyTranscation]() {
          private val logger: Logger = LoggerFactory.getLogger(classOf[ConvertJSONTextToMyRecord])
          @ProcessElement
          def processElement(c: ProcessContext): Unit = {
            try {
              val mapper: ObjectMapper = new ObjectMapper()
                .registerModule(DefaultScalaModule)
              val messageText = c.element.getKV.getValue
              val transaction: MyRecord = mapper.readValue(messageText, classOf[MyTranscation])
              logger.info(s"successfully converted to an EPC transaction = $transaction")
              for (record <- transaction.recordList) {
                  c.output(record)
              }
            } catch {
              case e: Exception =>
                val message = e.getLocalizedMessage + e.getStackTrace
                logger.error(message)
            }
          }
        }
    
    

관련 자료

  • 이전 asp.net - 연속 실행 30 분 후에 다시 시작에서 Hangfire 반복 작업을 방지하는 방법
  • 다음 algorithm - 예외가있는 경우 for_each를 사용합니까? std - : exception_list