/*** init the SessionPool* this function is moved into SessionPool's constructor, no need to call it manually.*/@Deprecatedpublicbooleaninit(){if(hasInit.get()){returntrue;}while(sessionList.size()< minSessionSize){try{createSessionObject(SessionState.IDLE);idleSessionSize.incrementAndGet();}catch(Exception e){log.error("SessionPool init failed. ");thrownewRuntimeException("create session failed.", e);}}healthCheckSchedule.scheduleAtFixedRate(this::checkSession,0, healthCheckTime,TimeUnit.SECONDS);sessionQueueMaintainSchedule.scheduleAtFixedRate(this::updateSessionQueue,0, cleanTime,TimeUnit.SECONDS);hasInit.compareAndSet(false,true);returntrue;}/*** create a {@link NebulaSession} with specified state** @param state {@link SessionState}* @return NebulaSession*/privateNebulaSessioncreateSessionObject(SessionState state)throwsClientServerIncompatibleException,AuthFailedException,IOErrorException,BindSpaceFailedException{SyncConnection connection =newSyncConnection();int tryConnect = sessionPoolConfig.getGraphAddressList().size();// reconnect with all available addresswhile(tryConnect-->0){try{if(sessionPoolConfig.isEnableSsl()){connection.open(getAddress(), sessionPoolConfig.getTimeout(),sessionPoolConfig.getSslParam(),sessionPoolConfig.isUseHttp2(),sessionPoolConfig.getCustomHeaders());}else{connection.open(getAddress(), sessionPoolConfig.getTimeout(),sessionPoolConfig.isUseHttp2(),sessionPoolConfig.getCustomHeaders());}break;}catch(Exception e){if(tryConnect ==0||!reconnect){throw e;}else{log.warn("connect failed, "+ e.getMessage());}}}AuthResult authResult;try{authResult = connection.authenticate(sessionPoolConfig.getUsername(),sessionPoolConfig.getPassword());}catch(AuthFailedException e){log.error(e.getMessage());if(e.getMessage().toLowerCase().contains("user not exist")|| e.getMessage().toLowerCase().contains("invalid password")){// close the session poolclose();}else{// just close the connectionconnection.close();}throw e;}NebulaSession nebulaSession =newNebulaSession(connection, authResult.getSessionId(),authResult.getTimezoneOffset(), state);ResultSet result =null;try{result = nebulaSession.execute(useSpace);}catch(IOErrorException e){log.error("binding space failed,", e);nebulaSession.release();thrownewBindSpaceFailedException("binding space failed:"+ e.getMessage());}if(!result.isSucceeded()){nebulaSession.release();thrownewBindSpaceFailedException(result.getErrorMessage());}sessionList.add(nebulaSession);return nebulaSession;}publicHostAddressgetAddress(){List<HostAddress> addresses = sessionPoolConfig.getGraphAddressList();int newPos =(pos.getAndIncrement())% addresses.size();HostAddress hostAddress = addresses.get(newPos);log.info("ng address {} {} {} {}", pos.get(), newPos, hostAddress.getHost(), hostAddress.getPort());return hostAddress;}
/*** return an idle session*/privatesynchronizedNebulaSessiongetSession()throwsClientServerIncompatibleException,AuthFailedException,IOErrorException,BindSpaceFailedException{int retry = sessionPoolConfig.getRetryConnectTimes();while(retry-->=0){// if there are idle sessions, get session from queueif(idleSessionSize.get()>0){for(NebulaSession nebulaSession : sessionList){if(nebulaSession.isIdleAndSetUsed()){int currentIdleSessionSize = idleSessionSize.decrementAndGet();log.info("ng session {} {}", currentIdleSessionSize, nebulaSession.getSessionID());return nebulaSession;}}}// if session size is less than max size, get session from poolif(sessionList.size()< maxSessionSize){returncreateSessionObject(SessionState.USED);}// there's no available session, wait for SessionPoolConfig.getWaitTime and re-gettry{Thread.sleep(sessionPoolConfig.getWaitTime());}catch(InterruptedException e){log.error("getSession error when wait for idle sessions, ", e);thrownewRuntimeException(e);}}// if session size is equal to max size and no idle session here, throw exceptionthrownewRuntimeException("no extra session available");}
可以调整从sessionList中随机取出nebulaSession
/*** return an idle session*/privatesynchronizedNebulaSessiongetSession()throwsClientServerIncompatibleException,AuthFailedException,IOErrorException,BindSpaceFailedException{int retry = sessionPoolConfig.getRetryConnectTimes();while(retry-->=0){// if there are idle sessions, get session from queueif(idleSessionSize.get()>0){int[] randomInts =RandomUtil.randomInts(sessionList.size());for(int randomInt : randomInts){NebulaSession nebulaSession = sessionList.get(randomInt);if(nebulaSession.isIdleAndSetUsed()){int currentIdleSessionSize = idleSessionSize.decrementAndGet();log.debug("ng session {} {}", currentIdleSessionSize, nebulaSession.getSessionID());return nebulaSession;}}}// if session size is less than max size, get session from poolif(sessionList.size()< maxSessionSize){returncreateSessionObject(SessionState.USED);}// there's no available session, wait for SessionPoolConfig.getWaitTime and re-gettry{Thread.sleep(sessionPoolConfig.getWaitTime());}catch(InterruptedException e){log.error("getSession error when wait for idle sessions, ", e);thrownewRuntimeException(e);}}// if session size is equal to max size and no idle session here, throw exceptionthrownewRuntimeException("no extra session available");}