1818
1919import static ch .qos .logback .classic .Level .DEBUG ;
2020import static java .util .Arrays .asList ;
21- import static java .util .stream .Collectors .toList ;
22- import static java .util .concurrent .TimeUnit .MILLISECONDS ;
23- import static java .util .concurrent .TimeUnit .SECONDS ;
24- import static java .util .Optional .of ;
2521import static java .util .Optional .empty ;
22+ import static java .util .Optional .of ;
2623import static java .util .Optional .ofNullable ;
24+ import static java .util .concurrent .TimeUnit .MILLISECONDS ;
25+ import static java .util .concurrent .TimeUnit .SECONDS ;
26+ import static java .util .stream .Collectors .toSet ;
2727import static org .slf4j .Logger .ROOT_LOGGER_NAME ;
2828
2929import java .io .Closeable ;
5454import io .appulse .epmd .java .core .model .request .Request ;
5555import io .appulse .epmd .java .core .model .request .Stop ;
5656import io .appulse .epmd .java .core .model .response .EpmdDump ;
57+ import io .appulse .epmd .java .core .model .response .EpmdDump .NodeDump ;
5758import io .appulse .epmd .java .core .model .response .EpmdInfo ;
59+ import io .appulse .epmd .java .core .model .response .EpmdInfo .NodeDescription ;
5860import io .appulse .epmd .java .core .model .response .KillResult ;
5961import io .appulse .epmd .java .core .model .response .NodeInfo ;
6062import io .appulse .epmd .java .core .model .response .RegistrationResult ;
6163import io .appulse .epmd .java .core .model .response .Response ;
6264import io .appulse .epmd .java .core .model .response .StopResult ;
63- import io .appulse .epmd .java .core .model .response .EpmdDump .NodeDump ;
64- import io .appulse .epmd .java .core .model .response .EpmdInfo .NodeDescription ;
6565import io .appulse .utils .BytesUtils ;
6666import io .appulse .utils .SocketUtils ;
6767import io .appulse .utils .threads .AppulseExecutors ;
6868import io .appulse .utils .threads .AppulseThreadFactory ;
6969
7070import ch .qos .logback .classic .Logger ;
71+ import edu .umd .cs .findbugs .annotations .SuppressFBWarnings ;
7172import lombok .Builder ;
72- import lombok .extern .slf4j .Slf4j ;
7373import lombok .NoArgsConstructor ;
7474import lombok .NonNull ;
7575import lombok .RequiredArgsConstructor ;
7676import lombok .Singular ;
7777import lombok .SneakyThrows ;
78- import lombok .val ;
7978import lombok .Value ;
79+ import lombok .extern .slf4j .Slf4j ;
80+ import lombok .val ;
8081import org .slf4j .LoggerFactory ;
8182import picocli .CommandLine .Command ;
8283import picocli .CommandLine .Option ;
@@ -96,7 +97,7 @@ class SubcommandServer implements Runnable {
9697 ANY_ADDRESS = InetAddress .getByName ("0.0.0.0" );
9798 LOOPBACK_ADDRESS = InetAddress .getLoopbackAddress ();
9899 } catch (UnknownHostException ex ) {
99- throw new RuntimeException (ex );
100+ throw new IllegalStateException (ex );
100101 }
101102 }
102103
@@ -128,27 +129,27 @@ class SubcommandServer implements Runnable {
128129 }
129130
130131 private void setupEnvironmentVariables () {
131- ips .add (LOOPBACK_ADDRESS );
132- if (ips .size () == 1 ) {
132+ if (new SubcommandServer ().ips .equals (ips )) {
133133 ips = ofNullable (System .getProperty ("ERL_EPMD_ADDRESS" ))
134- .map (it -> it .split (",\\ s* " ))
134+ .map (it -> it .split ("," ))
135135 .map (it -> Stream .of (it )
136+ .map (String ::trim )
137+ .filter (ip -> !ip .isEmpty ())
136138 .map (ip -> {
137139 try {
138140 return InetAddress .getByName (ip );
139141 } catch (UnknownHostException ex ) {
140- return null ;
142+ throw new IllegalArgumentException ( "Invalid host " + ip , ex ) ;
141143 }
142144 })
143- .collect (toList ()))
144- .map (HashSet ::new )
145- .map (it -> (Set <InetAddress >) it )
145+ .collect (toSet ()))
146146 .map (it -> {
147147 it .add (LOOPBACK_ADDRESS );
148148 return it ;
149149 })
150150 .orElse (ips );
151151 }
152+ ips .add (LOOPBACK_ADDRESS );
152153
153154 if (!unsafe ) {
154155 val string = ofNullable (System .getProperty ("ERL_EPMD_RELAXED_COMMAND_CHECK" ))
@@ -160,6 +161,7 @@ private void setupEnvironmentVariables () {
160161
161162 @ Override
162163 @ SneakyThrows
164+ @ SuppressWarnings ("PMD.AvoidInstantiatingObjectsInLoops" )
163165 public void run () {
164166 setupEnvironmentVariables ();
165167
@@ -205,7 +207,7 @@ public void run () {
205207 log .debug ("{} - a new incoming connection" , remoteSocketAddress );
206208
207209 val handler = new ServerHandler (clientSocket );
208- executor .submit (handler );
210+ executor .execute (handler );
209211 }
210212 } finally {
211213 executor .shutdown ();
@@ -250,7 +252,7 @@ public void run () {
250252 return it ;
251253 })
252254 .ifPresent (RequestProcessor ::process );
253- } catch (Throwable ex ) {
255+ } catch (Exception ex ) {
254256 if (options .debug ) {
255257 log .error ("handling {} connection error - '{}'" , ex .getMessage (), ex );
256258 } else {
@@ -259,20 +261,20 @@ public void run () {
259261 }
260262 }
261263
262- private Optional <RequestProcessor <?>> findProcessor (Request request , Socket socket ) {
264+ private Optional <RequestProcessor <?>> findProcessor (Request request , Socket clientSocket ) {
263265 switch (request .getTag ()) {
264266 case ALIVE2_REQUEST :
265- return of (new RegistrationRequestProcessor (request , socket ));
267+ return of (new RegistrationRequestProcessor (request , clientSocket ));
266268 case DUMP_REQUEST :
267- return of (new DumpRequestProcessor (request , socket ));
269+ return of (new DumpRequestProcessor (request , clientSocket ));
268270 case KILL_REQUEST :
269- return of (new KillRequestProcessor (request , socket ));
271+ return of (new KillRequestProcessor (request , clientSocket ));
270272 case PORT_PLEASE2_REQUEST :
271- return of (new GetNodeInfoRequestProcessor (request , socket ));
273+ return of (new GetNodeInfoRequestProcessor (request , clientSocket ));
272274 case NAMES_REQUEST :
273- return of (new GetEpmdInfoRequestProcessor (request , socket ));
275+ return of (new GetEpmdInfoRequestProcessor (request , clientSocket ));
274276 case STOP_REQUEST :
275- return of (new StopRequestProcessor (request , socket ));
277+ return of (new StopRequestProcessor (request , clientSocket ));
276278 default :
277279 log .warn ("unsupported request's tag - {}" , request .getTag ());
278280 return empty ();
@@ -395,13 +397,7 @@ protected Response respond () {
395397 if (!unsafe ) {
396398 return KillResult .NOK ;
397399 }
398- nodes .values ().forEach (it -> {
399- try {
400- it .getSocket ().close ();
401- } catch (IOException ex ) {
402- // noop
403- }
404- });
400+ nodes .values ().forEach (Node ::close );
405401 return KillResult .OK ;
406402 }
407403
@@ -513,6 +509,7 @@ private static class Node implements Closeable {
513509 Socket socket ;
514510
515511 @ Override
512+ @ SuppressWarnings ("PMD.EmptyCatchBlock" )
516513 public void close () {
517514 try {
518515 socket .close ();
@@ -522,12 +519,13 @@ public void close () {
522519 }
523520
524521 @ SneakyThrows
522+ @ SuppressFBWarnings ("REC_CATCH_EXCEPTION" )
525523 boolean isAlive () {
526524 val nodeSocketAddress = new InetSocketAddress (socket .getInetAddress (), port );
527525 try (val nodeSocket = new Socket ()) {
528526 nodeSocket .connect (nodeSocketAddress , 2_000 );
529527 nodeSocket .close ();
530- } catch (Throwable ex ) {
528+ } catch (Exception ex ) {
531529 return false ;
532530 }
533531 return true ;
0 commit comments