Technology moves fast! ⚡ Don't get left behind.🚶 Subscribe to our mailing list to keep up with latest and greatest in open source projects! 🏆


Subscribe to our mailing list

RxKotlin

RxJava bindings for Kotlin

Subscribe to updates I use RxKotlin


Statistics on RxKotlin

Number of watchers on Github 3858
Number of open issues 10
Average time to close an issue about 1 month
Main language Kotlin
Average time to merge a PR 8 days
Open pull requests 21+
Closed pull requests 17+
Last commit about 1 year ago
Repo Created over 4 years ago
Repo Last Updated 12 months ago
Size 495 KB
Organization / Authorreactivex
Latest Release2.2.0
Contributors11
Page Updated
Do you use RxKotlin? Leave a review!
View open issues (10)
View RxKotlin activity
View on github
Fresh, new opensource launches 🚀🚀🚀
Trendy new open source projects in your inbox! View examples

Subscribe to our mailing list

Evaluating RxKotlin for your project? Score Explanation
Commits Score (?)
Issues & PR Score (?)

RxKotlin

Kotlin Extensions for RxJava

RxKotlin is a lightweight library that adds convenient extension functions to RxJava. You can use RxJava with Kotlin out-of-the-box, but Kotlin has language features (such as extension functions) that can streamline usage of RxJava even more. RxKotlin aims to conservatively collect these conveniences in one centralized library, and standardize conventions for using RxJava with Kotlin.

import io.reactivex.rxkotlin.subscribeBy
import io.reactivex.rxkotlin.toObservable

fun main(args: Array<String>) {

    val list = listOf("Alpha", "Beta", "Gamma", "Delta", "Epsilon")

    list.toObservable() // extension function for Iterables
            .filter { it.length >= 5 }
            .subscribeBy(  // named arguments for lambda Subscribers
                    onNext = { println(it) },
                    onError =  { it.printStackTrace() },
                    onComplete = { println("Done!") }
            )

}

Resources

Learning RxJava Packt Book

Chapter 12 of Learning RxJava covers RxKotlin and Kotlin idioms with RxJava.

Reactive Programming in Kotlin Packt Book

The book Reactive Programming in Kotlin mainly focuses on RxKotlin, as well as learning reactive programming with Kotlin.

Kotlin Slack Channel

Join us on the #rx channel in Kotlin Slack!

https://kotlinlang.slack.com/messages/rx

Support for RxJava 1.x and RxJava 2.x

Use RxKotlin 1.x versions to target RxJava 1.x.

Use RxKotlin 2.x versions to target RxJava 2.x.

The maintainers do not update the RxJava dependency version for every RxJava release, so you should explicitly add the desired RxJava dependency version to your pom.xml or build.gradle.

Build

Build Status

Binaries

Binaries and dependency information for Maven, Ivy, Gradle and others can be found at http://search.maven.org.

RxKotlin 1.x

Example for Maven:

<dependency>
    <groupId>io.reactivex</groupId>
    <artifactId>rxkotlin</artifactId>
    <version>1.x.y</version>
</dependency>

and for Gradle:

compile 'io.reactivex:rxkotlin:x.y.z'

RxKotlin 2.x

Example for Maven:

<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxkotlin</artifactId>
    <version>2.x.y</version>
</dependency>

and for Gradle:

compile 'io.reactivex.rxjava2:rxkotlin:x.y.z'

Building with JitPack

You can also use Gradle or Maven with JitPack to build directly off a snapshot, branch, or commit of this repository.

For example, to build off the 1.x branch, use this setup for Gradle:

repositories {
    maven { url 'https://jitpack.io' }
}

dependencies {
    compile 'com.github.ReactiveX:RxKotlin:1.x-SNAPSHOT'
}

Use this setup for Maven:

    <repositories>
        <repository>
            <id>jitpack.io</id>
            <url>https://jitpack.io</url>
        </repository>
    </repositories>

        <dependency>
        <groupId>com.github.ReactiveX</groupId>
        <artifactId>RxKotlin</artifactId>
        <version>1.x-SNAPSHOT</version>
    </dependency>

Learn more about building this project with JitPack here.

Extensions

