안드로이드/Rx

[Android] RxJava2 함수에 대해 알아보자

코딩하는후운 2022. 10. 24. 08:59
반응형

Android RxJava2 함수에 대해 알아보자


-공통적으로 사용하는 구독(수신)클래스
static class CustomSubscriber<T> extends DisposableSubscriber<T>{
@Override
public void onNext(T t){
}
@Override
public void onError(Throwable t){
}
@Override
public void onComplete(){
}
}

1.Just

파라미터를 통해 받은 데이터로 Flowable을 생성하는 연산자입니다.
최대 10까지 전달할 수 있고, 모든데이터가 수신되면 onComplete()수신.
기본적인 Flowable 생성자 함수로 볼 수 있으며 단순 작업에서 많이 사용.

//파라미터 값을 순차적으로 송신하는 Flowable 생성
Flowable<String> flowable = Flowable.just("A", "B", "C")
//구독을 시작
flowable.subscribe(new CustomSubscriber<>())

2.fromArray/fromIterable

파라미터로 배열 또는 Iterable(리스트 등)에 담긴 데이터를 순서대로 Flowable을 생성하는 연산자입니다.
모든데이터를 순차적으로 송신 후 완료.
반복적인 데이터 변환 작업 같은경우 for문 대신 대체할 수 있습니다.
결과를 보면 main Thread에서 작업 결과가 나오지만, flatMap을 사용한다면 별도의 Thread로 main Thread의 부하를 막을 수 있습니다.

(1) fromArray

Flowable<String> flowable = Flowable.fromArray("A","B","C")
flowable.subscribe(new CustomSubscriber<>())

(2) fromIterable

List<String> list = Arrays.asList("A", "B", "C")
Flowable<String> flowable = Flowable.fromIterable(list)
flowable.subscribe(new CustomSubscriber<>())

3.range/rangLong

range함수는 지정한 숫자부터 지정한 개수만큼 증가하는 Integer값 데이터를 송신하는 Flowable를 생성합니다.
rangLong함수는 range와 동일하게 데이터 타입은 Long을 사용합니다.
두 함수 데이터 송신을 마치면 onComplete를 송신.

(1) range

//range(int start, int count) :시작값, 발생하는 횟수
Flowable<Integer> flowable = Flowable.range(10, 5)
flowable.subscribe(new CustomSubscriber<>())

main onNext(10)
main onNext(11)
main onNext(12)
main onNext(13)
main onNext(14)
main onComplete()

4.interval

지정한 간격마다 0부터 시작해 Long타입 숫자의 데이터를 송신하는 Flowable을 생성.
Android에서는 반복적인 작업인 TimerTask를 대신해서 interval로 간단하게 처리할 수 있습니다.
UI변경이 필요한 부분에서는 interval scheduler를 AndroidSchedulers.mainThread()를 변경해 적용할 수 있습니다.
//(long time, TimeUnit unit, Scheduler scheduler)
//발생 간격 시간, 간격 시간 단위, 발생 scheduler를 변경하여 사용할 수 있다(AnroidSchedulers.mainThrea()

//1초 간격으로 데이터 요청을 송신

Flowable<Long> flowable = Flowable.interval(1000L, TimeUnit.MILLISECONDS).take(10)
flowable.subscribe(new CustomSubscriber<>())

RxComputationThreadPool-1 onNext( 0 )
RxComputationThreadPool-1 onNext( 1 )
RxComputationThreadPool-1 onNext( 2 )
..

5.timer

timer함수는 호출된 시간부터 일정한 시간동안 대기하고 Long타입 0을 송신 및 종료하는 flowable을 생성합니다.
interval이 조건까지 반복적으로 송신한다면, timer는 한번만 송신하고 종료됩니다.

SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy.MM.dd hh:mm ss")
//(long time, TimeUnit unit, Scheduler scheduler)
//발생 간격 시간, 간격 시간 단위, 발생 scheduler

Flowable<Long> flowable = Flowable.timer(1000L, TimeUnit.MILLISECONDS)
//구독을 시작한다.
flowable.subscribe(value ->{

}, throwable ->{
}, ()->{
//complete
})
결과
현재시간 : 2019.04.29 09:09 56
timer : 2019.04.29 09:09 57
complete

6.map

Flowable에서 송신하는 데이터를 변환하고, 변환된 데이터를 송신하는 연산자입니다.
하나의 데이터만 송신할 수 있으며, 반드시 데이터를 송신해야 합니다.
혹여 송신되는 데이터가 null을 포함하면 map대신 아래의 flatMap을 사용하는것이 좋습니다.

Flowable<String> flowable = Flowable.just("A", "B", "C")
//map(Function mapper)
//mapper : 받은 데이터를 가공하는 함수형 인터페이스
//알파벳 값을 소문자로 변경하여 return한다.
.map(value -> value.toLowerCase())

