Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

重构ApiConfig,使之容易扩展支持多节点分布式应用 #33

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

rwsbillyang
Copy link

为了较方便地实现多节点应用支持,此处commit重构了ApiConfig, 将原ApiConfig类被拆成多个类:一个接口ApiConfigInterface、一个继承了原来大部分逻辑的抽象类AbstractApiConfig,一个具体的刷新类Refresher和用于webapp中真正需要的可序列化的TokenInfo实体类。
其中ApiConfigInterface定义了提供token和jsTicket的接口,而AbstractApiConfig封装了原ApiConfig的绝大部分处理逻辑,Refresher负责具体的请求刷新,TokenInfo设计为可序列化的POJO类,封装了token和jsTicket以及对应的刷新时间;

单机版仍可继续使用ApiConfig,功能和接口不变。因新的ApiConfig仍和原ApiConfig功能相同,因其实现了接口ApiConfigInterface,具体功能借助Refresher来实现,因此可以说新ApiConfig是Refresher的wrapper,或者Refresher是ApiConfig的代理实现类。

注意:创建子类化实例时,未进行token刷新,创建完实例后须明确调用tryRefresh进行刷新获取,但创建新ApiConfig实例时,则已经调用了tryRefresh进行了刷新。

一个ApiConfigRedis参考实现如下:

public class ApiConfigRedis extends Refresher{
	private static Logger log = LoggerFactory.getLogger(ApiConfigRedis.class);
	
	public final static String TOKENINFO_KEY_PREFIX = "api/tokenInfo/";
	public final static String TOKENINFO_HASHKEY_TOKEN = "accessToken";
	public final static String TOKENINFO_HASHKEY_JS = "jsApiTicket";
	public final static String TOKENINFO_HASHKEY_TOKEN_TIME = "weixinTokenStartTime";
	public final static String TOKENINFO_HASHKEY_JS_TIME = "jsTokenStartTime";
	
	public final static String TOKENINFO_CREATE_LOCK = "lock/tokenInfo/create";
	public final static String TOKENINFO_REFRESH_TOKEN_LOCK = "lock/token/refresh";
	public final static String TOKENINFO_REFRESH_TICKET_LOCK = "lock/jsticket/refresh";
	
	public final static int LOCK_TIMEOUT_MS_SECONDS = 100*1000;//100s
	public final static String TOKEN_CHANGE_NOTIFY_CHANNEL = "TokenChannel";

	public final static int TYPE_ACCESS_TOKEN = 0; // 与ChangeType中的枚举次序对应
	public final static int TYPE_JS_TICKET = 1; // 与ChangeType中的枚举次序对应
	
	private JedisConnectionFactory jedisConnectionFactory;
	private RedisTemplate redisTemplate;
	
	private BoundHashOperations<String,String,Object>  boundHashOperations;
	
	private String[] senderMsgIds = {null,null};
	private long[] senderCounts = {0l,0l};
	
	
	public ApiConfigRedis(String appid, String secret,JedisConnectionFactory jedisConnectionFactory,RedisTemplate redisTemplate) {
		super(appid, secret);
		init(jedisConnectionFactory,redisTemplate);
		setupTokenInfo();
	}
	/**
	 * 保守起见,refreshIntervalTime,单位millisecond毫秒建议设置为6600*1000~7000*1000
	 * */
	public ApiConfigRedis(String appid, String secret, boolean enableJsApi,long refreshIntervalTime,
			JedisConnectionFactory jedisConnectionFactory,RedisTemplate redisTemplate) {
		super(appid, secret, enableJsApi,refreshIntervalTime);
		init(jedisConnectionFactory,redisTemplate);
		setupTokenInfo();
	}
	private void init(JedisConnectionFactory jedisConnectionFactory,RedisTemplate redisTemplate)
	{
		this.jedisConnectionFactory = jedisConnectionFactory;
		this.redisTemplate = redisTemplate;
		boundHashOperations = redisTemplate.boundHashOps(TOKENINFO_KEY_PREFIX+getAppid());
		addHandle(new MyApiConfigChangeHandle());
	}
	private void setupTokenInfo()
	{
		TokenInfo tokenInfo = getTokenInfoFromRedis();
		if(tokenInfo==null)//redis中还没有tokenInfo
		{
			String identifier = RedisTool.tryGetDistributedLock(jedisConnectionFactory, TOKENINFO_CREATE_LOCK, LOCK_TIMEOUT_MS_SECONDS);
			if(identifier==null)//没抢到锁
			{
				log.warn("setupTokenInfo: not get redis create lock, just wait for notifying...");	
			}else
			{
				log.info("setupTokenInfo: get redis create lock identifier="+identifier+",to initToken...");
				tryRefresh();
				RedisTool.releaseDistributedLock(jedisConnectionFactory, TOKENINFO_CREATE_LOCK, identifier);
				log.info("setupTokenInfo Done, release create lock, identifier="+identifier);
			}	
		}else
		{
			log.info("has tokenInfo in redis, use it directly");
			setTokenInfo(tokenInfo);
		}
	}
	public boolean isTokenValid(int type)
	{
		 long time = 0L;
		long now = System.currentTimeMillis();
		if(type==TYPE_ACCESS_TOKEN)
		{
			time = now - this.tokenInfo.weixinTokenStartTime;
		}else if(type== TYPE_JS_TICKET)
		{
			time = now - this.tokenInfo.jsTokenStartTime;
		}
        if (time > refreshIntervalTime) {
           return false;
        }else
        		return true;
	}

	
	@Override
	public void initToken(final long refreshTime) {
		//若无pub/sub机制、或不可靠、或避免正在启动时错过了通知消息,出错率高等,打开下列注释
//		TokenInfo tokenInfo = getTokenInfoFromRedis();
//		if(tokenInfo!=null)
//		{
//			setTokenInfo(tokenInfo);
//			if(isTokenValid(TYPE_ACCESS_TOKEN))
//			{
//				log.info("not need to refresh token,because it is valid in redis");
//				return;
//			}
//		}
		
		String identifier = RedisTool.tryGetDistributedLock(jedisConnectionFactory, TOKENINFO_REFRESH_TOKEN_LOCK, LOCK_TIMEOUT_MS_SECONDS);
		if(identifier==null)
		{
			log.warn("not get redis refresh-token lock, just wait for notifying...");
		}else
		{
			log.info("get redis refresh-token lock identifier="+identifier+",to initToken...");
			super.initToken(refreshTime);
			log.info("initToken Done, release refresh-token lock, identifier="+identifier);
			RedisTool.releaseDistributedLock(jedisConnectionFactory, TOKENINFO_REFRESH_TOKEN_LOCK, identifier);
		}
		
	}
	
	@Override
	public void initJSToken(final long refreshTime) {
		//若无pub/sub机制、或不可靠、或避免正在启动时错过了通知消息,出错率高等,打开下列注释
//		TokenInfo tokenInfo = getTokenInfoFromRedis();
//		if (tokenInfo != null) {
//			setTokenInfo(tokenInfo);
//			if (isTokenValid(TYPE_JS_TICKET)) {
//				log.info("not need to refresh jsticket,because it is valid in redis");
//				return;
//			}
//		}
		
		String identifier = RedisTool.tryGetDistributedLock(jedisConnectionFactory, TOKENINFO_REFRESH_TICKET_LOCK, LOCK_TIMEOUT_MS_SECONDS);
		if(identifier==null)//没抢到锁
		{
			log.warn("not get redis refresh-jsticket lock, just wait for notifying...");//
		}else
		{
			log.info("get redis refresh-jsticket lock identifier="+identifier+",to initJSToken...");
			super.initJSToken(refreshTime);
			log.info("initJSToken Done, release refresh-jsticket lock, identifier="+identifier);
			RedisTool.releaseDistributedLock(jedisConnectionFactory, TOKENINFO_REFRESH_TICKET_LOCK, identifier);
		}
	}
	
	//收到刷新变化
	public class MyApiConfigChangeHandle implements ApiConfigChangeHandle{
		@Override
		public void update(Observable o, Object arg) {
			log.info("after refresh, get update notice...");
			
			//ApiConfigRedis ac = (ApiConfigRedis) o;
		
			ConfigChangeNotice notice = (ConfigChangeNotice) arg;
		
			long recieveTime = System.currentTimeMillis();
			long refreshTime = notice.getNoticeTime().getTime(); 
			log.info("Should be positive: recieveTime-refreshTime="+(recieveTime-refreshTime));
			
			saveTokenInfoIntoRedis(notice);
			
			int type = notice.getType().ordinal();
			StringBuilder sb = new StringBuilder();
			sb.append(notice.getType()).append("_").append(senderCounts[type]++).append("_");
			try {
				sb.append(InetAddress.getLocalHost().getHostAddress());
			} catch (UnknownHostException e) {
				e.printStackTrace();
			}
			Message msg = new Message(sb.toString(),type,notice.getValue(),refreshTime);
			
			//to publish
			redisTemplate.convertAndSend(TOKEN_CHANGE_NOTIFY_CHANNEL, msg);
			senderMsgIds[type] = msg.getMsgId();
			log.info("publishedMsgId="+senderMsgIds[type]);
		}
	}

    /**
     * 收到订阅消息,进行更新
     * */
	public void handleMessage(String text)
	{
		log.info("get msg,text="+text);
		handleMessage((Message)JSON.parseObject(text,Message.class));
	}
	public void handleMessage(byte[] bytes)
    {
    		log.info("0x%02X", bytes);
    		handleMessage((Message)JSON.parseObject(bytes,Message.class));
    }
	public void handleMessage(Message msg)
	{
		if(msg==null)
		{
			log.warn("msg is null when handleMessage");
			return;
		}
		
		if(msg.getMsgId().equals(senderMsgIds[msg.getType()]))
		{
			log.info("got msg from myself,ignore");
		}else
		{
			updateTokenInfo(msg);
		}
	}
	private void updateTokenInfo(Message msg)
	{
		log.info("to updateTokenInfo,according to msg,msgId="+msg.getMsgId());
		switch(msg.getType())
		{
		case TYPE_ACCESS_TOKEN:
			updateAccessTokenInfo(msg.getValue(), msg.getTime());
			break;
		case TYPE_JS_TICKET:
			updateJsApiTicket(msg.getValue(), msg.getTime());
			break;
		default:
			log.warn("not support msg type="+msg.getType());
		}
	}
	
	public void saveTokenInfoIntoRedis(ConfigChangeNotice notice) 
	{
		log.info("to saveTokenInfoIntoRedis,notice.getType="+notice.getType());
		Map<String,Object> map = new HashMap<String,Object>();
		switch(notice.getType())
		{
			case ACCESS_TOKEN:
				map.put(TOKENINFO_HASHKEY_TOKEN, notice.getValue());
				map.put(TOKENINFO_HASHKEY_TOKEN_TIME, notice.getNoticeTime().getTime());
				break;
			case JS_TOKEN:
				map.put(TOKENINFO_HASHKEY_JS, notice.getValue());
				map.put(TOKENINFO_HASHKEY_JS_TIME, notice.getNoticeTime().getTime());
				break;
		}

		boundHashOperations.putAll(map);
		redisTemplate.expire(TOKENINFO_KEY_PREFIX+getAppid(), getRefreshIntervalTime(), TimeUnit.MILLISECONDS);
	}
	
	private TokenInfo getTokenInfoFromRedis()
	{
		Map<String,Object> map = boundHashOperations.entries();
		if(map==null||map.isEmpty())
			return null;
		return new TokenInfo((String) map.get(TOKENINFO_HASHKEY_TOKEN),(long) map.get(TOKENINFO_HASHKEY_TOKEN_TIME),
				(String) map.get(TOKENINFO_HASHKEY_JS),(long)map.get(TOKENINFO_HASHKEY_JS_TIME));
	}
}

可在容器启动时准备好RedisConnectionFactory和RedisTemplate,注入ApiConfigRedis中,同时ApiConfigRedis也可作为bean注入,供webapp系统使用,此commit及该Redis参考实现已在生产环境中应用。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant