>

세 개의 값을 원하는데 aggValueInLastHour 입니다.  와이즈 비즈  와이즈 비즈 .

아래처럼 시도했습니다.

기다리고 싶지 않다는 것은 슬라이딩 창을 사용하여 집계를 수행하는 것을 선호하지 않는다는 것을 의미합니다.

첫 번째 이벤트가 올 때 마지막 3 일 집계 값을 얻으려면 어떻게해야합니까?

사전에 조언 해 주셔서 감사합니다!

aggValueInLastDay

  • 답변 # 1

    더 자주 업데이트하려면 QueryableState 를 사용하십시오 , 사용 사례에 적합한 속도로 상태를 폴링합니다.

  • 답변 # 2

    ContinuousEventTimeTrigger를 사용하면 전체 창보다 짧은 시간에 창이 실행되어 중간 상태를 볼 수 있습니다. 싱크의 다운 스트림 소비자가 각 출력이 전체 전류 상태가 아닌 부분 집계가 될 것으로 예상하는 경우 PurgingTrigger에서 선택적으로 래핑 할 수 있습니다.

  • 답변 # 3

    CEP를 시도했습니다.

    코드 :

    AfterMatchSkipStrategy strategy = AfterMatchSkipStrategy.skipShortOnes();
        Pattern<RiskEvent, ?> loginPattern = Pattern.<RiskEvent>begin("start", strategy)
                .where(eventTypeCondition)
                .timesOrMore(1)
                .greedy()
                .within(Time.hours(1));
    
        KeyedStream<RiskEvent, String> keyedStream = dataStream.keyBy(new KeySelector<RiskEvent, String>() {
            @Override
            public String getKey(RiskEvent riskEvent) throws Exception {
                // key by user for aggregation
                return riskEvent.getEventType() + riskEvent.getDeviceFp();
            }
        });
        PatternStream<RiskEvent> eventPatternStream = CEP.pattern(keyedStream, loginPattern);
        eventPatternStream.select(new PatternSelectFunction<RiskEvent, RiskResult>() {
            @Override
            public RiskResult select(Map<String, List<RiskEvent>> map) throws Exception {
                List<RiskEvent> list = map.get("start");
                ArrayList<Long> times = new ArrayList<>();
                for (RiskEvent riskEvent : list) {
                    times.add(riskEvent.getEventTime());
                }
                Long min = Collections.min(times);
                Long max = Collections.max(times);
                Set<String> accountList = list.stream().map(RiskEvent::getUserName).collect(Collectors.toSet());
                logger.info("时间范围:" + new Date(min) + " --- " + new Date(max) + " 事件:" + list.get(0).getEventType() + ", 设备指纹:" + list.get(0).getDeviceFp() + ", 关联账户:" + accountList.toString());
                return null;
            }
        });
    
    

    아마도, 스킵 전략 skipShortOnes  맞춤 전략입니다.

    CEP 라이브러리의 수정 사항을 보여주십시오.

    <올>

    Enum에서 전략 추가

    공개 열거 형 SkipStrategy {     NO_SKIP,     SKIP_PAST_LAST_EVENT,     SKIP_TO_FIRST,     SKIP_TO_LAST,     SKIP_SHORT_ONES }

    AfterMatchSkipStrategy.java 의 액세스 방법 추가

    공개 정적 AfterMatchSkipStrategy skipShortOnes () {     새 AfterMatchSkipStrategy (SkipStrategy.SKIP_SHORT_ONES)를 반환합니다. }

    discardComputationStatesAccordingToStrategy 에서 전략 액션 추가   NFA.java 의 방법 .

    케이스 SKIP_SHORT_ONES :             int i = 0;             List >>tempResult = new ArrayList<>(일치 결과);             for (Map>resultMap : tempResult) {                 if (i ++ == 0) {                     계속하다;                 }                 matchedResult.remove (resultMap);             }             휴식;

  • 이전 R로 저장된 데이터 인코딩 (Windows-Linux 이식성)
  • 다음 html - 다른 div 위에 라인 테이블에 드롭 다운 표시