Target Type Method Return Type Description
BooleanArray toObservable() Observable Turns a Boolean array into an Observable
ByteArray toObservable() Observable Turns a Byte array into an Observable
ShortArray toObservable() Observable Turns a Short array into an Observable
IntArray toObservable() Observable Turns an Int array into an Observable
LongArray toObservable() Observable Turns a Long array into an Observable
FloatArray toObservable() Observable Turns a Float array into an Observable
DoubleArray toObservable() Observable Turns a Double array into an Observable
Array toObservable() Observable Turns a T array into an Observable
IntProgression toObservable() Observable Turns an IntProgression into an Observable
Iterable toObservable() Observable Turns an Iterable<T> into an Observable
Iterator toObservable() Observable Turns an Iterator<T> into an Observable
Observable flatMapSequence() Observable Flat maps each T emission to a Sequence<R>
Observable> toMap() Collects Pair<A,B> emissions into a Map<A,B>
Observable> toMultimap() Collects Pair<A,B> emissions into a Map<A,List<B>>
Observable> mergeAll() Observable Merges all Observables emitted from an Observable
Observable> concatAll() Observable Cocnatenates all Observables emitted from an Observable
Observable> switchLatest() Observable Emits from the last emitted Observable
Observable<*> cast() Observable Casts all emissions to the reified type
Observable<*> ofType() Observable Filters all emissions to only the reified type
Iterable> merge() Merges an Iterable of Observables into a single Observable
Iterable> mergeDelayError() Merges an Iterable of Observables into a single Observable, but delays any error
BooleanArray toFlowable() Flowable Turns a Boolean array into an Flowable
ByteArray toFlowable() Flowable Turns a Byte array into an Flowable
ShortArray toFlowable() Flowable Turns a Short array into an Flowable
IntArray toFlowable() Flowable Turns an Int array into an Flowable
LongArray toFlowable() Flowable Turns a Long array into an Flowable
FloatArray toFlowable() Flowable Turns a Float array into an Flowable
DoubleArray toFlowable() Flowable Turns a Double array into an Flowable
Array toFlowable() Flowable Turns a T array into an Flowable
IntProgression toFlowable() Flowable Turns an IntProgression into an Flowable
Iterable toFlowable() Flowable Turns an Iterable<T> into an Flowable
Iterator toFlowable() Flowable Turns an Iterator<T> into an Flowable
Flowable flatMapSequence() Flowable Flat maps each T emission to a Sequence<R>
Flowable> toMap() Collects Pair<A,B> emissions into a Map<A,B>
Flowable> toMultimap() Collects Pair<A,B> emissions into a Map<A,List<B>>
Flowable> mergeAll() Flowable Merges all Flowables emitted from an Flowable
Flowable> concatAll() Flowable Concatenates all Flowables emitted from an Flowable
Flowable> switchLatest() Flowable Emits from the last emitted Flowable
Flowable cast() Flowable Casts all emissions to the reified type
Flowable ofType() Flowable Filters all emissions to only the reified type
Iterable> merge() Merges an Iterable of Flowables into a single Flowable
Iterable> mergeDelayError() Merges an Iterable of Flowables into a single Flowable, but delays any error
Single cast() Single Casts all emissions to the reified type
Observable> mergeAllSingles() Observable Merges all Singles emitted from an Observable
Flowable> mergeAllSingles() Flowable Merges all Singles emitted from a Flowable
Maybe cast() Maybe Casts any emissions to the reified type
Maybe ofType() Maybe Filters any emission that is the reified type
Observable> mergeAllMaybes() Observable Merges all emitted Maybes
Flowable> mergeAllMaybes() Flowable Merges all emitted Maybes
Action toCompletable() Completable Turns an Action into a Completable
Callable toCompletable() Completable Turns a Callable into a Completable
Future toCompletable() Completable Turns a Future into a Completable
(() -> Any) toCompletable() Completable Turns a (() -> Any) into a Completable
Observable mergeAllCompletables() Completable> Merges all emitted Completables
Flowable mergeAllCompletables() Completable Merges all emitted Completables
Observable subscribeBy() Disposable Allows named arguments to construct an Observer
Flowable subscribeBy() Disposable Allows named arguments to construct a Subscriber
Single subscribeBy() Disposable Allows named arguments to construct a SingleObserver
Maybe subscribeBy() Disposable Allows named arguments to construct a MaybeObserver
Completable subscribeBy() Disposable Allows named arguments to construct a CompletableObserver
Observable blockingSubscribeBy() Unit Allows named arguments to construct a blocking Observer
Flowable blockingSubscribeBy() Unit Allows named arguments to construct a blocking Subscriber
Disposable addTo() Disposable Adds a Disposable to the specified CompositeDisposable
CompositeDisposable plusAssign() Disposable Operator function to add a Disposable to thisCompositeDisposable

SAM Helpers

To help cope with the SAM ambiguity issue when using RxJava 2.x with Kotlin, there are a number of helper factories and extension functions to workaround the affected operators.

