package com.medium.android.common.stream;

import com.medium.android.common.api.MediumApi;
import com.medium.android.common.api.Pagings;
import com.medium.android.common.api.Response2;
import com.medium.android.common.core.JsonCodec;
import com.medium.android.common.generated.FeedProtos;
import com.medium.android.common.generated.MediumServiceProtos;
import com.medium.android.common.generated.PagingProtos;
import com.medium.android.common.generated.response.StreamItemListProtos;
import com.nytimes.android.external.cache.Cache;
import com.nytimes.android.external.cache.CacheBuilder;
import io.reactivex.Observable;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.functions.Action;
import io.reactivex.functions.Function;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import timber.log.Timber;

/* loaded from: classes2.dex */
public class RxStreamFetcher {
    private static final String HOME_STREAM_CACHE_KEY = "PENDING_HOMESTREAM_REQUEST";
    private final MediumApi api;
    private final JsonCodec jsonCodec;
    private final MediumServiceProtos.ObservableMediumService observableApi;
    private final Cache<String, Observable<StreamItemListProtos.StreamItemListResponse>> pendingHomeStreamObservableRequests;
    private final Cache<PagingProtos.Paging, Observable<StreamItemListProtos.StreamItemListResponse>> pendingMoreStreamRequestByPaging;

    public RxStreamFetcher(JsonCodec jsonCodec, MediumApi mediumApi, MediumServiceProtos.ObservableMediumService observableMediumService, CacheBuilder<Object, Object> cacheBuilder) {
        this.jsonCodec = jsonCodec;
        this.api = mediumApi;
        this.observableApi = observableMediumService;
        this.pendingMoreStreamRequestByPaging = cacheBuilder.build();
        this.pendingHomeStreamObservableRequests = cacheBuilder.build();
    }

    public Observable<StreamItemListProtos.StreamItemListResponse> fetchHomeStreamObservable(final int i) {
        try {
            return this.pendingHomeStreamObservableRequests.get(HOME_STREAM_CACHE_KEY, new Callable() { // from class: com.medium.android.common.stream.-$$Lambda$RxStreamFetcher$OcJ0DYyuOg-31mLST-SYOkr2_tU
                @Override // java.util.concurrent.Callable
                public final Object call() {
                    return RxStreamFetcher.this.lambda$fetchHomeStreamObservable$5$RxStreamFetcher(i);
                }
            });
        } catch (ExecutionException e) {
            Timber.TREE_OF_SOULS.e(e, "error observing home stream", new Object[0]);
            throw new RuntimeException();
        }
    }

    public /* synthetic */ Observable lambda$fetchHomeStreamObservable$5$RxStreamFetcher(int i) {
        return this.observableApi.fetchHomeStream(FeedProtos.PostFeedSource.FEED, i).map(new Function() { // from class: com.medium.android.common.stream.-$$Lambda$RxStreamFetcher$fIAsLB_O6nHi9dfXeDzckIyN664
            @Override // io.reactivex.functions.Function
            public final Object apply(Object obj) {
                return (StreamItemListProtos.StreamItemListResponse) ((Response2) obj).getPayload().get();
            }
        }).cache().share().observeOn(AndroidSchedulers.mainThread()).doOnTerminate(new Action() { // from class: com.medium.android.common.stream.-$$Lambda$RxStreamFetcher$LIAGQAAmeWK-X7pZObdO-6vtO58
            @Override // io.reactivex.functions.Action
            public final void run() {
                RxStreamFetcher.this.lambda$null$4$RxStreamFetcher();
            }
        });
    }

    public /* synthetic */ void lambda$null$4$RxStreamFetcher() {
        this.pendingHomeStreamObservableRequests.invalidate(HOME_STREAM_CACHE_KEY);
    }

    public /* synthetic */ void lambda$observeFetchMoreStream$1$RxStreamFetcher(PagingProtos.Paging paging) {
        this.pendingMoreStreamRequestByPaging.invalidate(paging);
    }

    public Observable<StreamItemListProtos.StreamItemListResponse> observeFetchMoreStream(final PagingProtos.Paging paging) {
        Map<String, Object> nextParameters = Pagings.nextParameters(this.jsonCodec, paging);
        String cleanPath = Pagings.cleanPath(paging);
        final Observable doOnTerminate = (paging.method.equalsIgnoreCase("POST") ? this.api.fetchMoreStreamPostObservable(cleanPath, nextParameters) : this.api.fetchMoreStreamObservable(cleanPath, nextParameters)).map(new Function() { // from class: com.medium.android.common.stream.-$$Lambda$RxStreamFetcher$itWj3a57B0a-VY_Xmuxzj_mVYoM
            @Override // io.reactivex.functions.Function
            public final Object apply(Object obj) {
                return (StreamItemListProtos.StreamItemListResponse) ((Response2) obj).getPayload().get();
            }
        }).cache().share().observeOn(AndroidSchedulers.mainThread()).doOnTerminate(new Action() { // from class: com.medium.android.common.stream.-$$Lambda$RxStreamFetcher$JcDNr9E_DGhrAWqSKCnPePcspFo
            @Override // io.reactivex.functions.Action
            public final void run() {
                RxStreamFetcher.this.lambda$observeFetchMoreStream$1$RxStreamFetcher(paging);
            }
        });
        try {
            return this.pendingMoreStreamRequestByPaging.get(paging, new Callable() { // from class: com.medium.android.common.stream.-$$Lambda$RxStreamFetcher$2m0SqoMDe1NwPtfaswfO5iMHq58
                @Override // java.util.concurrent.Callable
                public final Object call() {
                    return Observable.this;
                }
            });
        } catch (ExecutionException e) {
            Timber.TREE_OF_SOULS.e(e, "could not fetch more stream", new Object[0]);
            throw new RuntimeException();
        }
    }
}
