Whamcloud - gitweb
b=17896
authornathan <nathan>
Wed, 25 Feb 2009 18:30:47 +0000 (18:30 +0000)
committernathan <nathan>
Wed, 25 Feb 2009 18:30:47 +0000 (18:30 +0000)
i=johann
i=manoj
- multiple changelog consumer registration
- changelog access from clients

39 files changed:
lustre/cmm/cmm_device.c
lustre/cmm/cmm_object.c
lustre/cmm/mdc_internal.h
lustre/doc/lctl.8
lustre/doc/lfs.1
lustre/include/lprocfs_status.h
lustre/include/lustre/liblustreapi.h
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_disk.h
lustre/include/lustre_lib.h
lustre/include/lustre_log.h
lustre/include/lustre_req_layout.h
lustre/include/md_object.h
lustre/include/obd.h
lustre/include/obd_support.h
lustre/mdc/lproc_mdc.c
lustre/mdc/mdc_request.c
lustre/mdd/mdd_device.c
lustre/mdd/mdd_internal.h
lustre/mdd/mdd_lproc.c
lustre/mdd/mdd_object.c
lustre/mds/mds_log.c
lustre/mdt/mdt_handler.c
lustre/obdclass/llog_swab.c
lustre/obdclass/lprocfs_status.c
lustre/obdclass/obd_mount.c
lustre/ptlrpc/layout.c
lustre/ptlrpc/llog_server.c
lustre/ptlrpc/lproc_ptlrpc.c
lustre/ptlrpc/pack_generic.c
lustre/ptlrpc/wiretest.c
lustre/tests/sanity.sh
lustre/utils/lctl.c
lustre/utils/lfs.c
lustre/utils/liblustreapi.c
lustre/utils/obd.c
lustre/utils/obdctl.h
lustre/utils/wirecheck.c
lustre/utils/wiretest.c