Observables.zip()
Observables.combineLatest()
Observable#zipWith()
Observable#withLatestFrom()
Flowables.zip()
Flowables.combineLatest()
Flowable#zipWith()
Flowable#withLatestFrom()
Singles.zip()
Single#zipWith()
Maybes.zip()

Usage with Other Rx Libraries

RxKotlin can be used in conjunction with other Rx and Kotlin libraries, such as RxAndroid, RxBinding, and TornadoFX/RxKotlinFX. These libraries and RxKotlin are modular, and RxKotlin is merely a set of extension functions to RxJava that can be used with these other libraries. There should be no overlap or dependency issues.

Contributing

We welcome contributions and discussion for new features. It is recommended to file an issue first to prevent unnecessary efforts, but feel free to put in pull requests. The vision is to keep this library lightweight, with a tight and focused scope applicable to all platforms (including Android, server, and desktop). Anything specific to a particular domain (for example, JavaFX or JDBC), might be better suited as a separate project. Feel free to open discussion and we will help figure out where your functionality may belong.

RxKotlin open issues Ask a question     (View All Issues)
  • over 2 years Call for a new leadership
  • over 2 years Creating observable using kotlin range
  • over 2 years RxJava 2.0 Planning?
  • over 2 years Add completable method wrapping Completable.create
  • almost 3 years DSL for Multiple Lambda Operators?
  • almost 3 years concat() extension method
  • almost 3 years Do these methods really belong in RxKotlin?
  • almost 3 years RxJavaMath extensions
RxKotlin open pull requests (View All Pulls)
  • Add extension for more idiomatic subscription addition
  • cast operator
  • Add subscribeWith method for Single.
  • Add generic toSingle() extension function
  • Compile RxKotlin and examples with Kobalt.
  • Add subscriptions directly to a CompositeSubscription
  • Start implementing DSL operators for multiple lambda arguments
  • Remove unneeded indentation in README
  • Add completable method for Completable.
  • Update library dependencies
  • added support for `ofType`
  • Add flatZipWith for zip functions that return an Observable
  • Add asyncToObservable(); nicer Observable.fromAsync().
  • From async
  • Dependencies updated
  • update RxJava version
  • rxKotlin for rxJava2
  • Zip Pair/Triple
  • Update Kotlin Gradle Plugin and RxJava Gradle Repository
  • Added Observables.combineLatest for Iterable
  • Make composite disposable extensions work on DisposableContainer instead
RxKotlin questions on Stackoverflow (View All Questions)
  • Kotlin library 'rxkotlin-0.21.0.jar' has an unsupported format. Please update the library or the plugin
  • RxKotlin: trying to add custom errors catching
  • Use RxAndroid or RxKotlin when programming in Kotlin for Android?
RxKotlin list of languages used
RxKotlin latest release notes
2.2.0 RxKotlin 2.2.0

This is a breaking release removing some annoyances, specifically the toSingle() and toMaybe() extension functions targeting futures, callables, and even single T items.

These caused far more harm than good, as they often clashed with other libraries and people's own extensions. For instance, RxKotlinFX has a Dialog<T>.toMaybe() extension which emits a dialog's response. This clashed directly with the toMaybe() in RxKotlin. Rather than deprecating these extensions, they were just removed to prevent further hindrance.

Deprecated Observable#combineLatest() and its Flowable counterpart have been removed as well, and you can use the Observables and Flowables factories instead.

RxJava and Kotlin dependencies have been updated as well.

2.1.0 RxKotlin 2.1.0

There are a number of changes in this release, some of which are possibly breaking. Many additions seek to expand the SAM helpers used to workaround the SAM issue.

  • zip(), combineLatest(), zipWith(), and withLatestFrom() now support emitting Pairs and Triples when no combining function is provided for 2-3 source arguments. This applies to Observable, Flowable, Single, and Maybe types.

  • Singles and Maybes have been greatly expanded to include more SAM helper operators for zip() and zipWith()

  • toMap() and toMultimap() extension operators have been added to Observable<Pair<X,Y>> and Flowable<Pair<X,Y>>, which will automatically use each Pair emission to map the key and value without any arguments.

  • Order of parameters for subscribeBy() have been moved so that onNext is the default if only one unnamed parameter is provided.

  • Kotlin 1.2 and RxJava 2.1.0 are now dependencies

Please file any issues if you have any questions or concerns. Thanks to everyone who contributed to this release.

2.0.3 RxKotlin 2.0.3

This is a quick release to remove Java 8 configuration, so this works on Java 6+.

Also a blockingSubscribeBy() has been added as discussed here.

Other projects in Kotlin