Technology moves fast! ⚡ Don't get left behind.🚶 Subscribe to our mailing list to keep up with latest and greatest in open source projects! 🏆


Subscribe to our mailing list

RxJava

RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.

Star full 4f7b624809470f25b6493d5a7b30d9b9cb905931146e785d67c86ef0c205a402Star full 4f7b624809470f25b6493d5a7b30d9b9cb905931146e785d67c86ef0c205a402Star full 4f7b624809470f25b6493d5a7b30d9b9cb905931146e785d67c86ef0c205a402Star blank 374f33e4d622a2930833db3cbea26b5d03dc44961a6ecab0b9e13276d97d6682Star blank 374f33e4d622a2930833db3cbea26b5d03dc44961a6ecab0b9e13276d97d6682 (1 ratings)
Rated 3.0 out of 5
Subscribe to updates I use RxJava


Statistics on RxJava

Number of watchers on Github 31416
Number of open issues 31
Average time to close an issue 1 day
Main language Java
Average time to merge a PR about 23 hours
Open pull requests 71+
Closed pull requests 72+
Last commit over 1 year ago
Repo Created over 6 years ago
Repo Last Updated over 1 year ago
Size 49.2 MB
Organization / Authorreactivex
Latest Releasev2.1.10
Contributors118
Page Updated
Do you use RxJava? Leave a review!
View open issues (31)
View RxJava activity
View on github
Fresh, new opensource launches 🚀🚀🚀
Trendy new open source projects in your inbox! View examples

Subscribe to our mailing list

Evaluating RxJava for your project? Score Explanation
Commits Score (?)
Issues & PR Score (?)

RxJava: Reactive Extensions for the JVM

codecov.io Maven Central

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.

It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.

Version 1.x (Javadoc)

Looking for version 1.x? Jump to the 1.x branch.

Timeline plans for the 1.x line:

  • June 1, 2017 - feature freeze (no new operators), only bugfixes
  • March 31, 2018 - end of life, no further development

Version 2.x (Javadoc)

  • single dependency: Reactive-Streams
  • continued support for Java 6+ & Android 2.3+
  • performance gains through design changes learned through the 1.x cycle and through Reactive-Streams-Commons research project.
  • Java 8 lambda-friendly API
  • non-opinionated about source of concurrency (threads, pools, event loops, fibers, actors, etc)
  • async or synchronous execution
  • virtual time and schedulers for parameterized concurrency

Version 2.x and 1.x will live side-by-side for several years. They will have different group ids (io.reactivex.rxjava2 vs io.reactivex) and namespaces (io.reactivex vs rx).

See the differences between version 1.x and 2.x in the wiki article What's different in 2.0. Learn more about RxJava in general on the Wiki Home.

Getting started

The first step is to include RxJava 2 into your project, for example, as a Gradle compile dependency:

compile "io.reactivex.rxjava2:rxjava:2.x.y"

The second is to write the Hello World program:

package rxjava.examples;

import io.reactivex.*;

public class HelloWorld {
    public static void main(String[] args) {
        Flowable.just("Hello world").subscribe(System.out::println);
    }
}

If your platform doesn't support Java 8 lambdas (yet), you have to create an inner class of Consumer manually:

import io.reactivex.functions.Consumer;

Flowable.just("Hello world")
  .subscribe(new Consumer<String>() {
      @Override public void accept(String s) {
          System.out.println(s);
      }
  });

RxJava 2 features several base classes you can discover operators on:

One of the common use cases for RxJava is to run some computation, network request on a background thread and show the results (or error) on the UI thread:

import io.reactivex.schedulers.Schedulers;

Flowable.fromCallable(() -> {
    Thread.sleep(1000); //  imitate expensive computation
    return "Done";
})
  .subscribeOn(Schedulers.io())
  .observeOn(Schedulers.single())
  .subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(2000); // <--- wait for the flow to finish

This style of chaining methods is called a fluent API which resembles the builder pattern. However, RxJava's reactive types are immutable; each of the method calls returns a new Flowable with added behavior. To illustrate, the example can be rewritten as follows:

Flowable<String> source = Flowable.fromCallable(() -> {
    Thread.sleep(1000); //  imitate expensive computation
    return "Done";
});

Flowable<String> runBackground = source.subscribeOn(Schedulers.io());

Flowable<String> showForeground = runBackground.observeOn(Schedulers.single());

showForeground.subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(2000);

Typically, you can move computations or blocking IO to some other thread via subscribeOn. Once the data is ready, you can make sure they get processed on the foreground or GUI thread via observeOn.

RxJava operators don't work with Threads or ExecutorServices directly but with so called Schedulers that abstract away sources of concurrency behind an uniform API. RxJava 2 features several standard schedulers accessible via Schedulers utility class. These are available on all JVM platforms but some specific platforms, such as Android, have their own typical Schedulers defined: AndroidSchedulers.mainThread(), SwingScheduler.instance() or JavaFXSchedulers.gui().

The Thread.sleep(2000); at the end is no accident. In RxJava the default Schedulers run on daemon threads, which means once the Java main thread exits, they all get stopped and background computations may never happen. Sleeping for some time in this example situations lets you see the output of the flow on the console with time to spare.

Flows in RxJava are sequential in nature split into processing stages that may run concurrently with each other:

Flowable.range(1, 10)
  .observeOn(Schedulers.computation())
  .map(v -> v * v)
  .blockingSubscribe(System.out::println);

This example flow squares the numbers from 1 to 10 on the computation Scheduler and consumes the results on the main thread (more precisely, the caller thread of blockingSubscribe). However, the lambda v -> v * v doesn't run in parallel for this flow; it receives the values 1 to 10 on the same computation thread one after the other.

Processing the numbers 1 to 10 in parallel is a bit more involved:

Flowable.range(1, 10)
  .flatMap(v ->
      Flowable.just(v)
        .subscribeOn(Schedulers.computation())
        .map(w -> w * w)
  )
  .blockingSubscribe(System.out::println);

Practically, paralellism in RxJava means running independent flows and merging their results back into a single flow. The operator flatMap does this by first mapping each number from 1 to 10 into its own individual Flowable, runs them and merges the computed squares.

Starting from 2.0.5, there is an experimental operator parallel() and type ParallelFlowable that helps achieve the same parallel processing pattern:

Flowable.range(1, 10)
  .parallel()
  .runOn(Schedulers.computation())
  .map(v -> v * v)
  .sequential()
  .blockingSubscribe(System.out::println);

flatMap is a powerful operator and helps in a lot of situations. For example, given a service that returns a Flowable, we'd like to call another service with values emitted by the first service:

Flowable<Inventory> inventorySource = warehouse.getInventoryAsync();

inventorySource.flatMap(inventoryItem ->
    erp.getDemandAsync(inventoryItem.getId())
    .map(demand 
        -> System.out.println("Item " + inventoryItem.getName() + " has demand " + demand));
  )
  .subscribe();

Note, however, that flatMap doesn't guarantee any order and the end result from the inner flows may end up interleaved. There are alternative operators:

  • concatMap that maps and runs one inner flow at a time and
  • concatMapEager which runs all inner flows at once but the output flow will be in the order those inner flows were created.

For further details, consult the wiki.

Communication

Versioning

Version 2.x is now considered stable and final. Version 1.x will be supported for several years along with 2.x. Enhancements and bugfixes will be synchronized between the two in a timely manner.

Minor 2.x increments (such as 2.1, 2.2, etc) will occur when non-trivial new functionality is added or significant enhancements or bug fixes occur that may have behavioral changes that may affect some edge cases (such as dependence on behavior resulting from a bug). An example of an enhancement that would classify as this is adding reactive pull backpressure support to an operator that previously did not support it. This should be backwards compatible but does behave differently.

Patch 2.x.y increments (such as 2.0.0 -> 2.0.1, 2.3.1 -> 2.3.2, etc) will occur for bug fixes and trivial functionality (like adding a method overload). New functionality marked with an @Beta or @Experimental annotation can also be added in patch releases to allow rapid exploration and iteration of unstable new functionality.

@Beta

