feat: 整合 multi-tenant-task 库,增加 payapi.multitenant 属性配置
diff --git a/payapi/src/main/java/com/supwisdom/dlpay/framework/core/DayendSettleTask.java b/payapi/src/main/java/com/supwisdom/dlpay/framework/core/DayendSettleTask.java
index 1eb9422..bf31c52 100644
--- a/payapi/src/main/java/com/supwisdom/dlpay/framework/core/DayendSettleTask.java
+++ b/payapi/src/main/java/com/supwisdom/dlpay/framework/core/DayendSettleTask.java
@@ -3,33 +3,48 @@
 import com.supwisdom.dlpay.exception.TransactionException;
 import com.supwisdom.dlpay.framework.domain.TSettleLog;
 import com.supwisdom.dlpay.framework.service.DayendSettleService;
-import com.supwisdom.dlpay.framework.util.Constants;
+import com.supwisdom.dlpay.framework.tenant.TenantWorkerTask;
 import com.supwisdom.dlpay.framework.util.StringUtil;
-import com.supwisdom.multitenant.TenantContextHolder;
-import com.supwisdom.multitenant.TenantDetailsProvider;
-import net.javacrumbs.shedlock.core.SchedulerLock;
+import com.supwisdom.multitenant.task.ProcessingStatus;
+import com.supwisdom.multitenant.task.WorkerResult;
+import com.supwisdom.multitenant.task.support.TaskQueue;
+import net.javacrumbs.shedlock.spring.annotation.SchedulerLock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
-@Component
-public class DayendSettleTask {
+@Component("dayendSettleTenantTask")
+public class DayendSettleTask implements TenantWorkerTask {
   @Autowired
   private DayendSettleService dayendSettleService;
 
-  @Autowired
-  private TenantDetailsProvider tenantDetailsProvider;
   private TSettleLog settleLog;
 
   private static final Logger logger = LoggerFactory.getLogger(DayendSettleTask.class);
 
-  @Scheduled(cron = "${dayend.settletask.cron}")
-  @SchedulerLock(name = "DayendSettleTask", lockAtMostForString = "PT30M")
-  public void doSettleTask() {
+  @Scheduled(cron = "${dayend.settletask.cron:-}")
+  @SchedulerLock(name = "DayendSettleTask", lockAtMostFor = "PT30M")
+  public void settleTask() {
+    try {
+      doSettleTask();
+    } catch (Exception ex) {
+      ex.printStackTrace();
+    }
+  }
+
+  @Override
+  public WorkerResult execute(TaskQueue task) {
+    doSettleTask();
+    WorkerResult result = new WorkerResult(task);
+    result.setResult(ProcessingStatus.SUCCESS);
+    result.setMessage("成功");
+    return result;
+  }
+
+  private void doSettleTask() {
     if (logger.isDebugEnabled()) logger.debug("进入日结算任务!");
-    TenantContextHolder.getContext().setTenant(tenantDetailsProvider.defaultTenant());
 
     settleLog = dayendSettleService.doCreateSettleLog(); //记录日志
     try {
diff --git a/payapi/src/main/java/com/supwisdom/dlpay/framework/tenant/TenantWorkerExecutor.java b/payapi/src/main/java/com/supwisdom/dlpay/framework/tenant/TenantWorkerExecutor.java
new file mode 100644
index 0000000..4eaea96
--- /dev/null
+++ b/payapi/src/main/java/com/supwisdom/dlpay/framework/tenant/TenantWorkerExecutor.java
@@ -0,0 +1,43 @@
+package com.supwisdom.dlpay.framework.tenant;
+
+import com.supwisdom.multitenant.task.WorkerExecutor;
+import com.supwisdom.multitenant.task.WorkerResult;
+import com.supwisdom.multitenant.task.support.TaskQueue;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.BeansException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.stereotype.Component;
+
+import java.util.Optional;
+
+@Component
+@Slf4j
+public class TenantWorkerExecutor implements WorkerExecutor, ApplicationContextAware {
+  private ApplicationContext applicationContext;
+
+  @Override
+  public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+    this.applicationContext = applicationContext;
+  }
+
+  private String taskBeanName(TaskQueue task) {
+    return String.format("%sTenantTask", task.getTaskName());
+  }
+
+  private Optional<TenantWorkerTask> findTask(TaskQueue task) {
+    try {
+      TenantWorkerTask workerTask = applicationContext.getBean(taskBeanName(task), TenantWorkerTask.class);
+      return Optional.of(workerTask);
+    } catch (BeansException ex) {
+      log.error("task <{}> not found", taskBeanName(task));
+      throw ex;
+    }
+  }
+
+  @Override
+  public WorkerResult execute(TaskQueue task) {
+    Optional<TenantWorkerTask> workerTask = findTask(task);
+    return workerTask.get().execute(task);
+  }
+}
diff --git a/payapi/src/main/java/com/supwisdom/dlpay/framework/tenant/TenantWorkerTask.java b/payapi/src/main/java/com/supwisdom/dlpay/framework/tenant/TenantWorkerTask.java
new file mode 100644
index 0000000..aacb581
--- /dev/null
+++ b/payapi/src/main/java/com/supwisdom/dlpay/framework/tenant/TenantWorkerTask.java
@@ -0,0 +1,8 @@
+package com.supwisdom.dlpay.framework.tenant;
+
+import com.supwisdom.multitenant.task.WorkerResult;
+import com.supwisdom.multitenant.task.support.TaskQueue;
+
+public interface TenantWorkerTask {
+  WorkerResult execute(TaskQueue task);
+}
diff --git a/payapi/src/main/kotlin/com/supwisdom/dlpay/PayApiApplication.kt b/payapi/src/main/kotlin/com/supwisdom/dlpay/PayApiApplication.kt
index 6118efb..27b9c1c 100644
--- a/payapi/src/main/kotlin/com/supwisdom/dlpay/PayApiApplication.kt
+++ b/payapi/src/main/kotlin/com/supwisdom/dlpay/PayApiApplication.kt
@@ -1,11 +1,7 @@
 package com.supwisdom.dlpay
 
-import com.supwisdom.dlpay.framework.service.TenantService
 import com.supwisdom.dlpay.framework.tenant.TenantCacheKeyGen
-import com.supwisdom.dlpay.framework.util.Constants
 import com.supwisdom.multitenant.TenantContextHolder
-import com.supwisdom.multitenant.TenantDetails
-import com.supwisdom.multitenant.TenantDetailsProvider
 import com.supwisdom.multitenant.annotations.EnableHttpHeaderTenantInterceptor
 import com.supwisdom.multitenant.annotations.EnableSessionTenantInterceptor
 import com.supwisdom.multitenant.exceptions.TenantNotDefException
@@ -13,6 +9,8 @@
 import com.supwisdom.multitenant.jwt.annotations.EnableJwtTenantInterceptor
 import com.supwisdom.multitenant.jwt.config.JwtTenantConfig
 import com.supwisdom.multitenant.jwt.config.JwtToken
+import com.supwisdom.multitenant.storage.annotations.EnableTenantDetailsStorage
+import com.supwisdom.multitenant.task.annotations.EnableMultiTenantTaskWorker
 import io.lettuce.core.ReadFrom
 import mu.KotlinLogging
 import net.javacrumbs.shedlock.core.LockProvider
@@ -188,37 +186,37 @@
     }
 }
 
-@Component
-class MyTenantDetailsProvider : TenantDetailsProvider {
-    @Autowired
-    private lateinit var tenantService: TenantService
-    @Autowired
-    private lateinit var redisTemplate: RedisTemplate<String, String>
-
-    private val logger = KotlinLogging.logger { }
-
-    private val defaultTenant = TenantDetails().apply {
-        id = Constants.DEFAULT_TENANTID
-        dbSchema = "public"
-        dataCenter = "default"
-        enabled = true
-    }
-
-    override fun defaultTenant(): TenantDetails {
-        return defaultTenant
-    }
-
-    override fun createDetailsById(id: String): TenantDetails {
-        logger.debug { "find tenant id <$id> ..." }
-        val schema = redisTemplate.opsForValue().get(id) ?: return defaultTenant
-        return TenantDetails().apply {
-            this.id = id
-            dbSchema = schema
-            dataCenter = "default"
-            enabled = true
-        }
-    }
-}
+//@Component
+//class MyTenantDetailsProvider : TenantDetailsProvider {
+//    @Autowired
+//    private lateinit var tenantService: TenantService
+//    @Autowired
+//    private lateinit var redisTemplate: RedisTemplate<String, String>
+//
+//    private val logger = KotlinLogging.logger { }
+//
+//    private val defaultTenant = TenantDetails().apply {
+//        id = Constants.DEFAULT_TENANTID
+//        dbSchema = "public"
+//        dataCenter = "default"
+//        enabled = true
+//    }
+//
+//    override fun defaultTenant(): TenantDetails {
+//        return defaultTenant
+//    }
+//
+//    override fun createDetailsById(id: String): TenantDetails {
+//        logger.debug { "find tenant id <$id> ..." }
+//        val schema = redisTemplate.opsForValue().get(id) ?: return defaultTenant
+//        return TenantDetails().apply {
+//            this.id = id
+//            dbSchema = schema
+//            dataCenter = "default"
+//            enabled = true
+//        }
+//    }
+//}
 
 @Component
 class MyTenantJwtConfigAdapter : JwtTenantConfigAdapter {
@@ -244,9 +242,11 @@
 @EnableHttpHeaderTenantInterceptor
 @EnableSessionTenantInterceptor
 @EnableJwtTenantInterceptor
+@EnableTenantDetailsStorage
+@EnableMultiTenantTaskWorker(propertyName = "payapi.multitenant")
 @ServletComponentScan
-@EnableRetry
 @EnableAsync
+@EnableRetry
 class PayApiApplication : SpringBootServletInitializer() {
 
     override fun configure(builder: SpringApplicationBuilder): SpringApplicationBuilder {
diff --git a/payapi/src/main/kotlin/com/supwisdom/dlpay/api/scheduler_sourcetype_chk.kt b/payapi/src/main/kotlin/com/supwisdom/dlpay/api/scheduler_sourcetype_chk.kt
index c57ddb1..cb11234 100644
--- a/payapi/src/main/kotlin/com/supwisdom/dlpay/api/scheduler_sourcetype_chk.kt
+++ b/payapi/src/main/kotlin/com/supwisdom/dlpay/api/scheduler_sourcetype_chk.kt
@@ -11,14 +11,16 @@
 import com.supwisdom.dlpay.api.service.TransactionReconciliationService
 import com.supwisdom.dlpay.exception.TransactionException
 import com.supwisdom.dlpay.framework.service.SystemUtilService
-import com.supwisdom.dlpay.framework.util.Constants
+import com.supwisdom.dlpay.framework.tenant.TenantWorkerTask
 import com.supwisdom.dlpay.framework.util.DateUtil
 import com.supwisdom.dlpay.util.ConstantUtil
 import com.supwisdom.multitenant.TenantContextHolder
 import com.supwisdom.multitenant.TenantDetails
-import com.supwisdom.multitenant.TenantDetailsProvider
+import com.supwisdom.multitenant.task.ProcessingStatus
+import com.supwisdom.multitenant.task.WorkerResult
+import com.supwisdom.multitenant.task.support.TaskQueue
 import mu.KotlinLogging
-import net.javacrumbs.shedlock.core.SchedulerLock
+import net.javacrumbs.shedlock.spring.annotation.SchedulerLock
 import org.springframework.beans.BeansException
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.scheduling.annotation.Async
@@ -69,8 +71,8 @@
  *   2. 一致("equal") , 表示一致
  *   3. 挂起("hangup"), 表示补帐出现异常
  */
-@Component
-class SourceTypeCheck {
+@Component("sourcetypeCheckTenantTask")
+class SourceTypeCheck : TenantWorkerTask {
     private val logger = KotlinLogging.logger { }
 
     @Autowired
@@ -82,9 +84,6 @@
     @Autowired
     private lateinit var sourceTypeCheckExecutor: SourceTypeCheckExecutor
 
-    @Autowired
-    private lateinit var tenantDetailsProvider: TenantDetailsProvider
-
     private fun newSourceTypeStatus(sourceType: TSourceType, accdate: String): TSourceTypeCheckStatus {
         return sourceTypeService.saveOrUpdateSourceTypeCheckStatus(TSourceTypeCheckStatus().apply {
             this.sourceType = sourceType.sourceType
@@ -161,10 +160,25 @@
         }
     }
 
+    override fun execute(task: TaskQueue?): WorkerResult {
+        doExecute()
+        return WorkerResult(task).apply {
+            result = ProcessingStatus.SUCCESS
+            message = "成功"
+        }
+    }
+
     @Scheduled(cron = "\${payapi.sourcetype.checker.scheduler:-}")
-    @SchedulerLock(name = "payapiSourceTypeCheckLock", lockAtMostForString = "PT30M")
+    @SchedulerLock(name = "payapiSourceTypeCheckLock", lockAtMostFor = "PT30M")
     fun runCheck() {
-        TenantContextHolder.getContext().tenant = tenantDetailsProvider.defaultTenant()
+        try {
+            doExecute()
+        } catch (ex: Exception) {
+            ex.printStackTrace()
+        }
+    }
+
+    private fun doExecute() {
 
         val allSourcetype = sourceTypeService.allEnabledSourcetype
                 ?: return
diff --git a/payapi/src/main/kotlin/com/supwisdom/dlpay/api/scheduler_task.kt b/payapi/src/main/kotlin/com/supwisdom/dlpay/api/scheduler_task.kt
index ba4b5e9..699bea3 100644
--- a/payapi/src/main/kotlin/com/supwisdom/dlpay/api/scheduler_task.kt
+++ b/payapi/src/main/kotlin/com/supwisdom/dlpay/api/scheduler_task.kt
@@ -10,16 +10,20 @@
 import com.supwisdom.dlpay.api.service.DtlQueryResultService
 import com.supwisdom.dlpay.api.service.TransactionServiceProxy
 import com.supwisdom.dlpay.framework.service.SystemUtilService
+import com.supwisdom.dlpay.framework.tenant.TenantWorkerTask
 import com.supwisdom.dlpay.framework.util.TradeDict
 import com.supwisdom.dlpay.util.ConstantUtil
-import net.javacrumbs.shedlock.core.SchedulerLock
+import com.supwisdom.multitenant.task.ProcessingStatus
+import com.supwisdom.multitenant.task.WorkerResult
+import com.supwisdom.multitenant.task.support.TaskQueue
+import net.javacrumbs.shedlock.spring.annotation.SchedulerLock
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.scheduling.annotation.Scheduled
 import org.springframework.stereotype.Component
 import org.springframework.stereotype.Service
 
-@Service
-class MySchedulerTask {
+@Service("shopbalanceUpdaterTenantTask")
+class MySchedulerTask : TenantWorkerTask {
     @Autowired
     private lateinit var shopaccService: ShopaccService
 
@@ -28,23 +32,38 @@
     }
 
     @Scheduled(cron = "\${shopbalance.updater.cron:-}")
-    @SchedulerLock(name = "dealShopUnupdatedDtl", lockAtMostForString = "PT10M")
+    @SchedulerLock(name = "dealShopUnupdatedDtl", lockAtMostFor = "PT10M")
     fun dealShopUnupdatedDtl() {
         try {
-            shopaccService.findUnupdatedShopDtl(100).forEach {
-                doShopBlanceUpdate(it)
-            }
+            doExecute()
         } catch (ex: Exception) {
             ex.printStackTrace()
         }
     }
+
+    private fun doExecute() {
+        shopaccService.findUnupdatedShopDtl(100).forEach {
+            doShopBlanceUpdate(it)
+        }
+    }
+
+    /**
+     * tenant task 入口
+     */
+    override fun execute(task: TaskQueue?): WorkerResult {
+        doExecute()
+        return WorkerResult(task).apply {
+            this.result = ProcessingStatus.SUCCESS
+            this.message = "成功"
+        }
+    }
 }
 
 /**
  * 定时扫描TDtlQuery,查询第三方流水状态
  * */
-@Component
-class DtlQueryResultSchedulerTask {
+@Component("dtlQueryResultTenantTask")
+class DtlQueryResultSchedulerTask : TenantWorkerTask {
     @Autowired
     private lateinit var systemUtilService: SystemUtilService
     @Autowired
@@ -57,23 +76,35 @@
     private lateinit var agentPayServiceContext: AgentPayServiceContext
 
     @Scheduled(cron = "\${query.third.transdtl.result.cron:-}")
-    @SchedulerLock(name = "DtlQueryResultSchedulerTask", lockAtMostForString = "PT10M")
+    @SchedulerLock(name = "DtlQueryResultSchedulerTask", lockAtMostFor = "PT10M")
     fun queryThirdTransdtlResult() {
         try {
-            //仅查询当天数据,查询次数在规定次数之下
-            dtlQueryResultService.getNeedQueryRecords(systemUtilService.accdate, ConstantUtil.QUERY_MAX_COUNT).forEach {
-                try {
-                    doQuery(it)
-                } catch (exp: Exception) {
-                    it.qcnt = it.qcnt + 1
-                    dtlQueryResultService.saveOrUpdateDtlQuery(it) //次数加一
-                }
-            }
+            doExecute()
         } catch (ex: Exception) {
             ex.printStackTrace()
         }
     }
 
+    override fun execute(task: TaskQueue?): WorkerResult {
+        doExecute()
+        return WorkerResult(task).apply {
+            result = ProcessingStatus.SUCCESS
+            message = "成功"
+        }
+    }
+
+    private fun doExecute() {
+        //仅查询当天数据,查询次数在规定次数之下
+        dtlQueryResultService.getNeedQueryRecords(systemUtilService.accdate, ConstantUtil.QUERY_MAX_COUNT).forEach {
+            try {
+                doQuery(it)
+            } catch (exp: Exception) {
+                it.qcnt = it.qcnt + 1
+                dtlQueryResultService.saveOrUpdateDtlQuery(it) //次数加一
+            }
+        }
+    }
+
     private fun doQuery(dtlQuery: TDtlQuery) {
         if (ConstantUtil.QUERYTYPE_NEED_QUERY != dtlQuery.status) {
             return //已结束
diff --git a/payapi/src/main/kotlin/com/supwisdom/dlpay/api/service/impl/kafka_service_impl.kt b/payapi/src/main/kotlin/com/supwisdom/dlpay/api/service/impl/kafka_service_impl.kt
index 290bfaa..6e50d79 100644
--- a/payapi/src/main/kotlin/com/supwisdom/dlpay/api/service/impl/kafka_service_impl.kt
+++ b/payapi/src/main/kotlin/com/supwisdom/dlpay/api/service/impl/kafka_service_impl.kt
@@ -15,10 +15,10 @@
 import org.springframework.stereotype.Service
 
 @Service
-class KafkaSendMsgServiceImpl:KafkaSendMsgService{
+class KafkaSendMsgServiceImpl : KafkaSendMsgService {
     val logger = KotlinLogging.logger { }
     @Autowired
-    private lateinit var kafkaTemplate: KafkaTemplate<String, String>
+    private lateinit var kafkaTemplate: KafkaTemplate<Any, Any>
     @Autowired
     private lateinit var msgDao: MsgDao
     @Autowired
@@ -63,13 +63,13 @@
             message.custom = gson.toJson(extras)
             message.expiretime = DateUtil.getNewTime(DateUtil.getNow(), 300)
             message.gids = it.uid
-            if(it.lastloginplatform.isNullOrEmpty()){
-                message.platform="ios"
-                kafkaTemplate.send(topic, msg.msgid, gson.toJson(message))
-                message.platform="android"
-                kafkaTemplate.send(topic, msg.msgid, gson.toJson(message))
-            }else{
-                kafkaTemplate.send(topic, msg.msgid, gson.toJson(message))
+            if (it.lastloginplatform.isNullOrEmpty()) {
+                message.platform = "ios"
+                kafkaTemplate.send(topic, msg.msgid, message)
+                message.platform = "android"
+                kafkaTemplate.send(topic, msg.msgid, message)
+            } else {
+                kafkaTemplate.send(topic, msg.msgid, message)
             }
         }
         msg.pusheduids = uids
diff --git a/payapi/src/main/resources/application.properties b/payapi/src/main/resources/application.properties
index efef870..4c93ce7 100644
--- a/payapi/src/main/resources/application.properties
+++ b/payapi/src/main/resources/application.properties
@@ -28,6 +28,14 @@
 spring.http.encoding.charset=UTF-8
 spring.http.encoding.enabled=true
 server.tomcat.uri-encoding=UTF-8
+########################################################
+# 指定消息key和消息体的编解码方式
+spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
+spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
+spring.kafka.consumer.properties.spring.json.trusted.packages=com.supwisdom.multitenant.task.support
+# 指定消息key和消息体的编解码方式
+spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
+spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
 ##################################################
 ## quartz task scheduler
 shopbalance.updater.cron=*/10 * * * * ?
@@ -44,5 +52,7 @@
 multi-tenant.header.key=X-TENANT-ID
 multi-tenant.session.name=tenant-id
 multi-tenant.session.enableSessionScopedBean=false
-multi-tenant.dbschema=public
 multi-tenant.datasource.base-package=com.supwisdom.dlpay
+multi-tenant.task.datacenter=default
+multi-tenant.environment=default
+multi-tenant.task.worker.name=payapi