flowable.subscribe(new CustomSubscriber<>())

main onNext(a)
main onNext(b)
main onNext(c)
main onComplete()

7.flatMap

flatMap은 map과 동일한 함수이지만, map과는 달리 여러 데이터가 담긴 Flowable을 반환할 수 있습니다.
또한 빈 Flowable를 리턴해 특정 데이터를 건너뛰거나 에러 Flowable를 송신할 수 있습니다.

파라미터 mapper에서 새로운 Flowable의 데이터 전달이 아닌 다른 타임라인 Flowable로 작업하면 들어온
데이터 순서대로 출력을 지원하지 않습니다. 타임라인 Flowable(timer, delay, interval 등)에서는
가급적 사용을 피하거나, 순서에 지장이 없을 때 사용하는 것이 좋습니다.

Flowable<String> flowable = Flowable.range(10, 2)
//flatMap(Function mapper, BIFunction combiner)
//mapper: 받은 데이터로 새로운 Flowable를 생성하는 함수형 인터페이스
//combiner : mapper가 새로 생성한 Flowable과 원본 데이터를 조합해 새로운 송신 데이터를 생성하는 함수형 인터페이스
.flatMap(value -> Flowable.interval(100L, TimeUnit.MILLISECONDS).take(3),
(value, newData) -> "value " + value + " newData " + newData)

flowable.subscribe(new CustomSubscriber<>())

결과
RxComputationThreadPool-1 onNext( value 10 newData 0 )
RxComputationThreadPool-2 onNext( value 11 newData 0 )
RxComputationThreadPool-1 onNext( value 10 newData 1 )
RxComputationThreadPool-2 onNext( value 11 newData 1 )
RxComputationThreadPool-1 onNext( value 10 newData 2 )
RxComputationThreadPool-2 onNext( value 11 newData 2 )
RxComputationThreadPool-2 onComplete()

결과를 보면 각기 생성된 Flowable이 비동기식으로 송신 되기 때문에
서로 다른 스레드에서 실행돼 데이터를 받는 순서대로 송신하지 않는다는 점을 주목.

8.concatMap

받은 데이터를 Flowable로 변환하고 변환된 Flowable을 하나씩 순서대로 실행해서 수신자에게 송신.
여러 데이터를 계속 받더라도 첫번째 데이터로 생성한 Flowable의 처리가 끝나야 다음 데이터로 생성한 Flowable을 실행.

생성된 Flowable의 스레드에서 실행되더라도 데이터를 받은 순서대로 처리하는것을 보장하지만,
처리 성능에 영향을 줄 수있다.