APIs marked with the @Beta annotation at the class or method level are subject to change. They can be modified in any way, or even removed, at any time. If your code is a library itself (i.e. it is used on the CLASSPATH of users outside your own control), you should not use beta APIs, unless you repackage them (e.g. using ProGuard, shading, etc).

@Experimental

APIs marked with the @Experimental annotation at the class or method level will almost certainly change. They can be modified in any way, or even removed, at any time. You should not use or rely on them in any production code. They are purely to allow broad testing and feedback.

@Deprecated

APIs marked with the @Deprecated annotation at the class or method level will remain supported until the next major release but it is recommended to stop using them.

io.reactivex.internal.*

All code inside the io.reactivex.internal.* packages is considered private API and should not be relied upon at all. It can change at any time.

Full Documentation

Binaries

Binaries and dependency information for Maven, Ivy, Gradle and others can be found at http://search.maven.org.

Example for Gradle:

compile 'io.reactivex.rxjava2:rxjava:x.y.z'

and for Maven:

<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>x.y.z</version>
</dependency>

and for Ivy:

<dependency org="io.reactivex.rxjava2" name="rxjava" rev="x.y.z" />

Snapshots are available via https://oss.jfrog.org/libs-snapshot/io/reactivex/rxjava2/rxjava/

repositories {
    maven { url 'https://oss.jfrog.org/libs-snapshot' }
}

dependencies {
    compile 'io.reactivex.rxjava2:rxjava:2.2.0-SNAPSHOT'
}

Build

To build:

$ git clone git@github.com:ReactiveX/RxJava.git
$ cd RxJava/
$ ./gradlew build

Further details on building can be found on the Getting Started page of the wiki.

Bugs and Feedback

For bugs, questions and discussions please use the Github Issues.

LICENSE

Copyright (c) 2016-present, RxJava Contributors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
RxJava open issues Ask a question     (View All Issues)
  • over 2 years RxJava switchifempty() on generics
  • over 2 years The stack trace gets deep very quickly and causes StackOverflowException below Android Lollipop
  • over 2 years UnsubscribeOn the same scheduler as SubscribeOn (Without specifying the schedulers explicitly)
  • over 2 years 2.0.1 Release Preparations
  • over 2 years How to stop sending data from Flowable.fromIterable.
  • over 2 years java.io.InterruptedIOException: thread interrupted
  • over 2 years reactive-streams should be scoped as compile dependency
  • over 2 years e.setDisposable() vs e.setCancellable()
  • over 2 years doOnDispose() vs doOnUnsubscribe()
  • over 2 years FATAL EXCEPTION: RxCachedThreadScheduler-4 or -1 or -2 or -3 ...
  • over 2 years IncompatibleClassChangeError while using RxJava+ Retrofit 2 occurs sometimes
  • over 2 years 2.x: compose between types
  • over 2 years Make 2.x the default branch
  • over 2 years How to add proguard?
  • over 2 years java.lang.NoClassDefFoundError: rx.internal.util.SubscriptionList
  • over 2 years Error handling when operate observable
  • over 2 years Rename operator doOnSubscribe to doBeforeSubscribe
  • over 2 years 1.2.2 release preparations
  • over 2 years doOnUnsubscribe() not called for Single
  • over 2 years ConnectableObservable's subscribers don't get emissions after second connection to source
  • over 2 years how to use the buffer(func0) ?
  • over 2 years 1.x and 2.x Proposal: Send a subclass of CancellationException in Single.takeUntil()
  • over 2 years Subscriber receive items with incorrect order.
  • over 2 years fromEmitter same-pool deadlock
  • over 2 years how can I use rxjava with grpc?
  • over 2 years 2.x: PublishSubject and serialize()
  • over 2 years 2.x: Idea for improving groupBy performance
  • over 2 years PlatformDependent.resolveAndroidApiVersion() fails when running under Robolectric
  • over 2 years Merging two observables that subcribeOn different threads does not call doOnCompleted() for merged observable
  • over 2 years 2.x Proposal: disposeOn
