知方号

知方号

hdfs auditlog(审计日志)

hdfs审计日志(Auditlog)记录了用户针对hdfs的所有操作,详细信息包括操作成功与否、用户名称、客户机地址、操作命令、操作的目录等。对于用户的每一个操作,namenode都会将这些信息以key-value对的形式组织成固定格式的一条日志,然后记录到audit.log文件中。通过审计日志,我们可以实时查看hdfs的各种操作状况、可以追踪各种误操作、可以做一些指标监控等等。

hdfs的审计日志功能是可插拔的,用户可以通过实现默认接口扩展出满足自己所需的插件来替换hdfs默认提供的审计日志功能,或者与之并用。

启用审计日志

如果仅仅只启用默认的AuditLogger(DefaultAuditLogger),则在log4j.properties添加如下配置(hdfs.audit.logger必须配置为INFO级别)即可,审计日志会与namenode的系统日志独立开来保存,log4j.appender.RFAAUDIT.File可配置保存的位置及文件。 FSNamesystem根据log4j.properties中hdfs.audit.logger是否为INFO,以及是否配置了DefaultAuditLogger之外的其他AuditLogger,来决定是否启用审计日志功能。

## hdfs audit logging#hdfs.audit.logger=INFO,NullAppenderhdfs.audit.log.maxfilesize=256MBhdfs.audit.log.maxbackupindex=20log4j.logger.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=${hdfs.audit.logger}log4j.additivity.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=falselog4j.appender.RFAAUDIT=org.apache.log4j.RollingFileAppenderlog4j.appender.RFAAUDIT.File=${hadoop.log.dir}/hdfs-audit.loglog4j.appender.RFAAUDIT.layout=org.apache.log4j.PatternLayoutlog4j.appender.RFAAUDIT.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%nlog4j.appender.RFAAUDIT.MaxFileSize=${hdfs.audit.log.maxfilesize}log4j.appender.RFAAUDIT.MaxBackupIndex=${hdfs.audit.log.maxbackupindex}审计日志的接口及实现

Namenode开放了AuditLogger接口,并定义抽象类HdfsAuditLogger 实现AuditLogger,默认提供实现类DefaultAuditLogger。构造FSNamesystem时通过initAuditLoggers(Configuration conf)方法创建AuditLogger列表。在记录用户操作时,会将操作信息逐一传给列表中的每一个AuditLogger,由其做对应的审计处理。

通过实现Auditloger接口或者扩展HdfsAuditLogger类,用户可以实现自己的AuditLogger来满足所需,例如有针对性的记录审计日志(当集群、访问量上规模之后疯狂刷日志必然对namenode有影响,有针对性的记录有必要的日志是缓解此状况的一种可选方案)、扩展功能、将日志接入实时系统做实时分析或监控等。用户通过配置项dfs.namenode.audit.loggers在hdfs-site.xml中配置Auditloger的实现类,多个实现可以通过逗号分开,更改配置后重启namenode接口生效。FSNamesystem的initAuditLoggers(Configuration conf)方法通过该配置项加载并实例化实现类,初始化后存入集合。如果用户没有配置,那么默认使用DefaultAuditLogger。如果启动了nntop功能,还会使用TopAuditLogger。

FSNamesystem 初始化所有的AuditLogger:

1private List initAuditLoggers(Configuration conf) { 2 // Initialize the custom access loggers if configured. 3 //DFS_NAMENODE_AUDIT_LOGGERS_KEY=dfs.namenode.audit.loggers 4 Collection alClasses = conf.getStringCollection(DFS_NAMENODE_AUDIT_LOGGERS_KEY); 5 List auditLoggers = Lists.newArrayList(); 6 if (alClasses != null && !alClasses.isEmpty()) { 7 for (String className : alClasses) { 8 try { 9 AuditLogger logger;10 if (DFS_NAMENODE_DEFAULT_AUDIT_LOGGER_NAME.equals(className)) {11 logger = new DefaultAuditLogger();12 } else {13 logger = (AuditLogger) Class.forName(className).newInstance();14 }15 logger.initialize(conf);16 auditLoggers.add(logger);17 } catch (RuntimeException re) {18 throw re;19 } catch (Exception e) {20 throw new RuntimeException(e);21 }22 }23 }2425 // Make sure there is at least one logger installed.26 // 如果用户没有提供AuditLoggers,则默认使用DefaultAuditLogger27 if (auditLoggers.isEmpty()) {28 auditLoggers.add(new DefaultAuditLogger());29 }3031 // Add audit logger to calculate top users32 // 默认topConf.isEnabled是开启的,用于指标聚合、上报33 // TopAuditLogger类似 top命令34 if (topConf.isEnabled) {35 topMetrics = new TopMetrics(conf, topConf.nntopReportingPeriodsMs);36 auditLoggers.add(new TopAuditLogger(topMetrics));37 }3839 return Collections.unmodifiableList(auditLoggers);40 }

DefaultAuditLogger记录日志:

1@Override 2 public void logAuditEvent(boolean succeeded, String userName, 3 InetAddress addr, String cmd, String src, String dst, 4 FileStatus status, UserGroupInformation ugi, 5 DelegationTokenSecretManager dtSecretManager) { 6 if (auditLog.isInfoEnabled()) { 7 final StringBuilder sb = auditBuffer.get(); 8 sb.setLength(0); 9 sb.append("allowed=").append(succeeded).append("t");10 sb.append("ugi=").append(userName).append("t");11 sb.append("ip=").append(addr).append("t");12 sb.append("cmd=").append(cmd).append("t");13 sb.append("src=").append(src).append("t");14 sb.append("dst=").append(dst).append("t");15 if (null == status) {16 sb.append("perm=null");17 } else {18 sb.append("perm=");19 sb.append(status.getOwner()).append(":");20 sb.append(status.getGroup()).append(":");21 sb.append(status.getPermission());22 }23 if (logTokenTrackingId) {24 sb.append("t").append("trackingId=");25 String trackingId = null;26 if (ugi != null && dtSecretManager != null27 && ugi.getAuthenticationMethod() == AuthenticationMethod.TOKEN) {28 for (TokenIdentifier tid: ugi.getTokenIdentifiers()) {29 if (tid instanceof DelegationTokenIdentifier) {30 DelegationTokenIdentifier dtid =31 (DelegationTokenIdentifier)tid;32 trackingId = dtSecretManager.getTokenTrackingId(dtid);33 break;34 }35 }36 }37 sb.append(trackingId);38 }39 sb.append("t").append("proto=");40 sb.append(NamenodeWebHdfsMethods.isWebHdfsInvocation() ? "webhdfs" : "rpc");41 logAuditMessage(sb.toString());42 }43 }4445 public void logAuditMessage(String message) {46 auditLog.info(message);47 }审计过程

客户端对hdfs的所有操作,不管成功与否都会由FSNamesystem记录下。以删除操作为例,FSNamesystem在正常删除给定src后调用logAuditEvent(true, "delete", src)记录此次成功的delete操作,如果删除失败抛出异常,则调用logAuditEvent(false, "delete", src)记录此次失败的delete操作。

1boolean delete(String src, boolean recursive, boolean logRetryCache) 2 throws IOException { 3 waitForLoadingFSImage(); 4 BlocksMapUpdateInfo toRemovedBlocks = null; 5 writeLock(); 6 boolean ret = false; 7 try { 8 checkOperation(OperationCategory.WRITE); 9 checkNameNodeSafeMode("Cannot delete " + src);10 toRemovedBlocks = FSDirDeleteOp.delete(11 this, src, recursive, logRetryCache);12 ret = toRemovedBlocks != null;13 } catch (AccessControlException e) {14 logAuditEvent(false, "delete", src);15 throw e;16 } finally {17 writeUnlock();18 }19 getEditLog().logSync();20 if (toRemovedBlocks != null) {21 removeBlocks(toRemovedBlocks); // Incremental deletion of blocks22 }23 logAuditEvent(true, "delete", src);24 return ret;25 }26272829//判断是否是外部调用,只对rpc调用和webHdfs调用做审计30 boolean isExternalInvocation() {31 return Server.isRpcInvocation() || NamenodeWebHdfsMethods.isWebHdfsInvocation();32 } 3334 //判断是否启用审计日志功能35 public boolean isAuditEnabled() {36 return !isDefaultAuditLogger || auditLog.isInfoEnabled();37 }3839 //succeeded:操作是否成功 cmd:操作命令 src:操作对象40 private void logAuditEvent(boolean succeeded, String cmd, String src)41 throws IOException {42 logAuditEvent(succeeded, cmd, src, null, null);43 }4445 private void logAuditEvent(boolean succeeded, String cmd, String src,46 String dst, HdfsFileStatus stat) throws IOException {47 if (isAuditEnabled() && isExternalInvocation()) {48 logAuditEvent(succeeded, getRemoteUser(), getRemoteIp(),49 cmd, src, dst, stat);50 }51 }5253 //获取操作对象的信息,调用所有的auditloger 做审计54 private void logAuditEvent(boolean succeeded,55 UserGroupInformation ugi, InetAddress addr, String cmd, String src,56 String dst, HdfsFileStatus stat) {57 FileStatus status = null;58 if (stat != null) {59 Path symlink = stat.isSymlink() ? new Path(stat.getSymlink()) : null;60 Path path = dst != null ? new Path(dst) : new Path(src);61 status = new FileStatus(stat.getLen(), stat.isDir(),62 stat.getReplication(), stat.getBlockSize(), stat.getModificationTime(),63 stat.getAccessTime(), stat.getPermission(), stat.getOwner(),64 stat.getGroup(), symlink, path);65 }66 for (AuditLogger logger : auditLoggers) {67 if (logger instanceof HdfsAuditLogger) {68 HdfsAuditLogger hdfsLogger = (HdfsAuditLogger) logger;69 hdfsLogger.logAuditEvent(succeeded, ugi.toString(), addr, cmd, src, dst,70 status, ugi, dtSecretManager);71 } else {72 logger.logAuditEvent(succeeded, ugi.toString(), addr,73 cmd, src, dst, status);74 }75 }76 }审计日志接入实时系统的方法方法1:扩展Log4J的appender,由appender将日志发送到kafka。方法2:直接让kafka的producer读取日志文件。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至lizi9903@foxmail.com举报,一经查实,本站将立刻删除。