Flowable<String> flowable = Flowable.range(10,5)
//map(Function mapper)
//mapper: 받은 데이터를 가공하는 함수형 인터페이스
.concatMap(value ->
Flowable.interval(100L, TimeUnit.MILLISECONDS).take(2)
.map(data ->("value: " + value + " data : " + data))

flowable.subscribe(new CustomSubscriber<>())

결과
RxComputationThreadPool-1 onNext( value : 10 data : 0 )
RxComputationThreadPool-1 onNext( value : 10 data : 1 )
RxComputationThreadPool-2 onNext( value : 11 data : 0 )
RxComputationThreadPool-2 onNext( value : 11 data : 1 )
RxComputationThreadPool-3 onNext( value : 12 data : 0 )
RxComputationThreadPool-3 onNext( value : 12 data : 1 )
RxComputationThreadPool-4 onNext( value : 13 data : 0 )
RxComputationThreadPool-4 onNext( value : 13 data : 1 )
RxComputationThreadPool-5 onNext( value : 14 data : 0 )
RxComputationThreadPool-5 onNext( value : 14 data : 1 )
RxComputationThreadPool-5 onComplete()

결과를 보면 생성된 Flowable 스레드와 데이터 순서대로 출력이 보장된다.

9.toList

toList는 송신할 데이터를 모두 리스트에 담아 전달.
한꺼번에 데이터를 List로 가공해서 받기에 좋습니다.
하지만 많은 양의 데이터를 처리할 경우 버퍼가 생길 수 있고, 메모리가 부족해질 수도 있습니다.
또한 수신되는 데이터는 하나이므로 Flowable이 아닌 Single 반환값을 사용.

Single<List<String>> single = Flowable.just("A", "B", "C").toList()

single.subscribe(new SingleObserver<List<String>>(){
..
onSuccess(List<String> strings){
//최종 완료된 리스트를 순서대로 출력한다.
for(String text: strings){
}
}
}


결과
main onNext()
main onSuccess( A )
main onSuccess( B )
main onSuccess( C )

10.toMap

toMap은 송신할 데이터를 모두 키와 값의 쌍으로 Map에 담아 전달합니다.
나머지는 toList의 특징과 같습니다.
송신되는 데이터 타입은 Map에 담아서 송신하는데 동일한 key에서 value는 마지막 데이터가 덮어씁니다.

Single<Map<Long, String>> single = Flowable.just("1A", "2B", "3C", "1D", "2E")
//toMap(Function keySelector, Fuction valueSelector, Callable mapSupplier)
//keySelector: 받은 데이터로 Map에서 사용할 키를 생성하는 함수형 인터페이스
//valueSelector: 받은 데이터로 Map에 넣을 값을 생성하는 함수형 인터페이스
.toMap(value -> Long.valueOf(value.substring(0,1)), data.substring(1))

single.subscribe(new SingleObserver<Map<Long, String>>(){
@Override
public void onSuccess(Map<Long, String> longStringMap){
//최종 완료된 map을 순서대로 출력
for( long id : longStringMap.keySet()){
}
}
}

결과
main onNext()
main onSuccess( id : 1, value D )
main onSuccess( id : 2, value E )
main onSuccess( id : 3, value C )

11.toMultiMap

키와 컬렉션 값으로 이루어진 Map을 데이터로 변환하여 송신하는 함수.
toMap에서 중복되는 value를 관리하는건 없었지만, value를 collection으로 관리하여 전달된느 데이터를 모두 수신할 수 있습니다.

Single<Map<String, Collection<Long>>> single =
Flowable.interval(100L, TimeUnit.MILLISECONDS)
.take(5)
.toMultimap(value ->{
//value가 홀수있지 짝수인지 판단해서 key값을 리턴
if(value % 2 == 0){
return "짝수"
}else{
return "홀수"
}
});

single.subscribe(new SingleObserver<Map<String, Collection<Long>>>(){
..
@Override
public void onSuccess(Map<String, Collection<Long>> stringCollectionMap){
for(String key : stringCollectionMap.keySet()){
StringBuffer stringBuffer = new StringBu()

for(long value: stringCollectionMap.get(key)){
stringBuffer.
}
}
}
}


결과
main onNext()
RxComputationThreadPool-1 onSuccess( id : 짝수, value  0 2 4 )
RxComputationThreadPool-1 onSuccess( id : 홀수, value  1 3 )

12.filter

filter는 받은 데이터가 조건에 맞는지 판단해 결과가 true인 값만 송신합니다.
위의 just, fromArray, interval이 반복적인 케이스였다면, filter는 if문처럼
조건문의 역할을 할 수 있습니다.

Flowable<Long> flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
//짝수만 통과한다.
.filter(value -> value % 2 == 0).take(3)

flowable.subscribe(new CustomSubscriber<>())


결과
RxComputationThreadPool-1 onNext( 0 )
RxComputationThreadPool-1 onNext( 2 )
RxComputationThreadPool-1 onNext( 4 )
RxComputationThreadPool-1 onComplete()

13.distinct

이미 처리된 데이터를 다시 볼 필요가 없을 때 사용하는 함수.
이미 송신된 데이터와 같다면 해당데이터는 무시합니다.
이 함수는 내부에서 HashSet으로 데이터가 같은지 확인합니다.

Flowable<String> flowable = Flowable.just("A", "a", "B", "b", "A", "a")
//distinct(Function keySelector)
//keySelector: 받은 데이터와 비교할 데이터를 확인하는 함수
//모두 소문자로 변환하여 알파벳 기준으로 데이터를 판단
.distinct(value -> value.toLowerCase())

flowable.subscribe(new CustomSubscriber<>())

결과
main onNext( A )
main onNext( B )
main onComplete()

14.take

(1)take
take함수는 지정된 횟수만큼 받은 데이터를 송신합니다.
지정된 횟수에 도달하면 완료를 송신해 처리 종료.

(2)takeUntil
지정된 조건까지 데이터를 송신하는 연산자입니다.

(3)takeWhile
지정된 조건이 해당할 때만 데이터를 송신하는 연산자.

(4)takeLast
데이터 끝에서부터 지정한 조건까지 데이터를 송신하는 연산자.

Floawble<Long> flowable = Flowable.interval(100L, TimeUnit.MILLISECONDS)
.take(5)

Flowable<Long> flowable = Flowable.interval(100L, TimeUnit.MILLISECONDS)
.takeUntil(value -> vlue == 5)

Flowable<Long> flowable = Flowable.interval(100L, TimeUnit.MILLISECONDS)
.takeWhile(value -> vale != 5)

//5개의 출력중 뒤에 2개만 송신한다.
Flowable<Long> flowable = Flowable.interval(100L, TimeUnit.MILLISECONDS)
.take(5)
.takeLast(2)

RxComputationThreadPool-1 onNext( 3 )
RxComputationThreadPool-1 onNext( 4 )
RxComputationThreadPool-1 onComplete()

15.skip

(1)skip
함수로 지정된 횟수만큼 받은 데이터 송신을 제외합니다.
지정된 횟수가 초과되면 나머지 데이터를 송신합니다.

(2)skipUntil
(3)skipWhile
(4)skipLast

take와 반대의 기능을 갖고있습니다. 보통 페이저나 리스트에서 paging을 처리할 떄는 take와 skip을 혼용합니다.

Flowable<Long> flowable = Flowable.interval(100L, TimeUnit.MILLISECONDS)
.take(5)
.skip(2)

//5개를 발행하고, 1000밀리세컨드 제외 후 송신
Flowable<Long> flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.skipUntil(Flowable.timer(1000L, TimeUnit.MILLISECONDS))
.take(5)
 
결과
RxComputationThreadPool-2 onNext( 3 )
RxComputationThreadPool-2 onNext( 4 )
RxComputationThreadPool-2 onNext( 5 )
RxComputationThreadPool-2 onNext( 6 )
RxComputationThreadPool-2 onNext( 7 )
RxComputationThreadPool-2 onComplete()


//5개를 발행하고, 데이터 3이 올때까지 데이터를 제외합니다.
Flowable<Long> flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.skipWhile(value -> value != 3)
.take(5)

결과
RxComputationThreadPool-1 onNext( 3 )
RxComputationThreadPool-1 onNext( 4 )
RxComputationThreadPool-1 onNext( 5 )
RxComputationThreadPool-1 onNext( 6 )
RxComputationThreadPool-1 onNext( 7 )
RxComputationThreadPool-1 onComplete()

//5개를 발행하고 마지막 2개는 제외
Flowable<Long> flowable = Flowable.interval(1000L, TimeUnit.MILLISECONDS)
.take(5)
.skipLast(2)

16.throttleFirst

데이터를 송신하고 지정된 시간동안 들어오는 요청을 무시.
이 함수는 View의 Event처리에서 많이 사용됩니다.
중복되는 처리를 막기 위해 최초 실행 후 일정 시간동안 View의 클릭 이벤트나 API이벤트를 막을수 있기 때문에
비동기 처리와 화면에 직접적인 피드백이 발생 했을 때 throttleFirst를 자주 사용하고 있습니다.

//데이터 요청이 30밀리초마다 5번 발생합니다.
//데이터 요청 발생시 100밀리세컨드 동안 들어오는 데이터 요청을 무시합니다.
Flowable<Long> flowable = Flowable.interval(30L, TimeUnit.MILLISECONDS)
.take(5).throttleFirst(100L, TimeUnit.MILLISECONDS)
flowable.subscribe(new CustomSubscriber<>())


결과
RxComputationThreadPool-1 onNext( 0 )
RxComputationThreadPool-1 onNext( 4 )
RxComputationThreadPool-1 onComplete()

17.throttleLast

데이터를 송신하고 지정된 시간동안 들어오는 마지막 요청을 송신합니다.
이 함수도 throttleFirst처럼 반복적인 선택 이벤트 처리에 유용하게 사용.
(장바구니 카운트 변경을 요청할 때 마지막 변경 이벤트 데이터만 처리하면 되므로 값이 선택되고
일정시간이 지났을 때 API를 요청해 리소스 낭비를 줄일 수 있습니다.

//데이터 요청이 1초마다 6번 발생.
//데이터 요청 발생시 2초 동안 들어오는 마지막 요청을 송신
//- 0 - 1 - 2 - 3 - 4
//    -   -   -*  -
Flowable<Long> flowable = Flowable.interval(1, TimeUnit.SECONDS)
.take(5)
.throttleLast(2, TimeUnit.SECONDS)
flowable.subscribe(new CustomSubscriber<>())

결과
RxComputationThreadPool-1 onNext( 2 )
RxComputationThreadPool-1 onNext( 4 )
RxComputationThreadPool-1 onComplete()

 

 

참조 :
http://labs.brandi.co.kr/2019/05/30/gojs.html#ju1

 

RxJava2 함수 파헤치기!

Overview지난 글 Rxjava를 이용한 안드로이드 개발에서는 RxJava의 Android 연결 방법과 기본적인 사용법을 다뤘습니다. 이번 글에서는 RxJava의 강력하고 다양한 함수들을 살펴보고자 합니다. Android에서

labs.brandi.co.kr

 

반응형

'안드로이드 > Rx' 카테고리의 다른 글

RxJava에 대해 알아보자  (0) 2022.10.24
리액티브 연산자 - 기초 (map, filter, reduce)  (0) 2021.03.28