index bae6967..8245d2f 100644 (file)
@@ -65,7 +65,7 @@ static const struct lu_device_operations cmm_lu_ops;
 
 static inline int lu_device_is_cmm(struct lu_device *d)
 {
 
 static inline int lu_device_is_cmm(struct lu_device *d)
 {
-       return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &cmm_lu_ops);
+        return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &cmm_lu_ops);
 }
 
 int cmm_root_get(const struct lu_env *env, struct md_device *md,
 }
 
 int cmm_root_get(const struct lu_env *env, struct md_device *md,
@@ -84,7 +84,7 @@ static int cmm_statfs(const struct lu_env *env, struct md_device *md,
                       struct kstatfs *sfs)
 {
         struct cmm_device *cmm_dev = md2cmm_dev(md);
                       struct kstatfs *sfs)
 {
         struct cmm_device *cmm_dev = md2cmm_dev(md);
-       int rc;
+        int rc;
 
         ENTRY;
         rc = cmm_child_ops(cmm_dev)->mdo_statfs(env,
 
         ENTRY;
         rc = cmm_child_ops(cmm_dev)->mdo_statfs(env,
@@ -130,6 +130,18 @@ static int cmm_update_capa_key(const struct lu_env *env,
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
+static int cmm_llog_ctxt_get(const struct lu_env *env, struct md_device *m,
+                             int idx, void **h)
+{
+        struct cmm_device *cmm_dev = md2cmm_dev(m);
+        int rc;
+        ENTRY;
+
+        rc = cmm_child_ops(cmm_dev)->mdo_llog_ctxt_get(env, cmm_dev->cmm_child,
+                                                       idx, h);
+        RETURN(rc);
+}
+
 #ifdef HAVE_QUOTA_SUPPORT
 static int cmm_quota_notify(const struct lu_env *env, struct md_device *m)
 {
 #ifdef HAVE_QUOTA_SUPPORT
 static int cmm_quota_notify(const struct lu_env *env, struct md_device *m)
 {
@@ -369,12 +381,26 @@ static int cmm_quota_finvalidate(const struct lu_env *env, struct md_device *m,
 }
 #endif
 
 }
 #endif
 
+int cmm_iocontrol(const struct lu_env *env, struct md_device *m,
+                  unsigned int cmd, int len, void *data)
+{
+        struct md_device *next = md2cmm_dev(m)->cmm_child;
+        int rc;
+
+        ENTRY;
+        rc = next->md_ops->mdo_iocontrol(env, next, cmd, len, data);
+        RETURN(rc);
+}
+
+
 static const struct md_device_operations cmm_md_ops = {
         .mdo_statfs          = cmm_statfs,
         .mdo_root_get        = cmm_root_get,
         .mdo_maxsize_get     = cmm_maxsize_get,
         .mdo_init_capa_ctxt  = cmm_init_capa_ctxt,
         .mdo_update_capa_key = cmm_update_capa_key,
 static const struct md_device_operations cmm_md_ops = {
         .mdo_statfs          = cmm_statfs,
         .mdo_root_get        = cmm_root_get,
         .mdo_maxsize_get     = cmm_maxsize_get,
         .mdo_init_capa_ctxt  = cmm_init_capa_ctxt,
         .mdo_update_capa_key = cmm_update_capa_key,
+        .mdo_llog_ctxt_get   = cmm_llog_ctxt_get,
+        .mdo_iocontrol       = cmm_iocontrol,
 #ifdef HAVE_QUOTA_SUPPORT
         .mdo_quota           = {
                 .mqo_notify      = cmm_quota_notify,
 #ifdef HAVE_QUOTA_SUPPORT
         .mdo_quota           = {
                 .mqo_notify      = cmm_quota_notify,
@@ -632,7 +658,7 @@ static int cmm_prepare(const struct lu_env *env,
 }
 
 static const struct lu_device_operations cmm_lu_ops = {
 }
 
 static const struct lu_device_operations cmm_lu_ops = {
-       .ldo_object_alloc      = cmm_object_alloc,
+        .ldo_object_alloc      = cmm_object_alloc,
         .ldo_process_config    = cmm_process_config,
         .ldo_recovery_complete = cmm_recovery_complete,
         .ldo_prepare           = cmm_prepare,
         .ldo_process_config    = cmm_process_config,
         .ldo_recovery_complete = cmm_recovery_complete,
         .ldo_prepare           = cmm_prepare,
index 6f5a78a..fa52599 100644 (file)
@@ -346,7 +346,7 @@ static int cml_capa_get(const struct lu_env *env, struct md_object *mo,
 }
 
 static int cml_path(const struct lu_env *env, struct md_object *mo,
 }
 
 static int cml_path(const struct lu_env *env, struct md_object *mo,
-                    char *path, int pathlen, __u64 recno, int *linkno)
+                    char *path, int pathlen, __u64 *recno, int *linkno)
 {
         int rc;
         ENTRY;
 {
         int rc;
         ENTRY;
@@ -943,7 +943,7 @@ static int cmr_capa_get(const struct lu_env *env, struct md_object *mo,
 }
 
 static int cmr_path(const struct lu_env *env, struct md_object *obj,
 }
 
 static int cmr_path(const struct lu_env *env, struct md_object *obj,
-                    char *path, int pathlen, __u64 recno, int *linkno)
+                    char *path, int pathlen, __u64 *recno, int *linkno)
 {
         return -EREMOTE;
 }
 {
         return -EREMOTE;
 }
index e7a1d13..bcd5f3f 100644 (file)
@@ -73,12 +73,12 @@ struct mdc_thread_info {
 };
 
 struct mdc_object {
 };
 
 struct mdc_object {
-       struct md_object        mco_obj;
+        struct md_object        mco_obj;
 };
 
 static inline struct lu_device *mdc2lu_dev(struct mdc_device *mc)
 {
 };
 
 static inline struct lu_device *mdc2lu_dev(struct mdc_device *mc)
 {
-       return (&mc->mc_md_dev.md_lu_dev);
+        return (&mc->mc_md_dev.md_lu_dev);
 }
 
 static inline struct mdc_device *md2mdc_dev(struct md_device *md)
 }
 
 static inline struct mdc_device *md2mdc_dev(struct md_device *md)
@@ -88,22 +88,22 @@ static inline struct mdc_device *md2mdc_dev(struct md_device *md)
 
 static inline struct mdc_device *mdc_obj2dev(struct mdc_object *mco)
 {
 
 static inline struct mdc_device *mdc_obj2dev(struct mdc_object *mco)
 {
-       return (md2mdc_dev(md_obj2dev(&mco->mco_obj)));
+        return (md2mdc_dev(md_obj2dev(&mco->mco_obj)));
 }
 
 static inline struct mdc_object *lu2mdc_obj(struct lu_object *lo)
 {
 }
 
 static inline struct mdc_object *lu2mdc_obj(struct lu_object *lo)
 {
-       return container_of0(lo, struct mdc_object, mco_obj.mo_lu);
+        return container_of0(lo, struct mdc_object, mco_obj.mo_lu);
 }
 
 static inline struct mdc_object *md2mdc_obj(struct md_object *mo)
 {
 }
 
 static inline struct mdc_object *md2mdc_obj(struct md_object *mo)
 {
-       return container_of0(mo, struct mdc_object, mco_obj);
+        return container_of0(mo, struct mdc_object, mco_obj);
 }
 
 static inline struct mdc_device *lu2mdc_dev(struct lu_device *ld)
 {
 }
 
 static inline struct mdc_device *lu2mdc_dev(struct lu_device *ld)
 {
-       return container_of0(ld, struct mdc_device, mc_md_dev.md_lu_dev);
+        return container_of0(ld, struct mdc_device, mc_md_dev.md_lu_dev);
 }
 
 struct lu_object *mdc_object_alloc(const struct lu_env *,
 }
 
 struct lu_object *mdc_object_alloc(const struct lu_env *,
index 79c5812..fca7631 100644 (file)
@@ -120,6 +120,17 @@ Detach the virtual block device.
 .BI blockdev_info " <device node>"
 Acquire which lustre file was attached to the device node.
 .PP
 .BI blockdev_info " <device node>"
 Acquire which lustre file was attached to the device node.
 .PP
+.SS Changelogs
+.TP
+.BI changelog_register
+Register a new changelog user for a particular device.  Changelog entries
+will not be purged beyond any registered users' set point. (See lfs changelog_clear.)
+.TP
+.BI changelog_deregister " <id>"
+Unregister an existing changelog user.  If the user's "clear" record number
+is the minimum for the device, changelog records will be purged until the
+next minimum.  
+.PP
 .SS Debug
 .TP 
 .BI debug_daemon 
 .SS Debug
 .TP 
 .BI debug_daemon 
index 532a60b..4d6df5f 100644 (file)
@@ -1,10 +1,14 @@
-.TH lfs 1 "2008 Mar 15" Lustre "user utilities"
+.TH lfs 1 "2009 Jan 29" Lustre "user utilities"
 .SH NAME
 lfs \- Lustre utility to create a file with specific striping pattern, find the striping pattern of exiting files
 .SH SYNOPSIS
 .br
 .B lfs
 .br
 .SH NAME
 lfs \- Lustre utility to create a file with specific striping pattern, find the striping pattern of exiting files
 .SH SYNOPSIS
 .br
 .B lfs
 .br
+.B lfs changelog [--follow] <mdtname> [startrec [endrec]]
+.br
+.B lfs changelog_clear <mdtname> <id> <endrec>
+.br
 .B lfs check <mds|osts|servers>
 .br
 .B lfs df [-i] [-h] [path]
 .B lfs check <mds|osts|servers>
 .br
 .B lfs df [-i] [-h] [path]
@@ -30,7 +34,7 @@ lfs \- Lustre utility to create a file with specific striping pattern, find the
 .br
 .B lfs poollist <filesystem>[.<pool>] | <pathname>
 .br
 .br
 .B lfs poollist <filesystem>[.<pool>] | <pathname>
 .br
-.B lfs quota [-v] [-o obd_uuid] [-u|-g] <username|groupname> <filesystem>
+.B lfs quota [-v] [-o obd_uuid|-I ost_idx|-i mdt_idx] [-u|-g] <username|groupname> <filesystem>
 .br
 .B lfs quota <filesystem>
 .br
 .br
 .B lfs quota <filesystem>
 .br
@@ -67,13 +71,6 @@ lfs \- Lustre utility to create a file with specific striping pattern, find the
              \fB[-b <block-grace>] [-i <inode-grace>]
              \fB<filesystem>\fR
 .br
              \fB[-b <block-grace>] [-i <inode-grace>]
              \fB<filesystem>\fR
 .br
-
-.B lfs quota [-v] [-o obd_uuid|-i mdt_idx|-I ost_idx] [-u|-g] <username|groupname> <filesystem>
-.br
-.B lfs quota <filesystem>
-.br
-.B lfs quota -t [-u|-g] <filesystem>
-.br
 .B lfs help
 .SH DESCRIPTION
 .B lfs
 .B lfs help
 .SH DESCRIPTION
 .B lfs
@@ -81,6 +78,15 @@ can be used to create a new file with a specific striping pattern, determine the
 .SH OPTIONS
 The various options supported by lctl are listed and explained below:
 .TP
 .SH OPTIONS
 The various options supported by lctl are listed and explained below:
 .TP
+.B changelog
+Show the metadata changes on an MDT.  Start and end points are optional.  The --follow option will block on new changes; this option is only valid when run direclty on the MDT node.
+.TP
+.B changelog_clear
+Indicate that changelog records previous to <endrec> are no longer of
+interest to a particular consumer <id>, potentially allowing the MDT to
+free up disk space. An <endrec> of 0 indicates the current last record.
+Changelog consumers must be registered on the MDT node using \fBlctl\fR.
+.TP
 .B check 
 Display the status of MDS or OSTs (as specified in the command) or all the servers (MDS and OSTs)
 .TP
 .B check 
 Display the status of MDS or OSTs (as specified in the command) or all the servers (MDS and OSTs)
 .TP
@@ -129,6 +135,12 @@ Delete the default striping on the specified directory.
 .B poollist <filesystem>[.<pool>] | <pathname>
 List the pools in \fBfilesystem\fR or \fBpathname\fR, or the OSTs in \fBfilesystem.pool\fR
 .TP
 .B poollist <filesystem>[.<pool>] | <pathname>
 List the pools in \fBfilesystem\fR or \fBpathname\fR, or the OSTs in \fBfilesystem.pool\fR
 .TP
+.B quota [-v] [-o obd_uuid|-i mdt_idx|-I ost_idx] [-u|-g] <username|groupname> <filesystem>
+To display disk usage and limits, either for the full filesystem, or for objects on a specific obd. A user or group name can be specified. If both user and group are omitted quotas for current uid/gid are shown. -v provides more verbose (with per-obd statistics) output.
+.TP
+.B quota -t [-u|-g] <filesystem>
+To display block and inode grace times for user (-u) or group (-g) quotas
+.TP
 .B quotachown
 To change files' owner and group on OSTs of the specified filesystem
 .TP
 .B quotachown
 To change files' owner and group on OSTs of the specified filesystem
 .TP
@@ -150,12 +162,6 @@ To set filesystem quotas for users or groups. Limits can be specified with -b, -
 .B setquota -t [-u|-g] [--block-grace <block-grace>] [--inode-grace <inode-grace>] <filesystem>
 To set filesystem quota grace times for users or groups. Grace time is specified in "XXwXXdXXhXXmXXs" format or as an integer seconds value, see EXAMPLES
 .TP
 .B setquota -t [-u|-g] [--block-grace <block-grace>] [--inode-grace <inode-grace>] <filesystem>
 To set filesystem quota grace times for users or groups. Grace time is specified in "XXwXXdXXhXXmXXs" format or as an integer seconds value, see EXAMPLES
 .TP
-.B quota [-v] [-o obd_uuid|-i mdt_idx|-I ost_idx] [-u|-g] <username|groupname> <filesystem>
-To display disk usage and limits, either for the full filesystem, or for objects on a specific obd. A user or group name can be specified. If both user and group are omitted quotas for current uid/gid are shown. -v provides more verbose (with per-obd statistics) output.
-.TP
-.B quota -t [-u|-g] <filesystem>
-To display block and inode grace times for user (-u) or group (-g) quotas
-.TP
 .B help 
 Provides brief help on the various arguments
 .TP
 .B help 
 Provides brief help on the various arguments
 .TP
@@ -193,6 +199,12 @@ Lists space usage per OST and MDT in human readable format.
 .B $ lfs df -i 
 Lists inode usage per OST and MDT
 .TP
 .B $ lfs df -i 
 Lists inode usage per OST and MDT
 .TP
+.B $ lfs quota -u bob /mnt/lustre
+List quotas of user `bob'
+.TP
+.B $ lfs quota -t -u /mnt/lustre
+Show grace times for user quotas on /mnt/lustre
+.TP
 .B $ lfs quotachown -i /mnt/lustre
 Change file owner and group
 .TP
 .B $ lfs quotachown -i /mnt/lustre
 Change file owner and group
 .TP
@@ -210,12 +222,6 @@ Set quotas of user `bob': 1GB block quota hardlimit and 2 GB block quota softlim
 .TP
 .B $ lfs setquota -t -u --block-grace 1000 --inode-grace 1w4d /mnt/lustre
 Set grace times for user quotas: 1000 seconds for block quotas, 1 week and 4 days for inode quotas
 .TP
 .B $ lfs setquota -t -u --block-grace 1000 --inode-grace 1w4d /mnt/lustre
 Set grace times for user quotas: 1000 seconds for block quotas, 1 week and 4 days for inode quotas
-.TP
-.B $ lfs quota -u bob /mnt/lustre
-List quotas of user `bob'
-.TP
-.B $ lfs quota -t -u /mnt/lustre
-Show grace times for user quotas on /mnt/lustre
 .SH BUGS
 The \fBlfs find\fR command isn't as comprehensive as \fBfind\fR(1).
 Report bugs using http://bugzilla.lustre.org.
 .SH BUGS
 The \fBlfs find\fR command isn't as comprehensive as \fBfind\fR(1).
 Report bugs using http://bugzilla.lustre.org.
index f20dae2..3056bd0 100644 (file)
@@ -682,8 +682,33 @@ extern int lprocfs_quota_rd_qs_factor(char *page, char **start, off_t off,
                                       int count, int *eof, void *data);
 extern int lprocfs_quota_wr_qs_factor(struct file *file, const char *buffer,
                                       unsigned long count, void *data);
                                       int count, int *eof, void *data);
 extern int lprocfs_quota_wr_qs_factor(struct file *file, const char *buffer,
                                       unsigned long count, void *data);
+
+/** struct for holding changelog data for seq_file processing */
+struct changelog_seq_iter {
+        void *csi_dev;
+        struct llog_ctxt *csi_ctxt;
+        struct llog_handle *csi_llh;
+        __u64 csi_startrec;
+        __u64 csi_endrec;
+        loff_t csi_pos;
+        int csi_wrote;
+        int csi_startcat;
+        int csi_startidx;
+        int csi_fill:1;
+        int csi_done:1;
+};
+int changelog_seq_open(struct inode *inode, struct file *file,
+                       struct changelog_seq_iter **csih);
+int changelog_seq_release(struct inode *inode, struct file *file);
+loff_t changelog_seq_lseek(struct file *file, loff_t offset, int origin);
+
+
+
 #else
 /* LPROCFS is not defined */
 #else
 /* LPROCFS is not defined */
+
+
+
 static inline void lprocfs_counter_add(struct lprocfs_stats *stats,
                                        int index, long amount) { return; }
 static inline void lprocfs_counter_incr(struct lprocfs_stats *stats,
 static inline void lprocfs_counter_add(struct lprocfs_stats *stats,
                                        int index, long amount) { return; }
 static inline void lprocfs_counter_incr(struct lprocfs_stats *stats,
index 7dbe8c7..f667c00 100644 (file)
@@ -174,9 +174,12 @@ extern int llapi_rgetfacl(int argc, char *argv[]);
 extern int llapi_cp(int argc, char *argv[]);
 extern int llapi_ls(int argc, char *argv[]);
 extern int llapi_changelog_open(const char *mdtname, long long startrec);
 extern int llapi_cp(int argc, char *argv[]);
 extern int llapi_ls(int argc, char *argv[]);
 extern int llapi_changelog_open(const char *mdtname, long long startrec);
-extern int llapi_changelog_clear(const char *mdtname, long long endrec);
+extern int llapi_changelog_clear(const char *mdtname, const char *idstr,
+                                 long long endrec);
+extern int llapi_changelog_register(const char *mdtname);
+extern int llapi_changelog_unregister(const char *mdtname, int id);
 struct lu_fid;
 extern int llapi_fid2path(char *device, char *fid, char *path, int pathlen,
 struct lu_fid;
 extern int llapi_fid2path(char *device, char *fid, char *path, int pathlen,
-                          __u64 recno, int *linkno);
+                          long long *recno, int *linkno);
 #endif
 
 #endif
 
index 24091b5..8c4fccc 100644 (file)
@@ -2188,20 +2188,20 @@ struct lov_mds_md_join {
 #define LLOG_OP_MASK  0xfff00000
 
 typedef enum {
 #define LLOG_OP_MASK  0xfff00000
 
 typedef enum {
-        LLOG_PAD_MAGIC   = LLOG_OP_MAGIC | 0x00000,
-        OST_SZ_REC       = LLOG_OP_MAGIC | 0x00f00,
-        OST_RAID1_REC    = LLOG_OP_MAGIC | 0x01000,
-        MDS_UNLINK_REC   = LLOG_OP_MAGIC | 0x10000 | (MDS_REINT << 8) | REINT_UNLINK,
-        MDS_SETATTR_REC  = LLOG_OP_MAGIC | 0x10000 | (MDS_REINT << 8) | REINT_SETATTR,
-        MDS_SETATTR64_REC= LLOG_OP_MAGIC | 0x90000 | (MDS_REINT << 8) | REINT_SETATTR,
-        OBD_CFG_REC      = LLOG_OP_MAGIC | 0x20000,
-        PTL_CFG_REC      = LLOG_OP_MAGIC | 0x30000, /* obsolete */
-        LLOG_GEN_REC     = LLOG_OP_MAGIC | 0x40000,
-        LLOG_JOIN_REC    = LLOG_OP_MAGIC | 0x50000,
-         /** changelog record type */
-        CHANGELOG_REC    = LLOG_OP_MAGIC | 0x60000,
-        LLOG_HDR_MAGIC   = LLOG_OP_MAGIC | 0x45539,
-        LLOG_LOGID_MAGIC = LLOG_OP_MAGIC | 0x4553b,
+        LLOG_PAD_MAGIC     = LLOG_OP_MAGIC | 0x00000,
+        OST_SZ_REC         = LLOG_OP_MAGIC | 0x00f00,
+        OST_RAID1_REC      = LLOG_OP_MAGIC | 0x01000,
+        MDS_UNLINK_REC     = LLOG_OP_MAGIC | 0x10000 | (MDS_REINT << 8) | REINT_UNLINK,
+        MDS_SETATTR_REC    = LLOG_OP_MAGIC | 0x10000 | (MDS_REINT << 8) | REINT_SETATTR,
+        MDS_SETATTR64_REC  = LLOG_OP_MAGIC | 0x90000 | (MDS_REINT << 8) | REINT_SETATTR,
+        OBD_CFG_REC        = LLOG_OP_MAGIC | 0x20000,
+        PTL_CFG_REC        = LLOG_OP_MAGIC | 0x30000, /* obsolete */
+        LLOG_GEN_REC       = LLOG_OP_MAGIC | 0x40000,
+        LLOG_JOIN_REC      = LLOG_OP_MAGIC | 0x50000,
+        CHANGELOG_REC      = LLOG_OP_MAGIC | 0x60000,
+        CHANGELOG_USER_REC = LLOG_OP_MAGIC | 0x70000,
+        LLOG_HDR_MAGIC     = LLOG_OP_MAGIC | 0x45539,
+        LLOG_LOGID_MAGIC   = LLOG_OP_MAGIC | 0x4553b,
 } llog_op_type;
 
 /*
 } llog_op_type;
 
 /*
@@ -2336,18 +2336,33 @@ enum changelog_rec_type {
         CL_LAST
 };
 
         CL_LAST
 };
 
+/** Changelog entry type names. Must be defined in the same order as the
+ * \a changelog_rec_type enum.
+ */
+#define DECLARE_CHANGELOG_NAMES static const char *changelog_str[] =         \
+       {"MARK","CREAT","MKDIR","HLINK","SLINK","MKNOD","UNLNK","RMDIR",      \
+        "RNMFM","RNMTO","OPEN","CLOSE","IOCTL","TRUNC","SATTR","XATTR"}
+
 /** \a changelog_rec_type's that can't be masked */
 /** \a changelog_rec_type's that can't be masked */
-#define CL_MINMASK (1 << CL_MARK)
+#define CHANGELOG_MINMASK (1 << CL_MARK)
 /** bits covering all \a changelog_rec_type's */
 /** bits covering all \a changelog_rec_type's */
-#define CL_ALLMASK 0XFFFF
+#define CHANGELOG_ALLMASK 0XFFFF
 /** default \a changelog_rec_type mask */
 /** default \a changelog_rec_type mask */
-#define CL_DEFMASK CL_ALLMASK
+#define CHANGELOG_DEFMASK CHANGELOG_ALLMASK
 
 /* per-record flags */
 #define CLF_VERSION  0x1000
 #define CLF_FLAGMASK 0x0FFF
 #define CLF_HSM      0x0001
 
 
 /* per-record flags */
 #define CLF_VERSION  0x1000
 #define CLF_FLAGMASK 0x0FFF
 #define CLF_HSM      0x0001
 
+/* changelog llog name, needed by client replicators */
+#define CHANGELOG_CATALOG "changelog_catalog"
+
+struct changelog_setinfo {
+        __u64 cs_recno;
+        __u32 cs_id;
+};
+
 /** changelog record */
 struct llog_changelog_rec {
         struct llog_rec_hdr   cr_hdr;
 /** changelog record */
 struct llog_changelog_rec {
         struct llog_rec_hdr   cr_hdr;
@@ -2368,6 +2383,16 @@ struct llog_changelog_rec {
         };
 } __attribute__((packed));
 
         };
 } __attribute__((packed));
 
+#define CHANGELOG_USER_PREFIX "cl"
+
+struct llog_changelog_user_rec {
+        struct llog_rec_hdr   cur_hdr;
+        __u32                 cur_id;
+        __u32                 cur_padding;
+        __u64                 cur_endrec;
+        struct llog_rec_tail  cur_tail;
+} __attribute__((packed));
+
 struct llog_gen {
         __u64 mnt_cnt;
         __u64 conn_cnt;
 struct llog_gen {
         __u64 mnt_cnt;
         __u64 conn_cnt;
index 4914390..df9fddd 100644 (file)
@@ -57,7 +57,7 @@
 #define LOV_OBJID         "lov_objid"
 #define HEALTH_CHECK      "health_check"
 #define CAPA_KEYS         "capa_keys"
 #define LOV_OBJID         "lov_objid"
 #define HEALTH_CHECK      "health_check"
 #define CAPA_KEYS         "capa_keys"
-#define CHANGELOG_CATALOG "changelog_catalog"
+#define CHANGELOG_USERS   "changelog_users"
 
 
 /****************** persistent mount data *********************/
 
 
 /****************** persistent mount data *********************/
index a058fda..c6c9bc6 100644 (file)
@@ -139,9 +139,18 @@ struct obd_ioctl_data {
         __u32 ioc_len;
         __u32 ioc_version;
 
         __u32 ioc_len;
         __u32 ioc_version;
 
-        __u64 ioc_cookie;
-        __u32 ioc_conn1;
-        __u32 ioc_conn2;
+        union {
+                __u64 ioc_cookie;
+                __u64 ioc_u64_1;
+        };
+        union {
+                __u32 ioc_conn1;
+                __u32 ioc_u32_1;
+        };
+        union {
+                __u32 ioc_conn2;
+                __u32 ioc_u32_2;
+        };
 
         struct obdo ioc_obdo1;
         struct obdo ioc_obdo2;
 
         struct obdo ioc_obdo1;
         struct obdo ioc_obdo2;
@@ -478,6 +487,9 @@ static inline void obd_ioctl_freedata(char *buf, int len)
 
 #define OBD_IOC_GETDEVICE              _IOWR ('f', 149, OBD_IOC_DATA_TYPE)
 #define OBD_IOC_FID2PATH               _IOWR ('f', 150, OBD_IOC_DATA_TYPE)
 
 #define OBD_IOC_GETDEVICE              _IOWR ('f', 149, OBD_IOC_DATA_TYPE)
 #define OBD_IOC_FID2PATH               _IOWR ('f', 150, OBD_IOC_DATA_TYPE)
+#define OBD_IOC_CHANGELOG_REG          _IOW ('f', 151, OBD_IOC_DATA_TYPE)
+#define OBD_IOC_CHANGELOG_DEREG        _IOW ('f', 152, OBD_IOC_DATA_TYPE)
+#define OBD_IOC_CHANGELOG_CLEAR        _IOW ('f', 153, OBD_IOC_DATA_TYPE)
 
 #define OBD_IOC_LOV_SETSTRIPE          _IOW ('f', 154, OBD_IOC_DATA_TYPE)
 #define OBD_IOC_LOV_GETSTRIPE          _IOW ('f', 155, OBD_IOC_DATA_TYPE)
 
 #define OBD_IOC_LOV_SETSTRIPE          _IOW ('f', 154, OBD_IOC_DATA_TYPE)
 #define OBD_IOC_LOV_GETSTRIPE          _IOW ('f', 155, OBD_IOC_DATA_TYPE)
index 167b366..34ca742 100644 (file)
@@ -509,6 +509,7 @@ static inline int llog_write_rec(struct llog_handle *handle,
         if (lop->lop_write_rec == NULL)
                 RETURN(-EOPNOTSUPP);
 
         if (lop->lop_write_rec == NULL)
                 RETURN(-EOPNOTSUPP);
 
+        /* FIXME:  Why doesn't caller just set the right lrh_len itself? */
         if (buf)
                 buflen = rec->lrh_len + sizeof(struct llog_rec_hdr)
                                 + sizeof(struct llog_rec_tail);
         if (buf)
                 buflen = rec->lrh_len + sizeof(struct llog_rec_hdr)
                                 + sizeof(struct llog_rec_tail);
index 4f0c777..bec9def 100644 (file)
@@ -226,6 +226,7 @@ extern const struct req_msg_field RMF_TGTUUID;
 extern const struct req_msg_field RMF_CLUUID;
 extern const struct req_msg_field RMF_SETINFO_VAL;
 extern const struct req_msg_field RMF_SETINFO_KEY;
 extern const struct req_msg_field RMF_CLUUID;
 extern const struct req_msg_field RMF_SETINFO_VAL;
 extern const struct req_msg_field RMF_SETINFO_KEY;
+
 /*
  * connection handle received in MDS_CONNECT request.
  */
 /*
  * connection handle received in MDS_CONNECT request.
  */
index 985550b..8832552 100644 (file)
@@ -248,7 +248,7 @@ struct md_object_operations {
         int (*moo_object_sync)(const struct lu_env *, struct md_object *);
 
         int (*moo_path)(const struct lu_env *env, struct md_object *obj,
         int (*moo_object_sync)(const struct lu_env *, struct md_object *);
 
         int (*moo_path)(const struct lu_env *env, struct md_object *obj,
-                        char *path, int pathlen, __u64 recno, int *linkno);
+                        char *path, int pathlen, __u64 *recno, int *linkno);
 };
 
 /**
 };
 
 /**
@@ -326,6 +326,12 @@ struct md_device_operations {
                                    struct md_device *m,
                                    struct lustre_capa_key *key);
 
                                    struct md_device *m,
                                    struct lustre_capa_key *key);
 
+        int (*mdo_llog_ctxt_get)(const struct lu_env *env,
+                                 struct md_device *m, int idx, void **h);
+
+        int (*mdo_iocontrol)(const struct lu_env *env, struct md_device *m,
+                             unsigned int cmd, int len, void *data);
+
 #ifdef HAVE_QUOTA_SUPPORT
         struct md_quota_operations {
                 int (*mqo_notify)(const struct lu_env *env,
 #ifdef HAVE_QUOTA_SUPPORT
         struct md_quota_operations {
                 int (*mqo_notify)(const struct lu_env *env,
@@ -685,9 +691,10 @@ static inline int mo_capa_get(const struct lu_env *env,
 }
 
 static inline int mo_path(const struct lu_env *env, struct md_object *m,
 }
 
 static inline int mo_path(const struct lu_env *env, struct md_object *m,
-                          char *path, int pathlen, __u64 recno, int *linkno)
+                          char *path, int pathlen, __u64 *recno, int *linkno)
 {
 {
-        LASSERT(m->mo_ops->moo_path);
+        if (m->mo_ops->moo_path == NULL)
+                return -ENOSYS;
         return m->mo_ops->moo_path(env, m, path, pathlen, recno, linkno);
 }
 
         return m->mo_ops->moo_path(env, m, path, pathlen, recno, linkno);
 }
 
index 868e86f..b699209 100644 (file)
@@ -266,7 +266,9 @@ enum llog_ctxt_id {
         LLOG_TEST_REPL_CTXT,
         LLOG_LOVEA_ORIG_CTXT,
         LLOG_LOVEA_REPL_CTXT,
         LLOG_TEST_REPL_CTXT,
         LLOG_LOVEA_ORIG_CTXT,
         LLOG_LOVEA_REPL_CTXT,
-        LLOG_CHANGELOG_ORIG_CTXT,   /**< changelog context */
+        LLOG_CHANGELOG_ORIG_CTXT,      /**< changelog generation on mdd */
+        LLOG_CHANGELOG_REPL_CTXT,      /**< changelog access on clients */
+        LLOG_CHANGELOG_USER_ORIG_CTXT, /**< for multiple changelog consumers */
         LLOG_MAX_CTXTS
 };
 
         LLOG_MAX_CTXTS
 };
 
@@ -1089,32 +1091,32 @@ enum obd_cleanup_stage {
 };
 
 /* get/set_info keys */
 };
 
 /* get/set_info keys */
-#define KEY_READ_ONLY           "read-only"
-#define KEY_MDS_CONN            "mds_conn"
-#define KEY_NEXT_ID             "next_id"
-#define KEY_LOVDESC             "lovdesc"
-#define KEY_INIT_RECOV          "initial_recov"
-#define KEY_INIT_RECOV_BACKUP   "init_recov_bk"
-#define KEY_FLUSH_CTX           "flush_ctx"
+#define KEY_BLOCKSIZE_BITS      "blocksize_bits"
+#define KEY_BLOCKSIZE           "blocksize"
 #define KEY_CAPA_KEY            "capa_key"
 #define KEY_CAPA_KEY            "capa_key"
+#define KEY_CHANGELOG_CLEAR     "changelog_clear"
+#define KEY_CHECKSUM            "checksum"
+#define KEY_CLEAR_FS            "clear_fs"
 #define KEY_CONN_DATA           "conn_data"
 #define KEY_CONN_DATA           "conn_data"
-#define KEY_MAX_EASIZE          "max_easize"
-#define KEY_REVIMP_UPD          "revimp_update"
-#define KEY_LOV_IDX             "lov_idx"
+#define KEY_EVICT_BY_NID        "evict_by_nid"
+#define KEY_FIEMAP              "fiemap"
+#define KEY_FLUSH_CTX           "flush_ctx"
+#define KEY_INIT_RECOV_BACKUP   "init_recov_bk"
+#define KEY_INIT_RECOV          "initial_recov"
 #define KEY_LAST_ID             "last_id"
 #define KEY_LAST_ID             "last_id"
-#define KEY_READONLY            "read-only"
 #define KEY_LOCK_TO_STRIPE      "lock_to_stripe"
 #define KEY_LOCK_TO_STRIPE      "lock_to_stripe"
-#define KEY_CHECKSUM            "checksum"
-#define KEY_UNLINKED            "unlinked"
-#define KEY_EVICT_BY_NID        "evict_by_nid"
+#define KEY_LOVDESC             "lovdesc"
+#define KEY_LOV_IDX             "lov_idx"
+#define KEY_MAX_EASIZE          "max_easize"
+#define KEY_MDS_CONN            "mds_conn"
+#define KEY_MGSSEC              "mgssec"
+#define KEY_NEXT_ID             "next_id"
+#define KEY_READ_ONLY           "read-only"
 #define KEY_REGISTER_TARGET     "register_target"
 #define KEY_REGISTER_TARGET     "register_target"
+#define KEY_REVIMP_UPD          "revimp_update"
 #define KEY_SET_FS              "set_fs"
 #define KEY_SET_FS              "set_fs"
-#define KEY_CLEAR_FS            "clear_fs"
-#define KEY_BLOCKSIZE           "blocksize"
-#define KEY_BLOCKSIZE_BITS      "blocksize_bits"
-#define KEY_FIEMAP              "fiemap"
 #define KEY_SPTLRPC_CONF        "sptlrpc_conf"
 #define KEY_SPTLRPC_CONF        "sptlrpc_conf"
-#define KEY_MGSSEC              "mgssec"
+#define KEY_UNLINKED            "unlinked"
 /* XXX unused ?*/
 #define KEY_INTERMDS            "inter_mds"
 #define KEY_ASYNC               "async"
 /* XXX unused ?*/
 #define KEY_INTERMDS            "inter_mds"
 #define KEY_ASYNC               "async"
index 5ac5ee7..5ef9fed 100644 (file)
@@ -353,6 +353,18 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_FAIL_SEC_CTX_FINI_NET        0x1203
 #define OBD_FAIL_SEC_CTX_HDL_PAUSE       0x1204
 
 #define OBD_FAIL_SEC_CTX_FINI_NET        0x1203
 #define OBD_FAIL_SEC_CTX_HDL_PAUSE       0x1204
 
+#define OBD_FAIL_LLOG                               0x1300
+#define OBD_FAIL_LLOG_ORIGIN_CONNECT_NET            0x1301
+#define OBD_FAIL_LLOG_ORIGIN_HANDLE_CREATE_NET      0x1302
+#define OBD_FAIL_LLOG_ORIGIN_HANDLE_DESTROY_NET     0x1303
+#define OBD_FAIL_LLOG_ORIGIN_HANDLE_READ_HEADER_NET 0x1304
+#define OBD_FAIL_LLOG_ORIGIN_HANDLE_NEXT_BLOCK_NET  0x1305
+#define OBD_FAIL_LLOG_ORIGIN_HANDLE_PREV_BLOCK_NET  0x1306
+#define OBD_FAIL_LLOG_ORIGIN_HANDLE_WRITE_REC_NET   0x1307
+#define OBD_FAIL_LLOG_ORIGIN_HANDLE_CLOSE_NET       0x1308
+#define OBD_FAIL_LLOG_CATINFO_NET                   0x1309
+
+
 /* Failure injection control */
 #define OBD_FAIL_MASK_SYS    0x0000FF00
 #define OBD_FAIL_MASK_LOC   (0x000000FF | OBD_FAIL_MASK_SYS)
 /* Failure injection control */
 #define OBD_FAIL_MASK_SYS    0x0000FF00
 #define OBD_FAIL_MASK_LOC   (0x000000FF | OBD_FAIL_MASK_SYS)
index accb538..b8b1437 100644 (file)
@@ -39,6 +39,7 @@
 #include <linux/vfs.h>
 #include <obd_class.h>
 #include <lprocfs_status.h>
 #include <linux/vfs.h>
 #include <obd_class.h>
 #include <lprocfs_status.h>
+#include <lustre_log.h>
 
 #ifdef LPROCFS
 
 
 #ifdef LPROCFS
 
@@ -75,6 +76,61 @@ static int mdc_wr_max_rpcs_in_flight(struct file *file, const char *buffer,
 
         return count;
 }
 
         return count;
 }
+
+static int mdc_changelog_seq_release(struct inode *inode, struct file *file)
+{
+        struct seq_file *seq = file->private_data;
+        struct changelog_seq_iter *csi = seq->private;
+
+        if (csi && csi->csi_llh)
+                llog_cat_put(csi->csi_llh);
+        if (csi && csi->csi_ctxt)
+                llog_ctxt_put(csi->csi_ctxt);
+
+        return (changelog_seq_release(inode, file));
+}
+
+static int mdc_changelog_seq_open(struct inode *inode, struct file *file)
+{
+        struct changelog_seq_iter *csi;
+        int rc;
+        ENTRY;
+
+        rc = changelog_seq_open(inode, file, &csi);
+        if (rc)
+                RETURN(rc);
+
+        /* Set up the remote catalog handle */
+        /* Note the proc file is set up with obd in data, not mdc_device */
+        csi->csi_ctxt = llog_get_context((struct obd_device *)csi->csi_dev,
+                                         LLOG_CHANGELOG_REPL_CTXT);
+        if (csi->csi_ctxt == NULL)
+                GOTO(out, rc = -ENOENT);
+        rc = llog_create(csi->csi_ctxt, &csi->csi_llh, NULL, CHANGELOG_CATALOG);
+        if (rc) {
+                CERROR("llog_create() failed %d\n", rc);
+                GOTO(out, rc);
+        }
+        rc = llog_init_handle(csi->csi_llh, LLOG_F_IS_CAT, NULL);
+        if (rc) {
+                CERROR("llog_init_handle failed %d\n", rc);
+                GOTO(out, rc);
+        }
+
+out:
+        if (rc)
+                mdc_changelog_seq_release(inode, file);
+        RETURN(rc);
+}
+
+static struct file_operations mdc_changelog_fops = {
+        .owner   = THIS_MODULE,
+        .open    = mdc_changelog_seq_open,
+        .read    = seq_read,
+        .llseek  = changelog_seq_lseek,
+        .release = mdc_changelog_seq_release,
+};
+
 static struct lprocfs_vars lprocfs_mdc_obd_vars[] = {
         { "uuid",            lprocfs_rd_uuid,        0, 0 },
         { "ping",            0, lprocfs_wr_ping,     0, 0, 0222 },
 static struct lprocfs_vars lprocfs_mdc_obd_vars[] = {
         { "uuid",            lprocfs_rd_uuid,        0, 0 },
         { "ping",            0, lprocfs_wr_ping,     0, 0, 0222 },
@@ -92,6 +148,7 @@ static struct lprocfs_vars lprocfs_mdc_obd_vars[] = {
                                 mdc_wr_max_rpcs_in_flight, 0 },
         { "timeouts",        lprocfs_rd_timeouts,    0, 0 },
         { "import",          lprocfs_rd_import,    0, 0 },
                                 mdc_wr_max_rpcs_in_flight, 0 },
         { "timeouts",        lprocfs_rd_timeouts,    0, 0 },
         { "import",          lprocfs_rd_import,    0, 0 },
+        { "changelog",       0, 0, 0, &mdc_changelog_fops, 0400 },
         { 0 }
 };
 
         { 0 }
 };
 
index a5698b5..2823f10 100644 (file)
@@ -1047,6 +1047,14 @@ static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                 return -EINVAL;
         }
         switch (cmd) {
                 return -EINVAL;
         }
         switch (cmd) {
+        case OBD_IOC_CHANGELOG_CLEAR: {
+                struct changelog_setinfo cs =
+                        {data->ioc_u64_1, data->ioc_u32_1};
+                rc = obd_set_info_async(exp, strlen(KEY_CHANGELOG_CLEAR),
+                                        KEY_CHANGELOG_CLEAR, sizeof(cs), &cs,
+                                        NULL);
+                GOTO(out, rc);
+        }
         case OBD_IOC_CLIENT_RECOVER:
                 rc = ptlrpc_recover_import(imp, data->ioc_inlbuf1);
                 if (rc < 0)
         case OBD_IOC_CLIENT_RECOVER:
                 rc = ptlrpc_recover_import(imp, data->ioc_inlbuf1);
                 if (rc < 0)
@@ -1098,19 +1106,6 @@ static int do_set_info_async(struct obd_export *exp,
         int                    rc;
         ENTRY;
 
         int                    rc;
         ENTRY;
 
-        if (vallen != sizeof(int))
-                RETURN(-EINVAL);
-
-        spin_lock(&imp->imp_lock);
-        if (*((int *)val)) {
-                imp->imp_connect_flags_orig |= OBD_CONNECT_RDONLY;
-                imp->imp_connect_data.ocd_connect_flags |= OBD_CONNECT_RDONLY;
-        } else {
-                imp->imp_connect_flags_orig &= ~OBD_CONNECT_RDONLY;
-                imp->imp_connect_data.ocd_connect_flags &= ~OBD_CONNECT_RDONLY;
-        }
-        spin_unlock(&imp->imp_lock);
-
         req = ptlrpc_request_alloc(imp, &RQF_MDS_SET_INFO);
         if (req == NULL)
                 RETURN(-ENOMEM);
         req = ptlrpc_request_alloc(imp, &RQF_MDS_SET_INFO);
         if (req == NULL)
                 RETURN(-ENOMEM);
@@ -1176,6 +1171,19 @@ int mdc_set_info_async(struct obd_export *exp,
                 RETURN(0);
         }
         if (KEY_IS(KEY_READ_ONLY)) {
                 RETURN(0);
         }
         if (KEY_IS(KEY_READ_ONLY)) {
+                if (vallen != sizeof(int))
+                        RETURN(-EINVAL);
+
+                spin_lock(&imp->imp_lock);
+                if (*((int *)val)) {
+                        imp->imp_connect_flags_orig |= OBD_CONNECT_RDONLY;
+                        imp->imp_connect_data.ocd_connect_flags |= OBD_CONNECT_RDONLY;
+                } else {
+                        imp->imp_connect_flags_orig &= ~OBD_CONNECT_RDONLY;
+                        imp->imp_connect_data.ocd_connect_flags &= ~OBD_CONNECT_RDONLY;
+                }
+                spin_unlock(&imp->imp_lock);
+
                 rc = do_set_info_async(exp, keylen, key, vallen, val, set);
                 RETURN(rc);
         }
                 rc = do_set_info_async(exp, keylen, key, vallen, val, set);
                 RETURN(rc);
         }
@@ -1195,9 +1203,13 @@ int mdc_set_info_async(struct obd_export *exp,
                 imp->imp_server_timeout = 1;
                 spin_unlock(&imp->imp_lock);
                 imp->imp_client->cli_request_portal = MDS_MDS_PORTAL;
                 imp->imp_server_timeout = 1;
                 spin_unlock(&imp->imp_lock);
                 imp->imp_client->cli_request_portal = MDS_MDS_PORTAL;
-                CDEBUG(D_OTHER|D_WARNING, "%s: timeout / 2\n", exp->exp_obd->obd_name);
+                CDEBUG(D_OTHER, "%s: timeout / 2\n", exp->exp_obd->obd_name);
                 RETURN(0);
         }
                 RETURN(0);
         }
+        if (KEY_IS(KEY_CHANGELOG_CLEAR)) {
+                rc = do_set_info_async(exp, keylen, key, vallen, val, set);
+                RETURN(rc);
+        }
 
         RETURN(rc);
 }
 
         RETURN(rc);
 }
@@ -1665,8 +1677,16 @@ static int mdc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
 
         rc = llog_setup(obd, olg, LLOG_LOVEA_REPL_CTXT, tgt, 0, NULL,
                         &llog_client_ops);
 
         rc = llog_setup(obd, olg, LLOG_LOVEA_REPL_CTXT, tgt, 0, NULL,
                         &llog_client_ops);
+        if (rc)
+                RETURN(rc);
+        ctxt = llog_get_context(obd, LLOG_LOVEA_REPL_CTXT);
+        llog_initiator_connect(ctxt);
+        llog_ctxt_put(ctxt);
+
+        rc = llog_setup(obd, olg, LLOG_CHANGELOG_REPL_CTXT, tgt, 0, NULL,
+                        &llog_client_ops);
         if (rc == 0) {
         if (rc == 0) {
-                ctxt = llog_get_context(obd, LLOG_LOVEA_REPL_CTXT);
+                ctxt = llog_group_get_ctxt(olg, LLOG_CHANGELOG_REPL_CTXT);
                 llog_initiator_connect(ctxt);
                 llog_ctxt_put(ctxt);
         }
                 llog_initiator_connect(ctxt);
                 llog_ctxt_put(ctxt);
         }
@@ -1684,6 +1704,10 @@ static int mdc_llog_finish(struct obd_device *obd, int count)
         if (ctxt)
                 rc = llog_cleanup(ctxt);
 
         if (ctxt)
                 rc = llog_cleanup(ctxt);
 
+        ctxt = llog_get_context(obd, LLOG_CHANGELOG_REPL_CTXT);
+        if (ctxt)
+                rc = llog_cleanup(ctxt);
+
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
@@ -1698,8 +1722,8 @@ static int mdc_process_config(struct obd_device *obd, obd_count len, void *buf)
         default:
                 rc = class_process_proc_param(PARAM_MDC, lvars.obd_vars,
                                               lcfg, obd);
         default:
                 rc = class_process_proc_param(PARAM_MDC, lvars.obd_vars,
                                               lcfg, obd);
-               if (rc > 0)
-                       rc = 0;
+                if (rc > 0)
+                        rc = 0;
                 break;
         }
         return(rc);
                 break;
         }
         return(rc);
index c47a0b5..9de0d80 100644 (file)
@@ -132,16 +132,10 @@ static int changelog_init_cb(struct llog_handle *llh, struct llog_rec_hdr *hdr,
         struct llog_changelog_rec *rec = (struct llog_changelog_rec *)hdr;
         ENTRY;
 
         struct llog_changelog_rec *rec = (struct llog_changelog_rec *)hdr;
         ENTRY;
 
-        if (!(llh->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)) {
-                CERROR("log is not plain\n");
-                RETURN(-EINVAL);
-        }
-        if (rec->cr_hdr.lrh_type != CHANGELOG_REC) {
-                CERROR("Not a changelog rec? %d\n", rec->cr_hdr.lrh_type);
-                RETURN(-EINVAL);
-        }
+        LASSERT(llh->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN);
+        LASSERT(rec->cr_hdr.lrh_type == CHANGELOG_REC);
 
 
-        CDEBUG(D_INODE,
+        CDEBUG(D_INFO,
                "seeing record at index %d/%d/"LPU64" t=%x %.*s in log "LPX64"\n",
                hdr->lrh_index, rec->cr_hdr.lrh_index, rec->cr_index,
                rec->cr_type, rec->cr_namelen, rec->cr_name,
                "seeing record at index %d/%d/"LPU64" t=%x %.*s in log "LPX64"\n",
                hdr->lrh_index, rec->cr_hdr.lrh_index, rec->cr_index,
                rec->cr_type, rec->cr_namelen, rec->cr_name,
@@ -151,32 +145,76 @@ static int changelog_init_cb(struct llog_handle *llh, struct llog_rec_hdr *hdr,
         RETURN(LLOG_PROC_BREAK);
 }
 
         RETURN(LLOG_PROC_BREAK);
 }
 
+static int changelog_user_init_cb(struct llog_handle *llh,
+                                  struct llog_rec_hdr *hdr, void *data)
+{
+        struct mdd_device *mdd = (struct mdd_device *)data;
+        struct llog_changelog_user_rec *rec =
+                (struct llog_changelog_user_rec *)hdr;
+        ENTRY;
+
+        LASSERT(llh->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN);
+        LASSERT(rec->cur_hdr.lrh_type == CHANGELOG_USER_REC);
+
+        CDEBUG(D_INFO, "seeing user at index %d/%d id=%d endrec="LPU64
+               " in log "LPX64"\n", hdr->lrh_index, rec->cur_hdr.lrh_index,
+               rec->cur_id, rec->cur_endrec, llh->lgh_id.lgl_oid);
+
+        spin_lock(&mdd->mdd_cl.mc_user_lock);
+        mdd->mdd_cl.mc_lastuser = rec->cur_id;
+        spin_unlock(&mdd->mdd_cl.mc_user_lock);
+
+        RETURN(LLOG_PROC_BREAK);
+}
+
+
 static int mdd_changelog_llog_init(struct mdd_device *mdd)
 {
         struct obd_device *obd = mdd2obd_dev(mdd);
         struct llog_ctxt *ctxt;
         int rc;
 
 static int mdd_changelog_llog_init(struct mdd_device *mdd)
 {
         struct obd_device *obd = mdd2obd_dev(mdd);
         struct llog_ctxt *ctxt;
         int rc;
 
+        /* Find last changelog entry number */
         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
         if (ctxt == NULL) {
         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
         if (ctxt == NULL) {
-                CERROR("no context\n");
+                CERROR("no changelog context\n");
                 return -EINVAL;
         }
         if (!ctxt->loc_handle) {
                 return -EINVAL;
         }
         if (!ctxt->loc_handle) {
-                CERROR("no handle\n");
+                llog_ctxt_put(ctxt);
                 return -EINVAL;
         }
                 return -EINVAL;
         }
+
         rc = llog_cat_reverse_process(ctxt->loc_handle, changelog_init_cb, mdd);
         llog_ctxt_put(ctxt);
 
         rc = llog_cat_reverse_process(ctxt->loc_handle, changelog_init_cb, mdd);
         llog_ctxt_put(ctxt);
 
-        if (rc < 0)
+        if (rc < 0) {
                 CERROR("changelog init failed: %d\n", rc);
                 CERROR("changelog init failed: %d\n", rc);
-        else
-                rc = 0; /* llog_proc_break is ok */
+                return rc;
+        }
+        CDEBUG(D_INODE, "changelog starting index="LPU64"\n",
+               mdd->mdd_cl.mc_index);
 
 
-        CDEBUG(D_INODE, "changelog_init index="LPU64"\n", mdd->mdd_cl.mc_index);
+        /* Find last changelog user id */
+        ctxt = llog_get_context(obd, LLOG_CHANGELOG_USER_ORIG_CTXT);
+        if (ctxt == NULL) {
+                CERROR("no changelog user context\n");
+                return -EINVAL;
+        }
+        if (!ctxt->loc_handle) {
+                llog_ctxt_put(ctxt);
+                return -EINVAL;
+        }
 
 
-        return rc;
+        rc = llog_cat_reverse_process(ctxt->loc_handle, changelog_user_init_cb,
+                                      mdd);
+        llog_ctxt_put(ctxt);
+
+        if (rc < 0) {
+                CERROR("changelog user init failed: %d\n", rc);
+                return rc;
+        }
+        return 0;
 }
 
 static int mdd_changelog_init(const struct lu_env *env, struct mdd_device *mdd)
 }
 
 static int mdd_changelog_init(const struct lu_env *env, struct mdd_device *mdd)
@@ -186,15 +224,18 @@ static int mdd_changelog_init(const struct lu_env *env, struct mdd_device *mdd)
         mdd->mdd_cl.mc_index = 0;
         spin_lock_init(&mdd->mdd_cl.mc_lock);
         cfs_waitq_init(&mdd->mdd_cl.mc_waitq);
         mdd->mdd_cl.mc_index = 0;
         spin_lock_init(&mdd->mdd_cl.mc_lock);
         cfs_waitq_init(&mdd->mdd_cl.mc_waitq);
-
         mdd->mdd_cl.mc_starttime = cfs_time_current_64();
         mdd->mdd_cl.mc_flags = 0; /* off by default */
         mdd->mdd_cl.mc_starttime = cfs_time_current_64();
         mdd->mdd_cl.mc_flags = 0; /* off by default */
-        mdd->mdd_cl.mc_mask = CL_DEFMASK;
+        mdd->mdd_cl.mc_mask = CHANGELOG_DEFMASK;
+        spin_lock_init(&mdd->mdd_cl.mc_user_lock);
+        mdd->mdd_cl.mc_lastuser = 0;
+
         rc = mdd_changelog_llog_init(mdd);
         if (rc) {
                 CERROR("Changelog setup during init failed %d\n", rc);
                 mdd->mdd_cl.mc_flags |= CLM_ERR;
         }
         rc = mdd_changelog_llog_init(mdd);
         if (rc) {
                 CERROR("Changelog setup during init failed %d\n", rc);
                 mdd->mdd_cl.mc_flags |= CLM_ERR;
         }
+
         return rc;
 }
 
         return rc;
 }
 
@@ -254,19 +295,41 @@ int mdd_changelog_llog_cancel(struct mdd_device *mdd, long long endrec)
 {
         struct obd_device *obd = mdd2obd_dev(mdd);
         struct llog_ctxt *ctxt;
 {
         struct obd_device *obd = mdd2obd_dev(mdd);
         struct llog_ctxt *ctxt;
+        long long unsigned cur;
         int rc;
 
         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
         if (ctxt == NULL)
                 return -ENXIO;
 
         int rc;
 
         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
         if (ctxt == NULL)
                 return -ENXIO;
 
-        /* Some records purged; reset repeat-access time */
+        spin_lock(&mdd->mdd_cl.mc_lock);
+        cur = (long long)mdd->mdd_cl.mc_index;
+        spin_unlock(&mdd->mdd_cl.mc_lock);
+        if (endrec > cur)
+                endrec = cur;
+
+        /* purge to "0" is shorthand for everything */
+        if (endrec == 0)
+                endrec = cur;
+
+        /* If purging all records, write a header entry so we don't have an
+           empty catalog and we're sure to have a valid starting index next
+           time.  In case of crash, we just restart with old log so we're
+           allright. */
+        if (endrec == cur) {
+                rc = mdd_changelog_write_header(mdd, CLM_PURGE);
+                if (rc)
+                      goto out;
+        }
+
+        /* Some records were purged, so reset repeat-access time (so we
+           record new mtime update records, so users can see a file has been
+           changed since the last purge) */
         mdd->mdd_cl.mc_starttime = cfs_time_current_64();
 
         rc = llog_cancel(ctxt, NULL, 1, (struct llog_cookie *)&endrec, 0);
         mdd->mdd_cl.mc_starttime = cfs_time_current_64();
 
         rc = llog_cancel(ctxt, NULL, 1, (struct llog_cookie *)&endrec, 0);
-
+out:
         llog_ctxt_put(ctxt);
         llog_ctxt_put(ctxt);
-
         return rc;
 }
 
         return rc;
 }
 
@@ -366,7 +429,7 @@ static int dot_lustre_mdd_open(const struct lu_env *env, struct md_object *obj,
 }
 
 static int dot_lustre_path(const struct lu_env *env, struct md_object *obj,
 }
 
 static int dot_lustre_path(const struct lu_env *env, struct md_object *obj,
-                           char *path, int pathlen, __u64 recno, int *linkno)
+                           char *path, int pathlen, __u64 *recno, int *linkno)
 {
         return -ENOSYS;
 }
 {
         return -ENOSYS;
 }
@@ -532,7 +595,7 @@ static int obf_mdd_readpage(const struct lu_env *env, struct md_object *obj,
 }
 
 static int obf_path(const struct lu_env *env, struct md_object *obj,
 }
 
 static int obf_path(const struct lu_env *env, struct md_object *obj,
-                    char *path, int pathlen, __u64 recno, int *linkno)
+                    char *path, int pathlen, __u64 *recno, int *linkno)
 {
         return -ENOSYS;
 }
 {
         return -ENOSYS;
 }
@@ -917,6 +980,15 @@ static int mdd_update_capa_key(const struct lu_env *env,
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
+static int mdd_llog_ctxt_get(const struct lu_env *env, struct md_device *m,
+                             int idx, void **h)
+{
+        struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
+
+        *h = llog_group_get_ctxt(&mdd2obd_dev(mdd)->obd_olg, idx);
+        return (*h == NULL ? -ENOENT : 0);
+}
+
 static struct lu_device *mdd_device_alloc(const struct lu_env *env,
                                           struct lu_device_type *t,
                                           struct lustre_cfg *lcfg)
 static struct lu_device *mdd_device_alloc(const struct lu_env *env,
                                           struct lu_device_type *t,
                                           struct lustre_cfg *lcfg)
@@ -992,6 +1064,202 @@ struct md_capainfo *md_capainfo(const struct lu_env *env)
 }
 EXPORT_SYMBOL(md_capainfo);
 
 }
 EXPORT_SYMBOL(md_capainfo);
 
+static int mdd_changelog_user_register(struct mdd_device *mdd, int *id)
+{
+        struct llog_ctxt *ctxt;
+        struct llog_changelog_user_rec *rec;
+        int rc;
+        ENTRY;
+
+        ctxt = llog_get_context(mdd2obd_dev(mdd),LLOG_CHANGELOG_USER_ORIG_CTXT);
+        if (ctxt == NULL)
+                RETURN(-ENXIO);
+
+        OBD_ALLOC_PTR(rec);
+        if (rec == NULL) {
+                llog_ctxt_put(ctxt);
+                RETURN(-ENOMEM);
+        }
+
+        rec->cur_hdr.lrh_len = sizeof(*rec);
+        rec->cur_hdr.lrh_type = CHANGELOG_USER_REC;
+        rec->cur_endrec = 0ULL;
+        spin_lock(&mdd->mdd_cl.mc_user_lock);
+        if (mdd->mdd_cl.mc_lastuser == (unsigned int)(-1)) {
+                spin_unlock(&mdd->mdd_cl.mc_user_lock);
+                CERROR("Maximum number of changelog users exceeded!\n");
+                GOTO(out, rc = -EOVERFLOW);
+        }
+        *id = rec->cur_id = ++mdd->mdd_cl.mc_lastuser;
+        spin_unlock(&mdd->mdd_cl.mc_user_lock);
+        rc = llog_add(ctxt, &rec->cur_hdr, NULL, NULL, 0);
+
+        CDEBUG(D_INODE, "Registered changelog user %d\n", *id);
+out:
+        OBD_FREE_PTR(rec);
+        llog_ctxt_put(ctxt);
+        RETURN(rc);
+}
+
+struct mdd_changelog_user_data {
+        __u64 mcud_endrec; /**< purge record for this user */
+        __u64 mcud_minrec; /**< lowest changelog recno still referenced */
+        __u32 mcud_id;
+        __u32 mcud_minid;  /**< user id with lowest rec reference */
+        int   mcud_found:1;
+};
+
+/** Two things:
+ * 1. Find the smallest record everyone is willing to purge
+ * 2. Update the last purgeable record for this user
+ */
+static int mdd_changelog_user_purge_cb(struct llog_handle *llh,
+                                       struct llog_rec_hdr *hdr, void *data)
+{
+        struct llog_changelog_user_rec *rec;
+        struct mdd_changelog_user_data *mcud =
+                (struct mdd_changelog_user_data *)data;
+        int rc;
+        ENTRY;
+
+        LASSERT(llh->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN);
+
+        rec = (struct llog_changelog_user_rec *)hdr;
+
+        /* If we have a new endrec for this id, use it for the min check */
+        if (rec->cur_id == mcud->mcud_id)
+                rec->cur_endrec = max(rec->cur_endrec, mcud->mcud_endrec);
+
+        /* Track the minimum referenced record */
+        if (mcud->mcud_minid == 0 || mcud->mcud_minrec > rec->cur_endrec) {
+                mcud->mcud_minid = rec->cur_id;
+                mcud->mcud_minrec = rec->cur_endrec;
+        }
+
+        if (rec->cur_id != mcud->mcud_id)
+                RETURN(0);
+
+        /* Update this user's record */
+        mcud->mcud_found = 1;
+
+        /* Special case: unregister this user if endrec == -1 */
+        if (mcud->mcud_endrec == -1) {
+                struct llog_cookie cookie;
+                cookie.lgc_lgl = llh->lgh_id;
+                cookie.lgc_index = hdr->lrh_index;
+                rc = llog_cat_cancel_records(llh->u.phd.phd_cat_handle,
+                                             1, &cookie);
+                RETURN(rc);
+        }
+
+        /* Update the endrec */
+        CDEBUG(D_IOCTL, "Rewriting changelog user %d endrec to "LPU64"\n",
+               mcud->mcud_id, rec->cur_endrec);
+
+        /* hdr+1 is loc of data */
+        hdr->lrh_len -= sizeof(*hdr) + sizeof(struct llog_rec_tail);
+        rc = llog_write_rec(llh, hdr, NULL, 0, (void *)(hdr + 1),
+                            hdr->lrh_index);
+
+        RETURN(rc);
+}
+
+static int mdd_changelog_user_purge(struct mdd_device *mdd, int id,
+                                    long long endrec)
+{
+        struct mdd_changelog_user_data data;
+        struct llog_ctxt *ctxt;
+        int rc;
+        ENTRY;
+
+        CDEBUG(D_IOCTL, "Purge request: id=%d, endrec="LPD64"\n", id, endrec);
+
+        ctxt = llog_get_context(mdd2obd_dev(mdd),LLOG_CHANGELOG_USER_ORIG_CTXT);
+        if (ctxt == NULL)
+                return -ENXIO;
+        LASSERT(ctxt->loc_handle->lgh_hdr->llh_flags & LLOG_F_IS_CAT);
+
+        data.mcud_id = id;
+        data.mcud_endrec = endrec;
+        data.mcud_minid = 0;
+        data.mcud_minrec = 0;
+        rc = llog_cat_process(ctxt->loc_handle, mdd_changelog_user_purge_cb,
+                              (void *)&data, 0, 0);
+        if ((rc >= 0) && (data.mcud_minrec > 0)) {
+                CDEBUG(D_INODE, "Purging CL entries up to "LPD64
+                       ", referenced by "CHANGELOG_USER_PREFIX"%d\n",
+                       data.mcud_minrec, data.mcud_minid);
+                rc = mdd_changelog_llog_cancel(mdd, data.mcud_minrec);
+        } else {
+                CWARN("Could not determine changelog records to purge; rc=%d\n",
+                      rc);
+        }
+
+        if (!data.mcud_found) {
+                CWARN("No entry for user %d.  Last changelog reference is "
+                      LPD64" by changelog user %d\n", data.mcud_id,
+                      data.mcud_minrec, data.mcud_minid);
+               rc = -ENOENT;
+        }
+
+        llog_ctxt_put(ctxt);
+        RETURN (rc);
+}
+
+/** mdd_iocontrol
+ * May be called remotely from mdt_iocontrol_handle or locally from
+ * mdt_iocontrol. Data may be freeform - remote handling doesn't enforce or
+ * swab an obd_ioctl_data format (but local ioctl handler does).
+ * \param cmd - ioc
+ * \param len - data len
+ * \param karg - ioctl data, in kernel space
+ */
+static int mdd_iocontrol(const struct lu_env *env, struct md_device *m,
+                         unsigned int cmd, int len, void *karg)
+{
+        struct mdd_device *mdd;
+        struct obd_ioctl_data *data = karg;
+        int rc;
+        ENTRY;
+
+        mdd = lu2mdd_dev(&m->md_lu_dev);
+
+        /* Doesn't use obd_ioctl_data */
+        if (cmd == OBD_IOC_CHANGELOG_CLEAR) {
+                struct changelog_setinfo *cs = karg;
+                if (len != sizeof(*cs)) {
+                        CERROR("Bad changelog_clear ioctl size %d\n", len);
+                        RETURN(-EINVAL);
+                }
+                rc = mdd_changelog_user_purge(mdd, cs->cs_id, cs->cs_recno);
+                RETURN(rc);
+        }
+
+        /* Below ioctls use obd_ioctl_data */
+        if (len != sizeof(*data)) {
+                CERROR("Bad ioctl size %d\n", len);
+                RETURN(-EINVAL);
+        }
+        if (data->ioc_version != OBD_IOCTL_VERSION) {
+                CERROR("Bad magic %x != %x\n", data->ioc_version,
+                       OBD_IOCTL_VERSION);
+                RETURN(-EINVAL);
+        }
+
+        switch (cmd) {
+        case OBD_IOC_CHANGELOG_REG:
+                rc = mdd_changelog_user_register(mdd, &data->ioc_u32_1);
+                break;
+        case OBD_IOC_CHANGELOG_DEREG:
+                rc = mdd_changelog_user_purge(mdd, data->ioc_u32_1, -1);
+                break;
+        default:
+                rc = -EOPNOTSUPP;
+        }
+
+        RETURN (rc);
+}
+
 /* type constructor/destructor: mdd_type_init, mdd_type_fini */
 LU_TYPE_INIT_FINI(mdd, &mdd_thread_key, &mdd_ucred_key, &mdd_capainfo_key);
 
 /* type constructor/destructor: mdd_type_init, mdd_type_fini */
 LU_TYPE_INIT_FINI(mdd, &mdd_thread_key, &mdd_ucred_key, &mdd_capainfo_key);
 
@@ -1001,6 +1269,8 @@ const struct md_device_operations mdd_ops = {
         .mdo_maxsize_get    = mdd_maxsize_get,
         .mdo_init_capa_ctxt = mdd_init_capa_ctxt,
         .mdo_update_capa_key= mdd_update_capa_key,
         .mdo_maxsize_get    = mdd_maxsize_get,
         .mdo_init_capa_ctxt = mdd_init_capa_ctxt,
         .mdo_update_capa_key= mdd_update_capa_key,
+        .mdo_llog_ctxt_get  = mdd_llog_ctxt_get,
+        .mdo_iocontrol      = mdd_iocontrol,
 #ifdef HAVE_QUOTA_SUPPORT
         .mdo_quota          = {
                 .mqo_notify      = mdd_quota_notify,
 #ifdef HAVE_QUOTA_SUPPORT
         .mdo_quota          = {
                 .mqo_notify      = mdd_quota_notify,
index 6785411..4c6bb0e 100644 (file)
@@ -108,6 +108,8 @@ struct mdd_changelog {
         int                              mc_mask;
         __u64                            mc_index;
         __u64                            mc_starttime;
         int                              mc_mask;
         __u64                            mc_index;
         __u64                            mc_starttime;
+        spinlock_t                       mc_user_lock;
+        int                              mc_lastuser;
 };
 
 /** Objects in .lustre dir */
 };
 
 /** Objects in .lustre dir */
index 4cdc048..1e5d86b 100644 (file)
@@ -67,9 +67,6 @@
 static const char *mdd_counter_names[LPROC_MDD_NR] = {
 };
 
 static const char *mdd_counter_names[LPROC_MDD_NR] = {
 };
 
-/* from LPROC_SEQ_FOPS(mdd_changelog) below */
-extern struct file_operations mdd_changelog_fops;
-
 int mdd_procfs_init(struct mdd_device *mdd, const char *name)
 {
         struct lprocfs_static_vars lvars;
 int mdd_procfs_init(struct mdd_device *mdd, const char *name)
 {
         struct lprocfs_static_vars lvars;
@@ -161,10 +158,9 @@ static int lprocfs_rd_atime_diff(char *page, char **start, off_t off,
         return snprintf(page, count, "%lu\n", mdd->mdd_atime_diff);
 }
 
         return snprintf(page, count, "%lu\n", mdd->mdd_atime_diff);
 }
 
-/* match enum changelog_rec_type */
-static const char *changelog_str[] = {"MARK","CREAT","MKDIR","HLINK","SLINK",
-        "MKNOD","UNLNK","RMDIR","RNMFM","RNMTO","OPEN","CLOSE","IOCTL",
-        "TRUNC","SATTR","XATTR"};
+
+/**** changelogs ****/
+DECLARE_CHANGELOG_NAMES;
 
 const char *changelog_bit2str(int bit)
 {
 
 const char *changelog_bit2str(int bit)
 {
@@ -173,8 +169,8 @@ const char *changelog_bit2str(int bit)
         return NULL;
 }
 
         return NULL;
 }
 
-static int lprocfs_rd_cl_mask(char *page, char **start, off_t off,
-                              int count, int *eof, void *data)
+static int lprocfs_rd_changelog_mask(char *page, char **start, off_t off,
+                                     int count, int *eof, void *data)
 {
         struct mdd_device *mdd = data;
         int i = 0, rc = 0;
 {
         struct mdd_device *mdd = data;
         int i = 0, rc = 0;
@@ -189,8 +185,8 @@ static int lprocfs_rd_cl_mask(char *page, char **start, off_t off,
         return rc;
 }
 
         return rc;
 }
 
-static int lprocfs_wr_cl_mask(struct file *file, const char *buffer,
-                              unsigned long count, void *data)
+static int lprocfs_wr_changelog_mask(struct file *file, const char *buffer,
+                                     unsigned long count, void *data)
 {
         struct mdd_device *mdd = data;
         char *kernbuf;
 {
         struct mdd_device *mdd = data;
         char *kernbuf;
@@ -206,8 +202,8 @@ static int lprocfs_wr_cl_mask(struct file *file, const char *buffer,
                 GOTO(out, rc = -EFAULT);
         kernbuf[count] = 0;
 
                 GOTO(out, rc = -EFAULT);
         kernbuf[count] = 0;
 
-        rc = libcfs_str2mask(kernbuf, changelog_bit2str,
-                             &mdd->mdd_cl.mc_mask, CL_MINMASK, CL_ALLMASK);
+        rc = libcfs_str2mask(kernbuf, changelog_bit2str, &mdd->mdd_cl.mc_mask,
+                             CHANGELOG_MINMASK, CHANGELOG_ALLMASK);
         if (rc == 0)
                 rc = count;
 out:
         if (rc == 0)
                 rc = count;
 out:
@@ -215,21 +211,70 @@ out:
         return rc;
 }
 
         return rc;
 }
 
-/** struct for holding changelog data for seq_file processing */
-struct cl_seq_iter {
-        struct mdd_device *csi_mdd;
-        __u64 csi_startrec;
-        __u64 csi_endrec;
-        loff_t csi_pos;
-        int csi_wrote;
-        int csi_startcat;
-        int csi_startidx;
-        int csi_fill:1;
+struct cucb_data {
+        char *page;
+        int count;
+        int idx;
 };
 
 };
 
+static int lprocfs_changelog_users_cb(struct llog_handle *llh,
+                                      struct llog_rec_hdr *hdr, void *data)
+{
+        struct llog_changelog_user_rec *rec;
+        struct cucb_data *cucb = (struct cucb_data *)data;
+
+        LASSERT(llh->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN);
+
+        rec = (struct llog_changelog_user_rec *)hdr;
+
+        cucb->idx += snprintf(cucb->page + cucb->idx, cucb->count - cucb->idx,
+                              CHANGELOG_USER_PREFIX"%-3d "LPU64"\n",
+                              rec->cur_id, rec->cur_endrec);
+        if (cucb->idx >= cucb->count)
+                return -ENOSPC;
+
+        return 0;
+}
+
+static int lprocfs_rd_changelog_users(char *page, char **start, off_t off,
+                                      int count, int *eof, void *data)
+{
+        struct mdd_device *mdd = data;
+        struct llog_ctxt *ctxt;
+        struct cucb_data cucb;
+        __u64 cur;
+
+        *eof = 1;
+
+        ctxt = llog_get_context(mdd2obd_dev(mdd),LLOG_CHANGELOG_USER_ORIG_CTXT);
+        if (ctxt == NULL)
+                return -ENXIO;
+        LASSERT(ctxt->loc_handle->lgh_hdr->llh_flags & LLOG_F_IS_CAT);
+
+        spin_lock(&mdd->mdd_cl.mc_lock);
+        cur = mdd->mdd_cl.mc_index;
+        spin_unlock(&mdd->mdd_cl.mc_lock);
+
+        cucb.count = count;
+        cucb.page = page;
+        cucb.idx = 0;
+
+        cucb.idx += snprintf(cucb.page + cucb.idx, cucb.count - cucb.idx,
+                              "current index: "LPU64"\n", cur);
+
+        cucb.idx += snprintf(cucb.page + cucb.idx, cucb.count - cucb.idx,
+                              "%-5s %s\n", "ID", "index");
+
+        llog_cat_process(ctxt->loc_handle, lprocfs_changelog_users_cb,
+                         &cucb, 0, 0);
+
+        llog_ctxt_put(ctxt);
+        return cucb.idx;
+}
+
 /* non-seq version for direct calling by class_process_proc_param */
 /* non-seq version for direct calling by class_process_proc_param */
-static int lprocfs_wr_cl(struct file *file, const char *buffer,
-                         unsigned long count, void *data)
+static int mdd_changelog_write(struct file *file, const char *buffer,
+                               unsigned long count, void *data)
 {
         struct mdd_device *mdd = (struct mdd_device *)data;
         char kernbuf[32];
 {
         struct mdd_device *mdd = (struct mdd_device *)data;
         char kernbuf[32];
@@ -271,32 +316,11 @@ static int lprocfs_wr_cl(struct file *file, const char *buffer,
                 spin_unlock(&mdd->mdd_cl.mc_lock);
         } else {
                 /* purge to an index */
                 spin_unlock(&mdd->mdd_cl.mc_lock);
         } else {
                 /* purge to an index */
-                long long unsigned endrec, cur;
+                long long unsigned endrec;
 
 
-                spin_lock(&mdd->mdd_cl.mc_lock);
-                cur = (long long)mdd->mdd_cl.mc_index;
-                spin_unlock(&mdd->mdd_cl.mc_lock);
-
-                if (strcmp(kernbuf, "0") == 0)
-                        /* purge to "0" is shorthand for everything */
-                        endrec = cur;
-                else
-                        endrec = (long long)simple_strtoull(kernbuf, &end, 0);
-                if ((kernbuf == end) || (endrec == 0))
+                endrec = (long long)simple_strtoull(kernbuf, &end, 0);
+                if (end == kernbuf)
                         goto out_usage;
                         goto out_usage;
-                if (endrec > cur)
-                        endrec = cur;
-
-                /* If purging all records, write a header entry so we
-                   don't have an empty catalog and
-                   we're sure to have a valid starting index next time.  In
-                   case of crash, we just restart with old log so we're
-                   allright. */
-                if (endrec == cur) {
-                        rc = mdd_changelog_write_header(mdd, CLM_PURGE);
-                        if (rc)
-                              return rc;
-                }
 
                 LCONSOLE_INFO("changelog purge to %llu\n", endrec);
 
 
                 LCONSOLE_INFO("changelog purge to %llu\n", endrec);
 
@@ -312,303 +336,103 @@ out_usage:
         return -EINVAL;
 }
 
         return -EINVAL;
 }
 
-static ssize_t mdd_cl_seq_write(struct file *file, const char *buffer,
-                                size_t count, loff_t *off)
+static ssize_t mdd_changelog_seq_write(struct file *file, const char *buffer,
+                                       size_t count, loff_t *off)
 {
         struct seq_file *seq = file->private_data;
 {
         struct seq_file *seq = file->private_data;
-        struct cl_seq_iter *csi = seq->private;
-        struct mdd_device *mdd = csi->csi_mdd;
-
-        return lprocfs_wr_cl(file, buffer, count, mdd);
-}
-
-#define D_CL 0
-
-/* How many records per seq_show.  Too small, we spawn llog_process threads
-   too often; too large, we run out of buffer space */
-#define CL_CHUNK_SIZE 100
-
-static int changelog_show_cb(struct llog_handle *llh, struct llog_rec_hdr *hdr,
-                             void *data)
-{
-        struct seq_file *seq = (struct seq_file *)data;
-        struct cl_seq_iter *csi = seq->private;
-        struct llog_changelog_rec *rec = (struct llog_changelog_rec *)hdr;
-        int rc;
-        ENTRY;
-
-        if ((rec->cr_hdr.lrh_type != CHANGELOG_REC) ||
-            (rec->cr_type >= CL_LAST)) {
-                CERROR("Not a changelog rec? %d/%d\n", rec->cr_hdr.lrh_type,
-                       rec->cr_type);
-                RETURN(-EINVAL);
-        }
-
-        CDEBUG(D_CL, "rec="LPU64" start="LPU64" cat=%d:%d start=%d:%d\n",
-               rec->cr_index, csi->csi_startrec,
-               llh->lgh_hdr->llh_cat_idx, llh->lgh_cur_idx,
-               csi->csi_startcat, csi->csi_startidx);
-
-        if (rec->cr_index < csi->csi_startrec)
-                RETURN(0);
-        if (rec->cr_index == csi->csi_startrec) {
-                /* Remember where we started, since seq_read will re-read
-                 * the data when it reallocs space.  Sigh, if only there was
-                 * a way to tell seq_file how big the buf should be in the
-                 * first place... */
-                csi->csi_startcat = llh->lgh_hdr->llh_cat_idx;
-                csi->csi_startidx = rec->cr_hdr.lrh_index - 1;
-        }
-        if (csi->csi_wrote > CL_CHUNK_SIZE) {
-                /* Stop at some point with a reasonable seq_file buffer size.
-                 * Start from here the next time.
-                 */
-                csi->csi_endrec = rec->cr_index - 1;
-                csi->csi_startcat = llh->lgh_hdr->llh_cat_idx;
-                csi->csi_startidx = rec->cr_hdr.lrh_index - 1;
-                csi->csi_wrote = 0;
-                RETURN(LLOG_PROC_BREAK);
-        }
-
-        rc = seq_printf(seq, LPU64" %02d%-5s "LPU64" 0x%x t="DFID,
-                        rec->cr_index, rec->cr_type,
-                        changelog_str[rec->cr_type], rec->cr_time,
-                        rec->cr_flags & CLF_FLAGMASK, PFID(&rec->cr_tfid));
-
-        if (rec->cr_namelen)
-                /* namespace rec includes parent and filename */
-                rc += seq_printf(seq, " p="DFID" %.*s\n", PFID(&rec->cr_pfid),
-                                 rec->cr_namelen, rec->cr_name);
-        else
-                rc += seq_puts(seq, "\n");
-
-        if (rc < 0) {
-                /* seq_read will dump the whole buffer and re-seq_start with a
-                   larger one; no point in continuing the llog_process */
-                CDEBUG(D_CL, "rec="LPU64" overflow "LPU64"<-"LPU64"\n",
-                       rec->cr_index, csi->csi_startrec, csi->csi_endrec);
-                csi->csi_endrec = csi->csi_startrec - 1;
-                csi->csi_wrote = 0;
-                RETURN(LLOG_PROC_BREAK);
-        }
-
-        csi->csi_wrote++;
-        csi->csi_endrec = rec->cr_index;
-
-        RETURN(0);
-}
-
-static int mdd_cl_seq_show(struct seq_file *seq, void *v)
-{
-        struct cl_seq_iter *csi = seq->private;
-        struct obd_device *obd = mdd2obd_dev(csi->csi_mdd);
-        struct llog_ctxt *ctxt;
-        int rc;
-
-        if (csi->csi_fill) {
-                /* seq_read wants more data to fill his buffer. But we already
-                   filled the buf as much as we cared to; force seq_read to
-                   accept that. */
-                while ((rc = seq_putc(seq, 0)) == 0);
-                return 0;
-        }
-
-        ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
-        if (ctxt == NULL)
-                return -ENOENT;
-
-        /* Since we have to restart the llog_cat_process for each chunk of the
-           seq_ functions, start from where we left off. */
-        rc = llog_cat_process(ctxt->loc_handle, changelog_show_cb, seq,
-                              csi->csi_startcat, csi->csi_startidx);
+        struct changelog_seq_iter *csi = seq->private;
+        struct mdd_device *mdd = (struct mdd_device *)csi->csi_dev;
 
 
-        CDEBUG(D_CL, "seq_show "LPU64"-"LPU64" cat=%d:%d wrote=%d rc=%d\n",
-               csi->csi_startrec, csi->csi_endrec, csi->csi_startcat,
-               csi->csi_startidx, csi->csi_wrote, rc);
-
-        llog_ctxt_put(ctxt);
-
-        if (rc == LLOG_PROC_BREAK)
-                rc = 0;
-
-        return rc;
+        return mdd_changelog_write(file, buffer, count, mdd);
 }
 
 }
 
-static int mdd_cl_done(struct cl_seq_iter *csi)
+static int mdd_changelog_done(struct changelog_seq_iter *csi)
 {
 {
+        struct mdd_device *mdd = (struct mdd_device *)csi->csi_dev;
         int done = 0;
         int done = 0;
-        spin_lock(&csi->csi_mdd->mdd_cl.mc_lock);
-        done = (csi->csi_endrec >= csi->csi_mdd->mdd_cl.mc_index);
-        spin_unlock(&csi->csi_mdd->mdd_cl.mc_lock);
+
+        spin_lock(&mdd->mdd_cl.mc_lock);
+        done = (csi->csi_endrec >= mdd->mdd_cl.mc_index);
+        spin_unlock(&mdd->mdd_cl.mc_lock);
         return done;
 }
 
         return done;
 }
 
-
-static void *mdd_cl_seq_start(struct seq_file *seq, loff_t *pos)
+/* handle nonblocking */
+static ssize_t mdd_changelog_seq_read(struct file *file, char __user *buf,
+                                      size_t count, loff_t *ppos)
 {
 {
-        struct cl_seq_iter *csi = seq->private;
-        LASSERT(csi);
-
-        CDEBUG(D_CL, "start "LPU64"-"LPU64" pos="LPU64"\n",
-               csi->csi_startrec, csi->csi_endrec, *pos);
-
-        csi->csi_fill = 0;
-
-        if (mdd_cl_done(csi))
-                /* no more records, seq_read should return 0 if buffer
-                   is empty */
-                return NULL;
-
-        if (*pos > csi->csi_pos) {
-                /* The seq_read implementation sucks.  It may call start
-                   multiple times, using pos to indicate advances, if any,
-                   by arbitrarily increasing it by 1. So ignore the actual
-                   value of pos, and just register any increase as
-                   "seq_read wants the next values". */
-                csi->csi_startrec = csi->csi_endrec + 1;
-                csi->csi_pos = *pos;
-        }
-        /* else use old startrec/startidx */
-
-        return csi;
-}
+        struct seq_file *seq = (struct seq_file *)file->private_data;
+        struct changelog_seq_iter *csi = seq->private;
+        int rc;
+        ENTRY;
 
 
-static void mdd_cl_seq_stop(struct seq_file *seq, void *v)
-{
-        struct cl_seq_iter *csi = seq->private;
+        if ((file->f_flags & O_NONBLOCK) && mdd_changelog_done(csi))
+                RETURN(-EAGAIN);
 
 
-        CDEBUG(D_CL, "stop "LPU64"-"LPU64"\n",
-               csi->csi_startrec, csi->csi_endrec);
+        csi->csi_done = 0;
+        rc = seq_read(file, buf, count, ppos);
+        RETURN(rc);
 }
 
 }
 
-static void *mdd_cl_seq_next(struct seq_file *seq, void *v, loff_t *pos)
+/* handle nonblocking */
+static unsigned int mdd_changelog_seq_poll(struct file *file, poll_table *wait)
 {
 {
-        struct cl_seq_iter *csi = seq->private;
-
-        CDEBUG(D_CL, "next "LPU64"-"LPU64" pos="LPU64"\n",
-               csi->csi_startrec, csi->csi_endrec, *pos);
+        struct seq_file *seq = (struct seq_file *)file->private_data;
+        struct changelog_seq_iter *csi = seq->private;
+        struct mdd_device *mdd = (struct mdd_device *)csi->csi_dev;
+        ENTRY;
 
 
-        csi->csi_fill = 1;
+        csi->csi_done = 0;
+        poll_wait(file, &mdd->mdd_cl.mc_waitq, wait);
+        if (!mdd_changelog_done(csi))
+                RETURN(POLLIN | POLLRDNORM);
 
 
-        return csi;
+        RETURN(0);
 }
 
 }
 
-struct seq_operations mdd_cl_sops = {
-        .start = mdd_cl_seq_start,
-        .stop = mdd_cl_seq_stop,
-        .next = mdd_cl_seq_next,
-        .show = mdd_cl_seq_show,
-};
-
-static int mdd_cl_seq_open(struct inode *inode, struct file *file)
+static int mdd_changelog_seq_open(struct inode *inode, struct file *file)
 {
 {
-        struct cl_seq_iter *csi;
-        struct proc_dir_entry *dp = PDE(inode);
-        struct seq_file *seq;
+        struct changelog_seq_iter *csi;
+        struct obd_device *obd;
         int rc;
         int rc;
+        ENTRY;
 
 
-        LPROCFS_ENTRY_AND_CHECK(dp);
-
-        rc = seq_open(file, &mdd_cl_sops);
+        rc = changelog_seq_open(inode, file, &csi);
         if (rc)
         if (rc)
-                goto out;
-
-        OBD_ALLOC_PTR(csi);
-        if (csi == NULL) {
-                rc = -ENOMEM;
-                goto out;
+                RETURN(rc);
+
+        /* The proc file is set up with mdd in data, not obd */
+        obd = mdd2obd_dev((struct mdd_device *)csi->csi_dev);
+        csi->csi_ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
+        if (csi->csi_ctxt == NULL) {
+                changelog_seq_release(inode, file);
+                RETURN(-ENOENT);
         }
         }
-        csi->csi_mdd = dp->data;
-        seq = file->private_data;
-        seq->private = csi;
-
-out:
-        if (rc)
-                LPROCFS_EXIT();
-        return rc;
+        /* The handle is set up in llog_obd_origin_setup */
+        csi->csi_llh = csi->csi_ctxt->loc_handle;
+        RETURN(rc);
 }
 
 }
 
-static int mdd_cl_seq_release(struct inode *inode, struct file *file)
+static int mdd_changelog_seq_release(struct inode *inode, struct file *file)
 {
         struct seq_file *seq = file->private_data;
 {
         struct seq_file *seq = file->private_data;
-        struct cl_seq_iter *csi = seq->private;
-
-        OBD_FREE_PTR(csi);
-
-        return lprocfs_seq_release(inode, file);
-}
-
-static loff_t mdd_cl_seq_lseek(struct file *file, loff_t offset, int origin)
-{
-        struct seq_file *seq = (struct seq_file *)file->private_data;
-        struct cl_seq_iter *csi = seq->private;
-
-        CDEBUG(D_CL, "seek "LPU64"-"LPU64" off="LPU64":%d fpos="LPU64"\n",
-               csi->csi_startrec, csi->csi_endrec, offset, origin, file->f_pos);
-
-        LL_SEQ_LOCK(seq);
-
-        switch (origin) {
-                case SEEK_CUR:
-                        offset += csi->csi_endrec;
-                        break;
-                case SEEK_END:
-                        spin_lock(&csi->csi_mdd->mdd_cl.mc_lock);
-                        offset += csi->csi_mdd->mdd_cl.mc_index;
-                        spin_unlock(&csi->csi_mdd->mdd_cl.mc_lock);
-                        break;
-        }
-
-        /* SEEK_SET */
+        struct changelog_seq_iter *csi = seq->private;
 
 
-        if (offset < 0) {
-                LL_SEQ_UNLOCK(seq);
-                return -EINVAL;
-        }
-
-        csi->csi_startrec = offset;
-        csi->csi_endrec = offset ? offset - 1 : 0;
-
-        /* drop whatever is left in sucky seq_read's buffer */
-        seq->count = 0;
-        seq->from = 0;
-        seq->index++;
-        LL_SEQ_UNLOCK(seq);
-        file->f_pos = csi->csi_startrec;
-        return csi->csi_startrec;
-}
-
-static ssize_t mdd_cl_seq_read(struct file *file, char __user *buf,
-                               size_t count, loff_t *ppos)
-{
-        struct seq_file *seq = (struct seq_file *)file->private_data;
-        struct cl_seq_iter *csi = seq->private;
-
-        if ((file->f_flags & O_NONBLOCK) && mdd_cl_done(csi))
-                return -EAGAIN;
-        return seq_read(file, buf, count, ppos);
-}
+        if (csi && csi->csi_ctxt)
+                llog_ctxt_put(csi->csi_ctxt);
 
 
-static unsigned int mdd_cl_seq_poll(struct file *file, poll_table *wait)
-{   /* based on kmsg_poll */
-        struct seq_file *seq = (struct seq_file *)file->private_data;
-        struct cl_seq_iter *csi = seq->private;
-
-        poll_wait(file, &csi->csi_mdd->mdd_cl.mc_waitq, wait);
-        if (!mdd_cl_done(csi))
-                return POLLIN | POLLRDNORM;
-
-        return 0;
+        return (changelog_seq_release(inode, file));
 }
 
 }
 
+/* mdd changelog proc can handle nonblocking ops and writing to purge recs */
 struct file_operations mdd_changelog_fops = {
         .owner   = THIS_MODULE,
 struct file_operations mdd_changelog_fops = {
         .owner   = THIS_MODULE,
-        .open    = mdd_cl_seq_open,
-        .read    = mdd_cl_seq_read,
-        .write   = mdd_cl_seq_write,
-        .llseek  = mdd_cl_seq_lseek,
-        .poll    = mdd_cl_seq_poll,
-        .release = mdd_cl_seq_release,
+        .open    = mdd_changelog_seq_open,
+        .read    = mdd_changelog_seq_read,
+        .write   = mdd_changelog_seq_write,
+        .llseek  = changelog_seq_lseek,
+        .poll    = mdd_changelog_seq_poll,
+        .release = mdd_changelog_seq_release,
 };
 
 #ifdef HAVE_QUOTA_SUPPORT
 };
 
 #ifdef HAVE_QUOTA_SUPPORT
@@ -629,9 +453,11 @@ static int mdd_lprocfs_quota_wr_type(struct file *file, const char *buffer,
 #endif
 
 static struct lprocfs_vars lprocfs_mdd_obd_vars[] = {
 #endif
 
 static struct lprocfs_vars lprocfs_mdd_obd_vars[] = {
-        { "atime_diff", lprocfs_rd_atime_diff, lprocfs_wr_atime_diff, 0 },
-        { "changelog_mask", lprocfs_rd_cl_mask, lprocfs_wr_cl_mask, 0 },
-        { "changelog", 0, lprocfs_wr_cl, 0, &mdd_changelog_fops, 0600 },
+        { "atime_diff",      lprocfs_rd_atime_diff, lprocfs_wr_atime_diff, 0 },
+        { "changelog_mask",  lprocfs_rd_changelog_mask,
+                             lprocfs_wr_changelog_mask, 0 },
+        { "changelog_users", lprocfs_rd_changelog_users, 0, 0},
+        { "changelog", 0, mdd_changelog_write, 0, &mdd_changelog_fops, 0600 },
 #ifdef HAVE_QUOTA_SUPPORT
         { "quota_type",      mdd_lprocfs_quota_rd_type,
                              mdd_lprocfs_quota_wr_type, 0 },
 #ifdef HAVE_QUOTA_SUPPORT
         { "quota_type",      mdd_lprocfs_quota_rd_type,
                              mdd_lprocfs_quota_wr_type, 0 },
index c6bc349..e4f917b 100644 (file)
@@ -389,6 +389,7 @@ out:
 /** mdd_path() lookup structure. */
 struct path_lookup_info {
         __u64                pli_recno;        /**< history point */
 /** mdd_path() lookup structure. */
 struct path_lookup_info {
         __u64                pli_recno;        /**< history point */
+        __u64                pli_currec;       /**< current record */
         struct lu_fid        pli_fid;
         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
         struct mdd_object   *pli_mdd_obj;
         struct lu_fid        pli_fid;
         struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
         struct mdd_object   *pli_mdd_obj;
@@ -477,7 +478,12 @@ static int mdd_path_current(const struct lu_env *env,
                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
         }
 
                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
         }
 
-        /* Verify that our path hasn't changed since we started the lookup */
+        /* Verify that our path hasn't changed since we started the lookup.
+           Record the current index, and verify the path resolves to the
+           same fid. If it does, then the path is correct as of this index. */
+        spin_lock(&mdd->mdd_cl.mc_lock);
+        pli->pli_currec = mdd->mdd_cl.mc_index;
+        spin_unlock(&mdd->mdd_cl.mc_lock);
         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
         if (rc) {
                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
         rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
         if (rc) {
                 CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
@@ -501,9 +507,15 @@ out:
         return rc;
 }
 
         return rc;
 }
 
+static int mdd_path_historic(const struct lu_env *env,
+                             struct path_lookup_info *pli)
+{
+        return 0;
+}
+
 /* Returns the full path to this fid, as of changelog record recno. */
 static int mdd_path(const struct lu_env *env, struct md_object *obj,
 /* Returns the full path to this fid, as of changelog record recno. */
 static int mdd_path(const struct lu_env *env, struct md_object *obj,
-                    char *path, int pathlen, __u64 recno, int *linkno)
+                    char *path, int pathlen, __u64 *recno, int *linkno)
 {
         struct path_lookup_info *pli;
         int tries = 3;
 {
         struct path_lookup_info *pli;
         int tries = 3;
@@ -524,7 +536,7 @@ static int mdd_path(const struct lu_env *env, struct md_object *obj,
                 RETURN(-ENOMEM);
 
         pli->pli_mdd_obj = md2mdd_obj(obj);
                 RETURN(-ENOMEM);
 
         pli->pli_mdd_obj = md2mdd_obj(obj);
-        pli->pli_recno = recno;
+        pli->pli_recno = *recno;
         pli->pli_path = path;
         pli->pli_pathlen = pathlen;
         pli->pli_linkno = *linkno;
         pli->pli_path = path;
         pli->pli_pathlen = pathlen;
         pli->pli_linkno = *linkno;
@@ -533,7 +545,6 @@ static int mdd_path(const struct lu_env *env, struct md_object *obj,
         while (tries-- && rc == -EAGAIN)
                 rc = mdd_path_current(env, pli);
 
         while (tries-- && rc == -EAGAIN)
                 rc = mdd_path_current(env, pli);
 
-#if 0   /* We need old path names only for replication */
         /* For historical path lookup, the current links may not have existed
          * at "recno" time.  We must switch over to earlier links/parents
          * by using the changelog records.  If the earlier parent doesn't
         /* For historical path lookup, the current links may not have existed
          * at "recno" time.  We must switch over to earlier links/parents
          * by using the changelog records.  If the earlier parent doesn't
@@ -542,12 +553,13 @@ static int mdd_path(const struct lu_env *env, struct md_object *obj,
          * We may ignore this problem for the initial implementation and
          * state that an "original" hardlink must still exist for us to find
          * historic path name. */
          * We may ignore this problem for the initial implementation and
          * state that an "original" hardlink must still exist for us to find
          * historic path name. */
-        if (pli->pli_recno != -1)
+        if (pli->pli_recno != -1) {
                 rc = mdd_path_historic(env, pli);
                 rc = mdd_path_historic(env, pli);
-#endif
-
-        /* return next link index to caller */
-        *linkno = pli->pli_linkno;
+        } else {
+                *recno = pli->pli_currec;
+                /* Return next link index to caller */
+                *linkno = pli->pli_linkno;
+        }
 
         OBD_FREE_PTR(pli);
 
 
         OBD_FREE_PTR(pli);
 
index ab925e0..dd7a7ca 100644 (file)
@@ -178,8 +178,18 @@ int mds_changelog_llog_init(struct obd_device *obd, struct obd_device *tgt)
         rc = llog_setup_named(obd, &obd->obd_olg, LLOG_CHANGELOG_ORIG_CTXT,
                               tgt, 1, NULL, CHANGELOG_CATALOG,
                               &changelog_orig_logops);
         rc = llog_setup_named(obd, &obd->obd_olg, LLOG_CHANGELOG_ORIG_CTXT,
                               tgt, 1, NULL, CHANGELOG_CATALOG,
                               &changelog_orig_logops);
-        if (rc)
+        if (rc) {
                 CERROR("changelog llog setup failed %d\n", rc);
                 CERROR("changelog llog setup failed %d\n", rc);
+                RETURN(rc);
+        }
+
+        rc = llog_setup_named(obd, &obd->obd_olg, LLOG_CHANGELOG_USER_ORIG_CTXT,
+                              tgt, 1, NULL, CHANGELOG_USERS,
+                              &changelog_orig_logops);
+        if (rc) {
+                CERROR("changelog users llog setup failed %d\n", rc);
+                RETURN(rc);
+        }
 
         RETURN(rc);
 }
 
         RETURN(rc);
 }
@@ -245,5 +255,11 @@ int mds_llog_finish(struct obd_device *obd, int count)
         if (!rc)
                 rc = rc2;
 
         if (!rc)
                 rc = rc2;
 
+        ctxt = llog_get_context(obd, LLOG_CHANGELOG_USER_ORIG_CTXT);
+        if (ctxt)
+                rc2 = llog_cleanup(ctxt);
+        if (!rc)
+                rc = rc2;
+
         RETURN(rc);
 }
         RETURN(rc);
 }
index d164c65..74cfc0e 100644 (file)
@@ -1068,12 +1068,15 @@ static int lu_device_is_mdt(struct lu_device *d)
         return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
 }
 
         return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
 }
 
+static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
+                         void *karg, void *uarg);
+
 static int mdt_set_info(struct mdt_thread_info *info)
 {
         struct ptlrpc_request *req = mdt_info_req(info);
         char *key;
 static int mdt_set_info(struct mdt_thread_info *info)
 {
         struct ptlrpc_request *req = mdt_info_req(info);
         char *key;
-        __u32 *val;
-        int keylen, rc = 0;
+        void *val;
+        int keylen, vallen, rc = 0;
         ENTRY;
 
         rc = req_capsule_server_pack(info->mti_pill);
         ENTRY;
 
         rc = req_capsule_server_pack(info->mti_pill);
@@ -1095,19 +1098,35 @@ static int mdt_set_info(struct mdt_thread_info *info)
                 RETURN(-EFAULT);
         }
 
                 RETURN(-EFAULT);
         }
 
-        if (!KEY_IS(KEY_READ_ONLY))
-                RETURN(-EINVAL);
+        vallen = req_capsule_get_size(info->mti_pill, &RMF_SETINFO_VAL,
+                                      RCL_CLIENT);
 
 
-        req->rq_status = 0;
-        lustre_msg_set_status(req->rq_repmsg, 0);
+        if (KEY_IS(KEY_READ_ONLY)) {
+                req->rq_status = 0;
+                lustre_msg_set_status(req->rq_repmsg, 0);
 
 
-        spin_lock(&req->rq_export->exp_lock);
-        if (*val)
-                req->rq_export->exp_connect_flags |= OBD_CONNECT_RDONLY;
-        else
-                req->rq_export->exp_connect_flags &= ~OBD_CONNECT_RDONLY;
-        spin_unlock(&req->rq_export->exp_lock);
+                spin_lock(&req->rq_export->exp_lock);
+                if (*(__u32 *)val)
+                        req->rq_export->exp_connect_flags |= OBD_CONNECT_RDONLY;
+                else
+                        req->rq_export->exp_connect_flags &=~OBD_CONNECT_RDONLY;
+                spin_unlock(&req->rq_export->exp_lock);
+
+        } else if (KEY_IS(KEY_CHANGELOG_CLEAR)) {
+                if (lustre_msg_swabbed(req->rq_reqmsg)) {
+                        struct changelog_setinfo *cs =
+                                (struct changelog_setinfo *)val;
+                        __swab64s(&cs->cs_recno);
+                        __swab32s(&cs->cs_id);
+                }
+
+                rc = mdt_iocontrol(OBD_IOC_CHANGELOG_CLEAR, info->mti_exp,
+                                   vallen, val, NULL);
+                lustre_msg_set_status(req->rq_repmsg, rc);
 
 
+        } else {
+                RETURN(-EINVAL);
+        }
         RETURN(0);
 }
 
         RETURN(0);
 }
 
@@ -1784,6 +1803,7 @@ static int mdt_quotactl_handle(struct mdt_thread_info *info)
 }
 #endif
 
 }
 #endif
 
+
 /*
  * OBD PING and other handlers.
  */
 /*
  * OBD PING and other handlers.
  */
@@ -1812,6 +1832,101 @@ static int mdt_obd_qc_callback(struct mdt_thread_info *info)
 
 
 /*
 
 
 /*
+ * LLOG handlers.
+ */
+
+/** clone llog ctxt from child (mdd)
+ * This allows remote llog (replicator) access.
+ * We can either pass all llog RPCs (eg mdt_llog_create) on to child where the
+ * context was originally set up, or we can handle them directly.
+ * I choose the latter, but that means I need any llog
+ * contexts set up by child to be accessable by the mdt.  So we clone the
+ * context into our context list here.
+ */
+static int mdt_llog_ctxt_clone(const struct lu_env *env, struct mdt_device *mdt,
+                               int idx)
+{
+        struct md_device  *next = mdt->mdt_child;
+        struct llog_ctxt *ctxt;
+        int rc;
+
+        if (!llog_ctxt_null(mdt2obd_dev(mdt), idx))
+                return 0;
+
+        rc = next->md_ops->mdo_llog_ctxt_get(env, next, idx, (void **)&ctxt);
+        if (rc || ctxt == NULL) {
+                CERROR("Can't get mdd ctxt %d\n", rc);
+                return rc;
+        }
+
+        rc = llog_group_set_ctxt(&mdt2obd_dev(mdt)->obd_olg, ctxt, idx);
+        if (rc)
+                CERROR("Can't set mdt ctxt %d\n", rc);
+
+        return rc;
+}
+
+static int mdt_llog_ctxt_unclone(const struct lu_env *env,
+                                 struct mdt_device *mdt, int idx)
+{
+        struct llog_ctxt *ctxt;
+
+        ctxt = llog_get_context(mdt2obd_dev(mdt), idx);
+        if (ctxt == NULL)
+                return 0;
+        /* Put once for the get we just did, and once for the clone */
+        llog_ctxt_put(ctxt);
+        llog_ctxt_put(ctxt);
+        return 0;
+}
+
+static int mdt_llog_create(struct mdt_thread_info *info)
+{
+        int rc;
+
+        req_capsule_set(info->mti_pill, &RQF_LLOG_ORIGIN_HANDLE_CREATE);
+        rc = llog_origin_handle_create(mdt_info_req(info));
+        return (rc < 0 ? err_serious(rc) : rc);
+}
+
+static int mdt_llog_destroy(struct mdt_thread_info *info)
+{
+        int rc;
+
+        req_capsule_set(info->mti_pill, &RQF_LLOG_ORIGIN_HANDLE_DESTROY);
+        rc = llog_origin_handle_destroy(mdt_info_req(info));
+        return (rc < 0 ? err_serious(rc) : rc);
+}
+
+static int mdt_llog_read_header(struct mdt_thread_info *info)
+{
+        int rc;
+
+        req_capsule_set(info->mti_pill, &RQF_LLOG_ORIGIN_HANDLE_READ_HEADER);
+        rc = llog_origin_handle_read_header(mdt_info_req(info));
+        return (rc < 0 ? err_serious(rc) : rc);
+}
+
+static int mdt_llog_next_block(struct mdt_thread_info *info)
+{
+        int rc;
+
+        req_capsule_set(info->mti_pill, &RQF_LLOG_ORIGIN_HANDLE_NEXT_BLOCK);
+        rc = llog_origin_handle_next_block(mdt_info_req(info));
+        return (rc < 0 ? err_serious(rc) : rc);
+}
+
+static int mdt_llog_prev_block(struct mdt_thread_info *info)
+{
+        int rc;
+
+        req_capsule_set(info->mti_pill, &RQF_LLOG_ORIGIN_HANDLE_PREV_BLOCK);
+        rc = llog_origin_handle_prev_block(mdt_info_req(info));
+        return (rc < 0 ? err_serious(rc) : rc);
+}
+
+
+/*
  * DLM handlers.
  */
 static struct ldlm_callback_suite cbs = {
  * DLM handlers.
  */
 static struct ldlm_callback_suite cbs = {
@@ -2231,7 +2346,9 @@ static struct mdt_handler *mdt_handler_find(__u32 opc,
                 if (s->mos_opc_start <= opc && opc < s->mos_opc_end) {
                         h = s->mos_hs + (opc - s->mos_opc_start);
                         if (likely(h->mh_opc != 0))
                 if (s->mos_opc_start <= opc && opc < s->mos_opc_end) {
                         h = s->mos_hs + (opc - s->mos_opc_start);
                         if (likely(h->mh_opc != 0))
-                                LASSERT(h->mh_opc == opc);
+                                LASSERTF(h->mh_opc == opc,
+                                         "opcode mismatch %d != %d\n",
+                                         h->mh_opc, opc);
                         else
                                 h = NULL; /* unsupported opc */
                         break;
                         else
                                 h = NULL; /* unsupported opc */
                         break;
@@ -2335,6 +2452,7 @@ static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags)
                 struct mdt_device *mdt = info->mti_mdt;
 
                 /* Pack reply. */
                 struct mdt_device *mdt = info->mti_mdt;
 
                 /* Pack reply. */
+
                 if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
                         req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
                                              mdt->mdt_max_mdsize);
                 if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
                         req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
                                              mdt->mdt_max_mdsize);
@@ -2794,7 +2912,8 @@ static int mdt_handle0(struct ptlrpc_request *req,
                         if (likely(h != NULL)) {
                                 rc = mdt_req_handle(info, h, req);
                         } else {
                         if (likely(h != NULL)) {
                                 rc = mdt_req_handle(info, h, req);
                         } else {
-                                CERROR("The unsupported opc: 0x%x\n", lustre_msg_get_opc(msg) );
+                                CERROR("The unsupported opc: 0x%x\n",
+                                       lustre_msg_get_opc(msg) );
                                 req->rq_status = -ENOTSUPP;
                                 rc = ptlrpc_error(req);
                                 RETURN(rc);
                                 req->rq_status = -ENOTSUPP;
                                 rc = ptlrpc_error(req);
                                 RETURN(rc);
@@ -4204,6 +4323,7 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
 
         target_recovery_fini(obd);
         mdt_stop_ptlrpc_service(m);
 
         target_recovery_fini(obd);
         mdt_stop_ptlrpc_service(m);
+        mdt_llog_ctxt_unclone(env, m, LLOG_CHANGELOG_ORIG_CTXT);
         mdt_obd_llog_cleanup(obd);
         obd_zombie_barrier();
 #ifdef HAVE_QUOTA_SUPPORT
         mdt_obd_llog_cleanup(obd);
         obd_zombie_barrier();
 #ifdef HAVE_QUOTA_SUPPORT
@@ -4244,8 +4364,8 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
         cfs_timer_disarm(&m->mdt_ck_timer);
         mdt_ck_thread_stop(m);
 
         cfs_timer_disarm(&m->mdt_ck_timer);
         mdt_ck_thread_stop(m);
 
-        /* 
-         * Finish the stack 
+        /*
+         * Finish the stack
          */
         mdt_stack_fini(env, m, md2lu_dev(m->mdt_child));
 
          */
         mdt_stack_fini(env, m, md2lu_dev(m->mdt_child));
 
@@ -4524,6 +4644,10 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
         if (rc)
                 GOTO(err_fs_cleanup, rc);
 
         if (rc)
                 GOTO(err_fs_cleanup, rc);
 
+        rc = mdt_llog_ctxt_clone(env, m, LLOG_CHANGELOG_ORIG_CTXT);
+        if (rc)
+                GOTO(err_llog_cleanup, rc);
+
         mdt_adapt_sptlrpc_conf(obd, 1);
 
 #ifdef HAVE_QUOTA_SUPPORT
         mdt_adapt_sptlrpc_conf(obd, 1);
 
 #ifdef HAVE_QUOTA_SUPPORT
@@ -4568,8 +4692,9 @@ err_recovery:
         target_recovery_fini(obd);
 #ifdef HAVE_QUOTA_SUPPORT
         next->md_ops->mdo_quota.mqo_cleanup(env, next);
         target_recovery_fini(obd);
 #ifdef HAVE_QUOTA_SUPPORT
         next->md_ops->mdo_quota.mqo_cleanup(env, next);
-err_llog_cleanup:
 #endif
 #endif
+err_llog_cleanup:
+        mdt_llog_ctxt_unclone(env, m, LLOG_CHANGELOG_ORIG_CTXT);
         mdt_obd_llog_cleanup(obd);
 err_fs_cleanup:
         mdt_fs_cleanup(env, m);
         mdt_obd_llog_cleanup(obd);
 err_fs_cleanup:
         mdt_fs_cleanup(env, m);
@@ -4600,7 +4725,7 @@ err_fini_site:
 err_free_site:
         OBD_FREE_PTR(mite);
 err_lmi:
 err_free_site:
         OBD_FREE_PTR(mite);
 err_lmi:
-        if (lmi) 
+        if (lmi)
                 server_put_mount_2(dev, lmi->lmi_mnt);
         return (rc);
 }
                 server_put_mount_2(dev, lmi->lmi_mnt);
         return (rc);
 }
@@ -5234,7 +5359,7 @@ static int mdt_ioc_fid2path(struct lu_env *env, struct mdt_device *mdt,
                 GOTO(out_free, rc);
         }
 
                 GOTO(out_free, rc);
         }
 
-        rc = mo_path(env, md_object_next(&obj->mot_obj), path, pathlen, recno,
+        rc = mo_path(env, md_object_next(&obj->mot_obj), path, pathlen, &recno,
                      &linkno);
         mdt_object_put(env, obj);
         if (rc)
                      &linkno);
         mdt_object_put(env, obj);
         if (rc)
@@ -5243,6 +5368,7 @@ static int mdt_ioc_fid2path(struct lu_env *env, struct mdt_device *mdt,
         if (copy_to_user(data->ioc_pbuf1, path, pathlen))
                 rc = -EFAULT;
 
         if (copy_to_user(data->ioc_pbuf1, path, pathlen))
                 rc = -EFAULT;
 
+        memcpy(data->ioc_inlbuf2, &recno, sizeof(recno));
         memcpy(data->ioc_inlbuf3, &linkno, sizeof(linkno));
 
         EXIT;
         memcpy(data->ioc_inlbuf3, &linkno, sizeof(linkno));
 
         EXIT;
@@ -5254,6 +5380,31 @@ out_context:
         return rc;
 }
 
         return rc;
 }
 
+/* Pass the ioc down */
+static int mdt_ioc_child(struct lu_env *env, struct mdt_device *mdt,
+                         unsigned int cmd, int len, void *data)
+{
+        struct lu_context ioctl_session;
+        struct md_device *next = mdt->mdt_child;
+        int rc;
+        ENTRY;
+
+        rc = lu_context_init(&ioctl_session, LCT_SESSION);
+        if (rc)
+                RETURN(rc);
+        ioctl_session.lc_thread = (struct ptlrpc_thread *)cfs_current();
+        lu_context_enter(&ioctl_session);
+        env->le_ses = &ioctl_session;
+
+        LASSERT(next->md_ops->mdo_iocontrol);
+        rc = next->md_ops->mdo_iocontrol(env, next, cmd, len, data);
+
+        lu_context_exit(&ioctl_session);
+        lu_context_fini(&ioctl_session);
+        RETURN(rc);
+}
+
+/* ioctls on obd dev */
 static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                          void *karg, void *uarg)
 {
 static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                          void *karg, void *uarg)
 {
@@ -5284,6 +5435,11 @@ static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
         case OBD_IOC_FID2PATH:
                 rc = mdt_ioc_fid2path(&env, mdt, karg);
                 break;
         case OBD_IOC_FID2PATH:
                 rc = mdt_ioc_fid2path(&env, mdt, karg);
                 break;
+        case OBD_IOC_CHANGELOG_REG:
+        case OBD_IOC_CHANGELOG_DEREG:
+        case OBD_IOC_CHANGELOG_CLEAR:
+                rc = mdt_ioc_child(&env, mdt, cmd, len, karg);
+                break;
         default:
                 CERROR("Not supported cmd = %d for device %s\n",
                        cmd, obd->obd_name);
         default:
                 CERROR("Not supported cmd = %d for device %s\n",
                        cmd, obd->obd_name);
@@ -5571,7 +5727,19 @@ static struct mdt_handler mdt_dlm_ops[] = {
         DEF_DLM_HNDL_0(0,            CP_CALLBACK,    mdt_cp_callback)
 };
 
         DEF_DLM_HNDL_0(0,            CP_CALLBACK,    mdt_cp_callback)
 };
 
+#define DEF_LLOG_HNDL(flags, name, fn)                   \
+        DEF_HNDL(LLOG, ORIGIN_HANDLE_CREATE, _NET, flags, name, fn, NULL)
+
 static struct mdt_handler mdt_llog_ops[] = {
 static struct mdt_handler mdt_llog_ops[] = {
+        DEF_LLOG_HNDL(0, ORIGIN_HANDLE_CREATE,      mdt_llog_create),
+        DEF_LLOG_HNDL(0, ORIGIN_HANDLE_NEXT_BLOCK,  mdt_llog_next_block),
+        DEF_LLOG_HNDL(0, ORIGIN_HANDLE_READ_HEADER, mdt_llog_read_header),
+        DEF_LLOG_HNDL(0, ORIGIN_HANDLE_WRITE_REC,   NULL),
+        DEF_LLOG_HNDL(0, ORIGIN_HANDLE_CLOSE,       NULL),
+        DEF_LLOG_HNDL(0, ORIGIN_CONNECT,            NULL),
+        DEF_LLOG_HNDL(0, CATINFO,                   NULL),
+        DEF_LLOG_HNDL(0, ORIGIN_HANDLE_PREV_BLOCK,  mdt_llog_prev_block),
+        DEF_LLOG_HNDL(0, ORIGIN_HANDLE_DESTROY,     mdt_llog_destroy),
 };
 
 #define DEF_SEC_CTX_HNDL(name, fn)                      \
 };
 
 #define DEF_SEC_CTX_HNDL(name, fn)                      \
index 4ccb6ba..a6a430d 100644 (file)
@@ -166,6 +166,15 @@ void lustre_swab_llog_rec(struct llog_rec_hdr *rec, struct llog_rec_tail *tail)
                 break;
         }
 
                 break;
         }
 
+        case CHANGELOG_USER_REC: {
+                struct llog_changelog_user_rec *cur =
+                        (struct llog_changelog_user_rec*)rec;
+
+                __swab32s(&cur->cur_id);
+                __swab64s(&cur->cur_endrec);
+                break;
+        }
+
         case MDS_SETATTR64_REC: {
                 struct llog_setattr64_rec *lsr = (struct llog_setattr64_rec *)rec;
 
         case MDS_SETATTR64_REC: {
                 struct llog_setattr64_rec *lsr = (struct llog_setattr64_rec *)rec;
 
index 043ba88..a8caa47 100644 (file)
@@ -50,6 +50,8 @@
 #include <obd_class.h>
 #include <lprocfs_status.h>
 #include <lustre_fsfilt.h>
 #include <obd_class.h>
 #include <lprocfs_status.h>
 #include <lustre_fsfilt.h>
+#include <lustre_log.h>
+#include <lustre/lustre_idl.h>
 
 #if defined(LPROCFS)
 
 
 #if defined(LPROCFS)
 
@@ -2127,6 +2129,258 @@ int lprocfs_obd_wr_recovery_maxtime(struct file *file, const char *buffer,
 }
 EXPORT_SYMBOL(lprocfs_obd_wr_recovery_maxtime);
 
 }
 EXPORT_SYMBOL(lprocfs_obd_wr_recovery_maxtime);
 
+
+/**** Changelogs *****/
+#define D_CHANGELOG 0
+
+DECLARE_CHANGELOG_NAMES;
+
+/* How many records per seq_show.  Too small, we spawn llog_process threads
+   too often; too large, we run out of buffer space */
+#define CHANGELOG_CHUNK_SIZE 100
+
+static int changelog_show_cb(struct llog_handle *llh, struct llog_rec_hdr *hdr,
+                             void *data)
+{
+        struct seq_file *seq = (struct seq_file *)data;
+        struct changelog_seq_iter *csi = seq->private;
+        struct llog_changelog_rec *rec = (struct llog_changelog_rec *)hdr;
+        int rc;
+        ENTRY;
+
+        if ((rec->cr_hdr.lrh_type != CHANGELOG_REC) ||
+            (rec->cr_type >= CL_LAST)) {
+                CERROR("Not a changelog rec %d/%d\n", rec->cr_hdr.lrh_type,
+                       rec->cr_type);
+                RETURN(-EINVAL);
+        }
+
+        CDEBUG(D_CHANGELOG, "rec="LPU64" start="LPU64" cat=%d:%d start=%d:%d\n",
+               rec->cr_index, csi->csi_startrec,
+               llh->lgh_hdr->llh_cat_idx, llh->lgh_cur_idx,
+               csi->csi_startcat, csi->csi_startidx);
+
+        if (rec->cr_index < csi->csi_startrec)
+                /* Skip entries earlier than what we are interested in */
+                RETURN(0);
+        if (rec->cr_index == csi->csi_startrec) {
+                /* Remember where we started, since seq_read will re-read
+                 * the data when it reallocs space.  Sigh, if only there was
+                 * a way to tell seq_file how big the buf should be in the
+                 * first place...
+                 */
+                csi->csi_startcat = llh->lgh_hdr->llh_cat_idx;
+                csi->csi_startidx = rec->cr_hdr.lrh_index - 1;
+        }
+        if (csi->csi_wrote > CHANGELOG_CHUNK_SIZE) {
+                /* Stop at some point with a reasonable seq_file buffer size.
+                 * Start from here the next time.
+                 */
+                csi->csi_endrec = rec->cr_index - 1;
+                csi->csi_startcat = llh->lgh_hdr->llh_cat_idx;
+                csi->csi_startidx = rec->cr_hdr.lrh_index - 1;
+                csi->csi_wrote = 0;
+                RETURN(LLOG_PROC_BREAK);
+        }
+
+        rc = seq_printf(seq, LPU64" %02d%-5s "LPU64" 0x%x t="DFID,
+                        rec->cr_index, rec->cr_type,
+                        changelog_str[rec->cr_type], rec->cr_time,
+                        rec->cr_flags & CLF_FLAGMASK, PFID(&rec->cr_tfid));
+
+        if (rec->cr_namelen)
+                /* namespace rec includes parent and filename */
+                rc += seq_printf(seq, " p="DFID" %.*s\n", PFID(&rec->cr_pfid),
+                                 rec->cr_namelen, rec->cr_name);
+        else
+                rc += seq_puts(seq, "\n");
+
+        if (rc < 0) {
+                /* Ran out of room in the seq buffer. seq_read will dump
+                 * the whole buffer and re-seq_start with a larger one;
+                 * no point in continuing the llog_process */
+                CDEBUG(D_CHANGELOG, "rec="LPU64" overflow "LPU64"<-"LPU64"\n",
+                       rec->cr_index, csi->csi_startrec, csi->csi_endrec);
+                csi->csi_endrec = csi->csi_startrec - 1;
+                csi->csi_wrote = 0;
+                RETURN(LLOG_PROC_BREAK);
+        }
+
+        csi->csi_wrote++;
+        csi->csi_endrec = rec->cr_index;
+
+        RETURN(0);
+}
+
+static int changelog_seq_show(struct seq_file *seq, void *v)
+{
+        struct changelog_seq_iter *csi = seq->private;
+        int rc;
+        ENTRY;
+
+        if (csi->csi_fill) {
+                /* seq_read wants more data to fill his buffer. But we already
+                   filled the buf as much as we cared to; force seq_read to
+                   accept that by padding with 0's */
+                while (seq_putc(seq, 0) == 0);
+                RETURN(0);
+        }
+
+        /* Since we have to restart the llog_cat_process for each chunk of the
+           seq_ functions, start from where we left off. */
+        rc = llog_cat_process(csi->csi_llh, changelog_show_cb, seq,
+                              csi->csi_startcat, csi->csi_startidx);
+
+        CDEBUG(D_CHANGELOG,"seq_show "LPU64"-"LPU64" cat=%d:%d wrote=%d rc=%d\n",
+               csi->csi_startrec, csi->csi_endrec, csi->csi_startcat,
+               csi->csi_startidx, csi->csi_wrote, rc);
+
+        if (rc == 0)
+                csi->csi_done = 1;
+        if (rc == LLOG_PROC_BREAK)
+                /* more records left, but seq_show must return 0 */
+                rc = 0;
+        RETURN(rc);
+}
+
+static void *changelog_seq_start(struct seq_file *seq, loff_t *pos)
+{
+        struct changelog_seq_iter *csi = seq->private;
+        LASSERT(csi);
+
+        CDEBUG(D_CHANGELOG, "start "LPU64"-"LPU64" pos="LPU64"\n",
+               csi->csi_startrec, csi->csi_endrec, *pos);
+
+        csi->csi_fill = 0;
+
+        if (csi->csi_done)
+                /* no more records, seq_read should return 0 if buffer
+                   is empty */
+                return NULL;
+
+        if (*pos > csi->csi_pos) {
+                /* The seq_read implementation sucks.  It may call start
+                   multiple times, using pos to indicate advances, if any,
+                   by arbitrarily increasing it by 1. So ignore the actual
+                   value of pos, and just register any increase as
+                   "seq_read wants the next values". */
+                csi->csi_startrec = csi->csi_endrec + 1;
+                csi->csi_pos = *pos;
+        }
+        /* else use old startrec/startidx */
+
+        return csi;
+}
+
+static void changelog_seq_stop(struct seq_file *seq, void *v)
+{
+        struct changelog_seq_iter *csi = seq->private;
+
+        CDEBUG(D_CHANGELOG, "stop "LPU64"-"LPU64"\n",
+               csi->csi_startrec, csi->csi_endrec);
+}
+
+static void *changelog_seq_next(struct seq_file *seq, void *v, loff_t *pos)
+{
+        struct changelog_seq_iter *csi = seq->private;
+
+        CDEBUG(D_CHANGELOG, "next "LPU64"-"LPU64" pos="LPU64"\n",
+               csi->csi_startrec, csi->csi_endrec, *pos);
+
+        csi->csi_fill = 1;
+
+        return csi;
+}
+
+static struct seq_operations changelog_sops = {
+        .start = changelog_seq_start,
+        .stop = changelog_seq_stop,
+        .next = changelog_seq_next,
+        .show = changelog_seq_show,
+};
+
+int changelog_seq_open(struct inode *inode, struct file *file,
+                       struct changelog_seq_iter **csih)
+{
+        struct changelog_seq_iter *csi;
+        struct proc_dir_entry *dp = PDE(inode);
+        struct seq_file *seq;
+        int rc;
+
+        LPROCFS_ENTRY_AND_CHECK(dp);
+
+        rc = seq_open(file, &changelog_sops);
+        if (rc) {
+                LPROCFS_EXIT();
+                return rc;
+        }
+
+        OBD_ALLOC_PTR(csi);
+        if (csi == NULL) {
+                lprocfs_seq_release(inode, file);
+                return -ENOMEM;
+        }
+
+        csi->csi_dev = dp->data;
+        seq = file->private_data;
+        seq->private = csi;
+        *csih = csi;
+
+        return rc;
+}
+EXPORT_SYMBOL(changelog_seq_open);
+
+int changelog_seq_release(struct inode *inode, struct file *file)
+{
+        struct seq_file *seq = file->private_data;
+        struct changelog_seq_iter *csi = seq->private;
+
+        if (csi)
+                OBD_FREE_PTR(csi);
+
+        return lprocfs_seq_release(inode, file);
+}
+EXPORT_SYMBOL(changelog_seq_release);
+
+loff_t changelog_seq_lseek(struct file *file, loff_t offset, int origin)
+{
+        struct seq_file *seq = (struct seq_file *)file->private_data;
+        struct changelog_seq_iter *csi = seq->private;
+
+        CDEBUG(D_CHANGELOG,"seek "LPU64"-"LPU64" off="LPU64":%d fpos="LPU64"\n",
+               csi->csi_startrec, csi->csi_endrec, offset, origin, file->f_pos);
+
+        LL_SEQ_LOCK(seq);
+
+        switch (origin) {
+                case SEEK_CUR:
+                        offset += csi->csi_endrec;
+                        break;
+                case SEEK_END:
+                        /* we don't know the last rec */
+                        offset = -1;
+        }
+
+        /* SEEK_SET */
+
+        if (offset < 0) {
+                LL_SEQ_UNLOCK(seq);
+                return -EINVAL;
+        }
+
+        csi->csi_startrec = offset;
+        csi->csi_endrec = offset ? offset - 1 : 0;
+
+        /* drop whatever is left in sucky seq_read's buffer */
+        seq->count = 0;
+        seq->from = 0;
+        seq->index++;
+        LL_SEQ_UNLOCK(seq);
+        file->f_pos = csi->csi_startrec;
+        return csi->csi_startrec;
+}
+EXPORT_SYMBOL(changelog_seq_lseek);
+
 EXPORT_SYMBOL(lprocfs_register);
 EXPORT_SYMBOL(lprocfs_srch);
 EXPORT_SYMBOL(lprocfs_remove);
 EXPORT_SYMBOL(lprocfs_register);
 EXPORT_SYMBOL(lprocfs_srch);
 EXPORT_SYMBOL(lprocfs_remove);
index fbd047a..5158bcb 100644 (file)
@@ -1382,8 +1382,8 @@ static void server_wait_finished(struct vfsmount *mnt)
         init_waitqueue_head(&waitq);
 
         while ((atomic_read(&mnt->mnt_count) > 1) && (retries > 0)) {
         init_waitqueue_head(&waitq);
 
         while ((atomic_read(&mnt->mnt_count) > 1) && (retries > 0)) {
-                LCONSOLE_WARN("Mount still busy with %d refs, waiting for "
-                              "%d secs...\n",
+                LCONSOLE_WARN("%s: Mount still busy with %d refs, waiting for "
+                              "%d secs...\n", mnt->mnt_devname,
                               atomic_read(&mnt->mnt_count), retries);
 
                 /* Wait for a bit */
                               atomic_read(&mnt->mnt_count), retries);
 
                 /* Wait for a bit */
@@ -1392,8 +1392,8 @@ static void server_wait_finished(struct vfsmount *mnt)
                 l_wait_event(waitq, 0, &lwi);
         }
         if (atomic_read(&mnt->mnt_count) > 1) {
                 l_wait_event(waitq, 0, &lwi);
         }
         if (atomic_read(&mnt->mnt_count) > 1) {
-                CERROR("Mount %p is still busy (%d refs), giving up.\n",
-                       mnt, atomic_read(&mnt->mnt_count));
+                CERROR("%s: Mount still busy (%d refs), giving up.\n",
+                       mnt->mnt_devname, atomic_read(&mnt->mnt_count));
         }
 }
 
         }
 }
 
index 764957e..86f44a2 100644 (file)
@@ -889,7 +889,7 @@ const struct req_format RQF_MGS_TARGET_REG =
 EXPORT_SYMBOL(RQF_MGS_TARGET_REG);
 
 const struct req_format RQF_MGS_SET_INFO =
 EXPORT_SYMBOL(RQF_MGS_TARGET_REG);
 
 const struct req_format RQF_MGS_SET_INFO =
-        DEFINE_REQ_FMT0("MGS_SET_INTO", mgs_set_info,
+        DEFINE_REQ_FMT0("MGS_SET_INFO", mgs_set_info,
                          mgs_set_info);
 EXPORT_SYMBOL(RQF_MGS_SET_INFO);
 
                          mgs_set_info);
 EXPORT_SYMBOL(RQF_MGS_SET_INFO);
 
index 8259bd1..fd88361 100644 (file)
@@ -83,12 +83,15 @@ int llog_origin_handle_create(struct ptlrpc_request *req)
                 name = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
                 if (name == NULL)
                         RETURN(-EFAULT);
                 name = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
                 if (name == NULL)
                         RETURN(-EFAULT);
-                CDEBUG(D_INFO, "opening log %s\n", name);
+                CDEBUG(D_INFO, "%s: opening log %s\n", obd->obd_name, name);
         }
 
         ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
         }
 
         ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
-        if (ctxt == NULL)
+        if (ctxt == NULL) {
+                CDEBUG(D_WARNING, "%s: no ctxt. group=%p idx=%d name=%s\n",
+                       obd->obd_name, &obd->obd_olg, body->lgd_ctxt_idx, name);
                 RETURN(-ENODEV);
                 RETURN(-ENODEV);
+        }
         disk_obd = ctxt->loc_exp->exp_obd;
         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
 
         disk_obd = ctxt->loc_exp->exp_obd;
         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
 
@@ -340,8 +343,8 @@ int llog_origin_handle_read_header(struct ptlrpc_request *req)
         if (rc)
                 GOTO(out_pop, rc);
 
         if (rc)
                 GOTO(out_pop, rc);
 
-        /* 
-         * llog_init_handle() reads the llog header 
+        /*
+         * llog_init_handle() reads the llog header
          */
         flags = body->lgd_llh_flags;
         rc = llog_init_handle(loghandle, flags, NULL);
          */
         flags = body->lgd_llh_flags;
         rc = llog_init_handle(loghandle, flags, NULL);
@@ -407,28 +410,28 @@ int llog_origin_handle_cancel(struct ptlrpc_request *req)
                 handle = fsfilt_start_log(disk_obd, inode,
                                           FSFILT_OP_CANCEL_UNLINK, NULL, 1);
                 if (IS_ERR(handle)) {
                 handle = fsfilt_start_log(disk_obd, inode,
                                           FSFILT_OP_CANCEL_UNLINK, NULL, 1);
                 if (IS_ERR(handle)) {
-                        CERROR("fsfilt_start_log() failed: %ld\n", 
+                        CERROR("fsfilt_start_log() failed: %ld\n",
                                PTR_ERR(handle));
                         GOTO(pop_ctxt, rc = PTR_ERR(handle));
                 }
 
                 rc = llog_cat_cancel_records(cathandle, 1, logcookies);
 
                                PTR_ERR(handle));
                         GOTO(pop_ctxt, rc = PTR_ERR(handle));
                 }
 
                 rc = llog_cat_cancel_records(cathandle, 1, logcookies);
 
-                /* 
+                /*
                  * Do not raise -ENOENT errors for resent rpcs. This rec already
                  * Do not raise -ENOENT errors for resent rpcs. This rec already
-                 * might be killed. 
+                 * might be killed.
                  */
                  */
-                if (rc == -ENOENT && 
+                if (rc == -ENOENT &&
                     (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)) {
                     (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)) {
-                        /* 
+                        /*
                          * Do not change this message, reply-single.sh test_59b
                          * Do not change this message, reply-single.sh test_59b
-                         * expects to find this in log. 
+                         * expects to find this in log.
                          */
                         CDEBUG(D_RPCTRACE, "RESENT cancel req %p - ignored\n",
                                req);
                         rc = 0;
                 } else if (rc == 0) {
                          */
                         CDEBUG(D_RPCTRACE, "RESENT cancel req %p - ignored\n",
                                req);
                         rc = 0;
                 } else if (rc == 0) {
-                        CDEBUG(D_RPCTRACE, "Canceled %d llog-records\n", 
+                        CDEBUG(D_RPCTRACE, "Canceled %d llog-records\n",
                                num_cookies);
                 }
 
                                num_cookies);
                 }
 
@@ -446,7 +449,7 @@ int llog_origin_handle_cancel(struct ptlrpc_request *req)
 pop_ctxt:
         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
         if (rc)
 pop_ctxt:
         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
         if (rc)
-                CERROR("Cancel %d of %d llog-records failed: %d\n", 
+                CERROR("Cancel %d of %d llog-records failed: %d\n",
                        failed, num_cookies, rc);
 
         llog_ctxt_put(ctxt);
                        failed, num_cookies, rc);
 
         llog_ctxt_put(ctxt);
@@ -543,7 +546,7 @@ static int llog_catinfo_cb(struct llog_handle *cat,
 
         if (!cbd->ctxt)
                 RETURN(-ENODEV);
 
         if (!cbd->ctxt)
                 RETURN(-ENODEV);
-        
+
         lir = (struct llog_logid_rec *)rec;
         logid = &lir->lid_id;
         rc = llog_create(ctxt, &handle, logid, NULL);
         lir = (struct llog_logid_rec *)rec;
         logid = &lir->lid_id;
         rc = llog_create(ctxt, &handle, logid, NULL);
index 7303382..021df48 100644 (file)
@@ -527,7 +527,7 @@ static int ptlrpc_lprocfs_wr_hp_ratio(struct file *file, const char *buffer,
 {
         struct ptlrpc_service *svc = data;
         int rc, val;
 {
         struct ptlrpc_service *svc = data;
         int rc, val;
-        
+
         rc = lprocfs_write_helper(buffer, count, &val);
         if (rc < 0)
                 return rc;
         rc = lprocfs_write_helper(buffer, count, &val);
         if (rc < 0)
                 return rc;
index e85951a..ce2e237 100644 (file)
@@ -2312,3 +2312,5 @@ void lustre_swab_lustre_capa_key (struct lustre_capa_key *k)
         __swab32s (&k->lk_keyid);
         __swab32s (&k->lk_padding);
 }
         __swab32s (&k->lk_keyid);
         __swab32s (&k->lk_padding);
 }
+
+
index c93be5d..b5199d4 100644 (file)
@@ -65,8 +65,8 @@ void lustre_assert_wire_constants(void)
 {
         /* Wire protocol assertions generated by 'wirecheck'
          * (make -C lustre/utils newwiretest)
 {
         /* Wire protocol assertions generated by 'wirecheck'
          * (make -C lustre/utils newwiretest)
-         * running on Linux localhost.localdomain 2.6.18-prep #3 SMP Sun Nov 23 08:04:44 EST 2008 i68
-         * with gcc version 4.1.1 20061011 (Red Hat 4.1.1-30) */
+         * running on Linux cfs21 2.6.18-92.el5xen #1 SMP Tue Jun 10 19:55:54 EDT 2008 i686 i686 i386
+         * with gcc version 4.1.2 20071124 (Red Hat 4.1.2-42) */
 
 
         /* Constants... */
 
 
         /* Constants... */
@@ -166,6 +166,14 @@ void lustre_assert_wire_constants(void)
                  (long long)MDS_QUOTACHECK);
         LASSERTF(MDS_QUOTACTL == 48, " found %lld\n",
                  (long long)MDS_QUOTACTL);
                  (long long)MDS_QUOTACHECK);
         LASSERTF(MDS_QUOTACTL == 48, " found %lld\n",
                  (long long)MDS_QUOTACTL);
+        LASSERTF(MDS_GETXATTR == 49, " found %lld\n",
+                 (long long)MDS_GETXATTR);
+        LASSERTF(MDS_SETXATTR == 50, " found %lld\n",
+                 (long long)MDS_SETXATTR);
+        LASSERTF(MDS_WRITEPAGE == 51, " found %lld\n",
+                 (long long)MDS_WRITEPAGE);
+        LASSERTF(MDS_IS_SUBDIR == 52, " found %lld\n",
+                 (long long)MDS_IS_SUBDIR);
         LASSERTF(MDS_LAST_OPC == 53, " found %lld\n",
                  (long long)MDS_LAST_OPC);
         LASSERTF(REINT_SETATTR == 1, " found %lld\n",
         LASSERTF(MDS_LAST_OPC == 53, " found %lld\n",
                  (long long)MDS_LAST_OPC);
         LASSERTF(REINT_SETATTR == 1, " found %lld\n",
@@ -473,6 +481,7 @@ void lustre_assert_wire_constants(void)
         CLASSERT(OBD_CONNECT_AT == 0x01000000ULL);
         CLASSERT(OBD_CONNECT_CANCELSET == 0x400000ULL);
         CLASSERT(OBD_CONNECT_LRU_RESIZE == 0x02000000ULL);
         CLASSERT(OBD_CONNECT_AT == 0x01000000ULL);
         CLASSERT(OBD_CONNECT_CANCELSET == 0x400000ULL);
         CLASSERT(OBD_CONNECT_LRU_RESIZE == 0x02000000ULL);
+        CLASSERT(OBD_CONNECT_SKIP_ORPHAN == 0x400000000ULL);
 
         /* Checks for struct obdo */
         LASSERTF((int)sizeof(struct obdo) == 208, " found %lld\n",
 
         /* Checks for struct obdo */
         LASSERTF((int)sizeof(struct obdo) == 208, " found %lld\n",
index 6aadd4f..b4b15ed 100644 (file)
@@ -111,11 +111,13 @@ check_and_setup_lustre
 DIR=${DIR:-$MOUNT}
 assert_DIR
 
 DIR=${DIR:-$MOUNT}
 assert_DIR
 
-LOVNAME=`lctl get_param -n llite.*.lov.common_name | tail -n 1`
-OSTCOUNT=`lctl get_param -n lov.$LOVNAME.numobd`
-STRIPECOUNT=`lctl get_param -n lov.$LOVNAME.stripecount`
-STRIPESIZE=`lctl get_param -n lov.$LOVNAME.stripesize`
-ORIGFREE=`lctl get_param -n lov.$LOVNAME.kbytesavail`
+MDT0=$($LCTL get_param -n mdc.*.mds_server_uuid | \
+    awk '{gsub(/_UUID/,""); print $1}' | head -1)
+LOVNAME=$($LCTL get_param -n llite.*.lov.common_name | tail -n 1)
+OSTCOUNT=$($LCTL get_param -n lov.$LOVNAME.numobd)
+STRIPECOUNT=$($LCTL get_param -n lov.$LOVNAME.stripecount)
+STRIPESIZE=$($LCTL get_param -n lov.$LOVNAME.stripesize)
+ORIGFREE=$($LCTL get_param -n lov.$LOVNAME.kbytesavail)
 MAXFREE=${MAXFREE:-$((200000 * $OSTCOUNT))}
 
 [ -f $DIR/d52a/foo ] && chattr -a $DIR/d52a/foo
 MAXFREE=${MAXFREE:-$((200000 * $OSTCOUNT))}
 
 [ -f $DIR/d52a/foo ] && chattr -a $DIR/d52a/foo
@@ -5960,32 +5962,34 @@ test_153() {
 }
 run_test 153 "test if fdatasync does not crash ======================="
 
 }
 run_test 153 "test if fdatasync does not crash ======================="
 
-err17935 () {
-    if [ $MDSCOUNT -gt 1 ]; then
-       error_ignore 17935 $*
-    else
-       error $*
-    fi
-}
-
 test_154() {
        cp /etc/hosts $DIR/$tfile
 
 test_154() {
        cp /etc/hosts $DIR/$tfile
 
-       fid=`$LFS path2fid $DIR/$tfile`
+       fid=$($LFS path2fid $DIR/$tfile)
        rc=$?
        [ $rc -ne 0 ] && error "error: could not get fid for $DIR/$tfile."
 
        rc=$?
        [ $rc -ne 0 ] && error "error: could not get fid for $DIR/$tfile."
 
-       diff $DIR/$tfile $DIR/.lustre/fid/$fid || error "open by fid failed: did not find expected data in file."
+       echo "open fid $fid"
+       diff /etc/hosts $DIR/.lustre/fid/$fid || error "open by fid failed: did not find expected data in file."
 
        echo "Opening a file by FID succeeded"
 }
 run_test 154 "Opening a file by FID"
 
 #Changelogs
 
        echo "Opening a file by FID succeeded"
 }
 run_test 154 "Opening a file by FID"
 
 #Changelogs
+err17935 () {
+    if [ $MDSCOUNT -gt 1 ]; then
+       error_ignore 17935 $*
+    else
+       error $*
+    fi
+}
 test_160() {
 test_160() {
-    remote_mds && skip "remote MDS" && return
-    lctl set_param -n mdd.*.changelog on
-    $LFS changelog_clear $FSNAME 0
+    do_facet $SINGLEMDS lctl set_param mdd.$MDT0.changelog on
+    USER=$(do_facet $SINGLEMDS lctl --device $MDT0 changelog_register -n)
+    echo "Registered as changelog user $USER"
+    do_facet $SINGLEMDS lctl get_param -n mdd.$MDT0.changelog_users | \
+       grep -q $USER || error "User $USER not found in changelog_users"
 
     # change something
     mkdir -p $DIR/$tdir/pics/2008/zachy
 
     # change something
     mkdir -p $DIR/$tdir/pics/2008/zachy
@@ -5997,29 +6001,40 @@ test_160() {
     rm $DIR/$tdir/pics/desktop.jpg
 
     # verify contents
     rm $DIR/$tdir/pics/desktop.jpg
 
     # verify contents
-    $LFS changelog $FSNAME
-    # check target fid
-    fidc=$($LFS changelog $FSNAME | grep timestamp | grep "CREAT" | tail -1 | \
-       awk '{print $5}')
+    $LFS changelog $MDT0 | tail -5
+    echo "verifying target fid"
+    fidc=$($LFS changelog $MDT0 | grep timestamp | grep "CREAT" | \
+       tail -1 | awk '{print $5}')
     fidf=$($LFS path2fid $DIR/$tdir/pics/zach/timestamp)
     [ "$fidc" == "t=$fidf" ] || \
        err17935 "fid in changelog $fidc != file fid $fidf"
     fidf=$($LFS path2fid $DIR/$tdir/pics/zach/timestamp)
     [ "$fidc" == "t=$fidf" ] || \
        err17935 "fid in changelog $fidc != file fid $fidf"
-    # check parent fid
-    fidc=$($LFS changelog $FSNAME | grep timestamp | grep "CREAT" | tail -1 | \
-       awk '{print $6}')
+    echo "verifying parent fid"
+    fidc=$($LFS changelog $MDT0 | grep timestamp | grep "CREAT" | \
+       tail -1 | awk '{print $6}')
     fidf=$($LFS path2fid $DIR/$tdir/pics/zach)
     [ "$fidc" == "p=$fidf" ] || \
        err17935 "pfid in changelog $fidc != dir fid $fidf" 
 
     fidf=$($LFS path2fid $DIR/$tdir/pics/zach)
     [ "$fidc" == "p=$fidf" ] || \
        err17935 "pfid in changelog $fidc != dir fid $fidf" 
 
-    # verify purge
-    FIRST_REC=$($LFS changelog $FSNAME | head -1 | awk '{print $1}')
-    $LFS changelog_clear $FSNAME $(($FIRST_REC + 5)) 
-    PURGE_REC=$($LFS changelog $FSNAME | head -1 | awk '{print $1}')
-    [ $PURGE_REC == $(($FIRST_REC + 6)) ] || \
-     err17935 "first rec after purge should be $(($FIRST_REC + 6)); is $PURGE_REC"
-    # purge all
-    $LFS changelog_clear $FSNAME 0
-    lctl set_param -n mdd.*.changelog off
+    echo "verifying user clear"
+    USERS=$(( $(do_facet $SINGLEMDS lctl get_param -n \
+       mdd.$MDT0.changelog_users | wc -l) - 2 ))
+    FIRST_REC=$($LFS changelog $MDT0 | head -1 | awk '{print $1}')
+    $LFS changelog_clear $MDT0 $USER $(($FIRST_REC + 5))  
+    USER_REC=$(do_facet $SINGLEMDS lctl get_param -n \
+       mdd.$MDT0.changelog_users | grep $USER | awk '{print $2}')
+    [ $USER_REC == $(($FIRST_REC + 5)) ] || \
+       err17935 "user index should be $(($FIRST_REC + 5)); is $USER_REC"
+    CLEAR_REC=$($LFS changelog $MDT0 | head -1 | awk '{print $1}')
+    [ $CLEAR_REC == $(($FIRST_REC + 6)) -o $USERS -gt 1 ] || \
+       err17935 "first index should be $(($FIRST_REC + 6)); is $PURGE_REC"
+
+    echo "verifying user deregister"
+    do_facet $SINGLEMDS lctl --device $MDT0 changelog_deregister $USER
+    do_facet $SINGLEMDS lctl get_param -n mdd.$MDT0.changelog_users | \
+       grep -q $USER && error "User $USER still found in changelog_users"
+
+    [ $USERS -eq 1 ] && \
+       do_facet $SINGLEMDS lctl set_param mdd.$MDT0.changelog off || true
 }
 run_test 160 "changelog sanity"
 
 }
 run_test 160 "changelog sanity"
 
@@ -6035,7 +6050,7 @@ test_161() {
     ln $DIR/$tdir/$tfile $DIR/$tdir/foo2/zachary
     ln $DIR/$tdir/$tfile $DIR/$tdir/foo1/luna
     ln $DIR/$tdir/$tfile $DIR/$tdir/foo2/thor
     ln $DIR/$tdir/$tfile $DIR/$tdir/foo2/zachary
     ln $DIR/$tdir/$tfile $DIR/$tdir/foo1/luna
     ln $DIR/$tdir/$tfile $DIR/$tdir/foo2/thor
-    local FID=$($LFS path2fid $DIR/$tdir/$tfile)
+    local FID=$($LFS path2fid $DIR/$tdir/$tfile | tr -d '[')
     if [ "$($LFS fid2path ${mds1_svc} $FID | wc -l)" != "5" ]; then
        $LFS fid2path ${mds1_svc} $FID
        err17935 "bad link ea"
     if [ "$($LFS fid2path ${mds1_svc} $FID | wc -l)" != "5" ]; then
        $LFS fid2path ${mds1_svc} $FID
        err17935 "bad link ea"
@@ -6096,19 +6111,19 @@ test_162() {
     touch $DIR/$tdir/d2/x2
     mkdir -p $DIR/$tdir/d2/a/b/c
     mkdir -p $DIR/$tdir/d2/p/q/r
     touch $DIR/$tdir/d2/x2
     mkdir -p $DIR/$tdir/d2/a/b/c
     mkdir -p $DIR/$tdir/d2/p/q/r
-    fid=$($LFS path2fid $DIR/$tdir/d2/$tfile)
-    check_path "/$tdir/d2/$tfile" ${mds1_svc} $fid --link 0
+    FID=$($LFS path2fid $DIR/$tdir/d2/$tfile | tr -d '[')
+    check_path "/$tdir/d2/$tfile" ${mds1_svc} $FID --link 0
     ln $DIR/$tdir/d2/$tfile $DIR/$tdir/d2/p/q/r/hlink
     mv $DIR/$tdir/d2/$tfile $DIR/$tdir/d2/a/b/c/new_file
     ln $DIR/$tdir/d2/$tfile $DIR/$tdir/d2/p/q/r/hlink
     mv $DIR/$tdir/d2/$tfile $DIR/$tdir/d2/a/b/c/new_file
-    fid=$($LFS path2fid $DIR/$tdir/d2/a/b/c/new_file)
-    check_path "/$tdir/d2/a/b/c/new_file" ${mds1_svc} $fid --link 1
-    check_path "/$tdir/d2/p/q/r/hlink" ${mds1_svc} $fid --link 0
-    # check that there are 2 links, and that --rec doesnt break anything
-    ${LFS} fid2path ${mds1_svc} $fid --rec 20 | wc -l | grep -q 2 || \
+    FID=$($LFS path2fid $DIR/$tdir/d2/a/b/c/new_file | tr -d '[')
+    check_path "/$tdir/d2/a/b/c/new_file" ${mds1_svc} $FID --link 1
+    check_path "/$tdir/d2/p/q/r/hlink" ${mds1_svc} $FID --link 0
+    # check that there are 2 links
+    ${LFS} fid2path ${mds1_svc} $FID | wc -l | grep -q 2 || \
        err17935 "expected 2 links" 
 
     rm $DIR/$tdir/d2/p/q/r/hlink
        err17935 "expected 2 links" 
 
     rm $DIR/$tdir/d2/p/q/r/hlink
-    check_path "/$tdir/d2/a/b/c/new_file" ${mds1_svc} $fid --link 0
+    check_path "/$tdir/d2/a/b/c/new_file" ${mds1_svc} $FID --link 0
     # Doesnt work with CMD yet: 17935 
     return 0
 }
     # Doesnt work with CMD yet: 17935 
     return 0
 }
index 264ffcc..aae7494 100644 (file)
@@ -178,24 +178,6 @@ command_t cmdlist[] = {
          "provide gdb-friendly module information\n"
          "usage: modules <path>"},
 
          "provide gdb-friendly module information\n"
          "usage: modules <path>"},
 
-        /* Device configuration commands */
-        {"== device setup (these are not normally used post 1.4) ==",
-                jt_noop, 0, "device config"},
-        {"attach", jt_lcfg_attach, 0,
-         "set the type, name, and uuid of the current device\n"
-         "usage: attach type name uuid"},
-        {"detach", jt_obd_detach, 0,
-         "remove driver (and name and uuid) from current device\n"
-         "usage: detach"},
-        {"setup", jt_lcfg_setup, 0,
-         "type specific device configuration information\n"
-         "usage: setup <args...>"},
-        {"cleanup", jt_obd_cleanup, 0, "cleanup previously setup device\n"
-         "usage: cleanup [force | failover]"},
-        {"dump_cfg", jt_cfg_dump_log, 0,
-         "print log of recorded commands for this config to kernel debug log\n"
-         "usage: dump_cfg config-uuid-name"},
-
         /* virtual block operations */
         {"==== virtual block device ====", jt_noop, 0, "virtual block device"},
         {"blockdev_attach", jt_blockdev_attach, 0,
         /* virtual block operations */
         {"==== virtual block device ====", jt_noop, 0, "virtual block device"},
         {"blockdev_attach", jt_blockdev_attach, 0,
@@ -226,6 +208,33 @@ command_t cmdlist[] = {
          "list pools and pools members\n"
          "usage pool_list  <fsname>[.<poolname>] | <pathname>"},
 
          "list pools and pools members\n"
          "usage pool_list  <fsname>[.<poolname>] | <pathname>"},
 
+        /* Changelog commands */
+        {"===  Changelogs ==", jt_noop, 0, "changelog user management"},
+        {"changelog_register", jt_changelog_register, 0,
+         "register a new persistent changelog user, returns id\n"
+         "usage:\tdevice <mdtname>\n\tchangelog_register [-n]"},
+        {"changelog_deregister", jt_changelog_deregister, 0,
+         "deregister an existing changelog user\n"
+         "usage:\tdevice <mdtname>\n\tchangelog_deregister <id>"},
+
+        /* Device configuration commands */
+        {"== device setup (these are not normally used post 1.4) ==",
+                jt_noop, 0, "device config"},
+        {"attach", jt_lcfg_attach, 0,
+         "set the type, name, and uuid of the current device\n"
+         "usage: attach type name uuid"},
+        {"detach", jt_obd_detach, 0,
+         "remove driver (and name and uuid) from current device\n"
+         "usage: detach"},
+        {"setup", jt_lcfg_setup, 0,
+         "type specific device configuration information\n"
+         "usage: setup <args...>"},
+        {"cleanup", jt_obd_cleanup, 0, "cleanup previously setup device\n"
+         "usage: cleanup [force | failover]"},
+        {"dump_cfg", jt_cfg_dump_log, 0,
+         "print log of recorded commands for this config to kernel debug log\n"
+         "usage: dump_cfg config-uuid-name"},
+
         /* Test only commands */
         {"==== testing (DANGEROUS) ====", jt_noop, 0, "testing (DANGEROUS)"},
         {"--threads", jt_opt_threads, 0,
         /* Test only commands */
         {"==== testing (DANGEROUS) ====", jt_noop, 0, "testing (DANGEROUS)"},
         {"--threads", jt_opt_threads, 0,
index 36c1fc3..03c8628 100644 (file)
@@ -219,12 +219,14 @@ command_t cmdlist[] = {
          "Remote user list directory contents.\n"
          "usage: ls [OPTION]... [FILE]..."},
         {"changelog", lfs_changelog, 0,
          "Remote user list directory contents.\n"
          "usage: ls [OPTION]... [FILE]..."},
         {"changelog", lfs_changelog, 0,
-         "Show the metadata changes in a filesystem between two snapshot times."
-         "\nusage: changelog [--follow] <mdtname> [startrec [endrec]]"},
+         "Show the metadata changes on an MDT."
+         "\nusage: changelog [--follow] <mdtname> [startrec [endrec]]"
+         "\n(note: --follow is only valid when run on MDT node)"},
         {"changelog_clear", lfs_changelog_clear, 0,
         {"changelog_clear", lfs_changelog_clear, 0,
-         "Purge old changelog records up to <endrec> to free up space.\n"
+         "Indicate that old changelog records up to <endrec> are no longer of "
+         "interest to consumer <id>, allowing the system to free up space.\n"
          "An <endrec> of 0 means all records.\n"
          "An <endrec> of 0 means all records.\n"
-         "usage: changelog_clear <mdtname> <endrec>"},
+         "usage: changelog_clear <mdtname> <id> <endrec>"},
         {"fid2path", lfs_fid2path, 0,
          "Resolve the full path to a given FID. For a specific hardlink "
          "specify link number <linkno>.\n"
         {"fid2path", lfs_fid2path, 0,
          "Resolve the full path to a given FID. For a specific hardlink "
          "specify link number <linkno>.\n"
@@ -2366,7 +2368,7 @@ static int lfs_changelog(int argc, char **argv)
         int fd, len;
         char c, *mdd, *ptr = NULL;
         struct option long_opts[] = {
         int fd, len;
         char c, *mdd, *ptr = NULL;
         struct option long_opts[] = {
-                {"follow", 0, 0, 'f'},
+                {"follow", no_argument, 0, 'f'},
                 {0, 0, 0, 0}
         };
         char short_opts[] = "f";
                 {0, 0, 0, 0}
         };
         char short_opts[] = "f";
@@ -2426,7 +2428,7 @@ static int lfs_changelog(int argc, char **argv)
         close(fd);
 
         if (len < 0) {
         close(fd);
 
         if (len < 0) {
-                printf("read err %d\n", errno);
+                fprintf(stderr, "read err %d\n", errno);
                 return -errno;
         }
 
                 return -errno;
         }
 
@@ -2437,32 +2439,37 @@ static int lfs_changelog_clear(int argc, char **argv)
 {
         long long endrec;
 
 {
         long long endrec;
 
-        if (argc != 3)
+        if (argc != 4)
                 return CMD_HELP;
 
                 return CMD_HELP;
 
-        endrec = strtoll(argv[2], NULL, 10);
+        endrec = strtoll(argv[3], NULL, 10);
 
 
-        return(llapi_changelog_clear(argv[1], endrec));
+        return(llapi_changelog_clear(argv[1], argv[2], endrec));
 }
 
 static int lfs_fid2path(int argc, char **argv)
 {
         struct option long_opts[] = {
 }
 
 static int lfs_fid2path(int argc, char **argv)
 {
         struct option long_opts[] = {
-                {"link", 1, 0, 'l'},
-                {"rec", 1, 0, 'r'},
+                {"cur", no_argument, 0, 'c'},
+                {"link", required_argument, 0, 'l'},
+                {"rec", required_argument, 0, 'r'},
                 {0, 0, 0, 0}
         };
                 {0, 0, 0, 0}
         };
-        char c, short_opts[] = "l:r:";
+        char c, short_opts[] = "cl:r:";
         char *device, *fid, *path;
         long long recno = -1;
         int linkno = -1;
         int lnktmp;
         char *device, *fid, *path;
         long long recno = -1;
         int linkno = -1;
         int lnktmp;
+        int printcur = 0;
         int rc;
 
         optind = 0;
         while ((c = getopt_long(argc, argv, short_opts,
                                 long_opts, NULL)) != -1) {
                 switch (c) {
         int rc;
 
         optind = 0;
         while ((c = getopt_long(argc, argv, short_opts,
                                 long_opts, NULL)) != -1) {
                 switch (c) {
+                case 'c':
+                        printcur++;
+                        break;
                 case 'l':
                         linkno = strtol(optarg, NULL, 10);
                         break;
                 case 'l':
                         linkno = strtol(optarg, NULL, 10);
                         break;
@@ -2488,15 +2495,20 @@ static int lfs_fid2path(int argc, char **argv)
         lnktmp = (linkno >= 0) ? linkno : 0;
         while (1) {
                 int oldtmp = lnktmp;
         lnktmp = (linkno >= 0) ? linkno : 0;
         while (1) {
                 int oldtmp = lnktmp;
-                rc = llapi_fid2path(device, fid, path, PATH_MAX, recno,
+                long long rectmp = recno;
+                rc = llapi_fid2path(device, fid, path, PATH_MAX, &rectmp,
                                     &lnktmp);
                 if (rc < 0) {
                         fprintf(stderr, "%s error: %s\n", argv[0],
                                 strerror(errno = -rc));
                         break;
                                     &lnktmp);
                 if (rc < 0) {
                         fprintf(stderr, "%s error: %s\n", argv[0],
                                 strerror(errno = -rc));
                         break;
-                } else {
-                        fprintf(stdout, "%s\n", path);
                 }
                 }
+
+                if (printcur)
+                        fprintf(stdout, "%lld %s\n", recno, path);
+                else
+                        fprintf(stdout, "%s\n", path);
+
                 if (linkno >= 0)
                         /* specified linkno */
                         break;
                 if (linkno >= 0)
                         /* specified linkno */
                         break;
index 9ef5f2c..26f3a5e 100644 (file)
@@ -303,7 +303,7 @@ int llapi_file_open_pool(const char *name, int flags, int mode,
                 ptr = strchr(pool_name, '.');
                 if (ptr != NULL) {
                         strncpy(fsname, pool_name, ptr - pool_name);
                 ptr = strchr(pool_name, '.');
                 if (ptr != NULL) {
                         strncpy(fsname, pool_name, ptr - pool_name);
-                        fsname[ptr - pool_name] = '\0';
+                        *ptr = '\0';
                         /* if fsname matches a filesystem skip it
                          * if not keep the poolname as is */
                         if (poolpath(fsname, NULL, NULL) == 0)
                         /* if fsname matches a filesystem skip it
                          * if not keep the poolname as is */
                         if (poolpath(fsname, NULL, NULL) == 0)
@@ -395,8 +395,7 @@ static int print_pool_members(char *fs, char *pool_dir, char *pool_file)
 }
 
 /*
 }
 
 /*
- * search lustre fsname from pathname
- *
+ * Resolve lustre fsname from pathname
  */
 static int search_fsname(char *pathname, char *fsname)
 {
  */
 static int search_fsname(char *pathname, char *fsname)
 {
@@ -433,6 +432,25 @@ static int search_fsname(char *pathname, char *fsname)
         return -ENOENT;
 }
 
         return -ENOENT;
 }
 
+/* return the first file matching this pattern */
+static int first_match(char *pattern, char *buffer)
+{
+        glob_t glob_info;
+
+        if (glob(pattern, GLOB_BRACE, NULL, &glob_info))
+                return -ENOENT;
+
+        if (glob_info.gl_pathc < 1) {
+                globfree(&glob_info);
+                return -ENOENT;
+        }
+
+        strcpy(buffer, glob_info.gl_pathv[0]);
+
+        globfree(&glob_info);
+        return 0;
+}
+
 /*
  * find the pool directory path under /proc
  * (can be also used to test if a fsname is known)
 /*
  * find the pool directory path under /proc
  * (can be also used to test if a fsname is known)
@@ -440,7 +458,6 @@ static int search_fsname(char *pathname, char *fsname)
 static int poolpath(char *fsname, char *pathname, char *pool_pathname)
 {
         int rc = 0;
 static int poolpath(char *fsname, char *pathname, char *pool_pathname)
 {
         int rc = 0;
-        glob_t glob_info;
         char pattern[PATH_MAX + 1];
         char buffer[PATH_MAX];
 
         char pattern[PATH_MAX + 1];
         char buffer[PATH_MAX];
 
@@ -455,18 +472,13 @@ static int poolpath(char *fsname, char *pathname, char *pool_pathname)
         snprintf(pattern, PATH_MAX,
                  "/proc/fs/lustre/lov/%s-*/pools",
                  fsname);
         snprintf(pattern, PATH_MAX,
                  "/proc/fs/lustre/lov/%s-*/pools",
                  fsname);
-        rc = glob(pattern, GLOB_BRACE, NULL, &glob_info);
+        rc = first_match(pattern, buffer);
         if (rc)
         if (rc)
-                return -ENOENT;
-
-        if (glob_info.gl_pathc == 0) {
-                globfree(&glob_info);
-                return -ENOENT;
-        }
+                return rc;
 
         /* in fsname test mode, pool_pathname is NULL */
         if (pool_pathname != NULL)
 
         /* in fsname test mode, pool_pathname is NULL */
         if (pool_pathname != NULL)
-                strcpy(pool_pathname, glob_info.gl_pathv[0]);
+                strcpy(pool_pathname, buffer);
 
         return 0;
 }
 
         return 0;
 }
@@ -2396,16 +2408,21 @@ static int get_mdtname(const char *name, char *format, char *buf)
         return sprintf(buf, format, name, suffix);
 }
 
         return sprintf(buf, format, name, suffix);
 }
 
-#define CHANGELOG_FILE "/proc/fs/lustre/mdd/%s%s/changelog"
 
 
-/* return a file desc to readable changelog */
+/* Return a file descriptor to a readable changelog */
 int llapi_changelog_open(const char *mdtname, long long startrec)
 {
         char path[256];
         int rc, fd;
 
 int llapi_changelog_open(const char *mdtname, long long startrec)
 {
         char path[256];
         int rc, fd;
 
-        if (get_mdtname(mdtname, CHANGELOG_FILE, path) <0)
+        /* Use either the mdd changelog (preferred) or a client mdc changelog */
+        if (get_mdtname(mdtname,
+                        "/proc/fs/lustre/md[cd]/%s%s{,-mdc-*}/changelog",
+                        path) < 0)
                 return -EINVAL;
                 return -EINVAL;
+        rc = first_match(path, path);
+        if (rc)
+                return rc;
 
         if ((fd = open(path, O_RDONLY)) < 0) {
                 llapi_err(LLAPI_MSG_ERROR, "error: can't open |%s|\n", path);
 
         if ((fd = open(path, O_RDONLY)) < 0) {
                 llapi_err(LLAPI_MSG_ERROR, "error: can't open |%s|\n", path);
@@ -2421,42 +2438,11 @@ int llapi_changelog_open(const char *mdtname, long long startrec)
         return fd;
 }
 
         return fd;
 }
 
-int llapi_changelog_clear(const char *mdtname, long long endrec)
-{
-        char path[256];
-        char val[20];
-        int fd, len;
-
-        if (endrec < 0) {
-                llapi_err(LLAPI_MSG_ERROR | LLAPI_MSG_NO_ERRNO,
-                          "can't purge negative records\n");
-                return -EINVAL;
-        }
-
-        if (get_mdtname(mdtname, CHANGELOG_FILE, path) <0)
-                return -EINVAL;
-
-        if ((fd = open(path, O_WRONLY)) < 0) {
-                llapi_err(LLAPI_MSG_ERROR, "error: can't open |%s|\n", path);
-                return errno;
-        }
-
-        snprintf(val, sizeof(val), "%llu", endrec);
-        len = write(fd, val, strlen(val));
-        close(fd);
-        if (len != strlen(val)) {
-                llapi_err(LLAPI_MSG_ERROR, "purge err\n");
-                return errno;
-        }
-
-        return 0;
-}
-
 static int dev_ioctl(struct obd_ioctl_data *data, int dev, int cmd)
 {
 static int dev_ioctl(struct obd_ioctl_data *data, int dev, int cmd)
 {
-        int rc;
         static char rawbuf[8192];
         static char *buf = rawbuf;
         static char rawbuf[8192];
         static char *buf = rawbuf;
+        int rc;
 
         data->ioc_dev = dev;
         memset(buf, 0, sizeof(rawbuf));
 
         data->ioc_dev = dev;
         memset(buf, 0, sizeof(rawbuf));
@@ -2482,7 +2468,6 @@ static int dev_ioctl(struct obd_ioctl_data *data, int dev, int cmd)
         return rc;
 }
 
         return rc;
 }
 
-/* should we just grep it from proc? */
 static int dev_name2dev(char *name)
 {
         struct obd_ioctl_data data;
 static int dev_name2dev(char *name)
 {
         struct obd_ioctl_data data;
@@ -2491,8 +2476,8 @@ static int dev_name2dev(char *name)
         memset(&data, 0, sizeof(data));
         data.ioc_inllen1 = strlen(name) + 1;
         data.ioc_inlbuf1 = name;
         memset(&data, 0, sizeof(data));
         data.ioc_inllen1 = strlen(name) + 1;
         data.ioc_inlbuf1 = name;
-        rc = dev_ioctl(&data, -1, OBD_IOC_NAME2DEV);
 
 
+        rc = dev_ioctl(&data, -1, OBD_IOC_NAME2DEV);
         if (rc < 0) {
                 llapi_err(LLAPI_MSG_ERROR, "Device %s not found %d\n", name,rc);
                 return rc;
         if (rc < 0) {
                 llapi_err(LLAPI_MSG_ERROR, "Device %s not found %d\n", name,rc);
                 return rc;
@@ -2500,11 +2485,75 @@ static int dev_name2dev(char *name)
         return data.ioc_dev;
 }
 
         return data.ioc_dev;
 }
 
+/* We need the full mdc name, and we shouldn't just grep from proc... */
+static void do_get_mdcname(char *obd_type_name, char *obd_name,
+                           char *obd_uuid, void *name)
+{
+        if (strncmp(obd_name, (char *)name, strlen((char *)name)) == 0)
+                strcpy((char *)name, obd_name);
+}
+
+static int get_mdcdev(const char *mdtname)
+{
+        char name[MAX_OBD_NAME];
+        char *type[] = { "mdc" };
+        int rc;
+
+        strcpy(name, mdtname);
+        rc = llapi_target_iterate(1, type, (void *)name, do_get_mdcname);
+        rc = rc < 0 ? : -rc;
+        if (rc < 0) {
+                llapi_err(LLAPI_MSG_ERROR, "Device %s not found %d\n", name,rc);
+                return rc;
+        }
+        return dev_name2dev(name);
+}
+
+int llapi_changelog_clear(const char *mdtname, const char *idstr,
+                          long long endrec)
+{
+        struct obd_ioctl_data data;
+        int dev, id, rc;
+
+        if (endrec < 0) {
+                llapi_err(LLAPI_MSG_ERROR | LLAPI_MSG_NO_ERRNO,
+                          "can't purge negative records\n");
+                return -EINVAL;
+        }
+
+        id = strtol(idstr + strlen(CHANGELOG_USER_PREFIX), NULL, 10);
+        if ((id == 0) || (strncmp(idstr, CHANGELOG_USER_PREFIX,
+                                  strlen(CHANGELOG_USER_PREFIX)) != 0)) {
+                llapi_err(LLAPI_MSG_ERROR | LLAPI_MSG_NO_ERRNO,
+                          "expecting id of the form '"CHANGELOG_USER_PREFIX
+                          "<num>'; got '%s'\n", idstr);
+                return -EINVAL;
+        }
+
+        dev = get_mdcdev(mdtname);
+        if (dev < 0) {
+                llapi_err(LLAPI_MSG_ERROR | LLAPI_MSG_NO_ERRNO,
+                          "can't find mdc for '%s'\n", mdtname);
+                return dev;
+        }
+
+        memset(&data, 0, sizeof(data));
+        data.ioc_u32_1 = id;
+        data.ioc_u64_1 = endrec;
+        rc = dev_ioctl(&data, dev, OBD_IOC_CHANGELOG_CLEAR);
+        if (rc)
+                llapi_err(LLAPI_MSG_ERROR | LLAPI_MSG_NO_ERRNO,
+                          "ioctl err %d", rc);
+        return rc;
+}
+
+
 int llapi_fid2path(char *device, char *fidstr, char *buf, int buflen,
 int llapi_fid2path(char *device, char *fidstr, char *buf, int buflen,
-                   __u64 recno, int *linkno)
+                   long long *recno, int *linkno)
 {
         struct lu_fid fid;
         struct obd_ioctl_data data;
 {
         struct lu_fid fid;
         struct obd_ioctl_data data;
+        char buffer[256];
         int dev, rc;
 
         while (*fidstr == '[')
         int dev, rc;
 
         while (*fidstr == '[')
@@ -2519,14 +2568,18 @@ int llapi_fid2path(char *device, char *fidstr, char *buf, int buflen,
                 return -EINVAL;
         }
 
                 return -EINVAL;
         }
 
-        dev = dev_name2dev(device);
+        rc = get_mdtname(device, "%s%s", buffer);
+        if (rc < 0)
+                return rc;
+
+        dev = dev_name2dev(buffer);
         if (dev < 0)
                 return dev;
 
         memset(&data, 0, sizeof(data));
         data.ioc_inlbuf1 = (char *)&fid;
         data.ioc_inllen1 = sizeof(fid);
         if (dev < 0)
                 return dev;
 
         memset(&data, 0, sizeof(data));
         data.ioc_inlbuf1 = (char *)&fid;
         data.ioc_inllen1 = sizeof(fid);
-        data.ioc_inlbuf2 = (char *)&recno;
+        data.ioc_inlbuf2 = (char *)recno;
         data.ioc_inllen2 = sizeof(__u64);
         data.ioc_inlbuf3 = (char *)linkno;
         data.ioc_inllen3 = sizeof(int);
         data.ioc_inllen2 = sizeof(__u64);
         data.ioc_inlbuf3 = (char *)linkno;
         data.ioc_inllen3 = sizeof(int);
@@ -2556,3 +2609,4 @@ int llapi_path2fid(const char *path, unsigned long long *seq,
         return rc;
 }
 
         return rc;
 }
 
+
index a60d2a1..8d1b671 100644 (file)
@@ -3156,5 +3156,111 @@ void  llapi_ping_target(char *obd_type, char *obd_name,
         } else {
                 printf("%s active.\n", obd_name);
         }
         } else {
                 printf("%s active.\n", obd_name);
         }
+}
+
+int jt_changelog_register(int argc, char **argv)
+{
+        char rawbuf[MAX_IOC_BUFLEN], *buf = rawbuf;
+        struct obd_ioctl_data data;
+        char devname[30];
+        int rc;
+
+        if (argc > 2)
+                return CMD_HELP;
+        else if (argc == 2 && strcmp(argv[1], "-n") != 0)
+                return CMD_HELP;
+        if (cur_device < 0)
+                return CMD_HELP;
 
 
+        memset(&data, 0x00, sizeof(data));
+        data.ioc_dev = cur_device;
+        memset(buf, 0, sizeof(rawbuf));
+        rc = obd_ioctl_pack(&data, &buf, sizeof(rawbuf));
+        if (rc) {
+                fprintf(stderr, "error: %s: invalid ioctl\n",
+                        jt_cmdname(argv[0]));
+               return rc;
+        }
+
+        rc = l2_ioctl(OBD_DEV_ID, OBD_IOC_CHANGELOG_REG, buf);
+        if (rc < 0) {
+                fprintf(stderr, "error: %s: %s\n", jt_cmdname(argv[0]),
+                        strerror(rc = errno));
+                return rc;
+        }
+        obd_ioctl_unpack(&data, buf, sizeof(rawbuf));
+
+        if (data.ioc_u32_1 == 0) {
+                fprintf(stderr, "received invalid userid!\n");
+                return EPROTO;
+        }
+
+        if (lcfg_get_devname() != NULL)
+                strcpy(devname, lcfg_get_devname());
+        else
+                sprintf(devname, "dev %d", cur_device);
+
+        if (argc == 2)
+                /* -n means bare name */
+                printf(CHANGELOG_USER_PREFIX"%u\n", data.ioc_u32_1);
+        else
+                printf("%s: Registered changelog userid '"CHANGELOG_USER_PREFIX
+                       "%u'\n", devname, data.ioc_u32_1);
+        return 0;
 }
 }
+
+int jt_changelog_deregister(int argc, char **argv)
+{
+        char rawbuf[MAX_IOC_BUFLEN], *buf = rawbuf;
+        struct obd_ioctl_data data;
+        char devname[30];
+        int id, rc;
+
+        if (argc != 2 || cur_device < 0)
+                return CMD_HELP;
+
+        id = strtol(argv[1] + strlen(CHANGELOG_USER_PREFIX), NULL, 10);
+        if ((id == 0) || (strncmp(argv[1], CHANGELOG_USER_PREFIX,
+                                  strlen(CHANGELOG_USER_PREFIX)) != 0)) {
+                fprintf(stderr, "expecting id of the form '"
+                        CHANGELOG_USER_PREFIX"<num>'; got '%s'\n", argv[1]);
+                return CMD_HELP;
+        }
+
+        memset(&data, 0x00, sizeof(data));
+        data.ioc_dev = cur_device;
+        data.ioc_u32_1 = id;
+        memset(buf, 0, sizeof(rawbuf));
+        rc = obd_ioctl_pack(&data, &buf, sizeof(rawbuf));
+        if (rc) {
+                fprintf(stderr, "error: %s: invalid ioctl\n",
+                        jt_cmdname(argv[0]));
+                return rc;
+        }
+
+        rc = l2_ioctl(OBD_DEV_ID, OBD_IOC_CHANGELOG_DEREG, buf);
+        if (rc < 0) {
+                fprintf(stderr, "error: %s: %s\n", jt_cmdname(argv[0]),
+                        strerror(rc = errno));
+                return rc;
+        }
+        obd_ioctl_unpack(&data, buf, sizeof(rawbuf));
+
+        if (data.ioc_u32_1 != id) {
+                fprintf(stderr, "No changelog user '%s'.  Blocking user"
+                        " is '"CHANGELOG_USER_PREFIX"%d'.\n", argv[1],
+                        data.ioc_u32_1);
+                return ENOENT;
+        }
+
+        if (lcfg_get_devname() != NULL)
+                strcpy(devname, lcfg_get_devname());
+        else
+                sprintf(devname, "dev %d", cur_device);
+
+        printf("%s: Deregistered changelog user '"CHANGELOG_USER_PREFIX"%d'\n",
+               devname, data.ioc_u32_1);
+        return 0;
+}
+
+
index eeb1bb8..dccd213 100644 (file)
@@ -120,5 +120,7 @@ int jt_blockdev_detach(int argc, char **argv);
 int jt_blockdev_info(int argc, char **argv);
 
 int jt_pool_cmd(int argc, char **argv);
 int jt_blockdev_info(int argc, char **argv);
 
 int jt_pool_cmd(int argc, char **argv);
+int jt_changelog_register(int argc, char **argv);
+int jt_changelog_deregister(int argc, char **argv);
 
 #endif
 
 #endif
index 490c6a9..ed189d9 100644 (file)
@@ -1342,6 +1342,10 @@ main(int argc, char **argv)
         CHECK_VALUE(MDS_SET_INFO);
         CHECK_VALUE(MDS_QUOTACHECK);
         CHECK_VALUE(MDS_QUOTACTL);
         CHECK_VALUE(MDS_SET_INFO);
         CHECK_VALUE(MDS_QUOTACHECK);
         CHECK_VALUE(MDS_QUOTACTL);
+        CHECK_VALUE(MDS_GETXATTR);
+        CHECK_VALUE(MDS_SETXATTR);
+        CHECK_VALUE(MDS_WRITEPAGE);
+        CHECK_VALUE(MDS_IS_SUBDIR);
         CHECK_VALUE(MDS_LAST_OPC);
 
         CHECK_VALUE(REINT_SETATTR);
         CHECK_VALUE(MDS_LAST_OPC);
 
         CHECK_VALUE(REINT_SETATTR);
index 1a1fa03..b2c591b 100644 (file)
@@ -62,8 +62,8 @@ void lustre_assert_wire_constants(void)
 {
         /* Wire protocol assertions generated by 'wirecheck'
          * (make -C lustre/utils newwiretest)
 {
         /* Wire protocol assertions generated by 'wirecheck'
          * (make -C lustre/utils newwiretest)
-         * running on Linux localhost.localdomain 2.6.18-prep #3 SMP Sun Nov 23 08:04:44 EST 2008 i68
-         * with gcc version 4.1.1 20061011 (Red Hat 4.1.1-30) */
+         * running on Linux cfs21 2.6.18-92.el5xen #1 SMP Tue Jun 10 19:55:54 EDT 2008 i686 i686 i386
+         * with gcc version 4.1.2 20071124 (Red Hat 4.1.2-42) */
 
 
         /* Constants... */
 
 
         /* Constants... */
@@ -163,6 +163,14 @@ void lustre_assert_wire_constants(void)
                  (long long)MDS_QUOTACHECK);
         LASSERTF(MDS_QUOTACTL == 48, " found %lld\n",
                  (long long)MDS_QUOTACTL);
                  (long long)MDS_QUOTACHECK);
         LASSERTF(MDS_QUOTACTL == 48, " found %lld\n",
                  (long long)MDS_QUOTACTL);
+        LASSERTF(MDS_GETXATTR == 49, " found %lld\n",
+                 (long long)MDS_GETXATTR);
+        LASSERTF(MDS_SETXATTR == 50, " found %lld\n",
+                 (long long)MDS_SETXATTR);
+        LASSERTF(MDS_WRITEPAGE == 51, " found %lld\n",
+                 (long long)MDS_WRITEPAGE);
+        LASSERTF(MDS_IS_SUBDIR == 52, " found %lld\n",
+                 (long long)MDS_IS_SUBDIR);
         LASSERTF(MDS_LAST_OPC == 53, " found %lld\n",
                  (long long)MDS_LAST_OPC);
         LASSERTF(REINT_SETATTR == 1, " found %lld\n",
         LASSERTF(MDS_LAST_OPC == 53, " found %lld\n",
                  (long long)MDS_LAST_OPC);
         LASSERTF(REINT_SETATTR == 1, " found %lld\n",
@@ -470,6 +478,7 @@ void lustre_assert_wire_constants(void)
         CLASSERT(OBD_CONNECT_AT == 0x01000000ULL);
         CLASSERT(OBD_CONNECT_CANCELSET == 0x400000ULL);
         CLASSERT(OBD_CONNECT_LRU_RESIZE == 0x02000000ULL);
         CLASSERT(OBD_CONNECT_AT == 0x01000000ULL);
         CLASSERT(OBD_CONNECT_CANCELSET == 0x400000ULL);
         CLASSERT(OBD_CONNECT_LRU_RESIZE == 0x02000000ULL);
+        CLASSERT(OBD_CONNECT_SKIP_ORPHAN == 0x400000000ULL);
 
         /* Checks for struct obdo */
         LASSERTF((int)sizeof(struct obdo) == 208, " found %lld\n",
 
         /* Checks for struct obdo */
         LASSERTF((int)sizeof(struct obdo) == 208, " found %lld\n",