i´m trying figure out how handle errors when mapping elements inside flux.
for instance, i´m parsing csv string 1 of business pojos:
myflux.map(stock -> converter.converthistoricalcsvtostockquotation(stock));
some of lines might contain errors, in log is:
reactor.core.publisher.fluxlog: onnext([some_bogus_quote]@38.09 (fri apr 08 00:00:00 cest 2016) h(38.419998)/l(37.849998)/o(37.970001)) reactor.core.publisher.fluxlog: onnext([some_bogus_quote]@38.130001 (thu apr 07 00:00:00 cest 2016) h(38.189999)/l(37.610001)/o(37.799999)) reactor.core.publisher.fluxlog: onerror(java.lang.illegalargumentexception: invalid csv stock quotation: some_bogus_quote,trololo) reactor.core.publisher.fluxlog: java.lang.illegalargumentexception: invalid csv stock quotation: some_bogus_quote,trololo
i read in api error handling methods, refered returning "error value" or using fallback flux, one:
flux.onerrorresumewith(myflux, x -> mono.fromcallable(() -> ... stuff);
however, using myflux
means whole flux processed again.
so, there way handle errors while processing particular elements (i.e ignoring them/logging them) , keep processing rest of flux?
update @akarnokd workaround
public flux<stockquotation> getquotes(list<string> tickers) { flux<stockquotation> processingflux = flux.fromiterable(tickers) // each set of quotes in separate thread .flatmap(s -> mono.fromcallable(() -> feeder.getcsvquotes(s))) // convert each list of raw quotes string in new flux<string> .flatmap(list -> flux.fromiterable(list)) // convert string pojos .flatmap(x -> { try { return flux.just(converter.converthistoricalcsvtostockquotation(x)); } catch (illegalargumentexception ex){ system.out.println("error decoding stock quotation: " + x); return flux.empty(); } }); return processingflux; }
this works charm, however, can see code less elegant before. not flux api have method code does?
retry(...) retrywhen(...) onerrorresumewith(...) onerrorreturn(...)
you need flatmap
instead let's return empty sequence if processing failed:
myflux.flatmap(v -> { try { return flux.just(converter.converthistoricalcsvtostockquotation(stock)); } catch (illegalargumentexception ex) { return flux.empty(); } });
Comments
Post a Comment