RxJava open pull requests (View All Pulls)
  • Add maxConcurrent parameter to flatMapIterable
  • Add vararg of Subscriptions to CompositeSubscription.
  • new method concatMapIterable #3713
  • Add maxConcurrent parameter to concatMapEager
  • Add takeUntil support in Single
  • 1.x: Added Single execution hooks
  • 1.x: fix counted buffer and window backpressure
  • 2.x: Add support for concurrently inserting actions while advancing time
  • 1.x: implement OperatorDoOnEmpty, with Observable.doOnEmpty() operator
  • 1.x: fix time windows in throttleFirst
  • 2.x: Reduse verbosity with varargs [WIP]
  • 1.x: Run tests on Android!
  • 1.x: OnBackpressureBuffer: DROP_LATEST and DROP_OLDEST
  • 1.x: replay request coordination reduce overhead
  • 1.x: observeOn - fix in-sequence termination/unsubscription
  • 1.x: Update Gradle wrapper to 2.12
  • 1.x: Add Single.onErrorResumeNext(Func)
  • 1.x: switchOnNextDelayError and switchMapDelayError
  • Added java6 project using retrolambda
  • Fix messagePrefix in CompositeException
  • 1.x: optimize merge/flatMap for empty sources
  • 1.x: fix ExecutorScheduler and GenericScheduledExecutorService reorder bug
  • 1.x: concatMap full rewrite + delayError + performance
  • 1.x: Operator sample emits last sampled value before termination.
  • 1.x: new operators buffer(While/Until) with predicate-based boundary
  • Fix an unsubscribe race in EventLoopWorker
  • 1.x: improve ExecutorScheduler worker unsubscription some more
  • 1.x: optimize concatMapIterable/flatMapIterable
  • 1.x: fix from(Iterable) error handling of Iterable/Iterator
  • 1.x: Add TestSingleSubscriber
  • 1.x: Add assertSubscribed() to TestSubscriber analog to assertUnsubscribed()
  • Making RxPlugins reset() public
  • Upgrading AsyncOnSubscribe from Experimental to Beta
  • 2.x: add ConsumableX to the base types, update method signatures
  • Add an operator to throttle data via controlling the requests going upstream
  • Clean-up of the 2.x Design.md document
  • 1.x: TestSubscriber extra info on assertion failures
  • add groupBy overload with evictingMapFactory
  • 1.x: ReplaySubject now supports backpressure
  • AssertionError should be treated as fatal
  • added distinctUntilChanged(comparator)
  • 2.x: Design.md +extension +fusion
  • 1.x: lift into Subject
  • 1.x: add multi-other withLatestFrom operators
  • 1.x: scan with an initial factory callback
  • Change the workers to capture the stack trace
  • 2.x update branch and year in contributing and readme md
  • Refactor private constructor checker into a utility class
  • 2.x: +fromAsync, distinctUC, skip, take overloads, fix TestSubscriber API
  • Adds Observable.sorted method
  • concatDelayError multiple arguments
  • Wrap InterruptedException with an unchecked exception in TestSubscriber#awaitValueCount().
  • 2.x: allow subscribeOn to work with blocking create
  • 1.x: Add note to SingleSubscriber doc about unsubscribe invocation
  • 1.x: Completable.doAfterTerminate to run after onError as well
  • Porting the Scheduler.when operator from 1.x to 2.x
  • 2.x: sys-property to disable calling uncaught exception handler
  • Added assertForEachValue for TestSubscriber usage
  • 2.x: assertNever(T value) / assertNever(Predicate<T> valuePredicate)
  • 2.x: A.flatMapB to eagerly check for cancellations before subscribing
  • 2.x: Update marble diagrams for sample overloads
  • Merging an observable of singles.
  • 2.x: add Flowable.parallel() and parallel operators
  • Add the cast operator to Single.
  • 2.x: make sure interval+trampoline can be stopped
  • 2.x: Improve the wording of the Maybe.fromCallable JavaDoc
  • 2.x: Add efficient mergeWith(Single|Maybe|Completable) overloads.
  • 2.x: Add efficient concatWith(Single|Maybe|Completable) overloads
  • 2.x: Explain the properties of the XEmitter interfaces in detail
  • 2.x: Expand the JavaDocs of the Scheduler API
  • 2.x: Expand the documentation of the Flowable.lift() operator
