加入收藏 | 设为首页 | 会员中心 | 我要投稿 开发网_开封站长网 (http://www.0378zz.com/)- 科技、AI行业应用、媒体智能、低代码、办公协同!
当前位置: 首页 > 教程 > 正文

监控服务程序调度算法达成

发布时间:2021-11-22 10:12:02 所属栏目:教程 来源:互联网
导读:监控服务程序实现调度算法 完成nginx服务监控(从nginx配置解析出对应的服务作为监控对象之五,还有可以从数据库里读出待监控的服务)与更新服务后的监控算法: 处理休眠队列---------将所有的待监控服务记录放入一个优先级队列里(休眠队列,最小堆的数据结

 监控服务程序实现调度算法
 
完成nginx服务监控(从nginx配置解析出对应的服务作为监控对象之五,还有可以从数据库里读出待监控的服务)与更新服务后的监控算法:
 
处理休眠队列---------将所有的待监控服务记录放入一个优先级队列里(休眠队列,最小堆的数据结构,堆顶为绝对间隔时间最小的,优先执行),每次只需要检查堆顶就可以了,需要执行的放进执行队列里,删除的不加入执行队列
 
执行线程---------将执行列里的记录抛给异步执行的池里,每一个都是异步调用运行
 
回收线程----------运行完成的请求回收休眠队列,不回收已删除的。
 
更新线程---------定时加载新的数据,设置好绝对间隔时间,放入休眠队列
 
废话少说,主要实现代码如下。。
 
package com.wole.monitor;
 
import Java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
 
import org.eclipse.jetty.util.ConcurrentHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
 
import com.wole.monitor.dao.ServiceDao;
import com.wole.servicemonitor.util.ServiceUtils;
 
/**
 * 管理并调度某一个服务数据源的监控池
 * @author yzygenuine
 *
 */
public class MonitorsManage {
 private final static Logger logger = LoggerFactory.getLogger(MonitorsManage.class);
 
 private ServiceDao dao;
 
 /**
  * 执行的一个并发池
  */
 private Executor commExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5, TimeUnit.SECONDS,
   new SynchronousQueue<Runnable>());
 
 /**
  *
  */
 private CompletionService<Response> completionService = new ExecutorCompletionService<Response>(commExecutor);
 
 /**
  * 正在执行中的MonitorService集合
  */
 private ConcurrentHashSet<MonitorService> currentSet = new ConcurrentHashSet<MonitorService>();
 
 /**
  * 等待优先级队列
  */
 private Queue<MonitorService> sleepQueue = new PriorityBlockingQueue<MonitorService>();
 
 /**
  * 执行队列
  */
 private Queue<MonitorService> executeQueue = new LinkedBlockingQueue<MonitorService>();
 
 /**
  * 是否关闭
  */
 private AtomicBoolean isClose = new AtomicBoolean(false);
 
 /**
  * 生产者启动时间
  */
 private AtomicLong startTime = new AtomicLong(0);
 /**
  * 相对于启动的间隔时间
  */
 private AtomicLong intervalTime = new AtomicLong(0);
 
 public void close() {
  logger.info("closing................");
  isClose.compareAndSet(false, true);
 }
 
 
 
 public void init() {
  logger.info("初始化");
 
 }
 
 public void work() {
  logger.info("开始工作");
  // 生产者启动工作
 
  Thread productThread = new Thread(new ProductMonitor(1000));
  // 消费者启动工作
  Thread consumerThread = new Thread(new ConsumerMonitor(1000));
  // 回收者启动工作
  Thread recoverThread = new Thread(new RecoverMonitor(1000));
 
  // 启动定时加载数据工作
  Thread refreshThread = new Thread(new RefreshMonitorService(60000, dao));
  productThread.start();
  consumerThread.start();
  recoverThread.start();
  refreshThread.start();
 
 }
 
 /**
  * 生产者
  *
  * @author yzygenuine
  *
  */
 class ProductMonitor implements Runnable {
  long sleepTime = 1000;
 
  public ProductMonitor(long sleepTime) {
   this.sleepTime = sleepTime;
  }
 
  @Override
  public void run() {
   logger.info("生产者开启工作");
   // 开始进行定时监控
   long now = System.currentTimeMillis();
   long lastTime = now;
   startTime.addAndGet(now);
   try {
    do {
     Thread.sleep(sleepTime);
     logger.debug("生产者休息{}ms", sleepTime);
     now = System.currentTimeMillis();
     intervalTime.addAndGet(now - lastTime);
     while (sleepQueue.size() > 0) {
      MonitorService service = sleepQueue.peek();
      if (service.getCurrentTime() - intervalTime.get() < 1) {
       service = sleepQueue.poll();// 出队并检查是否被删除,如果没被删除则进入执行队列
       if (!currentSet.contains(service)) {
        logger.info("service {} 已被删除,不加入执行队列了", service.toString());
        continue;
       }
       executeQueue.add(service);
      } else {
       logger.debug("还有{}秒可执行", service.getCurrentTime() - intervalTime.get());
       break;
      }
     }
 
     if (sleepQueue.size() <= 0) {
      logger.debug("生产队列为空");
     }
     lastTime = now;
    } while (!isClose.get());
   } catch (Exception e) {
    logger.error("", e);
   }
 
  }
 
 }
 
 /**
  * 消费者
  *
  * @author yzygenuine
  *
  */
 class ConsumerMonitor implements Runnable {
  long sleepTime = 1000;
 
  public ConsumerMonitor(long sleepTime) {
   this.sleepTime = sleepTime;
   if (sleepTime < 1000) {
    throw new RuntimeException("请配置sleepTime值大一些");
   }
  }
 
  @Override
  public void run() {
   logger.info("消费者开启工作");
   try {
    do {
     Thread.sleep(sleepTime);
     logger.debug("消费者休息{}ms", sleepTime);
     while (executeQueue.size() > 0) {
      final MonitorService service = executeQueue.poll();
      completionService.submit(new ExecuteCallable(service));
     }
     logger.debug("消费队列为空");
    } while (!isClose.get());
   } catch (Exception e) {
    logger.error("", e);
   }
  }
 
 }
 
 /**
  * 执行回调类
  *
  * @author yzygenuine
  *
  */
 class ExecuteCallable implements Callable<Response> {
  final MonitorService service;
 
  public ExecuteCallable(MonitorService service) {
   this.service = service;
  }
 
  @Override
  public Response call() throws Exception {
   logger.debug("执行");
   Map<String, String> r = new HashMap<String, String>();
   Response response = new Response();
   response.service = service;
   response.response = r;
   Monitor m = MonitorFactory.getMonitor(service);
   response.isNeedWarn = m.isNeedWarnging(service, r);
   if (response.isNeedWarn) {
    response.isSucToNotify = m.sendNotify(service, r);
   }
   return response;
  }
 
 }
 
 /**
  * 回收者
  *
  * @author yzygenuine
  *
  */
 class RecoverMonitor implements Runnable {
  private long sleepTime = 1000;
 
  private long count = 0;
 
  public RecoverMonitor(long sleepTime) {
   this.sleepTime = sleepTime;
   if (sleepTime < 1000) {
    throw new RuntimeException("请配置sleepTime值大一些");
   }
  }
 
  @Override
  public void run() {
   logger.info("回收者开启工作");
   try {
    do {
     // Thread.sleep(sleepTime);
     Future<Response> response = completionService.take();
     // 重置后进入休眠队列
     MonitorService s = response.get().service;
     if (!currentSet.contains(s)) {
      logger.info("service {} 已被删除,不回收了", s.toString());
      continue;
     }
     // 当前程序已运动的时间+相对间隔时间=绝对的间隔时间
     s.setCurrentTime(s.getIntervalTime() + intervalTime.get());
     sleepQueue.add(s);
     count++;
     logger.info("回收,当前回收数量:" + count);
    } while (!isClose.get());
   } catch (Exception e) {
    logger.error("", e);
   }
  }
 }
 
 /**
  * 加载新的数据
  *
  * @author yzygenuine
  *
  */
 class RefreshMonitorService implements Runnable {
  private long sleepTime = 1000;
  private ServiceDao dao;
 
  public RefreshMonitorService(long sleepTime, ServiceDao dao) {
   this.sleepTime = sleepTime;
   if (sleepTime < 60000) {
    logger.warn("刷新加载数据的间隔时间不能太短");
    throw new RuntimeException("刷新加载数据的间隔时间不能太短");
   }
   this.dao = dao;
  }
 
  private void firstLoad() {
   List<MonitorService> monitorService = dao.getService();
   logger.info("加载记录:" + monitorService.size());
 
   // 将被监控服务加入优先级队列里
   for (int j = 0; j < monitorService.size(); j++) {
    MonitorService service = monitorService.get(j);
    // 初始化好时间
    service.setCurrentTime(service.getIntervalTime() + intervalTime.get());
    currentSet.add(service);
    sleepQueue.add(service);
   }
 
  }
 
  @Override
  public void run() {
   logger.info("读取新的service开启工作");
   firstLoad();
   try {
    do {
     logger.info("定时加载新的数据监听者休息{}ms", sleepTime);
     Thread.sleep(sleepTime);
     logger.info("##########开始执行更新数据############");
     // 加载新的所有所数据 ,与当前的数据比较
     List<MonitorService> deleteList = dao.deleteService();
     List<MonitorService> addList = dao.incrementalService();
     logger.info("删除旧的数据共:{}", deleteList.size());
     currentSet.removeAll(deleteList);
     logger.info("增加新的数据共:{}", addList.size());
     currentSet.addAll(addList);
     logger.info("更新后的currentSet size:{}", currentSet.size());
 
     for (MonitorService service : addList) {
      // 初始化绝对间隔时间
      service.setCurrentTime(service.getIntervalTime() + intervalTime.get());
      sleepQueue.add(service);
     }
     logger.info("########这一轮更新结束");
    } while (!isClose.get());
   } catch (Exception e) {
    logger.error("", e);
   }
  }
 }
 
 /**
  * 响应的封装类
  *
  * @author yzygenuine
  *
  */
 class Response {
  public Map<String, String> response;
  public MonitorService service;
  public boolean isNeedWarn;
  public boolean isSucToNotify;
 
 }
 
 public void setDao(ServiceDao dao) {
  this.dao = dao;
 }
 
}.

(编辑:开发网_开封站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    热点阅读