序
本文主要研究一下rocketmq的FileAppender
WriterAppender
org/apache/rocketmq/logging/inner/LoggingBuilder.java
public static class WriterAppender extends Appender { protected boolean immediateFlush = true; protected String encoding; protected QuietWriter qw; public WriterAppender() { } public void setImmediateFlush(boolean value) { immediateFlush = value; } public boolean getImmediateFlush() { return immediateFlush; } public void activateOptions() { } public void append(LoggingEvent event) { if (!checkEntryConditions()) { return; } subAppend(event); } protected boolean checkEntryConditions() { if (this.closed) { SysLogger.warn("Not allowed to write to a closed appender."); return false; } if (this.qw == null) { handleError("No output stream or file set for the appender named [" + name + "]."); return false; } if (this.layout == null) { handleError("No layout set for the appender named [" + name + "]."); return false; } return true; } public synchronized void close() { if (this.closed) { return; } this.closed = true; writeFooter(); reset(); } protected void closeWriter() { if (qw != null) { try { qw.close(); } catch (IOException e) { handleError("Could not close " + qw, e, CODE_CLOSE_FAILURE); } } } protected OutputStreamWriter createWriter(OutputStream os) { OutputStreamWriter retval = null; String enc = getEncoding(); if (enc != null) { try { retval = new OutputStreamWriter(os, enc); } catch (IOException e) { SysLogger.warn("Error initializing output writer."); SysLogger.warn("Unsupported encoding?"); } } if (retval == null) { retval = new OutputStreamWriter(os); } return retval; } public String getEncoding() { return encoding; } public void setEncoding(String value) { encoding = value; } public synchronized void setWriter(Writer writer) { reset(); this.qw = new QuietWriter(writer, this); writeHeader(); } protected void subAppend(LoggingEvent event) { this.qw.write(this.layout.format(event)); if (layout.ignoresThrowable()) { String[] s = event.getThrowableStr(); if (s != null) { for (String s1 : s) { this.qw.write(s1); this.qw.write(LINE_SEP); } } } if (shouldFlush(event)) { this.qw.flush(); } } protected void reset() { closeWriter(); this.qw = null; } protected void writeFooter() { if (layout != null) { String f = layout.getFooter(); if (f != null && this.qw != null) { this.qw.write(f); this.qw.flush(); } } } protected void writeHeader() { if (layout != null) { String h = layout.getHeader(); if (h != null && this.qw != null) { this.qw.write(h); } } } protected boolean shouldFlush(final LoggingEvent event) { return event != null && immediateFlush; } }
- 这个接口定义了writeHeader、writeFooter、append等方法
- append方法这里参数是LoggingEvent,内部委托给subAppend
- subAppend方法调用layout进行格式化event,然后如果需要flush,则会直接对qw进行flush
FileAppender
org/apache/rocketmq/logging/inner/LoggingBuilder.java
public static class FileAppender extends WriterAppender { protected boolean fileAppend = true; protected String fileName = null; protected boolean bufferedIO = false; protected int bufferSize = 8 * 1024; public FileAppender() { } public FileAppender(Layout layout, String filename, boolean append) throws IOException { this.layout = layout; this.setFile(filename, append, false, bufferSize); } public void setFile(String file) { fileName = file.trim(); } public boolean getAppend() { return fileAppend; } public String getFile() { return fileName; } public void activateOptions() { if (fileName != null) { try { setFile(fileName, fileAppend, bufferedIO, bufferSize); } catch (IOException e) { handleError("setFile(" + fileName + "," + fileAppend + ") call failed.", e, CODE_FILE_OPEN_FAILURE); } } else { SysLogger.warn("File option not set for appender [" + name + "]."); SysLogger.warn("Are you using FileAppender instead of ConsoleAppender?"); } } protected void closeFile() { if (this.qw != null) { try { this.qw.close(); } catch (IOException e) { if (e instanceof InterruptedIOException) { Thread.currentThread().interrupt(); } SysLogger.error("Could not close " + qw, e); } } } public boolean getBufferedIO() { return this.bufferedIO; } public int getBufferSize() { return this.bufferSize; } public void setAppend(boolean flag) { fileAppend = flag; } public void setBufferedIO(boolean bufferedIO) { this.bufferedIO = bufferedIO; if (bufferedIO) { immediateFlush = false; } } public void setBufferSize(int bufferSize) { this.bufferSize = bufferSize; } public synchronized void setFile(String fileName, boolean append, boolean bufferedIO, int bufferSize) throws IOException { SysLogger.debug("setFile called: " + fileName + ", " + append); if (bufferedIO) { setImmediateFlush(false); } reset(); FileOutputStream ostream; try { ostream = new FileOutputStream(fileName, append); } catch (FileNotFoundException ex) { String parentName = new File(fileName).getParent(); if (parentName != null) { File parentDir = new File(parentName); if (!parentDir.exists() && parentDir.mkdirs()) { ostream = new FileOutputStream(fileName, append); } else { throw ex; } } else { throw ex; } } Writer fw = createWriter(ostream); if (bufferedIO) { fw = new BufferedWriter(fw, bufferSize); } this.setQWForFiles(fw); this.fileName = fileName; this.fileAppend = append; this.bufferedIO = bufferedIO; this.bufferSize = bufferSize; writeHeader(); SysLogger.debug("setFile ended"); } protected void setQWForFiles(Writer writer) { this.qw = new QuietWriter(writer, this); } protected void reset() { closeFile(); this.fileName = null; super.reset(); } }
- 写文件,这里定义了bufferSize为8 * 1024,如果开启bufferedIO,则创建的是BufferedWriter
- setQWForFiles方法根据指定的writer创建了QuietWriter
- setFile方法设置了qw之后,就直接writeHeader
QuietWriter
org/apache/rocketmq/logging/inner/LoggingBuilder.java
private static class QuietWriter extends FilterWriter { protected Appender appender; public QuietWriter(Writer writer, Appender appender) { super(writer); this.appender = appender; } public void write(String string) { if (string != null) { try { out.write(string); } catch (Exception e) { appender.handleError("Failed to write [" + string + "].", e, Appender.CODE_WRITE_FAILURE); } } } public void flush() { try { out.flush(); } catch (Exception e) { appender.handleError("Failed to flush writer,", e, Appender.CODE_FLUSH_FAILURE); } } }
- QuietWriter继承自jdk的FilterWriter,实现了write(String)方法,重写了flush方法
- FilterWriter实现了write(int c),write(char cbuf[], int off, int len),write(String str, int off, int len)方法,用于对字符串进行过滤
小结
rocketmq的FileAppender继承自WriterAppender,采取的是根据layout对LoggingEvent来格式化,然后写入QuietWriter,最后写入到文件。