之前在实施一个监控项目时。客户由于买了IBM的小机。当前就赠送了TIVOLI的系统监控软件一套,客户也在他们的生产环境中部署了ITM的监控。由于没有购买IBM的netcool,无法集中管理告警事件,请要求我们直接把ITM的告警接受过来处理,当前我研究了一个ITM与netcool的接口eif,发现它是通过socket实现的,而且数据是文件格式,所以当时就开发了一个程序从ITM中直接接收事件,好东东要分享嘛,发给大家參考。实用的能够拿过去使用。
TECSocketServer.java,程序的主方法类,启动本地port接收ITM的告警事件。
import java.io.BufferedReader;
import java.io.IOException; import java.io.InputStreamReader; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.LinkedBlockingQueue; import org.apache.log4j.Logger; /** * TECSocketServer採集TEMS转发过来的事件信息。 * * @author James Gao * @since iMon 2.0 2011-11-6 */ public class TECSocketServer extends Thread { /** * 绑定本地网络port。接收TEC发送过来的事件。 */ private ServerSocket ss; /** * 是否执行。 */ boolean runFlag = false; /** * 缓存数据的队列的数据结构. */ LinkedBlockingQueue queue; /** * 日志记录。 */ static final Logger logger = Logger .getLogger(TECSocketServer.class); TECEventParser eventParser; private TECEventCachedHandler handler; private int port; /** * 默认事件缓存1000000条。 */ private static final int EVNET_CACHE_SIZE = 1000000; public TECSocketServer(int port, int eventCacheSize) { // netcoolEventHandler = new TECEventCachedHandler(); this.eventParser = new TECEventParser(","); this.port = port; if (eventCacheSize <= 1000) { this.queue = new LinkedBlockingQueue(EVNET_CACHE_SIZE); } else { this.queue = new LinkedBlockingQueue(eventCacheSize); } } /** * 须要在线程里启动绑定本地port的服务,这样便于在后台执行。 */ public void run() { try { ss = new ServerSocket(port); while (runFlag) { // 得到client连接 Socket socket = ss.accept(); socket.setKeepAlive(true); // 启动接收服务 CollectorWork work = new CollectorWork(socket); work.start(); } } catch (Exception e) { logger.error(e.getMessage(), e); } finally { if (ss != null) { try { ss.close(); } catch (IOException e) { } } } } /** * 启动採集服务 * * @param eventParser事件解析 */ public void startCollectServer() { runFlag = true; handler = new TECEventCachedHandler(this, eventParser); handler.setDaemon(true); handler.start(); logger.info("启动TEC事件处理线程完成。");
// 绑定本地网络port。開始接收TEMS转发过来的事件。 this.start(); logger.info("TEC接收TEMS转发事件服务启动完成。"); } public void stopCollectServer() { runFlag = false; try { ss.close(); } catch (Exception ex) { logger.error(ex.getMessage(), ex); } try { handler.interrupt(); } catch (Exception ex) { } logger.info("TEC接收TEMS转发事件服务关闭。释放绑定port。"); } /** * 事件接收线程 * */ class CollectorWork extends Thread { private BufferedReader in; private Socket socket; public CollectorWork() { } public CollectorWork(Socket socket) { this.socket = socket; } public void run() { try { // 15分钟内无事件则关闭client socket.setSoTimeout(15 * 60 * 1000); logger.info("*******************************************************************"); logger.info("A TEMS client come in:" + socket); logger.info("*******************************************************************"); StringBuffer sb = new StringBuffer(); // String event = null; in = new BufferedReader(new InputStreamReader( socket.getInputStream())); String line = ""; boolean endflag = false; while (runFlag && (line = in.readLine()) != null) { line = line.trim(); System.out.println(line); System.out.println(); if (line.equals("")) continue; // tems转发过来的每条事件以Start开头 if (line.startsWith("<START>")&&line.endsWith("END")) { endflag = true; sb = new StringBuffer(); // sb.append(line).append(";"); sb.append(line); } // // SocketGateway转发过来的每条事件以End开头 // if (line.indexOf("end") == 0) { // endflag = true; // // } // if (!endflag) { // if (event != null) { // sb.append(line); // } // } if (endflag) { logger.info("**********receives data==*************" + sb.toString()); // 放入缓存队件中。 queue.put(sb.toString()); } } } catch (Exception e) { logger.info(e.getMessage(), e); } finally { try { if (in != null) { in.close(); } } catch (IOException e) { logger.info(e.getMessage(), e); } try { if (socket != null) { socket.close(); } } catch (IOException e) { logger.info(e.getMessage(), e); } } } } public static void main(String[] args){ TECSocketServer server = new TECSocketServer(5529,100000); server.startCollectServer(); } }TECEventCachedHandler.java
import java.util.HashMap;
/** * * 针对TEC发送过来的事件进行缓存队列。并从缓存队列中取出并解析使事件成为标准化事件后进行处理。 * * @author James Gao,create on 2010-2-9 * @version v1.0 */ class TECEventCachedHandler extends Thread { /** * */ private final TECSocketServer tecSocketServer; private TECEventParser parseHandler; public TECEventCachedHandler(TECSocketServer tecSocketServer, TECEventParser parseHandler) { this.tecSocketServer = tecSocketServer; this.parseHandler = parseHandler; } public void run() { while (this.tecSocketServer.runFlag) { String data = null; try { // 从缓存队列中取出进行处理。 data = (String) this.tecSocketServer.queue.take(); if (TECSocketServer.logger.isDebugEnabled()) { TECSocketServer.logger.debug("Take a event data from CacheQueue,data=[" + data.toString() + "]"); } HashMap eventMap = parseHandler .parserStr( data); if (TECSocketServer.logger.isDebugEnabled()) { TECSocketServer.logger.debug("Process completely."); } } catch (InterruptedException ex) { TECSocketServer.logger.warn("Read data from cache queue error ,cause : " + ex.getMessage(), ex); } catch (Exception ex) { TECSocketServer.logger.warn("Process event data fail, data=[" + data + "],cause by: " + ex.getMessage(), ex); } } } }TECEventParser.java主要用来解析文本为数组。
import java.util.HashMap;
import org.apache.log4j.Logger; /** * 事件解析处理 * * @author James Gao, 2011-11-6 * @since SOP iMon 2.0 */ public class TECEventParser { private static final Logger logger = Logger .getLogger(TECEventParser.class); private String separator; public TECEventParser(String separator) { this.separator = separator; } /** * 解析组合字符串 * * @param initStr */ HashMap mapForData = new HashMap(); public HashMap parserStr(String initStr) { long t1 = System.currentTimeMillis(); String event = initStr; HashMap mapForValue = new HashMap(); if (initStr != null && !initStr.equals("")) { for(int a,b=0,c,d,i=0;i<initStr.length();i=d ){ a = initStr.indexOf("='"); b = initStr.indexOf("';"); c = initStr.indexOf(";"); String x = initStr.substring(c+1, b+1); System.out.println(x); if(!"".equals(x)&&(x!=null)){ int e = x.indexOf("='"); String key = x.substring(0, e); System.out.println(key); String firstValue = x.substring(e+2, x.length()); String lastValue = firstValue.substring(0, firstValue.length()-1); if(lastValue.equals("")){ lastValue ="no value"; } System.out.println(lastValue); if(key!=null&&!"".equals(key)&&key.equals("source")){ mapForValue.put(key, lastValue); } else if(key!=null&&!"".equals(key)&&key.equals("sub_source")){ mapForValue.put(key, lastValue); } else if(key!=null&&!"".equals(key)&&key.equals("severity")){ mapForValue.put(key, lastValue); } else if(key!=null&&!"".equals(key)&&key.equals("cms_hostname")){ mapForValue.put(key, lastValue); } else if(key!=null&&!"".equals(key)&&key.equals("situation_name")){ mapForValue.put(key, lastValue); } else if(key!=null&&!"".equals(key)&&key.equals("situation_fullname")){ mapForValue.put(key, lastValue); } else if(key!=null&&!"".equals(key)&&key.equals("situation_displayitem")){ mapForValue.put(key, lastValue); } else if(key!=null&&!"".equals(key)&&key.equals("situation_origin")){ mapForValue.put(key, lastValue); } else if(key!=null&&!"".equals(key)&&key.equals("situation_time")){ mapForValue.put(key, lastValue); } else if(key!=null&&!"".equals(key)&&key.equals("situation_group")){ mapForValue.put(key, lastValue); } else if(key!=null&&!"".equals(key)&&key.equals("situation_status")){ mapForValue.put(key, lastValue); } else if(key!=null&&!"".equals(key)&&key.equals("origin")){ mapForValue.put(key, lastValue); } else if(key!=null&&!"".equals(key)&&key.equals("hostname")){ mapForValue.put(key, lastValue); } else if(key!=null&&!"".equals(key)&&key.equals("master_reset_flag")){ mapForValue.put(key, lastValue); } else if(key!=null&&!"".equals(key)&&key.equals("integration_type")){ mapForValue.put(key, lastValue); } else{ mapForValue.put(key, lastValue); } } System.out.println("~~~~~~~~~~"); String x1 = initStr.substring(a+2, b); // System.out.println(x1); String x2 = initStr.substring(b+1, initStr.length()); if(x2.indexOf("END")==1){ break; } c = x2.indexOf("';"); String x3 = x2.substring(1, c+1); initStr =x2; d =(initStr.length())-b; } } else { logger.info("接收到的原始数据 initStr is NULL"); } long t4 = System.currentTimeMillis(); logger.info("解析告警" + event + "耗时:" + (t4 - t1) + "ms"); return mapForValue; } }