Skip to content

Instantly share code, notes, and snippets.

@benjchristensen
Created August 21, 2014 05:06
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save benjchristensen/fde2e7d2dad2c746a449 to your computer and use it in GitHub Desktop.
Save benjchristensen/fde2e7d2dad2c746a449 to your computer and use it in GitHub Desktop.
RetryWhenTestsConditional
java.lang.RuntimeException: always fails => retry after 1 second
don't retry on IllegalArgumentException... allow failure
Exception in thread "main" java.lang.IllegalArgumentException: user error
at RetryWhenTestsConditional.lambda$0(RetryWhenTestsConditional.java:16)
at RetryWhenTestsConditional$$Lambda$1/1670782018.call(Unknown Source)
at rx.Observable.unsafeSubscribe(Observable.java:8581)
at rx.internal.operators.OnSubscribeRedo$2.call(OnSubscribeRedo.java:228)
at rx.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.enqueue(TrampolineScheduler.java:85)
at rx.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.schedule(TrampolineScheduler.java:65)
at rx.internal.operators.OnSubscribeRedo$4$1.onNext(OnSubscribeRedo.java:289)
at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:602)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:528)
at rx.internal.operators.OnSubscribeTimerOnce$1.call(OnSubscribeTimerOnce.java:48)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:43)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import rx.Observable;
import rx.Subscriber;
public class RetryWhenTestsConditional {
public static void main(String[] args) {
AtomicInteger count = new AtomicInteger();
Observable.create((Subscriber<? super String> s) -> {
if (count.getAndIncrement() == 0) {
s.onError(new RuntimeException("always fails"));
} else {
s.onError(new IllegalArgumentException("user error"));
}
}).retryWhen(attempts -> {
return attempts.flatMap(n -> {
if (n.getThrowable() instanceof IllegalArgumentException) {
System.out.println("don't retry on IllegalArgumentException... allow failure");
return Observable.error(n.getThrowable());
} else {
System.out.println(n.getThrowable() + " => retry after 1 second");
return Observable.timer(1, TimeUnit.SECONDS);
}
});
})
.toBlocking().forEach(System.out::println);
}
}
@daschl
Copy link

daschl commented Aug 21, 2014

Thanks!

Here is the resulting retry when a CAS mismatch happens on a get -> modify -> replace cycle as DB interaction:

Observable
    .defer(() -> bucket.get("id"))
    .map(document -> {
        document.content().put("modified", new Date().getTime());
        return document;
    })
    .flatMap(bucket::replace)
    .retryWhen(attempts ->
        attempts.flatMap(n -> {
            if (!(n.getThrowable() instanceof CASMismatchException)) {
                return Observable.error(n.getThrowable());
            }
            return Observable.timer(1, TimeUnit.SECONDS);
        })
    )
    .subscribe();

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment