久久亚洲精品国产精品_羞羞漫画在线版免费阅读网页漫画_国产精品久久久久久久久久久久_午夜dj免费观看在线视频_希崎杰西卡番号

protectedstorage(protectedstorage)

前沿拓展:


本文主要研究一下rocketmq-streams的ILeaseService

ILeaseService/**
* 通過db實現租約和鎖,可以更輕量級,減少其他中間件的依賴 使用主備場景,只有一個實例運行,當當前實例掛掉,在一定時間內,會被其他實例接手 也可以用于全局鎖
*/
public interface ILeaseService {

/**
* 默認鎖定時間
*/
static final int DE**T_LOCK_TIME = 60 * 5;

/**
* 檢查某用戶當前時間是否具有租約。這個方法是純內存**作,無性能開銷
*
* @return true,租約有效;false,租約無效
*/
boolean hasLease(String name);

/**
* 申請租約,會啟動一個線程,不停申請租約,直到申請成功。 申請成功后,每 租期/2 續(xù)約。 如果目前被其他租戶獲取租約,只有在對方租約失效,后才允許新的租戶獲取租約
*
* @param name 租約名稱,無特殊要求,相同名稱會競爭租約
*/
void startLeaseTask(String name);

/**
* 申請租約,會啟動一個線程,不停申請租約,直到申請成功。 申請成功后,每 租期/2 續(xù)約。 如果目前被其他租戶獲取租約,只有在對方租約失效,后才允許新的租戶獲取租約
*
* @param name 租約名稱,無特殊要求,相同名稱會競爭租約
* @param callback 當第一獲取租約時,回調此函數
*/
void startLeaseTask(final String name, ILeaseGetCallback callback);

/**
* 申請租約,會啟動一個線程,不停申請租約,直到申請成功。 申請成功后,每 租期/2 續(xù)約。 如果目前被其他租戶獲取租約,只有在對方租約失效,后才允許新的租戶獲取租約
*
* @param name 租約名稱,無特殊要求,相同名稱會競爭租約
* @param leaseTermSecond 租期,在租期內可以做業(yè)務處理,單位是秒
* @param callback 當第一獲取租約時,回調此函數
*/
void startLeaseTask(final String name, int leaseTermSecond, ILeaseGetCallback callback);

/**
* 申請鎖,無論成功與否,立刻返回。如果不釋放,最大鎖定時間是5分鐘
*
* @param name 業(yè)務名稱
* @param lockerName 鎖名稱
* @return 是否枷鎖成功
*/
boolean lock(String name, String lockerName);

/**
* 申請鎖,無論成功與否,立刻返回。默認鎖定時間是5分鐘
*
* @param name 業(yè)務名稱
* @param lockerName 鎖名稱
* @param lockTimeSecond 如果不釋放,鎖定的最大時間,單位是秒
* @return 是否枷鎖成功
* @return
*/
boolean lock(String name, String lockerName, int lockTimeSecond);

/**
* 申請鎖,如果沒有則等待,等待時間可以指定,如果是-1 則無限等待。如果不釋放,最大鎖定時間是5分鐘
*
* @param name 業(yè)務名稱
* @param lockerName 鎖名稱
* @param waitTime 沒獲取鎖時,最大等待多長時間,如果是-1 則無限等待
* @return 是否枷鎖成功
*/
boolean tryLocker(String name, String lockerName, long waitTime);

/**
* 申請鎖,如果沒有則等待,等待時間可以指定,如果是-1 則無限等待。如果不釋放,最大鎖定時間是lockTimeSecond
*
* @param name 業(yè)務名稱
* @param lockerName 鎖名稱
* @param waitTime 沒獲取鎖時,最大等待多長時間,如果是-1 則無限等待
* @param lockTimeSecond 如果不釋放,鎖定的最大時間,單位是秒
* @return 是否枷鎖成功
*/
boolean tryLocker(String name, String lockerName, long waitTime, int lockTimeSecond);

/**
* 釋放鎖
*
* @param name
* @param lockerName
* @return
*/
boolean unlock(String name, String lockerName);

/**
* 對于已經獲取鎖的,可以通過這個方法,一直持有鎖。 和租約的區(qū)別是,當釋放鎖后,無其他實例搶占。無法實現主備模式
*
* @param name 業(yè)務名稱
* @param lockerName 鎖名稱
* @param lockTimeSecond 租期,這個方**自動續(xù)約,如果不主動釋放,會一直持有鎖
* @return 是否成功獲取鎖
*/
boolean holdLock(String name, String lockerName, int lockTimeSecond);

/**
* 是否持有鎖,不會申請鎖。如果以前申請過,且未過期,返回true,否則返回false
*
* @param name 業(yè)務名稱
* @param lockerName 鎖名稱
* @return
*/
boolean hasHoldLock(String name, String lockerName);

List<LeaseInfo> queryLockedInstanceByNamePrefix(String name, String lockerNamePrefix);

}
ILeaseService接口定義了hasLease、startLeaseTask、lock、tryLocker、unlock、holdLock、hasHoldLock、queryLockedInstanceByNamePrefix方法BasedLesaseImplpublic abstract class BasedLesaseImpl implements ILeaseService {
private static final Log LOG = LogFactory.getLog(BasedLesaseImpl.class);

private static final String CONSISTENT_HASH_PREFIX = "consistent_hash_";
private static final AtomicBoolean syncStart = new AtomicBoolean(false);
private static final int synTime = 120; // 5分鐘的一致性hash同步時間太久了,改為2分鐘
protected ScheduledExecutorService taskExecutor = null;
protected int leaseTerm = 300 * 2; // 租約時間

// protected transient JDBCDriver jdbcDataSource = null;
protected ILeaseStorage leaseStorage;
protected volatile Map<String, Date> leaseName2Date = new ConcurrentHashMap<>(); // 每個lease name對應的租約到期時間

public BasedLesaseImpl() {

taskExecutor = new ScheduledThreadPoolExecutor(10);
}

/**
* lease_name: consistent_hash_ip, lease_user_ip: ip,定時刷新lease_info表,檢查一致性hash環(huán)的節(jié)點情況
*
* @param name
* @return
*/
@Override
public boolean hasLease(String name) {
// 內存中沒有租約信息則表示 沒有租約
Date leaseEndTime = leaseName2Date.get(name);
if (leaseEndTime == null) {
// LOG.info("內存中根據 " + name + "沒有查詢到租約信息,表示沒有租約");
return false;
}
// LOG.info("查詢是否有租約 name:" + name + " ,當前時間:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())
// + " 租約到期時間 " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(leaseEndTime));
// 有租約時間,并且租約時間大于當前時間,表示有租約信息
if (new Date().before(leaseEndTime)) {
return true;
}

return false;
}

private final Map<String, AtomicBoolean> startLeaseMap = new HashMap<>();

@Override
public void startLeaseTask(final String name) {
startLeaseTask(name, this.leaseTerm, null);
}

@Override
public void startLeaseTask(final String name, ILeaseGetCallback callback) {
startLeaseTask(name, this.leaseTerm, callback);
}

@Override
public void startLeaseTask(final String name, int leaseTerm, ILeaseGetCallback callback) {
ApplyTask applyTask = new ApplyTask(leaseTerm, name, callback);
startLeaseTask(name, applyTask, leaseTerm / 2, true);
}

/**
* 啟動定時器,定時執(zhí)行任務,確保任務可重入
*
* @param name
* @param runnable 具體任務
* @param scheduleTime 調度時間
* @param startNow 是否立刻啟動一次
*/
protected void startLeaseTask(final String name, Runnable runnable, int scheduleTime, boolean startNow) {
AtomicBoolean isStartLease = startLeaseMap.get(name);//多次調用,只啟動一次定時任務
if (isStartLease == null) {
synchronized (this) {
isStartLease = startLeaseMap.get(name);
if (isStartLease == null) {
isStartLease = new AtomicBoolean(false);
startLeaseMap.put(name, isStartLease);
}
}
}
if (isStartLease.compareAndSet(false, true)) {
if (startNow) {
runnable.run();
}
taskExecutor.scheduleWithFixedDelay(runnable, 0, scheduleTime, TimeUnit.SECONDS);
}
}

//……
}
BasedLesaseImpl聲明實現了ILeaseService,它依賴ILeaseStorage,startLeaseTask方**創(chuàng)建ApplyTask,第二以固定間隔調度執(zhí)行ApplyTask /**
* 續(xù)約任務
*/
protected class ApplyTask implements Runnable {

protected String name;
protected int leaseTerm;
protected ILeaseGetCallback callback;

public ApplyTask(int leaseTerm, String name) {
this(leaseTerm, name, null);
}

public ApplyTask(int leaseTerm, String name, ILeaseGetCallback callback) {
this.name = name;
this.leaseTerm = leaseTerm;
this.callback = callback;
}

@Override
public void run() {
try {
// LOG.info("LeaseServiceImpl name: " + name + "開始獲取租約…");
AtomicBoolean newApplyLease = new AtomicBoolean(false);
Date leaseDate = applyLeaseTask(leaseTerm, name, newApplyLease);
if (leaseDate != null) {
leaseName2Date.put(name, leaseDate);
LOG.info("LeaseServiceImpl, name: " + name + " " + getSelfUser() + " 獲取租約成功, 租約到期時間為 "
+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(leaseDate));
} else {
// fix.2020.08.13 這時name對應的租約可能還在有效期內,或者本機還持有租約,需要remove
// leaseName2Date.remove(name);
LOG.info("LeaseServiceImpl name: " + name + " " + getSelfUser() + " 獲取租約失敗 ");
}
if (newApplyLease.get() && callback != null) {
callback.callback(leaseDate);
}
} catch (Exception e) {
LOG.error(" LeaseServiceImpl name: " + name + " " + getSelfUser() + " 獲取租約出現異常 ", e);
}

}
}

/**
* 申請租約,如果當期租約有效,直接更新一個租約周期,如果當前租約無效,先查詢是否有有效的租約,如果有申請失敗,否則直接申請租約
*/
protected Date applyLeaseTask(int leaseTerm, String name, AtomicBoolean newApplyLease) {

// 計算下一次租約時間 = 當前時間 + 租約時長
Date nextLeaseDate = DateUtil.addSecond(new Date(), leaseTerm);

// 1 如果已經有租約,則更新租約時間(內存和數據庫)即可
if (hasLease(name)) {
// LOG.info("用戶已有租約,更新數據庫和內存中的租約信息");
// 更新數據庫
LeaseInfo leaseInfo = queryValidateLease(name);
if (leaseInfo == null) {
LOG.error("LeaseServiceImpl applyLeaseTask leaseInfo is null");
return null;
}
// fix.2020.08.13,與本機ip相等且滿足一致性hash分配策略,才續(xù)約,其他情況為null
String leaseUserIp = leaseInfo.getLeaseUserIp();
if (!leaseUserIp.equals(getSelfUser())) {
return null;
}
leaseInfo.setLeaseEndDate(nextLeaseDate);
updateLeaseInfo(leaseInfo);
return nextLeaseDate;
}

// 2 沒有租約情況 判斷是否可以獲取租約,只要租約沒有被其他人獲取,則說明有有效租約
boolean success = canGetLease(name);
if (!success) { // 表示被其他機器獲取到了有效的租約
// LOG.info("其他機器獲取到了有效的租約");
return null;
}

// 3 沒有租約而且可以獲取租約的情況,則嘗試使用數據庫原子更新的方式獲取租約,保證只有一臺機器成功獲取租約,而且可以運行
boolean flag = tryGetLease(name, nextLeaseDate);
if (flag) { // 獲取租約成功
newApplyLease.set(true);
return nextLeaseDate;
}
return null;

}
ApplyTask內部調用applyLeaseTask,如果已有租約則更新租約時間,沒有租約則判斷是否可以獲取租約,可以則執(zhí)行tryGetLeasetryGetLease /**
* 更新數據庫,占用租期并更新租期時間
*
* @param time
*/
protected boolean tryGetLease(String name, Date time) {
// LOG.info("嘗試獲取租約 lease name is : " + name + " 下次到期時間: "
// + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(time));
LeaseInfo validateLeaseInfo = queryValidateLease(name);

if (validateLeaseInfo == null) {// 這里有兩種情況 1 數據庫里面沒有租約信息 2 數據庫里面有租約信息但是已經過期
Integer count = countLeaseInfo(name);
if (count == null || count == 0) {// 表示現在數據庫里面沒有任何租約信息,插入租約成功則表示獲取成功,失敗表示在這一時刻其他機器獲取了租約
// LOG.info("數據庫中暫時沒有租約信息,嘗試原子插入租約:" + name);
// fix.2020.08.13,經過一致性hash計算,該名字的任務不應該在本機執(zhí)行,直接返回,無需插入。只有分配到hash執(zhí)行權限的機器才可以插入并獲取租約
if (!getSelfUser().equals(getConsistentHashHost(name))) {
return false;
}
validateLeaseInfo = new LeaseInfo();
validateLeaseInfo.setLeaseName(name);
validateLeaseInfo.setLeaseUserIp(getSelfUser());
validateLeaseInfo.setLeaseEndDate(time);
validateLeaseInfo.setStatus(1);
validateLeaseInfo.setVersion(1);
if (insert(validateLeaseInfo)) {
LOG.info("數據庫中暫時沒有租約信息,原子插入成功,獲取租約成功:" + name);
return true;
} else {
LOG.info("數據庫中暫時沒有租約信息,原子插入失敗,已經被其他機器獲取租約:" + name);
return false;
}
} else { // 表示數據庫里面有一條但是無效,這里需要兩臺機器按照version進行原子更新,更新成功的獲取租約
// LOG.info("數據庫中有一條無效的租約信息,嘗試根據版本號去原子更新租約信息:" + name);
LeaseInfo inValidateLeaseInfo = queryInValidateLease(name);
if (inValidateLeaseInfo == null) {// 說明這個時候另外一臺機器獲取成功了
LOG.info("另外一臺機器獲取成功了租約:" + name);
return false;
}
// fix.2020.08.13,機器重啟之后,該名字的任務已經不分配在此機器上執(zhí)行,直接返回,無需更新數據庫
if (!getSelfUser().equals(getConsistentHashHost(name))) {
return false;
}
inValidateLeaseInfo.setLeaseName(name);
inValidateLeaseInfo.setLeaseUserIp(getSelfUser());
inValidateLeaseInfo.setLeaseEndDate(time);
inValidateLeaseInfo.setStatus(1);
boolean success = updateDBLeaseInfo(inValidateLeaseInfo);
if (success) {
LOG.info("LeaseServiceImpl 原子更新租約成功,當前機器獲取到了租約信息:" + name);
} else {
LOG.info("LeaseServiceImpl 原子更新租約失敗,租約被其他機器獲取:" + name);
}
return success;
}

} else { // 判斷是否是自己獲取了租約,如果是自己獲取了租約則更新時間(內存和數據庫),
// 這里是為了解決機器重啟的情況,機器重啟,內存中沒有租約信息,但是實際上該用戶是有租約權限的
// fix.2020.08.13,租約的ip與本機ip相等,且滿足一致性hash策略,才會被本機執(zhí)行
String leaseUserIp = validateLeaseInfo.getLeaseUserIp();
if (leaseUserIp.equals(getSelfUser())) {
// 如果當期用戶有租約信息,則更新數據庫
validateLeaseInfo.setLeaseEndDate(time);
boolean hasUpdate = updateLeaseInfo(validateLeaseInfo);
if (hasUpdate) {
LOG.info(
"LeaseServiceImpl機器重啟情況,當前用戶有租約信息,并且更新數據庫成功,租約信息為 name :" + validateLeaseInfo.getLeaseName()
+ " ip : " + validateLeaseInfo.getLeaseUserIp() + " 到期時間 : " + new SimpleDateFormat(
"yyyy-MM-dd HH:mm:ss").format(validateLeaseInfo.getLeaseEndDate()));
return true;
} else {
LOG.info("LeaseServiceImpl 機器重啟情況,當前用戶有租約信息,并且更新數據庫失敗,表示失去租約:" + name);
return false;
}
}
// LOG.info("LeaseServiceImpl 租約被其他機器獲取,租約信息為 name :" + validateLeaseInfo.getLeaseName() + " ip : "
// + validateLeaseInfo.getLeaseUserIp() + " 到期時間 : "
// + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(validateLeaseInfo.getLeaseEndDate()));
return false;
}

}

protected LeaseInfo queryValidateLease(String name) {
//String sql = "SELECT * FROM lease_info WHERE lease_name ='" + name + "' and status=1 and lease_end_time>now()";
//// LOG.info("LeaseServiceImpl query validate lease sql:" + sql);
//return queryLease(name, sql);
return leaseStorage.queryValidateLease(name);
}
tryGetLease先通過queryValidateLease查詢租約信息,若沒有租約則插入,若過期則根據版本號更新,若已有租約則判斷是否是自己獲取了租約,是則更新租約信息LeaseServiceImplpublic class LeaseServiceImpl extends BasedLesaseImpl {

private static final Log LOG = LogFactory.getLog(LeaseServiceImpl.class);

private transient ConcurrentHashMap<String, HoldLockTask> holdLockTasks = new ConcurrentHashMap();

protected ConcurrentHashMap<String, HoldLockFunture> seizeLockingFuntures = new ConcurrentHashMap<>();
//如果是搶占鎖狀態(tài)中,則不允許申請鎖

public LeaseServiceImpl() {
super();
}

/**
* 嘗試獲取鎖,可以等待waitTime,如果到點未返回,則直接返回。如果是-1,則一直等待
*
* @param name 業(yè)務名稱
* @param lockerName 鎖名稱
* @param waitTime 等待時間,是微秒單位
* @return
*/
@Override
public boolean tryLocker(String name, String lockerName, long waitTime) {
return tryLocker(name, lockerName, waitTime, ILeaseService.DE**T_LOCK_TIME);
}

@Override
public boolean tryLocker(String name, String lockerName, long waitTime, int lockTimeSecond) {
long now = System.currentTimeMillis();
boolean success = lock(name, lockerName, lockTimeSecond);
while (!success) {
if (waitTime > -1 && (System.currentTimeMillis() – now > waitTime)) {
break;
}
success = lock(name, lockerName, lockTimeSecond);
if (success) {
return success;
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOG.error("LeaseServiceImpl try locker error", e);
}
}
return success;

}

@Override
public boolean lock(String name, String lockerName) {
return lock(name, lockerName, ILeaseService.DE**T_LOCK_TIME);
}

@Override
public boolean lock(String name, String lockerName, int leaseSecond) {
lockerName = createLockName(name, lockerName);
Future future = seizeLockingFuntures.get(lockerName);
if (future != null && ((HoldLockFunture)future).isDone == false) {
return false;
}
Date nextLeaseDate =
DateUtil.addSecond(new Date(), leaseSecond);// 默認鎖定5分鐘,用完需要立刻釋放.如果時間不同步,可能導致鎖失敗
return tryGetLease(lockerName, nextLeaseDate);
}

@Override
public boolean unlock(String name, String lockerName) {
// LOG.info("LeaseServiceImpl unlock,name:" + name);
lockerName = createLockName(name, lockerName);
LeaseInfo validateLeaseInfo = queryValidateLease(lockerName);
if (validateLeaseInfo == null) {
LOG.warn("LeaseServiceImpl unlock,validateLeaseInfo is null,lockerName:" + lockerName);
}
if (validateLeaseInfo != null && validateLeaseInfo.getLeaseUserIp().equals(getSelfUser())) {
validateLeaseInfo.setStatus(0);
updateDBLeaseInfo(validateLeaseInfo);
}
HoldLockTask holdLockTask = holdLockTasks.remove(lockerName);
if (holdLockTask != null) {
holdLockTask.close();
}
leaseName2Date.remove(lockerName);
return false;
}

/**
* 如果有鎖,則一直持有,如果不能獲取,則結束。和租約不同,租約是沒有也會嘗試重試,一備對方掛機,自己可以接手工作
*
* @param name
* @param secondeName
* @param lockTimeSecond 獲取鎖的時間
* @return
*/
@Override
public boolean holdLock(String name, String secondeName, int lockTimeSecond) {
if (hasHoldLock(name, secondeName)) {
return true;
}
synchronized (this) {
if (hasHoldLock(name, secondeName)) {
return true;
}
String lockerName = createLockName(name, secondeName);
Date nextLeaseDate =
DateUtil.addSecond(new Date(), lockTimeSecond);
boolean success = tryGetLease(lockerName, nextLeaseDate);// 申請鎖,鎖的時間是leaseTerm
if (!success) {
return false;
}
leaseName2Date.put(lockerName, nextLeaseDate);

if (!holdLockTasks.containsKey(lockerName)) {
HoldLockTask holdLockTask = new HoldLockTask(lockTimeSecond, lockerName, this);
holdLockTask.start();
holdLockTasks.putIfAbsent(lockerName, holdLockTask);
}
}

return true;
}

/**
* 是否持有鎖,不訪問數據庫,直接看本地
*
* @param name
* @param secondeName
* @return
*/
@Override
public boolean hasHoldLock(String name, String secondeName) {
String lockerName = createLockName(name, secondeName);
return hasLease(lockerName);
}

@Override
public List<LeaseInfo> queryLockedInstanceByNamePrefix(String name, String lockerNamePrefix) {
String leaseNamePrefix = MapKeyUtil.createKey(name, lockerNamePrefix);
return queryValidateLeaseByNamePrefix(leaseNamePrefix);
}

//……
}
LeaseServiceImpl繼承了BasedLesaseImpl,tryLocker方**根據等待時間循環(huán)執(zhí)行l(wèi)ock,lock方法則執(zhí)行tryGetLease,unlock方法則更新租約信息,同時移除內存記錄;holdLock則通過hasHoldLock判斷是否持有鎖,若有則返回,沒有則執(zhí)行tryGetLeaseILeaseStoragepublic interface ILeaseStorage {

/**
* 更新lease info,需要是原子**作,存儲保障多線程**作的原子性
*
* @param leaseInfo 租約表數據
* @return
*/
boolean updateLeaseInfo(LeaseInfo leaseInfo);

/**
* 統(tǒng)計這個租約名稱下,LeaseInfo對象個數
*
* @param leaseName 租約名稱,無特殊要求,相同名稱會競爭租約
* @return
*/
Integer countLeaseInfo(String leaseName);

/**
* 查詢無效的的租約
*
* @param leaseName 租約名稱,無特殊要求,相同名稱會競爭租約
* @return
*/
LeaseInfo queryInValidateLease(String leaseName);

/**
* 查詢有效的的租約
*
* @param leaseName 租約名稱,無特殊要求,相同名稱會競爭租約
* @return
*/
LeaseInfo queryValidateLease(String leaseName);

/**
* 按前綴查詢有效的租約信息
*
* @param namePrefix
* @return
*/
List<LeaseInfo> queryValidateLeaseByNamePrefix(String namePrefix);

/**
* 增加租約
*
* @param leaseInfo 租約名稱,無特殊要求,相同名稱會競爭租約
*/
void addLeaseInfo(LeaseInfo leaseInfo);

}
ILeaseStorage接口定義了updateLeaseInfo、countLeaseInfo、queryInValidateLease、queryValidateLease、queryValidateLeaseByNamePrefix、addLeaseInfo方法DBLeaseStoragepublic class DBLeaseStorage implements ILeaseStorage {
private static final Log LOG = LogFactory.getLog(DBLeaseStorage.class);
protected JDBCDriver jdbcDataSource;
private String url;
protected String userName;
protected String password;
protected String jdbc;

public DBLeaseStorage(String jdbc, String url, String userName, String password) {
this.jdbc = jdbc;
this.url = url;
this.userName = userName;
this.password = password;
jdbcDataSource = DriverBuilder.createDriver(jdbc, url, userName, password);
}

@Override
public boolean updateLeaseInfo(LeaseInfo leaseInfo) {
String sql = "UPDATE lease_info SET version=version+1,status=#{status},gmt_modified=now()";
String whereSQL = " WHERE id=#{id} and version=#{version}";

if (StringUtil.isNotEmpty(leaseInfo.getLeaseName())) {
sql += ",lease_name=#{leaseName}";
}
if (StringUtil.isNotEmpty(leaseInfo.getLeaseUserIp())) {
sql += ",lease_user_ip=#{leaseUserIp}";
}
if (leaseInfo.getLeaseEndDate() != null) {
sql += ",lease_end_time=#{leaseEndDate}";
}
sql += whereSQL;
sql = SQLUtil.parseIbatisSQL(leaseInfo, sql);
try {
int count = getOrCreateJDBCDataSource().update(sql);
boolean success = count > 0;
if (success) {
synchronized (this) {
leaseInfo.setVersion(leaseInfo.getVersion() + 1);
}
} else {
System.out.println(count);
}
return success;
} catch (Exception e) {
LOG.error("LeaseServiceImpl updateLeaseInfo excuteUpdate error", e);
throw new RuntimeException("execute sql error " + sql, e);
}
}

@Override
public Integer countLeaseInfo(String leaseName) {
String sql = "SELECT count(*) as c FROM lease_info WHERE lease_name = '" + leaseName + "' and status = 1";
try {

List<Map<String, Object>> rows = getOrCreateJDBCDataSource().queryForList(sql);
if (rows == null || rows.size() == 0) {
return null;
}
Long value = (Long) rows.get(0).get("c");
return value.intValue();
} catch (Exception e) {
throw new RuntimeException("execute sql error " + sql, e);
}
}

@Override
public LeaseInfo queryInValidateLease(String leaseName) {
String sql = "SELECT * FROM lease_info WHERE lease_name ='" + leaseName + "' and status=1 and lease_end_time<'" + DateUtil.getCurrentTimeString() + "'";
LOG.info("LeaseServiceImpl queryInValidateLease builder:" + sql);
return queryLease(leaseName, sql);
}

@Override
public LeaseInfo queryValidateLease(String leaseName) {
String sql = "SELECT * FROM lease_info WHERE lease_name ='" + leaseName + "' and status=1 and lease_end_time>now()";
return queryLease(leaseName, sql);
}

@Override
public List<LeaseInfo> queryValidateLeaseByNamePrefix(String namePrefix) {
String sql = "SELECT * FROM lease_info WHERE lease_name like '" + namePrefix + "%' and status=1 and lease_end_time>now()";
try {
List<LeaseInfo> leaseInfos = new ArrayList<>();
List<Map<String, Object>> rows = getOrCreateJDBCDataSource().queryForList(sql);
if (rows == null || rows.size() == 0) {
return null;
}
for (Map<String, Object> row : rows) {
LeaseInfo leaseInfo = convert(row);
leaseInfos.add(leaseInfo);
}

return leaseInfos;
} catch (Exception e) {
throw new RuntimeException("execute sql error " + sql, e);
}
}

@Override
public void addLeaseInfo(LeaseInfo leaseInfo) {
String sql =
" REPLACE INTO lease_info(lease_name,lease_user_ip,lease_end_time,status,version,gmt_create,gmt_modified)"
+ " VALUES (#{leaseName},#{leaseUserIp},#{leaseEndDate},#{status},#{version},now(),now())";
sql = SQLUtil.parseIbatisSQL(leaseInfo, sql);
try {

getOrCreateJDBCDataSource().execute(sql);
} catch (Exception e) {
LOG.error("LeaseServiceImpl execute sql error,sql:" + sql, e);
throw new RuntimeException("execute sql error " + sql, e);
}
}

protected JDBCDriver getOrCreateJDBCDataSource() {
if (this.jdbcDataSource == null || !this.jdbcDataSource.isValidate()) {
synchronized (this) {
if (this.jdbcDataSource == null || !this.jdbcDataSource.isValidate()) {
this.jdbcDataSource =
DriverBuilder.createDriver(this.jdbc, this.url, this.userName, this.password);
}
}
}
return jdbcDataSource;
}

protected LeaseInfo queryLease(String name, String sql) {
try {
List<Map<String, Object>> rows = getOrCreateJDBCDataSource().queryForList(sql);
if (rows == null || rows.size() == 0) {
return null;
}
return convert(rows.get(0));
} catch (Exception e) {
throw new RuntimeException("execute sql error " + sql, e);
}
}

protected LeaseInfo convert(Map<String, Object> map) {
LeaseInfo leaseInfo = new LeaseInfo();
leaseInfo.setId(getMapLongValue("id", map));
leaseInfo.setCreateTime(getMapDateValue("gmt_create", map));
leaseInfo.setLeaseEndDate(getMapDateValue("lease_end_time", map));
leaseInfo.setLeaseName(getMapValue("lease_name", map, String.class));
leaseInfo.setLeaseUserIp(getMapValue("lease_user_ip", map, String.class));
Integer status = getMapValue("status", map, Integer.class);
if (status != null) {
leaseInfo.setStatus(status);
}
leaseInfo.setUpdateTime(getMapDateValue("gmt_modified", map));
Long version = getMapLongValue("version", map);
if (version != null) {
leaseInfo.setVersion(version);
}
return leaseInfo;
}

@SuppressWarnings("unchecked")
private <T> T getMapValue(String fieldName, Map<String, Object> map, Class<T> integerClass) {
Object value = map.get(fieldName);
if (value == null) {
return null;
}
return (T) value;
}

private Long getMapLongValue(String fieldName, Map<String, Object> map) {
Object value = map.get(fieldName);
if (value == null) {
return null;
}
if (value instanceof Long) {
return (Long) value;
}
if (value instanceof BigInteger) {
return ((BigInteger) value).longValue();
}
return null;
}

private Date getMapDateValue(String fieldName, Map<String, Object> map) {
Object value = map.get(fieldName);
if (value == null) {
return null;
}
if (value instanceof Date) {
return (Date) value;
}
if (value instanceof String) {
return DateUtil.parseTime(((String) value));
}
return null;

}

}
DBLeaseStorage實現了ILeaseStorage接口,使用jdbc實現了其方法小結

rocketmq-streams的LeaseService基于db實現了租約和鎖,可用于主備場景切換。

拓展知識:

前沿拓展:


本文主要研究一下rocketmq-streams的ILeaseService

ILeaseService/**
* 通過db實現租約和鎖,可以更輕量級,減少其他中間件的依賴 使用主備場景,只有一個實例運行,當當前實例掛掉,在一定時間內,會被其他實例接手 也可以用于全局鎖
*/
public interface ILeaseService {

/**
* 默認鎖定時間
*/
static final int DE**T_LOCK_TIME = 60 * 5;

/**
* 檢查某用戶當前時間是否具有租約。這個方法是純內存**作,無性能開銷
*
* @return true,租約有效;false,租約無效
*/
boolean hasLease(String name);

/**
* 申請租約,會啟動一個線程,不停申請租約,直到申請成功。 申請成功后,每 租期/2 續(xù)約。 如果目前被其他租戶獲取租約,只有在對方租約失效,后才允許新的租戶獲取租約
*
* @param name 租約名稱,無特殊要求,相同名稱會競爭租約
*/
void startLeaseTask(String name);

/**
* 申請租約,會啟動一個線程,不停申請租約,直到申請成功。 申請成功后,每 租期/2 續(xù)約。 如果目前被其他租戶獲取租約,只有在對方租約失效,后才允許新的租戶獲取租約
*
* @param name 租約名稱,無特殊要求,相同名稱會競爭租約
* @param callback 當第一獲取租約時,回調此函數
*/
void startLeaseTask(final String name, ILeaseGetCallback callback);

/**
* 申請租約,會啟動一個線程,不停申請租約,直到申請成功。 申請成功后,每 租期/2 續(xù)約。 如果目前被其他租戶獲取租約,只有在對方租約失效,后才允許新的租戶獲取租約
*
* @param name 租約名稱,無特殊要求,相同名稱會競爭租約
* @param leaseTermSecond 租期,在租期內可以做業(yè)務處理,單位是秒
* @param callback 當第一獲取租約時,回調此函數
*/
void startLeaseTask(final String name, int leaseTermSecond, ILeaseGetCallback callback);

/**
* 申請鎖,無論成功與否,立刻返回。如果不釋放,最大鎖定時間是5分鐘
*
* @param name 業(yè)務名稱
* @param lockerName 鎖名稱
* @return 是否枷鎖成功
*/
boolean lock(String name, String lockerName);

/**
* 申請鎖,無論成功與否,立刻返回。默認鎖定時間是5分鐘
*
* @param name 業(yè)務名稱
* @param lockerName 鎖名稱
* @param lockTimeSecond 如果不釋放,鎖定的最大時間,單位是秒
* @return 是否枷鎖成功
* @return
*/
boolean lock(String name, String lockerName, int lockTimeSecond);

/**
* 申請鎖,如果沒有則等待,等待時間可以指定,如果是-1 則無限等待。如果不釋放,最大鎖定時間是5分鐘
*
* @param name 業(yè)務名稱
* @param lockerName 鎖名稱
* @param waitTime 沒獲取鎖時,最大等待多長時間,如果是-1 則無限等待
* @return 是否枷鎖成功
*/
boolean tryLocker(String name, String lockerName, long waitTime);

/**
* 申請鎖,如果沒有則等待,等待時間可以指定,如果是-1 則無限等待。如果不釋放,最大鎖定時間是lockTimeSecond
*
* @param name 業(yè)務名稱
* @param lockerName 鎖名稱
* @param waitTime 沒獲取鎖時,最大等待多長時間,如果是-1 則無限等待
* @param lockTimeSecond 如果不釋放,鎖定的最大時間,單位是秒
* @return 是否枷鎖成功
*/
boolean tryLocker(String name, String lockerName, long waitTime, int lockTimeSecond);

/**
* 釋放鎖
*
* @param name
* @param lockerName
* @return
*/
boolean unlock(String name, String lockerName);

/**
* 對于已經獲取鎖的,可以通過這個方法,一直持有鎖。 和租約的區(qū)別是,當釋放鎖后,無其他實例搶占。無法實現主備模式
*
* @param name 業(yè)務名稱
* @param lockerName 鎖名稱
* @param lockTimeSecond 租期,這個方**自動續(xù)約,如果不主動釋放,會一直持有鎖
* @return 是否成功獲取鎖
*/
boolean holdLock(String name, String lockerName, int lockTimeSecond);

/**
* 是否持有鎖,不會申請鎖。如果以前申請過,且未過期,返回true,否則返回false
*
* @param name 業(yè)務名稱
* @param lockerName 鎖名稱
* @return
*/
boolean hasHoldLock(String name, String lockerName);

List<LeaseInfo> queryLockedInstanceByNamePrefix(String name, String lockerNamePrefix);

}
ILeaseService接口定義了hasLease、startLeaseTask、lock、tryLocker、unlock、holdLock、hasHoldLock、queryLockedInstanceByNamePrefix方法BasedLesaseImplpublic abstract class BasedLesaseImpl implements ILeaseService {
private static final Log LOG = LogFactory.getLog(BasedLesaseImpl.class);

private static final String CONSISTENT_HASH_PREFIX = "consistent_hash_";
private static final AtomicBoolean syncStart = new AtomicBoolean(false);
private static final int synTime = 120; // 5分鐘的一致性hash同步時間太久了,改為2分鐘
protected ScheduledExecutorService taskExecutor = null;
protected int leaseTerm = 300 * 2; // 租約時間

// protected transient JDBCDriver jdbcDataSource = null;
protected ILeaseStorage leaseStorage;
protected volatile Map<String, Date> leaseName2Date = new ConcurrentHashMap<>(); // 每個lease name對應的租約到期時間

public BasedLesaseImpl() {

taskExecutor = new ScheduledThreadPoolExecutor(10);
}

/**
* lease_name: consistent_hash_ip, lease_user_ip: ip,定時刷新lease_info表,檢查一致性hash環(huán)的節(jié)點情況
*
* @param name
* @return
*/
@Override
public boolean hasLease(String name) {
// 內存中沒有租約信息則表示 沒有租約
Date leaseEndTime = leaseName2Date.get(name);
if (leaseEndTime == null) {
// LOG.info("內存中根據 " + name + "沒有查詢到租約信息,表示沒有租約");
return false;
}
// LOG.info("查詢是否有租約 name:" + name + " ,當前時間:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())
// + " 租約到期時間 " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(leaseEndTime));
// 有租約時間,并且租約時間大于當前時間,表示有租約信息
if (new Date().before(leaseEndTime)) {
return true;
}

return false;
}

private final Map<String, AtomicBoolean> startLeaseMap = new HashMap<>();

@Override
public void startLeaseTask(final String name) {
startLeaseTask(name, this.leaseTerm, null);
}

@Override
public void startLeaseTask(final String name, ILeaseGetCallback callback) {
startLeaseTask(name, this.leaseTerm, callback);
}

@Override
public void startLeaseTask(final String name, int leaseTerm, ILeaseGetCallback callback) {
ApplyTask applyTask = new ApplyTask(leaseTerm, name, callback);
startLeaseTask(name, applyTask, leaseTerm / 2, true);
}

/**
* 啟動定時器,定時執(zhí)行任務,確保任務可重入
*
* @param name
* @param runnable 具體任務
* @param scheduleTime 調度時間
* @param startNow 是否立刻啟動一次
*/
protected void startLeaseTask(final String name, Runnable runnable, int scheduleTime, boolean startNow) {
AtomicBoolean isStartLease = startLeaseMap.get(name);//多次調用,只啟動一次定時任務
if (isStartLease == null) {
synchronized (this) {
isStartLease = startLeaseMap.get(name);
if (isStartLease == null) {
isStartLease = new AtomicBoolean(false);
startLeaseMap.put(name, isStartLease);
}
}
}
if (isStartLease.compareAndSet(false, true)) {
if (startNow) {
runnable.run();
}
taskExecutor.scheduleWithFixedDelay(runnable, 0, scheduleTime, TimeUnit.SECONDS);
}
}

//……
}
BasedLesaseImpl聲明實現了ILeaseService,它依賴ILeaseStorage,startLeaseTask方**創(chuàng)建ApplyTask,第二以固定間隔調度執(zhí)行ApplyTask /**
* 續(xù)約任務
*/
protected class ApplyTask implements Runnable {

protected String name;
protected int leaseTerm;
protected ILeaseGetCallback callback;

public ApplyTask(int leaseTerm, String name) {
this(leaseTerm, name, null);
}

public ApplyTask(int leaseTerm, String name, ILeaseGetCallback callback) {
this.name = name;
this.leaseTerm = leaseTerm;
this.callback = callback;
}

@Override
public void run() {
try {
// LOG.info("LeaseServiceImpl name: " + name + "開始獲取租約…");
AtomicBoolean newApplyLease = new AtomicBoolean(false);
Date leaseDate = applyLeaseTask(leaseTerm, name, newApplyLease);
if (leaseDate != null) {
leaseName2Date.put(name, leaseDate);
LOG.info("LeaseServiceImpl, name: " + name + " " + getSelfUser() + " 獲取租約成功, 租約到期時間為 "
+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(leaseDate));
} else {
// fix.2020.08.13 這時name對應的租約可能還在有效期內,或者本機還持有租約,需要remove
// leaseName2Date.remove(name);
LOG.info("LeaseServiceImpl name: " + name + " " + getSelfUser() + " 獲取租約失敗 ");
}
if (newApplyLease.get() && callback != null) {
callback.callback(leaseDate);
}
} catch (Exception e) {
LOG.error(" LeaseServiceImpl name: " + name + " " + getSelfUser() + " 獲取租約出現異常 ", e);
}

}
}

/**
* 申請租約,如果當期租約有效,直接更新一個租約周期,如果當前租約無效,先查詢是否有有效的租約,如果有申請失敗,否則直接申請租約
*/
protected Date applyLeaseTask(int leaseTerm, String name, AtomicBoolean newApplyLease) {

// 計算下一次租約時間 = 當前時間 + 租約時長
Date nextLeaseDate = DateUtil.addSecond(new Date(), leaseTerm);

// 1 如果已經有租約,則更新租約時間(內存和數據庫)即可
if (hasLease(name)) {
// LOG.info("用戶已有租約,更新數據庫和內存中的租約信息");
// 更新數據庫
LeaseInfo leaseInfo = queryValidateLease(name);
if (leaseInfo == null) {
LOG.error("LeaseServiceImpl applyLeaseTask leaseInfo is null");
return null;
}
// fix.2020.08.13,與本機ip相等且滿足一致性hash分配策略,才續(xù)約,其他情況為null
String leaseUserIp = leaseInfo.getLeaseUserIp();
if (!leaseUserIp.equals(getSelfUser())) {
return null;
}
leaseInfo.setLeaseEndDate(nextLeaseDate);
updateLeaseInfo(leaseInfo);
return nextLeaseDate;
}

// 2 沒有租約情況 判斷是否可以獲取租約,只要租約沒有被其他人獲取,則說明有有效租約
boolean success = canGetLease(name);
if (!success) { // 表示被其他機器獲取到了有效的租約
// LOG.info("其他機器獲取到了有效的租約");
return null;
}

// 3 沒有租約而且可以獲取租約的情況,則嘗試使用數據庫原子更新的方式獲取租約,保證只有一臺機器成功獲取租約,而且可以運行
boolean flag = tryGetLease(name, nextLeaseDate);
if (flag) { // 獲取租約成功
newApplyLease.set(true);
return nextLeaseDate;
}
return null;

}
ApplyTask內部調用applyLeaseTask,如果已有租約則更新租約時間,沒有租約則判斷是否可以獲取租約,可以則執(zhí)行tryGetLeasetryGetLease /**
* 更新數據庫,占用租期并更新租期時間
*
* @param time
*/
protected boolean tryGetLease(String name, Date time) {
// LOG.info("嘗試獲取租約 lease name is : " + name + " 下次到期時間: "
// + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(time));
LeaseInfo validateLeaseInfo = queryValidateLease(name);

if (validateLeaseInfo == null) {// 這里有兩種情況 1 數據庫里面沒有租約信息 2 數據庫里面有租約信息但是已經過期
Integer count = countLeaseInfo(name);
if (count == null || count == 0) {// 表示現在數據庫里面沒有任何租約信息,插入租約成功則表示獲取成功,失敗表示在這一時刻其他機器獲取了租約
// LOG.info("數據庫中暫時沒有租約信息,嘗試原子插入租約:" + name);
// fix.2020.08.13,經過一致性hash計算,該名字的任務不應該在本機執(zhí)行,直接返回,無需插入。只有分配到hash執(zhí)行權限的機器才可以插入并獲取租約
if (!getSelfUser().equals(getConsistentHashHost(name))) {
return false;
}
validateLeaseInfo = new LeaseInfo();
validateLeaseInfo.setLeaseName(name);
validateLeaseInfo.setLeaseUserIp(getSelfUser());
validateLeaseInfo.setLeaseEndDate(time);
validateLeaseInfo.setStatus(1);
validateLeaseInfo.setVersion(1);
if (insert(validateLeaseInfo)) {
LOG.info("數據庫中暫時沒有租約信息,原子插入成功,獲取租約成功:" + name);
return true;
} else {
LOG.info("數據庫中暫時沒有租約信息,原子插入失敗,已經被其他機器獲取租約:" + name);
return false;
}
} else { // 表示數據庫里面有一條但是無效,這里需要兩臺機器按照version進行原子更新,更新成功的獲取租約
// LOG.info("數據庫中有一條無效的租約信息,嘗試根據版本號去原子更新租約信息:" + name);
LeaseInfo inValidateLeaseInfo = queryInValidateLease(name);
if (inValidateLeaseInfo == null) {// 說明這個時候另外一臺機器獲取成功了
LOG.info("另外一臺機器獲取成功了租約:" + name);
return false;
}
// fix.2020.08.13,機器重啟之后,該名字的任務已經不分配在此機器上執(zhí)行,直接返回,無需更新數據庫
if (!getSelfUser().equals(getConsistentHashHost(name))) {
return false;
}
inValidateLeaseInfo.setLeaseName(name);
inValidateLeaseInfo.setLeaseUserIp(getSelfUser());
inValidateLeaseInfo.setLeaseEndDate(time);
inValidateLeaseInfo.setStatus(1);
boolean success = updateDBLeaseInfo(inValidateLeaseInfo);
if (success) {
LOG.info("LeaseServiceImpl 原子更新租約成功,當前機器獲取到了租約信息:" + name);
} else {
LOG.info("LeaseServiceImpl 原子更新租約失敗,租約被其他機器獲取:" + name);
}
return success;
}

} else { // 判斷是否是自己獲取了租約,如果是自己獲取了租約則更新時間(內存和數據庫),
// 這里是為了解決機器重啟的情況,機器重啟,內存中沒有租約信息,但是實際上該用戶是有租約權限的
// fix.2020.08.13,租約的ip與本機ip相等,且滿足一致性hash策略,才會被本機執(zhí)行
String leaseUserIp = validateLeaseInfo.getLeaseUserIp();
if (leaseUserIp.equals(getSelfUser())) {
// 如果當期用戶有租約信息,則更新數據庫
validateLeaseInfo.setLeaseEndDate(time);
boolean hasUpdate = updateLeaseInfo(validateLeaseInfo);
if (hasUpdate) {
LOG.info(
"LeaseServiceImpl機器重啟情況,當前用戶有租約信息,并且更新數據庫成功,租約信息為 name :" + validateLeaseInfo.getLeaseName()
+ " ip : " + validateLeaseInfo.getLeaseUserIp() + " 到期時間 : " + new SimpleDateFormat(
"yyyy-MM-dd HH:mm:ss").format(validateLeaseInfo.getLeaseEndDate()));
return true;
} else {
LOG.info("LeaseServiceImpl 機器重啟情況,當前用戶有租約信息,并且更新數據庫失敗,表示失去租約:" + name);
return false;
}
}
// LOG.info("LeaseServiceImpl 租約被其他機器獲取,租約信息為 name :" + validateLeaseInfo.getLeaseName() + " ip : "
// + validateLeaseInfo.getLeaseUserIp() + " 到期時間 : "
// + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(validateLeaseInfo.getLeaseEndDate()));
return false;
}

}

protected LeaseInfo queryValidateLease(String name) {
//String sql = "SELECT * FROM lease_info WHERE lease_name ='" + name + "' and status=1 and lease_end_time>now()";
//// LOG.info("LeaseServiceImpl query validate lease sql:" + sql);
//return queryLease(name, sql);
return leaseStorage.queryValidateLease(name);
}
tryGetLease先通過queryValidateLease查詢租約信息,若沒有租約則插入,若過期則根據版本號更新,若已有租約則判斷是否是自己獲取了租約,是則更新租約信息LeaseServiceImplpublic class LeaseServiceImpl extends BasedLesaseImpl {

private static final Log LOG = LogFactory.getLog(LeaseServiceImpl.class);

private transient ConcurrentHashMap<String, HoldLockTask> holdLockTasks = new ConcurrentHashMap();

protected ConcurrentHashMap<String, HoldLockFunture> seizeLockingFuntures = new ConcurrentHashMap<>();
//如果是搶占鎖狀態(tài)中,則不允許申請鎖

public LeaseServiceImpl() {
super();
}

/**
* 嘗試獲取鎖,可以等待waitTime,如果到點未返回,則直接返回。如果是-1,則一直等待
*
* @param name 業(yè)務名稱
* @param lockerName 鎖名稱
* @param waitTime 等待時間,是微秒單位
* @return
*/
@Override
public boolean tryLocker(String name, String lockerName, long waitTime) {
return tryLocker(name, lockerName, waitTime, ILeaseService.DE**T_LOCK_TIME);
}

@Override
public boolean tryLocker(String name, String lockerName, long waitTime, int lockTimeSecond) {
long now = System.currentTimeMillis();
boolean success = lock(name, lockerName, lockTimeSecond);
while (!success) {
if (waitTime > -1 && (System.currentTimeMillis() – now > waitTime)) {
break;
}
success = lock(name, lockerName, lockTimeSecond);
if (success) {
return success;
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOG.error("LeaseServiceImpl try locker error", e);
}
}
return success;

}

@Override
public boolean lock(String name, String lockerName) {
return lock(name, lockerName, ILeaseService.DE**T_LOCK_TIME);
}

@Override
public boolean lock(String name, String lockerName, int leaseSecond) {
lockerName = createLockName(name, lockerName);
Future future = seizeLockingFuntures.get(lockerName);
if (future != null && ((HoldLockFunture)future).isDone == false) {
return false;
}
Date nextLeaseDate =
DateUtil.addSecond(new Date(), leaseSecond);// 默認鎖定5分鐘,用完需要立刻釋放.如果時間不同步,可能導致鎖失敗
return tryGetLease(lockerName, nextLeaseDate);
}

@Override
public boolean unlock(String name, String lockerName) {
// LOG.info("LeaseServiceImpl unlock,name:" + name);
lockerName = createLockName(name, lockerName);
LeaseInfo validateLeaseInfo = queryValidateLease(lockerName);
if (validateLeaseInfo == null) {
LOG.warn("LeaseServiceImpl unlock,validateLeaseInfo is null,lockerName:" + lockerName);
}
if (validateLeaseInfo != null && validateLeaseInfo.getLeaseUserIp().equals(getSelfUser())) {
validateLeaseInfo.setStatus(0);
updateDBLeaseInfo(validateLeaseInfo);
}
HoldLockTask holdLockTask = holdLockTasks.remove(lockerName);
if (holdLockTask != null) {
holdLockTask.close();
}
leaseName2Date.remove(lockerName);
return false;
}

/**
* 如果有鎖,則一直持有,如果不能獲取,則結束。和租約不同,租約是沒有也會嘗試重試,一備對方掛機,自己可以接手工作
*
* @param name
* @param secondeName
* @param lockTimeSecond 獲取鎖的時間
* @return
*/
@Override
public boolean holdLock(String name, String secondeName, int lockTimeSecond) {
if (hasHoldLock(name, secondeName)) {
return true;
}
synchronized (this) {
if (hasHoldLock(name, secondeName)) {
return true;
}
String lockerName = createLockName(name, secondeName);
Date nextLeaseDate =
DateUtil.addSecond(new Date(), lockTimeSecond);
boolean success = tryGetLease(lockerName, nextLeaseDate);// 申請鎖,鎖的時間是leaseTerm
if (!success) {
return false;
}
leaseName2Date.put(lockerName, nextLeaseDate);

if (!holdLockTasks.containsKey(lockerName)) {
HoldLockTask holdLockTask = new HoldLockTask(lockTimeSecond, lockerName, this);
holdLockTask.start();
holdLockTasks.putIfAbsent(lockerName, holdLockTask);
}
}

return true;
}

/**
* 是否持有鎖,不訪問數據庫,直接看本地
*
* @param name
* @param secondeName
* @return
*/
@Override
public boolean hasHoldLock(String name, String secondeName) {
String lockerName = createLockName(name, secondeName);
return hasLease(lockerName);
}

@Override
public List<LeaseInfo> queryLockedInstanceByNamePrefix(String name, String lockerNamePrefix) {
String leaseNamePrefix = MapKeyUtil.createKey(name, lockerNamePrefix);
return queryValidateLeaseByNamePrefix(leaseNamePrefix);
}

//……
}
LeaseServiceImpl繼承了BasedLesaseImpl,tryLocker方**根據等待時間循環(huán)執(zhí)行l(wèi)ock,lock方法則執(zhí)行tryGetLease,unlock方法則更新租約信息,同時移除內存記錄;holdLock則通過hasHoldLock判斷是否持有鎖,若有則返回,沒有則執(zhí)行tryGetLeaseILeaseStoragepublic interface ILeaseStorage {

/**
* 更新lease info,需要是原子**作,存儲保障多線程**作的原子性
*
* @param leaseInfo 租約表數據
* @return
*/
boolean updateLeaseInfo(LeaseInfo leaseInfo);

/**
* 統(tǒng)計這個租約名稱下,LeaseInfo對象個數
*
* @param leaseName 租約名稱,無特殊要求,相同名稱會競爭租約
* @return
*/
Integer countLeaseInfo(String leaseName);

/**
* 查詢無效的的租約
*
* @param leaseName 租約名稱,無特殊要求,相同名稱會競爭租約
* @return
*/
LeaseInfo queryInValidateLease(String leaseName);

/**
* 查詢有效的的租約
*
* @param leaseName 租約名稱,無特殊要求,相同名稱會競爭租約
* @return
*/
LeaseInfo queryValidateLease(String leaseName);

/**
* 按前綴查詢有效的租約信息
*
* @param namePrefix
* @return
*/
List<LeaseInfo> queryValidateLeaseByNamePrefix(String namePrefix);

/**
* 增加租約
*
* @param leaseInfo 租約名稱,無特殊要求,相同名稱會競爭租約
*/
void addLeaseInfo(LeaseInfo leaseInfo);

}
ILeaseStorage接口定義了updateLeaseInfo、countLeaseInfo、queryInValidateLease、queryValidateLease、queryValidateLeaseByNamePrefix、addLeaseInfo方法DBLeaseStoragepublic class DBLeaseStorage implements ILeaseStorage {
private static final Log LOG = LogFactory.getLog(DBLeaseStorage.class);
protected JDBCDriver jdbcDataSource;
private String url;
protected String userName;
protected String password;
protected String jdbc;

public DBLeaseStorage(String jdbc, String url, String userName, String password) {
this.jdbc = jdbc;
this.url = url;
this.userName = userName;
this.password = password;
jdbcDataSource = DriverBuilder.createDriver(jdbc, url, userName, password);
}

@Override
public boolean updateLeaseInfo(LeaseInfo leaseInfo) {
String sql = "UPDATE lease_info SET version=version+1,status=#{status},gmt_modified=now()";
String whereSQL = " WHERE id=#{id} and version=#{version}";

if (StringUtil.isNotEmpty(leaseInfo.getLeaseName())) {
sql += ",lease_name=#{leaseName}";
}
if (StringUtil.isNotEmpty(leaseInfo.getLeaseUserIp())) {
sql += ",lease_user_ip=#{leaseUserIp}";
}
if (leaseInfo.getLeaseEndDate() != null) {
sql += ",lease_end_time=#{leaseEndDate}";
}
sql += whereSQL;
sql = SQLUtil.parseIbatisSQL(leaseInfo, sql);
try {
int count = getOrCreateJDBCDataSource().update(sql);
boolean success = count > 0;
if (success) {
synchronized (this) {
leaseInfo.setVersion(leaseInfo.getVersion() + 1);
}
} else {
System.out.println(count);
}
return success;
} catch (Exception e) {
LOG.error("LeaseServiceImpl updateLeaseInfo excuteUpdate error", e);
throw new RuntimeException("execute sql error " + sql, e);
}
}

@Override
public Integer countLeaseInfo(String leaseName) {
String sql = "SELECT count(*) as c FROM lease_info WHERE lease_name = '" + leaseName + "' and status = 1";
try {

List<Map<String, Object>> rows = getOrCreateJDBCDataSource().queryForList(sql);
if (rows == null || rows.size() == 0) {
return null;
}
Long value = (Long) rows.get(0).get("c");
return value.intValue();
} catch (Exception e) {
throw new RuntimeException("execute sql error " + sql, e);
}
}

@Override
public LeaseInfo queryInValidateLease(String leaseName) {
String sql = "SELECT * FROM lease_info WHERE lease_name ='" + leaseName + "' and status=1 and lease_end_time<'" + DateUtil.getCurrentTimeString() + "'";
LOG.info("LeaseServiceImpl queryInValidateLease builder:" + sql);
return queryLease(leaseName, sql);
}

@Override
public LeaseInfo queryValidateLease(String leaseName) {
String sql = "SELECT * FROM lease_info WHERE lease_name ='" + leaseName + "' and status=1 and lease_end_time>now()";
return queryLease(leaseName, sql);
}

@Override
public List<LeaseInfo> queryValidateLeaseByNamePrefix(String namePrefix) {
String sql = "SELECT * FROM lease_info WHERE lease_name like '" + namePrefix + "%' and status=1 and lease_end_time>now()";
try {
List<LeaseInfo> leaseInfos = new ArrayList<>();
List<Map<String, Object>> rows = getOrCreateJDBCDataSource().queryForList(sql);
if (rows == null || rows.size() == 0) {
return null;
}
for (Map<String, Object> row : rows) {
LeaseInfo leaseInfo = convert(row);
leaseInfos.add(leaseInfo);
}

return leaseInfos;
} catch (Exception e) {
throw new RuntimeException("execute sql error " + sql, e);
}
}

@Override
public void addLeaseInfo(LeaseInfo leaseInfo) {
String sql =
" REPLACE INTO lease_info(lease_name,lease_user_ip,lease_end_time,status,version,gmt_create,gmt_modified)"
+ " VALUES (#{leaseName},#{leaseUserIp},#{leaseEndDate},#{status},#{version},now(),now())";
sql = SQLUtil.parseIbatisSQL(leaseInfo, sql);
try {

getOrCreateJDBCDataSource().execute(sql);
} catch (Exception e) {
LOG.error("LeaseServiceImpl execute sql error,sql:" + sql, e);
throw new RuntimeException("execute sql error " + sql, e);
}
}

protected JDBCDriver getOrCreateJDBCDataSource() {
if (this.jdbcDataSource == null || !this.jdbcDataSource.isValidate()) {
synchronized (this) {
if (this.jdbcDataSource == null || !this.jdbcDataSource.isValidate()) {
this.jdbcDataSource =
DriverBuilder.createDriver(this.jdbc, this.url, this.userName, this.password);
}
}
}
return jdbcDataSource;
}

protected LeaseInfo queryLease(String name, String sql) {
try {
List<Map<String, Object>> rows = getOrCreateJDBCDataSource().queryForList(sql);
if (rows == null || rows.size() == 0) {
return null;
}
return convert(rows.get(0));
} catch (Exception e) {
throw new RuntimeException("execute sql error " + sql, e);
}
}

protected LeaseInfo convert(Map<String, Object> map) {
LeaseInfo leaseInfo = new LeaseInfo();
leaseInfo.setId(getMapLongValue("id", map));
leaseInfo.setCreateTime(getMapDateValue("gmt_create", map));
leaseInfo.setLeaseEndDate(getMapDateValue("lease_end_time", map));
leaseInfo.setLeaseName(getMapValue("lease_name", map, String.class));
leaseInfo.setLeaseUserIp(getMapValue("lease_user_ip", map, String.class));
Integer status = getMapValue("status", map, Integer.class);
if (status != null) {
leaseInfo.setStatus(status);
}
leaseInfo.setUpdateTime(getMapDateValue("gmt_modified", map));
Long version = getMapLongValue("version", map);
if (version != null) {
leaseInfo.setVersion(version);
}
return leaseInfo;
}

@SuppressWarnings("unchecked")
private <T> T getMapValue(String fieldName, Map<String, Object> map, Class<T> integerClass) {
Object value = map.get(fieldName);
if (value == null) {
return null;
}
return (T) value;
}

private Long getMapLongValue(String fieldName, Map<String, Object> map) {
Object value = map.get(fieldName);
if (value == null) {
return null;
}
if (value instanceof Long) {
return (Long) value;
}
if (value instanceof BigInteger) {
return ((BigInteger) value).longValue();
}
return null;
}

private Date getMapDateValue(String fieldName, Map<String, Object> map) {
Object value = map.get(fieldName);
if (value == null) {
return null;
}
if (value instanceof Date) {
return (Date) value;
}
if (value instanceof String) {
return DateUtil.parseTime(((String) value));
}
return null;

}

}
DBLeaseStorage實現了ILeaseStorage接口,使用jdbc實現了其方法小結

rocketmq-streams的LeaseService基于db實現了租約和鎖,可用于主備場景切換。

拓展知識:

前沿拓展:


本文主要研究一下rocketmq-streams的ILeaseService

ILeaseService/**
* 通過db實現租約和鎖,可以更輕量級,減少其他中間件的依賴 使用主備場景,只有一個實例運行,當當前實例掛掉,在一定時間內,會被其他實例接手 也可以用于全局鎖
*/
public interface ILeaseService {

/**
* 默認鎖定時間
*/
static final int DE**T_LOCK_TIME = 60 * 5;

/**
* 檢查某用戶當前時間是否具有租約。這個方法是純內存**作,無性能開銷
*
* @return true,租約有效;false,租約無效
*/
boolean hasLease(String name);

/**
* 申請租約,會啟動一個線程,不停申請租約,直到申請成功。 申請成功后,每 租期/2 續(xù)約。 如果目前被其他租戶獲取租約,只有在對方租約失效,后才允許新的租戶獲取租約
*
* @param name 租約名稱,無特殊要求,相同名稱會競爭租約
*/
void startLeaseTask(String name);

/**
* 申請租約,會啟動一個線程,不停申請租約,直到申請成功。 申請成功后,每 租期/2 續(xù)約。 如果目前被其他租戶獲取租約,只有在對方租約失效,后才允許新的租戶獲取租約
*
* @param name 租約名稱,無特殊要求,相同名稱會競爭租約
* @param callback 當第一獲取租約時,回調此函數
*/
void startLeaseTask(final String name, ILeaseGetCallback callback);

/**
* 申請租約,會啟動一個線程,不停申請租約,直到申請成功。 申請成功后,每 租期/2 續(xù)約。 如果目前被其他租戶獲取租約,只有在對方租約失效,后才允許新的租戶獲取租約
*
* @param name 租約名稱,無特殊要求,相同名稱會競爭租約
* @param leaseTermSecond 租期,在租期內可以做業(yè)務處理,單位是秒
* @param callback 當第一獲取租約時,回調此函數
*/
void startLeaseTask(final String name, int leaseTermSecond, ILeaseGetCallback callback);

/**
* 申請鎖,無論成功與否,立刻返回。如果不釋放,最大鎖定時間是5分鐘
*
* @param name 業(yè)務名稱
* @param lockerName 鎖名稱
* @return 是否枷鎖成功
*/
boolean lock(String name, String lockerName);

/**
* 申請鎖,無論成功與否,立刻返回。默認鎖定時間是5分鐘
*
* @param name 業(yè)務名稱
* @param lockerName 鎖名稱
* @param lockTimeSecond 如果不釋放,鎖定的最大時間,單位是秒
* @return 是否枷鎖成功
* @return
*/
boolean lock(String name, String lockerName, int lockTimeSecond);

/**
* 申請鎖,如果沒有則等待,等待時間可以指定,如果是-1 則無限等待。如果不釋放,最大鎖定時間是5分鐘
*
* @param name 業(yè)務名稱
* @param lockerName 鎖名稱
* @param waitTime 沒獲取鎖時,最大等待多長時間,如果是-1 則無限等待
* @return 是否枷鎖成功
*/
boolean tryLocker(String name, String lockerName, long waitTime);

/**
* 申請鎖,如果沒有則等待,等待時間可以指定,如果是-1 則無限等待。如果不釋放,最大鎖定時間是lockTimeSecond
*
* @param name 業(yè)務名稱
* @param lockerName 鎖名稱
* @param waitTime 沒獲取鎖時,最大等待多長時間,如果是-1 則無限等待
* @param lockTimeSecond 如果不釋放,鎖定的最大時間,單位是秒
* @return 是否枷鎖成功
*/
boolean tryLocker(String name, String lockerName, long waitTime, int lockTimeSecond);

/**
* 釋放鎖
*
* @param name
* @param lockerName
* @return
*/
boolean unlock(String name, String lockerName);

/**
* 對于已經獲取鎖的,可以通過這個方法,一直持有鎖。 和租約的區(qū)別是,當釋放鎖后,無其他實例搶占。無法實現主備模式
*
* @param name 業(yè)務名稱
* @param lockerName 鎖名稱
* @param lockTimeSecond 租期,這個方**自動續(xù)約,如果不主動釋放,會一直持有鎖
* @return 是否成功獲取鎖
*/
boolean holdLock(String name, String lockerName, int lockTimeSecond);

/**
* 是否持有鎖,不會申請鎖。如果以前申請過,且未過期,返回true,否則返回false
*
* @param name 業(yè)務名稱
* @param lockerName 鎖名稱
* @return
*/
boolean hasHoldLock(String name, String lockerName);

List<LeaseInfo> queryLockedInstanceByNamePrefix(String name, String lockerNamePrefix);

}
ILeaseService接口定義了hasLease、startLeaseTask、lock、tryLocker、unlock、holdLock、hasHoldLock、queryLockedInstanceByNamePrefix方法BasedLesaseImplpublic abstract class BasedLesaseImpl implements ILeaseService {
private static final Log LOG = LogFactory.getLog(BasedLesaseImpl.class);

private static final String CONSISTENT_HASH_PREFIX = "consistent_hash_";
private static final AtomicBoolean syncStart = new AtomicBoolean(false);
private static final int synTime = 120; // 5分鐘的一致性hash同步時間太久了,改為2分鐘
protected ScheduledExecutorService taskExecutor = null;
protected int leaseTerm = 300 * 2; // 租約時間

// protected transient JDBCDriver jdbcDataSource = null;
protected ILeaseStorage leaseStorage;
protected volatile Map<String, Date> leaseName2Date = new ConcurrentHashMap<>(); // 每個lease name對應的租約到期時間

public BasedLesaseImpl() {

taskExecutor = new ScheduledThreadPoolExecutor(10);
}

/**
* lease_name: consistent_hash_ip, lease_user_ip: ip,定時刷新lease_info表,檢查一致性hash環(huán)的節(jié)點情況
*
* @param name
* @return
*/
@Override
public boolean hasLease(String name) {
// 內存中沒有租約信息則表示 沒有租約
Date leaseEndTime = leaseName2Date.get(name);
if (leaseEndTime == null) {
// LOG.info("內存中根據 " + name + "沒有查詢到租約信息,表示沒有租約");
return false;
}
// LOG.info("查詢是否有租約 name:" + name + " ,當前時間:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())
// + " 租約到期時間 " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(leaseEndTime));
// 有租約時間,并且租約時間大于當前時間,表示有租約信息
if (new Date().before(leaseEndTime)) {
return true;
}

return false;
}

private final Map<String, AtomicBoolean> startLeaseMap = new HashMap<>();

@Override
public void startLeaseTask(final String name) {
startLeaseTask(name, this.leaseTerm, null);
}

@Override
public void startLeaseTask(final String name, ILeaseGetCallback callback) {
startLeaseTask(name, this.leaseTerm, callback);
}

@Override
public void startLeaseTask(final String name, int leaseTerm, ILeaseGetCallback callback) {
ApplyTask applyTask = new ApplyTask(leaseTerm, name, callback);
startLeaseTask(name, applyTask, leaseTerm / 2, true);
}

/**
* 啟動定時器,定時執(zhí)行任務,確保任務可重入
*
* @param name
* @param runnable 具體任務
* @param scheduleTime 調度時間
* @param startNow 是否立刻啟動一次
*/
protected void startLeaseTask(final String name, Runnable runnable, int scheduleTime, boolean startNow) {
AtomicBoolean isStartLease = startLeaseMap.get(name);//多次調用,只啟動一次定時任務
if (isStartLease == null) {
synchronized (this) {
isStartLease = startLeaseMap.get(name);
if (isStartLease == null) {
isStartLease = new AtomicBoolean(false);
startLeaseMap.put(name, isStartLease);
}
}
}
if (isStartLease.compareAndSet(false, true)) {
if (startNow) {
runnable.run();
}
taskExecutor.scheduleWithFixedDelay(runnable, 0, scheduleTime, TimeUnit.SECONDS);
}
}

//……
}
BasedLesaseImpl聲明實現了ILeaseService,它依賴ILeaseStorage,startLeaseTask方**創(chuàng)建ApplyTask,第二以固定間隔調度執(zhí)行ApplyTask /**
* 續(xù)約任務
*/
protected class ApplyTask implements Runnable {

protected String name;
protected int leaseTerm;
protected ILeaseGetCallback callback;

public ApplyTask(int leaseTerm, String name) {
this(leaseTerm, name, null);
}

public ApplyTask(int leaseTerm, String name, ILeaseGetCallback callback) {
this.name = name;
this.leaseTerm = leaseTerm;
this.callback = callback;
}

@Override
public void run() {
try {
// LOG.info("LeaseServiceImpl name: " + name + "開始獲取租約…");
AtomicBoolean newApplyLease = new AtomicBoolean(false);
Date leaseDate = applyLeaseTask(leaseTerm, name, newApplyLease);
if (leaseDate != null) {
leaseName2Date.put(name, leaseDate);
LOG.info("LeaseServiceImpl, name: " + name + " " + getSelfUser() + " 獲取租約成功, 租約到期時間為 "
+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(leaseDate));
} else {
// fix.2020.08.13 這時name對應的租約可能還在有效期內,或者本機還持有租約,需要remove
// leaseName2Date.remove(name);
LOG.info("LeaseServiceImpl name: " + name + " " + getSelfUser() + " 獲取租約失敗 ");
}
if (newApplyLease.get() && callback != null) {
callback.callback(leaseDate);
}
} catch (Exception e) {
LOG.error(" LeaseServiceImpl name: " + name + " " + getSelfUser() + " 獲取租約出現異常 ", e);
}

}
}

/**
* 申請租約,如果當期租約有效,直接更新一個租約周期,如果當前租約無效,先查詢是否有有效的租約,如果有申請失敗,否則直接申請租約
*/
protected Date applyLeaseTask(int leaseTerm, String name, AtomicBoolean newApplyLease) {

// 計算下一次租約時間 = 當前時間 + 租約時長
Date nextLeaseDate = DateUtil.addSecond(new Date(), leaseTerm);

// 1 如果已經有租約,則更新租約時間(內存和數據庫)即可
if (hasLease(name)) {
// LOG.info("用戶已有租約,更新數據庫和內存中的租約信息");
// 更新數據庫
LeaseInfo leaseInfo = queryValidateLease(name);
if (leaseInfo == null) {
LOG.error("LeaseServiceImpl applyLeaseTask leaseInfo is null");
return null;
}
// fix.2020.08.13,與本機ip相等且滿足一致性hash分配策略,才續(xù)約,其他情況為null
String leaseUserIp = leaseInfo.getLeaseUserIp();
if (!leaseUserIp.equals(getSelfUser())) {
return null;
}
leaseInfo.setLeaseEndDate(nextLeaseDate);
updateLeaseInfo(leaseInfo);
return nextLeaseDate;
}

// 2 沒有租約情況 判斷是否可以獲取租約,只要租約沒有被其他人獲取,則說明有有效租約
boolean success = canGetLease(name);
if (!success) { // 表示被其他機器獲取到了有效的租約
// LOG.info("其他機器獲取到了有效的租約");
return null;
}

// 3 沒有租約而且可以獲取租約的情況,則嘗試使用數據庫原子更新的方式獲取租約,保證只有一臺機器成功獲取租約,而且可以運行
boolean flag = tryGetLease(name, nextLeaseDate);
if (flag) { // 獲取租約成功
newApplyLease.set(true);
return nextLeaseDate;
}
return null;

}
ApplyTask內部調用applyLeaseTask,如果已有租約則更新租約時間,沒有租約則判斷是否可以獲取租約,可以則執(zhí)行tryGetLeasetryGetLease /**
* 更新數據庫,占用租期并更新租期時間
*
* @param time
*/
protected boolean tryGetLease(String name, Date time) {
// LOG.info("嘗試獲取租約 lease name is : " + name + " 下次到期時間: "
// + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(time));
LeaseInfo validateLeaseInfo = queryValidateLease(name);

if (validateLeaseInfo == null) {// 這里有兩種情況 1 數據庫里面沒有租約信息 2 數據庫里面有租約信息但是已經過期
Integer count = countLeaseInfo(name);
if (count == null || count == 0) {// 表示現在數據庫里面沒有任何租約信息,插入租約成功則表示獲取成功,失敗表示在這一時刻其他機器獲取了租約
// LOG.info("數據庫中暫時沒有租約信息,嘗試原子插入租約:" + name);
// fix.2020.08.13,經過一致性hash計算,該名字的任務不應該在本機執(zhí)行,直接返回,無需插入。只有分配到hash執(zhí)行權限的機器才可以插入并獲取租約
if (!getSelfUser().equals(getConsistentHashHost(name))) {
return false;
}
validateLeaseInfo = new LeaseInfo();
validateLeaseInfo.setLeaseName(name);
validateLeaseInfo.setLeaseUserIp(getSelfUser());
validateLeaseInfo.setLeaseEndDate(time);
validateLeaseInfo.setStatus(1);
validateLeaseInfo.setVersion(1);
if (insert(validateLeaseInfo)) {
LOG.info("數據庫中暫時沒有租約信息,原子插入成功,獲取租約成功:" + name);
return true;
} else {
LOG.info("數據庫中暫時沒有租約信息,原子插入失敗,已經被其他機器獲取租約:" + name);
return false;
}
} else { // 表示數據庫里面有一條但是無效,這里需要兩臺機器按照version進行原子更新,更新成功的獲取租約
// LOG.info("數據庫中有一條無效的租約信息,嘗試根據版本號去原子更新租約信息:" + name);
LeaseInfo inValidateLeaseInfo = queryInValidateLease(name);
if (inValidateLeaseInfo == null) {// 說明這個時候另外一臺機器獲取成功了
LOG.info("另外一臺機器獲取成功了租約:" + name);
return false;
}
// fix.2020.08.13,機器重啟之后,該名字的任務已經不分配在此機器上執(zhí)行,直接返回,無需更新數據庫
if (!getSelfUser().equals(getConsistentHashHost(name))) {
return false;
}
inValidateLeaseInfo.setLeaseName(name);
inValidateLeaseInfo.setLeaseUserIp(getSelfUser());
inValidateLeaseInfo.setLeaseEndDate(time);
inValidateLeaseInfo.setStatus(1);
boolean success = updateDBLeaseInfo(inValidateLeaseInfo);
if (success) {
LOG.info("LeaseServiceImpl 原子更新租約成功,當前機器獲取到了租約信息:" + name);
} else {
LOG.info("LeaseServiceImpl 原子更新租約失敗,租約被其他機器獲取:" + name);
}
return success;
}

} else { // 判斷是否是自己獲取了租約,如果是自己獲取了租約則更新時間(內存和數據庫),
// 這里是為了解決機器重啟的情況,機器重啟,內存中沒有租約信息,但是實際上該用戶是有租約權限的
// fix.2020.08.13,租約的ip與本機ip相等,且滿足一致性hash策略,才會被本機執(zhí)行
String leaseUserIp = validateLeaseInfo.getLeaseUserIp();
if (leaseUserIp.equals(getSelfUser())) {
// 如果當期用戶有租約信息,則更新數據庫
validateLeaseInfo.setLeaseEndDate(time);
boolean hasUpdate = updateLeaseInfo(validateLeaseInfo);
if (hasUpdate) {
LOG.info(
"LeaseServiceImpl機器重啟情況,當前用戶有租約信息,并且更新數據庫成功,租約信息為 name :" + validateLeaseInfo.getLeaseName()
+ " ip : " + validateLeaseInfo.getLeaseUserIp() + " 到期時間 : " + new SimpleDateFormat(
"yyyy-MM-dd HH:mm:ss").format(validateLeaseInfo.getLeaseEndDate()));
return true;
} else {
LOG.info("LeaseServiceImpl 機器重啟情況,當前用戶有租約信息,并且更新數據庫失敗,表示失去租約:" + name);
return false;
}
}
// LOG.info("LeaseServiceImpl 租約被其他機器獲取,租約信息為 name :" + validateLeaseInfo.getLeaseName() + " ip : "
// + validateLeaseInfo.getLeaseUserIp() + " 到期時間 : "
// + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(validateLeaseInfo.getLeaseEndDate()));
return false;
}

}

