>

스레드를 빌드하고 싶을 때 대기열을 수신하고 대기열에 항목을 추가 할 때마다 작업을 수행합니다.

그러나 나는 그것을 개발하는 방법에 대해별로 생각하지 않습니다. RxJava2에서 Flowable 예제를 이미 시도했지만 그 방법을 인식하지 못했습니다.

Android 및 Java의 모든 예제에서 열려 있습니다. 메시지 처리기 또는 실행 프로그램이 쉬운 솔루션 일 수 있습니다. 슬프게도 노하우가 없습니다. 분명히 RxJava2는 좋을 것입니다.

업데이트

즉, 긴 로그가 분리되어 표시되고 타이밍이 두 개가 가까운 시간에 호출 될 때마다 혼합되기 때문에 큐 메커니즘을 만들고 싶습니다.

public final class Logcat {
   private static final String TAG = "HOWDY";
   public static void v(String message) {
       Log.v(TAG, message);
   }
   public static void d(String message) {
       Log.d(TAG, message); 
       //TODO I will add a for-loop later for long messages to make sure to show all of them for each method.
   }
   public static void e(Throwable throwable) {
       Log.e(TAG, throwable.getMessage());
   }
   public static void e(String message) {
       Log.e(TAG, message);
   }
   public static void e(ApiError error) {
       Log.e(TAG, error.message);
   }
}


  • 답변 # 1

    여기서 어떻게해야합니까.

    import java.util.Queue;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ConcurrentLinkedQueue;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicBoolean;
    import io.reactivex.Observable;
    import io.reactivex.schedulers.Schedulers;
    import io.reactivex.subjects.PublishSubject;
    public class DemoRxJava2 {
        public static void testWithQueue() {
            CompletableFuture<String> allDone = new CompletableFuture<>();
            AtomicBoolean submitDone = new AtomicBoolean(Boolean.FALSE);
            final Queue<Long> queue = new ConcurrentLinkedQueue<>();
            Observable.interval(2, TimeUnit.SECONDS)
            .takeWhile(tick -> !queue.isEmpty() || !submitDone.get())
            .flatMap(tick -> {
                return Observable.create(sub -> {
                    while (!queue.isEmpty()) {
                        sub.onNext(queue.poll());
                    }
                    sub.onComplete();
                });
            })
            .subscribeOn(Schedulers.single())
            .doOnSubscribe(dis -> System.out.println("Queue processing active"))
            .doOnComplete(() -> {
                System.out.println("Queue processing done");
                allDone.complete("DONE");
            })
            .subscribe(nextTs -> System.out.printf("[%s] : Processing tx : %d\n", Thread.currentThread().getName(), nextTs));
            Observable.interval(1,TimeUnit.SECONDS)
            .take(10)
            .doOnSubscribe(dis -> System.out.println("Job submitter start"))
            .doOnNext(tick -> {
                long ms = System.currentTimeMillis() / 1000;
                System.out.printf("[%s] : Submitting tx : %d\n", Thread.currentThread().getName(), ms);
                queue.add(ms);
            })
            .doOnComplete(() -> submitDone.set(Boolean.TRUE))
            .blockingSubscribe();
            try {
                allDone.get();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
        public static void testWithSubject() {
            CompletableFuture<String> allDone = new CompletableFuture<>();
            PublishSubject<Long> queue = PublishSubject.create();
            queue.observeOn(Schedulers.single())
            .flatMap(tx -> Observable.just(tx).delay(2, TimeUnit.SECONDS))
            .doOnSubscribe(dis -> System.out.println("Queue processing active"))
            .doOnComplete(() -> allDone.complete("DONE"))
            .subscribe(nextTs -> System.out.printf("[%s] : Processing tx : %d\n", Thread.currentThread().getName(), nextTs));
            Observable.interval(1, TimeUnit.SECONDS)
            .take(10)
            .doOnSubscribe(dis -> System.out.println("Job submitter start"))
            .doOnNext(tick -> {
                long ms = System.currentTimeMillis() / 1000;
                System.out.printf("[%s] : Submitting tx : %d\n", Thread.currentThread().getName(), ms);
                queue.onNext(ms);
            })
            .doOnComplete(() -> queue.onComplete())
            .blockingSubscribe();
            //wait until all done
            try {
                allDone.get();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
        public static void main(String[] args) {
            testWithQueue();
            testWithSubject();
        }
    }
    
    

    이것은 RxJava를 사용하여 별도의 스레드에서 객체 대기열을 처리하고 필요에 맞게 조정하는 방법을 보여줍니다.

  • 답변 # 2

    이것이 내가하는 방식입니다.

    interface ILog {
       String TAG = "HOWDY";
       void display();
    }
    
    

    내 추상 수업;

    abstract class AbstractLog implements ILog {
       String mLog;
       AbstractLog(@NonNull String log) {
           mLog = log;
       }
    }
    
    

    다음은 구체적인 수업입니다. Verbose 등과 같은 다른 클래스도 있습니다.

    public class ErrorLog extends AbstractLog {
       ErrorLog(@NonNull String log) {
           super(log);
       }
       ErrorLog(@NonNull Throwable throwable) {
           super(throwable.getMessage());
       }
       ErrorLog(@NonNull ApiError error) {
           super(error.message);
       }
       @Override
       public void display() {
          Log.e(TAG, mLog);
       }
    }
    
    

    이것은 상호 작용할 클래스 개발자입니다.

    public final class Logcat {
       private static LogQueue sQueue = new LogQueue();
       public static void v(String log) {
           Message message = new Message();
           message.obj = new VerboseLog(log);
           sQueue.sendMessage(message);
       }
       public static void d(String log) {
           Message message = new Message();
           message.obj = new DebugLog(log);
           sQueue.sendMessage(message);
       }
       public static void e(Throwable throwable) {
           Message message = new Message();
           message.obj = new ErrorLog(throwable);
           sQueue.sendMessage(message);
       }
       public static void e(String log) {
           Message message = new Message();
           message.obj = new ErrorLog(log);
           sQueue.sendMessage(message);
       }
       public static void e(ApiError error) {
           Message message = new Message();
           message.obj = new ErrorLog(error);
           sQueue.sendMessage(message);
       }
       private static class LogQueue extends Handler {
           @Override
           public void handleMessage(Message msg) {
               super.handleMessage(msg);
               ILog log = (ILog) msg.obj;
               log.display();
           }
       }
    }
    
    

    다른 사람들에게 도움이되기를 바랍니다.

  • 이전 google bigquery - 쿼리 당 1000 개의 테이블 제한이 분할 된 테이블에 적용됩니까?
  • 다음 linux - bash 스크립트에서 다른 ssh 세션으로 시작한 프로세스를 종료 할 수 없습니다