RxJava questions on Stackoverflow (View All Questions)
  • Is there a better way to get the intersection of two Observables using RxJava
  • Android RxJava Interval with IntentService
  • How to make RxJava interval to perform action instantly
  • Rxjava and Volley Requests
  • Using of "skipWhile" combined with "repeatWhen" in RxJava to implement server polling
  • Send event from custom class that implements observable outside call method using rxjava
  • Making an RxJava Operator Chain Concurrent
  • RxJava network requests and caching
  • RxJava and Cached Data
  • InterruptedException in RxJava cache thread when debugging on Android
  • RxJava - how to invoke subscriber on 4 click events in Android
  • RXJava - onBackpressureBuffer + multiple observers not working
  • RxJava/RxScala backpressure using request
  • At what point is the thread created in RxJava
  • Is safe to use RxJava in Android in production?
  • Realm, network operations, subscribing and observing on different threads with RxJava
  • RxJava, Proguard and sun.misc.Unsafe
  • correct way to join an rxjava subscription with in flight observable
  • In RxJava, I can't emit a onComplete from a flatMap
  • Robolectric + OkHttp + retrofit + rxJava Unit Test
  • How to manage results in RxJava with Retrofit
  • RxJava Code Execution Flow - map vs. flatMap
  • RxJava for Android: InvocationTargetException
  • RxJava - ConnectableObservable can't notify its observers more than 128 times when using observeOn and subscribeOn simultaneously
  • Correct way to nest RxJava Observables?
  • Vertx - Is it possible to combine RxJava and Hibernate
  • Apply retries in a RXjava
  • How to run 2 queries sequentially in a Android RxJava Observable?
  • Register Google GCM by using RxJava
  • Combining Multiple API Request Simultaneously using Retrofit and RxJava
RxJava list of languages used
RxJava latest release notes
v2.1.10 2.1.10

Maven

API changes

  • Pull 5845: Add efficient concatWith(Single|Maybe|Completable) overloads to Flowable and Observable.
  • Pull 5847: Add efficient mergeWith(Single|Maybe|Completable) overloads to Flowable and Observable.
  • Pull 5860: Add Flowable.groupBy overload with evicting map factory.

Documentation changes

  • Pull 5824: Improve the wording of the share() JavaDocs.
  • Pull 5826: Fix Observable.blockingIterable(int) and add Observable.blockingLatest marbles.
  • Pull 5828: Document size-bounded replay emission's item retention property.
  • Pull 5830: Reword the just() operator and reference other typical alternatives.
  • Pull 5834: Fix copy-paste errors in SingleSubject JavaDoc.
  • Pull 5837: Detail distinct() and distinctUntilChanged() in JavaDoc.
  • Pull 5841: Improve JavaDoc of Observer, SingleObserver, MaybeObserver and CompletableObserver.
  • Pull 5843: Expand the JavaDocs of the Scheduler API.
  • Pull 5844: Explain the properties of the {Flowable|Observable|Single|Maybe|Completable}Emitter interfaces in detail.
  • Pull 5848: Improve the wording of the Maybe.fromCallable JavaDoc.
  • Pull 5856: Add finite requirement to various collector operators' JavaDoc.

Bugfixes

  • Pull 5833: Fix Observable.switchMap main onError not disposing the current inner source.

Other changes

  • Pull 5838: Added nullability annotation for completable assembly.
  • Pull 5858: Remove unnecessary comment from Observable.timeInterval(TimeUnit).
v1.3.6 1.3.6

Maven

Bugfixes

  • Pull 5850: Fix a race condition that may make OperatorMaterialize emit the wrong signals.
  • Pull 5851: Fix a race condition in OperatorMerge.InnerSubscriber#onError causing incorrect terminal event.
v1.3.5 1.3.5

Maven

Other

  • Pull 5820: RxJavaPlugins lookup workaround for System.getProperties() access restrictions.
Other projects in Java