protected LeaseInfo queryValidateLease(String name) {
//String sql = "SELECT * FROM lease_info WHERE lease_name ='" + name + "' and status=1 and lease_end_time>now()";
//// LOG.info("LeaseServiceImpl query validate lease sql:" + sql);
//return queryLease(name, sql);
return leaseStorage.queryValidateLease(name);
}
tryGetLease先通過queryValidateLease查詢租約信息,若沒有租約則插入,若過期則根據版本號更新,若已有租約則判斷是否是自己獲取了租約,是則更新租約信息LeaseServiceImplpublic class LeaseServiceImpl extends BasedLesaseImpl {

private static final Log LOG = LogFactory.getLog(LeaseServiceImpl.class);

private transient ConcurrentHashMap<String, HoldLockTask> holdLockTasks = new ConcurrentHashMap();

protected ConcurrentHashMap<String, HoldLockFunture> seizeLockingFuntures = new ConcurrentHashMap<>();
//如果是搶占鎖狀態(tài)中,則不允許申請鎖

public LeaseServiceImpl() {
super();
}

/**
* 嘗試獲取鎖,可以等待waitTime,如果到點未返回,則直接返回。如果是-1,則一直等待
*
* @param name 業(yè)務名稱
* @param lockerName 鎖名稱
* @param waitTime 等待時間,是微秒單位
* @return
*/
@Override
public boolean tryLocker(String name, String lockerName, long waitTime) {
return tryLocker(name, lockerName, waitTime, ILeaseService.DE**T_LOCK_TIME);
}

@Override
public boolean tryLocker(String name, String lockerName, long waitTime, int lockTimeSecond) {
long now = System.currentTimeMillis();
boolean success = lock(name, lockerName, lockTimeSecond);
while (!success) {
if (waitTime > -1 && (System.currentTimeMillis() – now > waitTime)) {
break;
}
success = lock(name, lockerName, lockTimeSecond);
if (success) {
return success;
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOG.error("LeaseServiceImpl try locker error", e);
}
}
return success;

}

@Override
public boolean lock(String name, String lockerName) {
return lock(name, lockerName, ILeaseService.DE**T_LOCK_TIME);
}

@Override
public boolean lock(String name, String lockerName, int leaseSecond) {
lockerName = createLockName(name, lockerName);
Future future = seizeLockingFuntures.get(lockerName);
if (future != null && ((HoldLockFunture)future).isDone == false) {
return false;
}
Date nextLeaseDate =
DateUtil.addSecond(new Date(), leaseSecond);// 默認鎖定5分鐘,用完需要立刻釋放.如果時間不同步,可能導致鎖失敗
return tryGetLease(lockerName, nextLeaseDate);
}

@Override
public boolean unlock(String name, String lockerName) {
// LOG.info("LeaseServiceImpl unlock,name:" + name);
lockerName = createLockName(name, lockerName);
LeaseInfo validateLeaseInfo = queryValidateLease(lockerName);
if (validateLeaseInfo == null) {
LOG.warn("LeaseServiceImpl unlock,validateLeaseInfo is null,lockerName:" + lockerName);
}
if (validateLeaseInfo != null && validateLeaseInfo.getLeaseUserIp().equals(getSelfUser())) {
validateLeaseInfo.setStatus(0);
updateDBLeaseInfo(validateLeaseInfo);
}
HoldLockTask holdLockTask = holdLockTasks.remove(lockerName);
if (holdLockTask != null) {
holdLockTask.close();
}
leaseName2Date.remove(lockerName);
return false;
}

/**
* 如果有鎖,則一直持有,如果不能獲取,則結束。和租約不同,租約是沒有也會嘗試重試,一備對方掛機,自己可以接手工作
*
* @param name
* @param secondeName
* @param lockTimeSecond 獲取鎖的時間
* @return
*/
@Override
public boolean holdLock(String name, String secondeName, int lockTimeSecond) {
if (hasHoldLock(name, secondeName)) {
return true;
}
synchronized (this) {
if (hasHoldLock(name, secondeName)) {
return true;
}
String lockerName = createLockName(name, secondeName);
Date nextLeaseDate =
DateUtil.addSecond(new Date(), lockTimeSecond);
boolean success = tryGetLease(lockerName, nextLeaseDate);// 申請鎖,鎖的時間是leaseTerm
if (!success) {
return false;
}
leaseName2Date.put(lockerName, nextLeaseDate);

if (!holdLockTasks.containsKey(lockerName)) {
HoldLockTask holdLockTask = new HoldLockTask(lockTimeSecond, lockerName, this);
holdLockTask.start();
holdLockTasks.putIfAbsent(lockerName, holdLockTask);
}
}

return true;
}

/**
* 是否持有鎖,不訪問數據庫,直接看本地
*
* @param name
* @param secondeName
* @return
*/
@Override
public boolean hasHoldLock(String name, String secondeName) {
String lockerName = createLockName(name, secondeName);
return hasLease(lockerName);
}

@Override
public List<LeaseInfo> queryLockedInstanceByNamePrefix(String name, String lockerNamePrefix) {
String leaseNamePrefix = MapKeyUtil.createKey(name, lockerNamePrefix);
return queryValidateLeaseByNamePrefix(leaseNamePrefix);
}

//……
}
LeaseServiceImpl繼承了BasedLesaseImpl,tryLocker方**根據等待時間循環(huán)執(zhí)行l(wèi)ock,lock方法則執(zhí)行tryGetLease,unlock方法則更新租約信息,同時移除內存記錄;holdLock則通過hasHoldLock判斷是否持有鎖,若有則返回,沒有則執(zhí)行tryGetLeaseILeaseStoragepublic interface ILeaseStorage {

/**
* 更新lease info,需要是原子**作,存儲保障多線程**作的原子性
*
* @param leaseInfo 租約表數據
* @return
*/
boolean updateLeaseInfo(LeaseInfo leaseInfo);

/**
* 統(tǒng)計這個租約名稱下,LeaseInfo對象個數
*
* @param leaseName 租約名稱,無特殊要求,相同名稱會競爭租約
* @return
*/
Integer countLeaseInfo(String leaseName);

/**
* 查詢無效的的租約
*
* @param leaseName 租約名稱,無特殊要求,相同名稱會競爭租約
* @return
*/
LeaseInfo queryInValidateLease(String leaseName);

/**
* 查詢有效的的租約
*
* @param leaseName 租約名稱,無特殊要求,相同名稱會競爭租約
* @return
*/
LeaseInfo queryValidateLease(String leaseName);

/**
* 按前綴查詢有效的租約信息
*
* @param namePrefix
* @return
*/
List<LeaseInfo> queryValidateLeaseByNamePrefix(String namePrefix);

/**
* 增加租約
*
* @param leaseInfo 租約名稱,無特殊要求,相同名稱會競爭租約
*/
void addLeaseInfo(LeaseInfo leaseInfo);

}
ILeaseStorage接口定義了updateLeaseInfo、countLeaseInfo、queryInValidateLease、queryValidateLease、queryValidateLeaseByNamePrefix、addLeaseInfo方法DBLeaseStoragepublic class DBLeaseStorage implements ILeaseStorage {
private static final Log LOG = LogFactory.getLog(DBLeaseStorage.class);
protected JDBCDriver jdbcDataSource;
private String url;
protected String userName;
protected String password;
protected String jdbc;

public DBLeaseStorage(String jdbc, String url, String userName, String password) {
this.jdbc = jdbc;
this.url = url;
this.userName = userName;
this.password = password;
jdbcDataSource = DriverBuilder.createDriver(jdbc, url, userName, password);
}

@Override
public boolean updateLeaseInfo(LeaseInfo leaseInfo) {
String sql = "UPDATE lease_info SET version=version+1,status=#{status},gmt_modified=now()";
String whereSQL = " WHERE id=#{id} and version=#{version}";

if (StringUtil.isNotEmpty(leaseInfo.getLeaseName())) {
sql += ",lease_name=#{leaseName}";
}
if (StringUtil.isNotEmpty(leaseInfo.getLeaseUserIp())) {
sql += ",lease_user_ip=#{leaseUserIp}";
}
if (leaseInfo.getLeaseEndDate() != null) {
sql += ",lease_end_time=#{leaseEndDate}";
}
sql += whereSQL;
sql = SQLUtil.parseIbatisSQL(leaseInfo, sql);
try {
int count = getOrCreateJDBCDataSource().update(sql);
boolean success = count > 0;
if (success) {
synchronized (this) {
leaseInfo.setVersion(leaseInfo.getVersion() + 1);
}
} else {
System.out.println(count);
}
return success;
} catch (Exception e) {
LOG.error("LeaseServiceImpl updateLeaseInfo excuteUpdate error", e);
throw new RuntimeException("execute sql error " + sql, e);
}
}

@Override
public Integer countLeaseInfo(String leaseName) {
String sql = "SELECT count(*) as c FROM lease_info WHERE lease_name = '" + leaseName + "' and status = 1";
try {

List<Map<String, Object>> rows = getOrCreateJDBCDataSource().queryForList(sql);
if (rows == null || rows.size() == 0) {
return null;
}
Long value = (Long) rows.get(0).get("c");
return value.intValue();
} catch (Exception e) {
throw new RuntimeException("execute sql error " + sql, e);
}
}

@Override
public LeaseInfo queryInValidateLease(String leaseName) {
String sql = "SELECT * FROM lease_info WHERE lease_name ='" + leaseName + "' and status=1 and lease_end_time<'" + DateUtil.getCurrentTimeString() + "'";
LOG.info("LeaseServiceImpl queryInValidateLease builder:" + sql);
return queryLease(leaseName, sql);
}

@Override
public LeaseInfo queryValidateLease(String leaseName) {
String sql = "SELECT * FROM lease_info WHERE lease_name ='" + leaseName + "' and status=1 and lease_end_time>now()";
return queryLease(leaseName, sql);
}

@Override
public List<LeaseInfo> queryValidateLeaseByNamePrefix(String namePrefix) {
String sql = "SELECT * FROM lease_info WHERE lease_name like '" + namePrefix + "%' and status=1 and lease_end_time>now()";
try {
List<LeaseInfo> leaseInfos = new ArrayList<>();
List<Map<String, Object>> rows = getOrCreateJDBCDataSource().queryForList(sql);
if (rows == null || rows.size() == 0) {
return null;
}
for (Map<String, Object> row : rows) {
LeaseInfo leaseInfo = convert(row);
leaseInfos.add(leaseInfo);
}

return leaseInfos;
} catch (Exception e) {
throw new RuntimeException("execute sql error " + sql, e);
}
}

@Override
public void addLeaseInfo(LeaseInfo leaseInfo) {
String sql =
" REPLACE INTO lease_info(lease_name,lease_user_ip,lease_end_time,status,version,gmt_create,gmt_modified)"
+ " VALUES (#{leaseName},#{leaseUserIp},#{leaseEndDate},#{status},#{version},now(),now())";
sql = SQLUtil.parseIbatisSQL(leaseInfo, sql);
try {

getOrCreateJDBCDataSource().execute(sql);
} catch (Exception e) {
LOG.error("LeaseServiceImpl execute sql error,sql:" + sql, e);
throw new RuntimeException("execute sql error " + sql, e);
}
}

protected JDBCDriver getOrCreateJDBCDataSource() {
if (this.jdbcDataSource == null || !this.jdbcDataSource.isValidate()) {
synchronized (this) {
if (this.jdbcDataSource == null || !this.jdbcDataSource.isValidate()) {
this.jdbcDataSource =
DriverBuilder.createDriver(this.jdbc, this.url, this.userName, this.password);
}
}
}
return jdbcDataSource;
}

protected LeaseInfo queryLease(String name, String sql) {
try {
List<Map<String, Object>> rows = getOrCreateJDBCDataSource().queryForList(sql);
if (rows == null || rows.size() == 0) {
return null;
}
return convert(rows.get(0));
} catch (Exception e) {
throw new RuntimeException("execute sql error " + sql, e);
}
}

protected LeaseInfo convert(Map<String, Object> map) {
LeaseInfo leaseInfo = new LeaseInfo();
leaseInfo.setId(getMapLongValue("id", map));
leaseInfo.setCreateTime(getMapDateValue("gmt_create", map));
leaseInfo.setLeaseEndDate(getMapDateValue("lease_end_time", map));
leaseInfo.setLeaseName(getMapValue("lease_name", map, String.class));
leaseInfo.setLeaseUserIp(getMapValue("lease_user_ip", map, String.class));
Integer status = getMapValue("status", map, Integer.class);
if (status != null) {
leaseInfo.setStatus(status);
}
leaseInfo.setUpdateTime(getMapDateValue("gmt_modified", map));
Long version = getMapLongValue("version", map);
if (version != null) {
leaseInfo.setVersion(version);
}
return leaseInfo;
}

@SuppressWarnings("unchecked")
private <T> T getMapValue(String fieldName, Map<String, Object> map, Class<T> integerClass) {
Object value = map.get(fieldName);
if (value == null) {
return null;
}
return (T) value;
}

private Long getMapLongValue(String fieldName, Map<String, Object> map) {
Object value = map.get(fieldName);
if (value == null) {
return null;
}
if (value instanceof Long) {
return (Long) value;
}
if (value instanceof BigInteger) {
return ((BigInteger) value).longValue();
}
return null;
}

private Date getMapDateValue(String fieldName, Map<String, Object> map) {
Object value = map.get(fieldName);
if (value == null) {
return null;
}
if (value instanceof Date) {
return (Date) value;
}
if (value instanceof String) {
return DateUtil.parseTime(((String) value));
}
return null;

}

}
DBLeaseStorage實現了ILeaseStorage接口,使用jdbc實現了其方法小結

rocketmq-streams的LeaseService基于db實現了租約和鎖,可用于主備場景切換。

拓展知識:

前沿拓展:


本文主要研究一下rocketmq-streams的ILeaseService

ILeaseService/**
* 通過db實現租約和鎖,可以更輕量級,減少其他中間件的依賴 使用主備場景,只有一個實例運行,當當前實例掛掉,在一定時間內,會被其他實例接手 也可以用于全局鎖
*/
public interface ILeaseService {

/**
* 默認鎖定時間
*/
static final int DE**T_LOCK_TIME = 60 * 5;

/**
* 檢查某用戶當前時間是否具有租約。這個方法是純內存**作,無性能開銷
*
* @return true,租約有效;false,租約無效
*/
boolean hasLease(String name);

/**
* 申請租約,會啟動一個線程,不停申請租約,直到申請成功。 申請成功后,每 租期/2 續(xù)約。 如果目前被其他租戶獲取租約,只有在對方租約失效,后才允許新的租戶獲取租約
*
* @param name 租約名稱,無特殊要求,相同名稱會競爭租約
*/
void startLeaseTask(String name);

/**
* 申請租約,會啟動一個線程,不停申請租約,直到申請成功。 申請成功后,每 租期/2 續(xù)約。 如果目前被其他租戶獲取租約,只有在對方租約失效,后才允許新的租戶獲取租約
*
* @param name 租約名稱,無特殊要求,相同名稱會競爭租約
* @param callback 當第一獲取租約時,回調此函數
*/
void startLeaseTask(final String name, ILeaseGetCallback callback);

/**
* 申請租約,會啟動一個線程,不停申請租約,直到申請成功。 申請成功后,每 租期/2 續(xù)約。 如果目前被其他租戶獲取租約,只有在對方租約失效,后才允許新的租戶獲取租約
*
* @param name 租約名稱,無特殊要求,相同名稱會競爭租約
* @param leaseTermSecond 租期,在租期內可以做業(yè)務處理,單位是秒
* @param callback 當第一獲取租約時,回調此函數
*/
void startLeaseTask(final String name, int leaseTermSecond, ILeaseGetCallback callback);

/**
* 申請鎖,無論成功與否,立刻返回。如果不釋放,最大鎖定時間是5分鐘
*
* @param name 業(yè)務名稱
* @param lockerName 鎖名稱
* @return 是否枷鎖成功
*/
boolean lock(String name, String lockerName);

/**
* 申請鎖,無論成功與否,立刻返回。默認鎖定時間是5分鐘
*
* @param name 業(yè)務名稱
* @param lockerName 鎖名稱
* @param lockTimeSecond 如果不釋放,鎖定的最大時間,單位是秒
* @return 是否枷鎖成功
* @return
*/
boolean lock(String name, String lockerName, int lockTimeSecond);

/**
* 申請鎖,如果沒有則等待,等待時間可以指定,如果是-1 則無限等待。如果不釋放,最大鎖定時間是5分鐘
*
* @param name 業(yè)務名稱
* @param lockerName 鎖名稱
* @param waitTime 沒獲取鎖時,最大等待多長時間,如果是-1 則無限等待
* @return 是否枷鎖成功
*/
boolean tryLocker(String name, String lockerName, long waitTime);

/**
* 申請鎖,如果沒有則等待,等待時間可以指定,如果是-1 則無限等待。如果不釋放,最大鎖定時間是lockTimeSecond
*
* @param name 業(yè)務名稱
* @param lockerName 鎖名稱
* @param waitTime 沒獲取鎖時,最大等待多長時間,如果是-1 則無限等待
* @param lockTimeSecond 如果不釋放,鎖定的最大時間,單位是秒
* @return 是否枷鎖成功
*/
boolean tryLocker(String name, String lockerName, long waitTime, int lockTimeSecond);

/**
* 釋放鎖
*
* @param name
* @param lockerName
* @return
*/
boolean unlock(String name, String lockerName);

/**
* 對于已經獲取鎖的,可以通過這個方法,一直持有鎖。 和租約的區(qū)別是,當釋放鎖后,無其他實例搶占。無法實現主備模式
*
* @param name 業(yè)務名稱
* @param lockerName 鎖名稱
* @param lockTimeSecond 租期,這個方**自動續(xù)約,如果不主動釋放,會一直持有鎖
* @return 是否成功獲取鎖
*/
boolean holdLock(String name, String lockerName, int lockTimeSecond);

/**
* 是否持有鎖,不會申請鎖。如果以前申請過,且未過期,返回true,否則返回false
*
* @param name 業(yè)務名稱
* @param lockerName 鎖名稱
* @return
*/
boolean hasHoldLock(String name, String lockerName);

List<LeaseInfo> queryLockedInstanceByNamePrefix(String name, String lockerNamePrefix);

}
ILeaseService接口定義了hasLease、startLeaseTask、lock、tryLocker、unlock、holdLock、hasHoldLock、queryLockedInstanceByNamePrefix方法BasedLesaseImplpublic abstract class BasedLesaseImpl implements ILeaseService {
private static final Log LOG = LogFactory.getLog(BasedLesaseImpl.class);

private static final String CONSISTENT_HASH_PREFIX = "consistent_hash_";
private static final AtomicBoolean syncStart = new AtomicBoolean(false);
private static final int synTime = 120; // 5分鐘的一致性hash同步時間太久了,改為2分鐘
protected ScheduledExecutorService taskExecutor = null;
protected int leaseTerm = 300 * 2; // 租約時間

// protected transient JDBCDriver jdbcDataSource = null;
protected ILeaseStorage leaseStorage;
protected volatile Map<String, Date> leaseName2Date = new ConcurrentHashMap<>(); // 每個lease name對應的租約到期時間

public BasedLesaseImpl() {

taskExecutor = new ScheduledThreadPoolExecutor(10);
}

/**
* lease_name: consistent_hash_ip, lease_user_ip: ip,定時刷新lease_info表,檢查一致性hash環(huán)的節(jié)點情況
*
* @param name
* @return
*/
@Override
public boolean hasLease(String name) {
// 內存中沒有租約信息則表示 沒有租約
Date leaseEndTime = leaseName2Date.get(name);
if (leaseEndTime == null) {
// LOG.info("內存中根據 " + name + "沒有查詢到租約信息,表示沒有租約");
return false;
}
// LOG.info("查詢是否有租約 name:" + name + " ,當前時間:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())
// + " 租約到期時間 " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(leaseEndTime));
// 有租約時間,并且租約時間大于當前時間,表示有租約信息
if (new Date().before(leaseEndTime)) {
return true;
}

return false;
}

private final Map<String, AtomicBoolean> startLeaseMap = new HashMap<>();

@Override
public void startLeaseTask(final String name) {
startLeaseTask(name, this.leaseTerm, null);
}

@Override
public void startLeaseTask(final String name, ILeaseGetCallback callback) {
startLeaseTask(name, this.leaseTerm, callback);
}

@Override
public void startLeaseTask(final String name, int leaseTerm, ILeaseGetCallback callback) {
ApplyTask applyTask = new ApplyTask(leaseTerm, name, callback);
startLeaseTask(name, applyTask, leaseTerm / 2, true);
}

/**
* 啟動定時器,定時執(zhí)行任務,確保任務可重入
*
* @param name
* @param runnable 具體任務
* @param scheduleTime 調度時間
* @param startNow 是否立刻啟動一次
*/
protected void startLeaseTask(final String name, Runnable runnable, int scheduleTime, boolean startNow) {
AtomicBoolean isStartLease = startLeaseMap.get(name);//多次調用,只啟動一次定時任務
if (isStartLease == null) {
synchronized (this) {
isStartLease = startLeaseMap.get(name);
if (isStartLease == null) {
isStartLease = new AtomicBoolean(false);
startLeaseMap.put(name, isStartLease);
}
}
}
if (isStartLease.compareAndSet(false, true)) {
if (startNow) {
runnable.run();
}
taskExecutor.scheduleWithFixedDelay(runnable, 0, scheduleTime, TimeUnit.SECONDS);
}
}

//……
}
BasedLesaseImpl聲明實現了ILeaseService,它依賴ILeaseStorage,startLeaseTask方**創(chuàng)建ApplyTask,第二以固定間隔調度執(zhí)行ApplyTask /**
* 續(xù)約任務
*/
protected class ApplyTask implements Runnable {

protected String name;
protected int leaseTerm;
protected ILeaseGetCallback callback;

public ApplyTask(int leaseTerm, String name) {
this(leaseTerm, name, null);
}

public ApplyTask(int leaseTerm, String name, ILeaseGetCallback callback) {
this.name = name;
this.leaseTerm = leaseTerm;
this.callback = callback;
}

@Override
public void run() {
try {
// LOG.info("LeaseServiceImpl name: " + name + "開始獲取租約…");
AtomicBoolean newApplyLease = new AtomicBoolean(false);
Date leaseDate = applyLeaseTask(leaseTerm, name, newApplyLease);
if (leaseDate != null) {
leaseName2Date.put(name, leaseDate);
LOG.info("LeaseServiceImpl, name: " + name + " " + getSelfUser() + " 獲取租約成功, 租約到期時間為 "
+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(leaseDate));
} else {
// fix.2020.08.13 這時name對應的租約可能還在有效期內,或者本機還持有租約,需要remove
// leaseName2Date.remove(name);
LOG.info("LeaseServiceImpl name: " + name + " " + getSelfUser() + " 獲取租約失敗 ");
}
if (newApplyLease.get() && callback != null) {
callback.callback(leaseDate);
}
} catch (Exception e) {
LOG.error(" LeaseServiceImpl name: " + name + " " + getSelfUser() + " 獲取租約出現異常 ", e);
}

}
}

/**
* 申請租約,如果當期租約有效,直接更新一個租約周期,如果當前租約無效,先查詢是否有有效的租約,如果有申請失敗,否則直接申請租約
*/
protected Date applyLeaseTask(int leaseTerm, String name, AtomicBoolean newApplyLease) {

// 計算下一次租約時間 = 當前時間 + 租約時長
Date nextLeaseDate = DateUtil.addSecond(new Date(), leaseTerm);

// 1 如果已經有租約,則更新租約時間(內存和數據庫)即可
if (hasLease(name)) {
// LOG.info("用戶已有租約,更新數據庫和內存中的租約信息");
// 更新數據庫
LeaseInfo leaseInfo = queryValidateLease(name);
if (leaseInfo == null) {
LOG.error("LeaseServiceImpl applyLeaseTask leaseInfo is null");
return null;
}
// fix.2020.08.13,與本機ip相等且滿足一致性hash分配策略,才續(xù)約,其他情況為null
String leaseUserIp = leaseInfo.getLeaseUserIp();
if (!leaseUserIp.equals(getSelfUser())) {
return null;
}
leaseInfo.setLeaseEndDate(nextLeaseDate);
updateLeaseInfo(leaseInfo);
return nextLeaseDate;
}

// 2 沒有租約情況 判斷是否可以獲取租約,只要租約沒有被其他人獲取,則說明有有效租約
boolean success = canGetLease(name);
if (!success) { // 表示被其他機器獲取到了有效的租約
// LOG.info("其他機器獲取到了有效的租約");
return null;
}

// 3 沒有租約而且可以獲取租約的情況,則嘗試使用數據庫原子更新的方式獲取租約,保證只有一臺機器成功獲取租約,而且可以運行
boolean flag = tryGetLease(name, nextLeaseDate);
if (flag) { // 獲取租約成功
newApplyLease.set(true);
return nextLeaseDate;
}
return null;

}
ApplyTask內部調用applyLeaseTask,如果已有租約則更新租約時間,沒有租約則判斷是否可以獲取租約,可以則執(zhí)行tryGetLeasetryGetLease /**
* 更新數據庫,占用租期并更新租期時間
*
* @param time
*/
protected boolean tryGetLease(String name, Date time) {
// LOG.info("嘗試獲取租約 lease name is : " + name + " 下次到期時間: "
// + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(time));
LeaseInfo validateLeaseInfo = queryValidateLease(name);

if (validateLeaseInfo == null) {// 這里有兩種情況 1 數據庫里面沒有租約信息 2 數據庫里面有租約信息但是已經過期
Integer count = countLeaseInfo(name);
if (count == null || count == 0) {// 表示現在數據庫里面沒有任何租約信息,插入租約成功則表示獲取成功,失敗表示在這一時刻其他機器獲取了租約
// LOG.info("數據庫中暫時沒有租約信息,嘗試原子插入租約:" + name);
// fix.2020.08.13,經過一致性hash計算,該名字的任務不應該在本機執(zhí)行,直接返回,無需插入。只有分配到hash執(zhí)行權限的機器才可以插入并獲取租約
if (!getSelfUser().equals(getConsistentHashHost(name))) {
return false;
}
validateLeaseInfo = new LeaseInfo();
validateLeaseInfo.setLeaseName(name);
validateLeaseInfo.setLeaseUserIp(getSelfUser());
validateLeaseInfo.setLeaseEndDate(time);
validateLeaseInfo.setStatus(1);
validateLeaseInfo.setVersion(1);
if (insert(validateLeaseInfo)) {
LOG.info("數據庫中暫時沒有租約信息,原子插入成功,獲取租約成功:" + name);
return true;
} else {
LOG.info("數據庫中暫時沒有租約信息,原子插入失敗,已經被其他機器獲取租約:" + name);
return false;
}
} else { // 表示數據庫里面有一條但是無效,這里需要兩臺機器按照version進行原子更新,更新成功的獲取租約
// LOG.info("數據庫中有一條無效的租約信息,嘗試根據版本號去原子更新租約信息:" + name);
LeaseInfo inValidateLeaseInfo = queryInValidateLease(name);
if (inValidateLeaseInfo == null) {// 說明這個時候另外一臺機器獲取成功了
LOG.info("另外一臺機器獲取成功了租約:" + name);
return false;
}
// fix.2020.08.13,機器重啟之后,該名字的任務已經不分配在此機器上執(zhí)行,直接返回,無需更新數據庫
if (!getSelfUser().equals(getConsistentHashHost(name))) {
return false;
}
inValidateLeaseInfo.setLeaseName(name);
inValidateLeaseInfo.setLeaseUserIp(getSelfUser());
inValidateLeaseInfo.setLeaseEndDate(time);
inValidateLeaseInfo.setStatus(1);
boolean success = updateDBLeaseInfo(inValidateLeaseInfo);
if (success) {
LOG.info("LeaseServiceImpl 原子更新租約成功,當前機器獲取到了租約信息:" + name);
} else {
LOG.info("LeaseServiceImpl 原子更新租約失敗,租約被其他機器獲取:" + name);
}
return success;
}

} else { // 判斷是否是自己獲取了租約,如果是自己獲取了租約則更新時間(內存和數據庫),
// 這里是為了解決機器重啟的情況,機器重啟,內存中沒有租約信息,但是實際上該用戶是有租約權限的
// fix.2020.08.13,租約的ip與本機ip相等,且滿足一致性hash策略,才會被本機執(zhí)行
String leaseUserIp = validateLeaseInfo.getLeaseUserIp();
if (leaseUserIp.equals(getSelfUser())) {
// 如果當期用戶有租約信息,則更新數據庫
validateLeaseInfo.setLeaseEndDate(time);
boolean hasUpdate = updateLeaseInfo(validateLeaseInfo);
if (hasUpdate) {
LOG.info(
"LeaseServiceImpl機器重啟情況,當前用戶有租約信息,并且更新數據庫成功,租約信息為 name :" + validateLeaseInfo.getLeaseName()
+ " ip : " + validateLeaseInfo.getLeaseUserIp() + " 到期時間 : " + new SimpleDateFormat(
"yyyy-MM-dd HH:mm:ss").format(validateLeaseInfo.getLeaseEndDate()));
return true;
} else {
LOG.info("LeaseServiceImpl 機器重啟情況,當前用戶有租約信息,并且更新數據庫失敗,表示失去租約:" + name);
return false;
}
}
// LOG.info("LeaseServiceImpl 租約被其他機器獲取,租約信息為 name :" + validateLeaseInfo.getLeaseName() + " ip : "
// + validateLeaseInfo.getLeaseUserIp() + " 到期時間 : "
// + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(validateLeaseInfo.getLeaseEndDate()));
return false;
}

}

protected LeaseInfo queryValidateLease(String name) {
//String sql = "SELECT * FROM lease_info WHERE lease_name ='" + name + "' and status=1 and lease_end_time>now()";
//// LOG.info("LeaseServiceImpl query validate lease sql:" + sql);
//return queryLease(name, sql);
return leaseStorage.queryValidateLease(name);
}
tryGetLease先通過queryValidateLease查詢租約信息,若沒有租約則插入,若過期則根據版本號更新,若已有租約則判斷是否是自己獲取了租約,是則更新租約信息LeaseServiceImplpublic class LeaseServiceImpl extends BasedLesaseImpl {

private static final Log LOG = LogFactory.getLog(LeaseServiceImpl.class);

private transient ConcurrentHashMap<String, HoldLockTask> holdLockTasks = new ConcurrentHashMap();

protected ConcurrentHashMap<String, HoldLockFunture> seizeLockingFuntures = new ConcurrentHashMap<>();
//如果是搶占鎖狀態(tài)中,則不允許申請鎖

public LeaseServiceImpl() {
super();
}

/**
* 嘗試獲取鎖,可以等待waitTime,如果到點未返回,則直接返回。如果是-1,則一直等待
*
* @param name 業(yè)務名稱
* @param lockerName 鎖名稱
* @param waitTime 等待時間,是微秒單位
* @return
*/
@Override
public boolean tryLocker(String name, String lockerName, long waitTime) {
return tryLocker(name, lockerName, waitTime, ILeaseService.DE**T_LOCK_TIME);
}

@Override
public boolean tryLocker(String name, String lockerName, long waitTime, int lockTimeSecond) {
long now = System.currentTimeMillis();
boolean success = lock(name, lockerName, lockTimeSecond);
while (!success) {
if (waitTime > -1 && (System.currentTimeMillis() – now > waitTime)) {
break;
}
success = lock(name, lockerName, lockTimeSecond);
if (success) {
return success;
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOG.error("LeaseServiceImpl try locker error", e);
}
}
return success;

}

@Override
public boolean lock(String name, String lockerName) {
return lock(name, lockerName, ILeaseService.DE**T_LOCK_TIME);
}

@Override
public boolean lock(String name, String lockerName, int leaseSecond) {
lockerName = createLockName(name, lockerName);
Future future = seizeLockingFuntures.get(lockerName);
if (future != null && ((HoldLockFunture)future).isDone == false) {
return false;
}
Date nextLeaseDate =
DateUtil.addSecond(new Date(), leaseSecond);// 默認鎖定5分鐘,用完需要立刻釋放.如果時間不同步,可能導致鎖失敗
return tryGetLease(lockerName, nextLeaseDate);
}

@Override
public boolean unlock(String name, String lockerName) {
// LOG.info("LeaseServiceImpl unlock,name:" + name);
lockerName = createLockName(name, lockerName);
LeaseInfo validateLeaseInfo = queryValidateLease(lockerName);
if (validateLeaseInfo == null) {
LOG.warn("LeaseServiceImpl unlock,validateLeaseInfo is null,lockerName:" + lockerName);
}
if (validateLeaseInfo != null && validateLeaseInfo.getLeaseUserIp().equals(getSelfUser())) {
validateLeaseInfo.setStatus(0);
updateDBLeaseInfo(validateLeaseInfo);
}
HoldLockTask holdLockTask = holdLockTasks.remove(lockerName);
if (holdLockTask != null) {
holdLockTask.close();
}
leaseName2Date.remove(lockerName);
return false;
}

/**
* 如果有鎖,則一直持有,如果不能獲取,則結束。和租約不同,租約是沒有也會嘗試重試,一備對方掛機,自己可以接手工作
*
* @param name
* @param secondeName
* @param lockTimeSecond 獲取鎖的時間
* @return
*/
@Override
public boolean holdLock(String name, String secondeName, int lockTimeSecond) {
if (hasHoldLock(name, secondeName)) {
return true;
}
synchronized (this) {
if (hasHoldLock(name, secondeName)) {
return true;
}
String lockerName = createLockName(name, secondeName);
Date nextLeaseDate =
DateUtil.addSecond(new Date(), lockTimeSecond);
boolean success = tryGetLease(lockerName, nextLeaseDate);// 申請鎖,鎖的時間是leaseTerm
if (!success) {
return false;
}
leaseName2Date.put(lockerName, nextLeaseDate);

if (!holdLockTasks.containsKey(lockerName)) {
HoldLockTask holdLockTask = new HoldLockTask(lockTimeSecond, lockerName, this);
holdLockTask.start();
holdLockTasks.putIfAbsent(lockerName, holdLockTask);
}
}

return true;
}

/**
* 是否持有鎖,不訪問數據庫,直接看本地
*
* @param name
* @param secondeName
* @return
*/
@Override
public boolean hasHoldLock(String name, String secondeName) {
String lockerName = createLockName(name, secondeName);
return hasLease(lockerName);
}

@Override
public List<LeaseInfo> queryLockedInstanceByNamePrefix(String name, String lockerNamePrefix) {
String leaseNamePrefix = MapKeyUtil.createKey(name, lockerNamePrefix);
return queryValidateLeaseByNamePrefix(leaseNamePrefix);
}

//……
}
LeaseServiceImpl繼承了BasedLesaseImpl,tryLocker方**根據等待時間循環(huán)執(zhí)行l(wèi)ock,lock方法則執(zhí)行tryGetLease,unlock方法則更新租約信息,同時移除內存記錄;holdLock則通過hasHoldLock判斷是否持有鎖,若有則返回,沒有則執(zhí)行tryGetLeaseILeaseStoragepublic interface ILeaseStorage {

/**
* 更新lease info,需要是原子**作,存儲保障多線程**作的原子性
*
* @param leaseInfo 租約表數據
* @return
*/
boolean updateLeaseInfo(LeaseInfo leaseInfo);

/**
* 統(tǒng)計這個租約名稱下,LeaseInfo對象個數
*
* @param leaseName 租約名稱,無特殊要求,相同名稱會競爭租約
* @return
*/
Integer countLeaseInfo(String leaseName);

/**
* 查詢無效的的租約
*
* @param leaseName 租約名稱,無特殊要求,相同名稱會競爭租約
* @return
*/
LeaseInfo queryInValidateLease(String leaseName);

/**
* 查詢有效的的租約
*
* @param leaseName 租約名稱,無特殊要求,相同名稱會競爭租約
* @return
*/
LeaseInfo queryValidateLease(String leaseName);

/**
* 按前綴查詢有效的租約信息
*
* @param namePrefix
* @return
*/
List<LeaseInfo> queryValidateLeaseByNamePrefix(String namePrefix);

/**
* 增加租約
*
* @param leaseInfo 租約名稱,無特殊要求,相同名稱會競爭租約
*/
void addLeaseInfo(LeaseInfo leaseInfo);

}
ILeaseStorage接口定義了updateLeaseInfo、countLeaseInfo、queryInValidateLease、queryValidateLease、queryValidateLeaseByNamePrefix、addLeaseInfo方法DBLeaseStoragepublic class DBLeaseStorage implements ILeaseStorage {
private static final Log LOG = LogFactory.getLog(DBLeaseStorage.class);
protected JDBCDriver jdbcDataSource;
private String url;
protected String userName;
protected String password;
protected String jdbc;

public DBLeaseStorage(String jdbc, String url, String userName, String password) {
this.jdbc = jdbc;
this.url = url;
this.userName = userName;
this.password = password;
jdbcDataSource = DriverBuilder.createDriver(jdbc, url, userName, password);
}

@Override
public boolean updateLeaseInfo(LeaseInfo leaseInfo) {
String sql = "UPDATE lease_info SET version=version+1,status=#{status},gmt_modified=now()";
String whereSQL = " WHERE id=#{id} and version=#{version}";

if (StringUtil.isNotEmpty(leaseInfo.getLeaseName())) {
sql += ",lease_name=#{leaseName}";
}
if (StringUtil.isNotEmpty(leaseInfo.getLeaseUserIp())) {
sql += ",lease_user_ip=#{leaseUserIp}";
}
if (leaseInfo.getLeaseEndDate() != null) {
sql += ",lease_end_time=#{leaseEndDate}";
}
sql += whereSQL;
sql = SQLUtil.parseIbatisSQL(leaseInfo, sql);
try {
int count = getOrCreateJDBCDataSource().update(sql);
boolean success = count > 0;
if (success) {
synchronized (this) {
leaseInfo.setVersion(leaseInfo.getVersion() + 1);
}
} else {
System.out.println(count);
}
return success;
} catch (Exception e) {
LOG.error("LeaseServiceImpl updateLeaseInfo excuteUpdate error", e);
throw new RuntimeException("execute sql error " + sql, e);
}
}

@Override
public Integer countLeaseInfo(String leaseName) {
String sql = "SELECT count(*) as c FROM lease_info WHERE lease_name = '" + leaseName + "' and status = 1";
try {

List<Map<String, Object>> rows = getOrCreateJDBCDataSource().queryForList(sql);
if (rows == null || rows.size() == 0) {
return null;
}
Long value = (Long) rows.get(0).get("c");
return value.intValue();
} catch (Exception e) {
throw new RuntimeException("execute sql error " + sql, e);
}
}

@Override
public LeaseInfo queryInValidateLease(String leaseName) {
String sql = "SELECT * FROM lease_info WHERE lease_name ='" + leaseName + "' and status=1 and lease_end_time<'" + DateUtil.getCurrentTimeString() + "'";
LOG.info("LeaseServiceImpl queryInValidateLease builder:" + sql);
return queryLease(leaseName, sql);
}

@Override
public LeaseInfo queryValidateLease(String leaseName) {
String sql = "SELECT * FROM lease_info WHERE lease_name ='" + leaseName + "' and status=1 and lease_end_time>now()";
return queryLease(leaseName, sql);
}

@Override
public List<LeaseInfo> queryValidateLeaseByNamePrefix(String namePrefix) {
String sql = "SELECT * FROM lease_info WHERE lease_name like '" + namePrefix + "%' and status=1 and lease_end_time>now()";
try {
List<LeaseInfo> leaseInfos = new ArrayList<>();
List<Map<String, Object>> rows = getOrCreateJDBCDataSource().queryForList(sql);
if (rows == null || rows.size() == 0) {
return null;
}
for (Map<String, Object> row : rows) {
LeaseInfo leaseInfo = convert(row);
leaseInfos.add(leaseInfo);
}

return leaseInfos;
} catch (Exception e) {
throw new RuntimeException("execute sql error " + sql, e);
}
}

@Override
public void addLeaseInfo(LeaseInfo leaseInfo) {
String sql =
" REPLACE INTO lease_info(lease_name,lease_user_ip,lease_end_time,status,version,gmt_create,gmt_modified)"
+ " VALUES (#{leaseName},#{leaseUserIp},#{leaseEndDate},#{status},#{version},now(),now())";
sql = SQLUtil.parseIbatisSQL(leaseInfo, sql);
try {

getOrCreateJDBCDataSource().execute(sql);
} catch (Exception e) {
LOG.error("LeaseServiceImpl execute sql error,sql:" + sql, e);
throw new RuntimeException("execute sql error " + sql, e);
}
}

protected JDBCDriver getOrCreateJDBCDataSource() {
if (this.jdbcDataSource == null || !this.jdbcDataSource.isValidate()) {
synchronized (this) {
if (this.jdbcDataSource == null || !this.jdbcDataSource.isValidate()) {
this.jdbcDataSource =
DriverBuilder.createDriver(this.jdbc, this.url, this.userName, this.password);
}
}
}
return jdbcDataSource;
}

protected LeaseInfo queryLease(String name, String sql) {
try {
List<Map<String, Object>> rows = getOrCreateJDBCDataSource().queryForList(sql);
if (rows == null || rows.size() == 0) {
return null;
}
return convert(rows.get(0));
} catch (Exception e) {
throw new RuntimeException("execute sql error " + sql, e);
}
}

protected LeaseInfo convert(Map<String, Object> map) {
LeaseInfo leaseInfo = new LeaseInfo();
leaseInfo.setId(getMapLongValue("id", map));
leaseInfo.setCreateTime(getMapDateValue("gmt_create", map));
leaseInfo.setLeaseEndDate(getMapDateValue("lease_end_time", map));
leaseInfo.setLeaseName(getMapValue("lease_name", map, String.class));
leaseInfo.setLeaseUserIp(getMapValue("lease_user_ip", map, String.class));
Integer status = getMapValue("status", map, Integer.class);
if (status != null) {
leaseInfo.setStatus(status);
}
leaseInfo.setUpdateTime(getMapDateValue("gmt_modified", map));
Long version = getMapLongValue("version", map);
if (version != null) {
leaseInfo.setVersion(version);
}
return leaseInfo;
}

@SuppressWarnings("unchecked")
private <T> T getMapValue(String fieldName, Map<String, Object> map, Class<T> integerClass) {
Object value = map.get(fieldName);
if (value == null) {
return null;
}
return (T) value;
}

private Long getMapLongValue(String fieldName, Map<String, Object> map) {
Object value = map.get(fieldName);
if (value == null) {
return null;
}
if (value instanceof Long) {
return (Long) value;
}
if (value instanceof BigInteger) {
return ((BigInteger) value).longValue();
}
return null;
}

private Date getMapDateValue(String fieldName, Map<String, Object> map) {
Object value = map.get(fieldName);
if (value == null) {
return null;
}
if (value instanceof Date) {
return (Date) value;
}
if (value instanceof String) {
return DateUtil.parseTime(((String) value));
}
return null;

}

}
DBLeaseStorage實現了ILeaseStorage接口,使用jdbc實現了其方法小結

rocketmq-streams的LeaseService基于db實現了租約和鎖,可用于主備場景切換。

拓展知識:

原創(chuàng)文章,作者:九賢生活小編,如若轉載,請注明出處:http://www.cddhlm.com/22716.html