세 개의 값을 원하는데
aggValueInLastHour
입니다.
와이즈 비즈
와이즈 비즈
.
아래처럼 시도했습니다.
기다리고 싶지 않다는 것은 슬라이딩 창을 사용하여 집계를 수행하는 것을 선호하지 않는다는 것을 의미합니다.
첫 번째 이벤트가 올 때 마지막 3 일 집계 값을 얻으려면 어떻게해야합니까?
사전에 조언 해 주셔서 감사합니다!
aggValueInLastDay
-
답변 # 1
-
답변 # 2
ContinuousEventTimeTrigger를 사용하면 전체 창보다 짧은 시간에 창이 실행되어 중간 상태를 볼 수 있습니다. 싱크의 다운 스트림 소비자가 각 출력이 전체 전류 상태가 아닌 부분 집계가 될 것으로 예상하는 경우 PurgingTrigger에서 선택적으로 래핑 할 수 있습니다.
-
답변 # 3
CEP를 시도했습니다.
코드 :
AfterMatchSkipStrategy strategy = AfterMatchSkipStrategy.skipShortOnes(); Pattern<RiskEvent, ?> loginPattern = Pattern.<RiskEvent>begin("start", strategy) .where(eventTypeCondition) .timesOrMore(1) .greedy() .within(Time.hours(1)); KeyedStream<RiskEvent, String> keyedStream = dataStream.keyBy(new KeySelector<RiskEvent, String>() { @Override public String getKey(RiskEvent riskEvent) throws Exception { // key by user for aggregation return riskEvent.getEventType() + riskEvent.getDeviceFp(); } }); PatternStream<RiskEvent> eventPatternStream = CEP.pattern(keyedStream, loginPattern); eventPatternStream.select(new PatternSelectFunction<RiskEvent, RiskResult>() { @Override public RiskResult select(Map<String, List<RiskEvent>> map) throws Exception { List<RiskEvent> list = map.get("start"); ArrayList<Long> times = new ArrayList<>(); for (RiskEvent riskEvent : list) { times.add(riskEvent.getEventTime()); } Long min = Collections.min(times); Long max = Collections.max(times); Set<String> accountList = list.stream().map(RiskEvent::getUserName).collect(Collectors.toSet()); logger.info("时间范围:" + new Date(min) + " --- " + new Date(max) + " 事件:" + list.get(0).getEventType() + ", 设备指纹:" + list.get(0).getDeviceFp() + ", 关联账户:" + accountList.toString()); return null; } });
아마도, 스킵 전략
skipShortOnes
맞춤 전략입니다.CEP 라이브러리의 수정 사항을 보여주십시오.
<올>Enum에서 전략 추가
공개 열거 형 SkipStrategy { NO_SKIP, SKIP_PAST_LAST_EVENT, SKIP_TO_FIRST, SKIP_TO_LAST, SKIP_SHORT_ONES }
AfterMatchSkipStrategy.java
의 액세스 방법 추가공개 정적 AfterMatchSkipStrategy skipShortOnes () { 새 AfterMatchSkipStrategy (SkipStrategy.SKIP_SHORT_ONES)를 반환합니다. }
discardComputationStatesAccordingToStrategy
에서 전략 액션 추가NFA.java
의 방법 .케이스 SKIP_SHORT_ONES : int i = 0; List >>tempResult = new ArrayList<>(일치 결과); for (Map>resultMap : tempResult) { if (i ++ == 0) { 계속하다; } matchedResult.remove (resultMap); } 휴식;
관련 자료
- c++ - 포인터가 어떻게 r 값이 될 수 있습니까?
- sql - 자바 - 열거 형 값을 문자열로 가져 오는 방법?
- angular - HTML 속성에 값을 어떻게 설정할 수 있습니까?
- 시간을 얻고 파이썬에서 UTC로 변환하는 방법
- angularjs - CRM javascript 값을 null로 설정하는 방법은 무엇입니까?
- flutter - 문자열 날짜를 날짜 시간으로 변환하는 방법?
- function - C에서 해당 시간대의 시간을 얻는 방법은 무엇입니까?
- java - BST에서 단어를 검색 할 때 단어가 나타나는 시간을 어떻게 알 수 있습니까?
- dataframe - R에서 시간을 빼는 방법?
- R에서 시간 데이터를 간격으로 어떻게 분할합니까?
- kubuntu - 시스템 트레이에 시간을 다시 추가하려면 어떻게해야합니까?
- c# - linq - 객체의 가치를 얻는 방법
- sql server - 각 부서에 대해 한 번만 댓글을 표시하는 방법은 무엇입니까?
- c# - 콘솔 게임에서 Hp 값을 만드는 방법은 무엇입니까?
- python - 목록에서 가장 가까운 값을 얻는 방법은 무엇입니까?
- android - 조각/활동에 소요되는 시간을 얻는 방법
- python - ttk의 스핀 박스에서 값을 가져 오는 방법은 무엇입니까?
- python - exec의 반환 값을 얻는 방법?
- bash - nohup과 시간을 함께 사용하려면 어떻게하나요?
- vb.net - VB Len (x)가 C # SystemRuntimeInteropServicesMarshalSizeOf (x)와 다른 값을보고하는 이유는 무엇입니까?
- Apache Flink 자원 계획 우수 사례
- apache flink - sourcefunction을 사용하여 지정된 간격으로 작업을 실행하는 방법은 무엇입니까?
- Flink에서 스트림 간 속도를 조정하는 방법은 무엇입니까?
- Flink CEP로 패턴 a + b +를 감지하는 방법
- 운영자의 상태를 저장하지 않는 방법을 깜박임?
- Flink 작업을 실행하는 데 필요한 전체 작업 병렬 처리 또는 슬롯 수를 판별하는 방법이 있습니까 (실행하기 전에)
- java - FlinkCEP의 실행 시간
- 경고 코드에서 Arralist에 문제가 발생합니까?
- Flink CEP 이벤트가 트리거되지 않음
더 자주 업데이트하려면
QueryableState
를 사용하십시오 , 사용 사례에 적합한 속도로 상태를 폴링합니다.