響應(yīng)式API的設(shè)計(jì)、實(shí)現(xiàn)和應(yīng)用
。
再看一遍前面的例子;保證第一個(gè)getPage()調(diào)用在針對(duì)每個(gè)附加頁(yè)面的后續(xù)調(diào)用之前發(fā)生。此外,由于后續(xù)對(duì)getPage()的調(diào)用是在.Flatmapmany()中的,所以由框架負(fù)責(zé)優(yōu)化多線(xiàn)程執(zhí)行,并將結(jié)果匯到一起返回,傳播可能發(fā)生的任何錯(cuò)誤。
條件邏輯
與命令式編程不同,在響應(yīng)式編程中錯(cuò)誤是作為一種值來(lái)考慮的。這意味著它們是通過(guò)流操作來(lái)傳遞的。這些錯(cuò)誤可以通過(guò)所有方式傳遞給消費(fèi)者,或者流可以基于它們改變行為。這種行為變化可以表現(xiàn)為錯(cuò)誤的轉(zhuǎn)換或基于錯(cuò)誤產(chǎn)生新的結(jié)果。
public Mono<AppStatsResponse> getApplication(GetAppRequest request) { return client.applications() .statistics(AppStatsRequest.builder() .applicationId(request.id()) .build()) .onErrorResume(ExceptionUtils.statusCode(APP_STOPPED_ERROR), t -> Mono.just(AppStatsResponse.builder().build())); }
在本例中,我們要求為正在運(yùn)行的應(yīng)用程序獲取統(tǒng)計(jì)信息。如果一切正常,響應(yīng)就會(huì)傳回給消費(fèi)者。但是,如果接收到一個(gè)錯(cuò)誤(帶有特定的狀態(tài)代碼),則返回一個(gè)空響應(yīng)。使用者永遠(yuǎn)不會(huì)看到錯(cuò)誤和執(zhí)行過(guò)程中的默認(rèn)值,就好像從來(lái)沒(méi)有發(fā)出過(guò)錯(cuò)誤信號(hào)一樣。
如前所述,一個(gè)流完成時(shí)未發(fā)送任何條目也是有效的。通常,這就相當(dāng)于返回null(其中void返回類(lèi)型是一種特殊情況)。像以上這種出錯(cuò)的情況一樣,沒(méi)有任何條目的完成結(jié)果可以一直傳遞給消費(fèi)者,或者流可以基于它們改變行為。
public Flux<GetDomainsResponse> getDomains(GetDomainsRequest request) { return requestPrivateDomains(request.getId()) .switchIfEmpty(requestSharedDomains(request.getId())); }
在本例中,getDomains()返回一個(gè)域,該域可以位于兩個(gè)不同的桶中。首先搜索私有域,如果成功完成,即使沒(méi)有結(jié)果,也會(huì)搜索共享域。
public Mono<String> getDomainId(GetDomainIdRequest request) { return getPrivateDomainId(request.getName()) .switchIfEmpty(getSharedDomainId(request.getName())) .switchIfEmpty(ExceptionUtils.illegalState( "Domain %s not found", request.getName())); }
也可以用無(wú)條目表示一種錯(cuò)誤條件。在這個(gè)示例中,如果沒(méi)有找到私有或共享域,就會(huì)生成一個(gè)新的IllegalStateException并傳遞給使用者。
然而有時(shí),你希望根據(jù)無(wú)錯(cuò)誤或空來(lái)做決策,但不是根據(jù)值本身。雖然可以使用操作符來(lái)實(shí)現(xiàn)這個(gè)邏輯,但人們常常發(fā)現(xiàn),其復(fù)雜度要遠(yuǎn)遠(yuǎn)高于其價(jià)值。在本例中,你應(yīng)該只使用命令式條件語(yǔ)句。
public Mono<String> getDomainId(String domain, String organizationId) { return Mono.just(domain) .filter(d -> d == null) .then(getSharedDomainIds() .switchIfEmpty(getPrivateDomainIds(organizationId)) .next() // select first returned .switchIfEmpty(ExceptionUtils.illegalState("Domain not found"))) .switchIfEmpty(getPrivateDomainId(domain, organizationId) .switchIfEmpty(getSharedDomainId(domain)) .switchIfEmpty( ExceptionUtils.illegalState("Domain %s not found", domain))); }
這個(gè)示例返回給定的組織(一個(gè)分級(jí)容器)中給定域名的id。這里有兩個(gè)分支:如果域?yàn)榭?,則返回組織范圍內(nèi)第一個(gè)共享域或私有域的id。如果域不為空,則搜索顯式的域名,并返回它的id。如果你覺(jué)得這段代碼令人迷惑難懂,不要絕望,我們也一樣!
public Mono<String> getDomainId(String domain, String organizationId) { if (domain == null) { return getSharedDomainIds() .switchIfEmpty(getPrivateDomainIds(organizationId)) .next() .switchIfEmpty(ExceptionUtils.illegalState("Domain not found")); } else { return getPrivateDomainId(domain, organizationId) .switchIfEmpty(getSharedDomainId(domain)) .switchIfEmpty( ExceptionUtils.illegalState("Domain %s not found", domain)); } }
這個(gè)示例效果一樣,但使用的是命令式條件語(yǔ)句。但更容易理解得多了,你覺(jué)得呢?
測(cè)試
實(shí)際上,大多數(shù)有用的流都是異步的。這在測(cè)試中是有問(wèn)題的,因?yàn)闇y(cè)試框架往往都是同步的,注冊(cè)是通過(guò)了還是失敗了,在異步結(jié)果返回之前就應(yīng)該有結(jié)果了。為了彌補(bǔ)這一點(diǎn),你必須阻塞主線(xiàn)程,直到返回結(jié)果,然后將這些結(jié)果發(fā)至斷言的主線(xiàn)程中。
@Test public void noLatch() { Mono.just("alpha") .subscribeOn(Schedulers.single()) .subscribe(s -> assertEquals("bravo", s)); }
這個(gè)示例在非主線(xiàn)程上發(fā)出一個(gè)字符串,出人意料地是,通過(guò)了測(cè)試。這個(gè)測(cè)試通過(guò)的根本原因,就是當(dāng)它顯然不應(yīng)該通過(guò)的時(shí)候,noLatch方法將會(huì)完成執(zhí)行,而沒(méi)有拋出一個(gè)AssertionError。
@Test public void latch() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); AtomicReference<String> actual = new AtomicReference<>(); Mono.just("alpha") .subscribeOn(Schedulers.single()) .subscribe(actual::set, t -> latch.countDown(), latch::countDown); latch.await(); assertEquals("bravo", actual.get()); }
這個(gè)例子,它使用一個(gè)CountDownLatch來(lái)確保latch()方法在流完成之后才返回,雖然不可否認(rèn)它很笨拙。一旦latch 釋放,主線(xiàn)程中的斷言就會(huì)拋出一個(gè)AssertionError,導(dǎo)致測(cè)試失敗。
如果你看了這些代碼,拒絕以這種方式實(shí)現(xiàn)你所有的測(cè)試,大家一定會(huì)體諒你的,我們保證。幸運(yùn)的是,Reactor 提供了一個(gè)StepVerifier類(lèi)來(lái)輔助測(cè)試。
對(duì)響應(yīng)式設(shè)計(jì)的測(cè)試需要的不僅僅是阻塞。你通常需要對(duì)多個(gè)值和預(yù)期錯(cuò)誤進(jìn)行斷言,同時(shí)確保意外錯(cuò)誤會(huì)導(dǎo)致測(cè)試失敗。StepVerifier對(duì)每一項(xiàng)都有所考慮。
@Test public void testMultipleValues() { Flux.just("alpha", "bravo") .as(StepVerifier::create) .expectNext("alpha") .expectNext("bravo") .expectComplete() .verify(Duration.ofSeconds(5)); }
在這個(gè)示例中,使用StepVerifier來(lái)預(yù)期精確發(fā)出了alpha和bravo,然后流完成。如果其中一個(gè)沒(méi)有發(fā)出,發(fā)出了一個(gè)額外的元素,或者產(chǎn)生一個(gè)錯(cuò)誤,測(cè)試就會(huì)失敗。
@Test public void shareFails() { this.domains .share(ShareDomainRequest.builder() .domain("test-domain") .organization("test-organization") .build()) .as(StepVerifier::create) .consumeErrorWith(t -> assertThat(t) .isInstanceOf(IllegalArgumentException.class) .hasMessage("Private domain test-domain does not exist")) .verify(Duration.ofSeconds(5)); }
這個(gè)例子使用了一些更高級(jí)的StepVerifier特性,并不僅斷言已經(jīng)發(fā)出了一個(gè)錯(cuò)誤信號(hào),而且它還是一個(gè)IllegalArgumentException,并且消息匹配預(yù)期結(jié)果。
CountDownLatches
關(guān)于響應(yīng)式框架的一個(gè)關(guān)鍵問(wèn)題是,它們只能協(xié)調(diào)自己的操作和線(xiàn)程模型。許多響應(yīng)式編程的執(zhí)行環(huán)境將不僅僅只有一個(gè)線(xiàn)程(例如Servlet容器)。在這些環(huán)境中,響應(yīng)式編程天然的異步屬性并不是問(wèn)題。但是,有一些環(huán)境,比如上面的測(cè)試示例,那里的進(jìn)程將在任何單獨(dú)的線(xiàn)程之前結(jié)束。
public static void main(String[] args) { Mono.just("alpha") .delaySubscription(Duration.ofSeconds(1)) .subscribeOn(Schedulers.single()) .subscribe(System.out::println); }
就像該測(cè)試方法一樣,這個(gè)main()方法將在alpha發(fā)出之前終止。
public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); Mono.just("alpha") .delaySubscription(Duration.ofSeconds(1)) .subscribeOn(Schedulers.single()) .subscribe(System.out::println, t -> latch.countDown(), latch::countDown); latch.await(); }
就像在該測(cè)試示例中一樣,一個(gè)CountDownLatch可以確保主線(xiàn)程在流終止之前不會(huì)終止,不管它是在什么線(xiàn)程上執(zhí)行的。
阻塞流
在可預(yù)見(jiàn)的將來(lái),在響應(yīng)式編程中與阻塞api交互會(huì)成為一種常見(jiàn)現(xiàn)象。為了在兩者之間架起橋梁,在等待結(jié)果的時(shí)候會(huì)適當(dāng)?shù)剡M(jìn)行阻塞。但是,當(dāng)以這種方式連接到阻塞API時(shí),會(huì)丟失響應(yīng)式編程的一些好處,比如有效的資源使用。因此,你將希望盡可能長(zhǎng)地保持代碼的響應(yīng)性,直到最后一刻才阻塞。同樣值得注意的是,這個(gè)想法的邏輯總結(jié)一下就是,一個(gè)響應(yīng)式的API可以被阻塞,但是一個(gè)阻塞的API永遠(yuǎn)不能成為響應(yīng)式。
Mono<User> requestUser(String name) {...} User getUser(String name) { return requestUser(name) .block(); }
在這個(gè)例子中,.block()用于橋接Mono的結(jié)果到必須的返回類(lèi)型。
Flux<User> requestUsers() {...} List<User> listUsers() { return requestUsers() .collectList() .block(); }
和前面的例子一樣,.block()用于將結(jié)果橋接到必須的返回類(lèi)型,但在此之前,流必須被收集到一個(gè)列表中。
錯(cuò)誤處理
如前所述,錯(cuò)誤是流經(jīng)系統(tǒng)的值。這意味著一直都沒(méi)有一個(gè)合適的點(diǎn)來(lái)捕獲異常。但是,你應(yīng)該將它們作為流的一部分處理,或者作為訂閱者。.Subscribe()方法有0到3個(gè)參數(shù),這些參數(shù)允許你處理每個(gè)條目,如果錯(cuò)誤成了就對(duì)它進(jìn)行處理,并對(duì)流的完成情況進(jìn)行處理。
public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); Flux.concat(Mono.just("alpha"), Mono.error(new

責(zé)任編輯:售電衡衡
-
權(quán)威發(fā)布 | 新能源汽車(chē)產(chǎn)業(yè)頂層設(shè)計(jì)落地:鼓勵(lì)“光儲(chǔ)充放”,有序推進(jìn)氫燃料供給體系建設(shè)
2020-11-03新能源,汽車(chē),產(chǎn)業(yè),設(shè)計(jì) -
中國(guó)自主研制的“人造太陽(yáng)”重力支撐設(shè)備正式啟運(yùn)
2020-09-14核聚變,ITER,核電 -
探索 | 既耗能又可供能的數(shù)據(jù)中心 打造融合型綜合能源系統(tǒng)
2020-06-16綜合能源服務(wù),新能源消納,能源互聯(lián)網(wǎng)
-
新基建助推 數(shù)據(jù)中心建設(shè)將迎爆發(fā)期
2020-06-16數(shù)據(jù)中心,能源互聯(lián)網(wǎng),電力新基建 -
泛在電力物聯(lián)網(wǎng)建設(shè)下看電網(wǎng)企業(yè)數(shù)據(jù)變現(xiàn)之路
2019-11-12泛在電力物聯(lián)網(wǎng) -
泛在電力物聯(lián)網(wǎng)建設(shè)典型實(shí)踐案例
2019-10-15泛在電力物聯(lián)網(wǎng)案例
-
新基建之充電樁“火”了 想進(jìn)這個(gè)行業(yè)要“心里有底”
2020-06-16充電樁,充電基礎(chǔ)設(shè)施,電力新基建 -
燃料電池汽車(chē)駛?cè)雽こ0傩占疫€要多久?
-
備戰(zhàn)全面電動(dòng)化 多部委及央企“定調(diào)”充電樁配套節(jié)奏
-
權(quán)威發(fā)布 | 新能源汽車(chē)產(chǎn)業(yè)頂層設(shè)計(jì)落地:鼓勵(lì)“光儲(chǔ)充放”,有序推進(jìn)氫燃料供給體系建設(shè)
2020-11-03新能源,汽車(chē),產(chǎn)業(yè),設(shè)計(jì) -
中國(guó)自主研制的“人造太陽(yáng)”重力支撐設(shè)備正式啟運(yùn)
2020-09-14核聚變,ITER,核電 -
能源革命和電改政策紅利將長(zhǎng)期助力儲(chǔ)能行業(yè)發(fā)展
-
探索 | 既耗能又可供能的數(shù)據(jù)中心 打造融合型綜合能源系統(tǒng)
2020-06-16綜合能源服務(wù),新能源消納,能源互聯(lián)網(wǎng) -
5G新基建助力智能電網(wǎng)發(fā)展
2020-06-125G,智能電網(wǎng),配電網(wǎng) -
從智能電網(wǎng)到智能城市
-
山西省首座電力與通信共享電力鐵塔試點(diǎn)成功
-
中國(guó)電建公司公共資源交易服務(wù)平臺(tái)摘得電力創(chuàng)新大獎(jiǎng)
-
電力系統(tǒng)對(duì)UPS的技術(shù)要求