Whamcloud - gitweb
Land b_orphan on HEAD (20040130_1601)
authoradilger <adilger>
Sat, 31 Jan 2004 00:33:58 +0000 (00:33 +0000)
committeradilger <adilger>
Sat, 31 Jan 2004 00:33:58 +0000 (00:33 +0000)
36 files changed:
lnet/include/config.h.in
lustre/ChangeLog
lustre/configure.in
lustre/include/linux/lustre_commit_confd.h
lustre/include/linux/lustre_fsfilt.h
lustre/include/linux/lustre_idl.h
lustre/include/linux/lustre_log.h
lustre/include/linux/lustre_mds.h
lustre/ldlm/ldlm_lib.c
lustre/ldlm/ldlm_lockd.c
lustre/lov/lov_log.c
lustre/lvfs/fsfilt_ext3.c
lustre/mds/handler.c
lustre/mds/mds_log.c
lustre/mds/mds_lov.c
lustre/mds/mds_open.c
lustre/mds/mds_reint.c
lustre/mds/mds_unlink_open.c
lustre/obdclass/llog.c
lustre/obdclass/llog_cat.c
lustre/obdclass/llog_ioctl.c
lustre/obdclass/llog_lvfs.c
lustre/obdclass/llog_obd.c
lustre/obdclass/llog_test.c
lustre/obdclass/obd_config.c
lustre/obdfilter/filter.c
lustre/obdfilter/filter_log.c
lustre/ost/ost_handler.c
lustre/portals/include/config.h.in
lustre/ptlrpc/llog_client.c
lustre/ptlrpc/llog_net.c
lustre/ptlrpc/llog_server.c
lustre/ptlrpc/recov_thread.c
lustre/tests/replay-single.sh
lustre/utils/wirecheck.c
lustre/utils/wiretest.c

index f9605ab..f295154 100644 (file)
@@ -1,8 +1,5 @@
 /* portals/include/config.h.in.  Generated from configure.in by autoheader.  */
 
-/* Compile with orphan support */
-#undef ENABLE_ORPHANS
-
 /* Use the Pinger */
 #undef ENABLE_PINGER
 
index 6cf3e48..5588b2e 100644 (file)
@@ -1,6 +1,7 @@
 tbd         Cluster File Systems, Inc. <info@clusterfs.com>
        * version 1.2.0
        * bug fixes
+       - orphan handling to avoid losing space on client/server crashes
 
 2004-01-27  Cluster File Systems, Inc. <info@clusterfs.com>
        * version 1.0.3
@@ -89,7 +90,7 @@ tbd         Cluster File Systems, Inc. <info@clusterfs.com>
        - ENOMEM detection and retry on socknal sends (2230)
        - use GFP_NOFS throughout Lustre, to combat ENOMEM (2230)
        - move osc_rpcd into ptlrpc, for use in MDC and others (2329)
-       - protect MDS inode fsdata with stronger locking (2313)
+       - protect MDS inode fsdata with stronger locking; fixes assertion (2313)
        - better error messages when a client is rejected during recovery (1505)
        - avoid cancelling locks which were never granted, after failure (2330)
        - fix i_sem/journal inversion in mds_client_add (2333)
index 9eda4b6..66a1840 100644 (file)
@@ -33,12 +33,6 @@ if test x$enable_pinger != xno ; then
   AC_DEFINE(ENABLE_PINGER, 1, Use the Pinger)
 fi
 
-# very experimental orphan support
-AC_ARG_ENABLE(orphans, [  --enable-orphans very experimental orphan recovery support])
-if test x$enable_orphans = xyes ; then
-  AC_DEFINE(ENABLE_ORPHANS, 1, Compile with orphan support)
-fi
-
 AC_ARG_WITH(obd-buffer-size, [  --with-obd-buffer-size=[size] set lctl ioctl maximum bytes (default=8192)],OBD_BUFFER_SIZE=$with_obd_buffer_size,OBD_BUFFER_SIZE=8192)
 AC_DEFINE_UNQUOTED(OBD_MAX_IOCTL_BUFFER, $OBD_BUFFER_SIZE, [IOCTL Buffer Size])
 
index a749911..6183596 100644 (file)
 
 struct llog_canceld_ctxt {
         struct list_head           llcd_list;  /* free or pending struct list */
-        struct obd_import         *llcd_import;
+        struct llog_ctxt          *llcd_ctxt;
         struct llog_commit_master *llcd_lcm;
         int                        llcd_tries; /* number of tries to send */
-        struct llog_ctxt_gen       llcd_gen; 
         int                        llcd_cookiebytes;
         struct llog_cookie         llcd_cookies[0];
 };
index 84c44ac..0d62e90 100644 (file)
@@ -91,6 +91,7 @@ extern void fsfilt_put_ops(struct fsfilt_operations *fs_ops);
 #define FSFILT_OP_LINK           9
 #define FSFILT_OP_CREATE_LOG    10
 #define FSFILT_OP_UNLINK_LOG    11
+#define FSFILT_OP_CANCEL_UNLINK_LOG    12
 
 static inline void *fsfilt_start(struct obd_device *obd, struct inode *inode,
                                  int op, struct obd_trans_info *oti)
index 4b6ad12..da9bd52 100644 (file)
@@ -387,7 +387,7 @@ static inline struct llog_cookie *obdo_logcookie(struct obdo *oa)
         return (struct llog_cookie *)(oa->o_inline +
                                       sizeof(struct lustre_handle));
 }
-/* don't forget obdo_fid which is way down at the bottom so it can 
+/* don't forget obdo_fid which is way down at the bottom so it can
  * come after the definition of llog_cookie */
 
 struct obd_statfs {
@@ -491,12 +491,6 @@ typedef enum {
 #define DISP_OPEN_OPEN    (1 << 5)
 #define DISP_ENQ_COMPLETE (1<<6)
 
-
-struct ll_uctxt {
-        __u32 gid1;
-        __u32 gid2;
-};
-
 struct ll_fid {
         __u64 id;
         __u32 generation;
@@ -824,17 +818,23 @@ extern void lustre_swab_ptlbd_rsp (struct ptlbd_rsp *r);
 /*
  * Opcodes for management/monitoring node.
  */
-#define MGMT_CONNECT    250
-#define MGMT_DISCONNECT 251
-#define MGMT_EXCEPTION  252 /* node died, etc. */
+typedef enum {
+        MGMT_CONNECT = 250,
+        MGMT_DISCONNECT,
+        MGMT_EXCEPTION,         /* node died, etc. */
+        MGMT_LAST_OPC
+} mgmt_cmd_t;
+#define MGMT_FIRST_OPC MGMT_CONNECT
 
 /*
  * Opcodes for multiple servers.
  */
 
-#define OBD_PING       400
-#define OBD_LOG_CANCEL 401
-#define OBD_LAST_OPC  (OBD_LOG_CANCEL + 1)
+typedef enum {
+        OBD_PING = 400,
+        OBD_LOG_CANCEL,
+        OBD_LAST_OPC
+} obd_cmd_t;
 #define OBD_FIRST_OPC OBD_PING
 
 /* catalog of log objects */
@@ -855,6 +855,7 @@ typedef enum {
         MDS_UNLINK_REC   = 0x10610000 | (MDS_REINT << 8) | REINT_UNLINK,
         OBD_CFG_REC      = 0x10620000,
         PTL_CFG_REC      = 0x10630000,
+        LLOG_GEN_REC     = 0x10640000,
         LLOG_HDR_MAGIC   = 0x10645539,
         LLOG_LOGID_MAGIC = 0x1064553a,
 } llog_op_type;
@@ -915,6 +916,16 @@ struct llog_size_change_rec {
         struct llog_rec_tail    lsc_tail;
 } __attribute__((packed));
 
+struct llog_gen {
+        __u64 mnt_cnt;
+        __u64 conn_cnt;
+} __attribute__((packed));
+
+struct llog_gen_rec {
+        struct llog_rec_hdr     lgr_hdr;
+        struct llog_gen         lgr_gen;
+        struct llog_rec_tail    lgr_tail;
+};
 /* On-disk header structure of each log object, stored in little endian order */
 #define LLOG_CHUNK_SIZE         4096
 #define LLOG_HEADER_SIZE        (96)
@@ -935,6 +946,7 @@ struct llog_log_hdr {
         __u32                   llh_size;
         __u32                   llh_flags;
         __u32                   llh_cat_idx;
+        /* for a catlog the first plain slot is next to it */
         struct obd_uuid         llh_tgtuuid;
         __u32                   llh_reserved[LLOG_HEADER_SIZE/sizeof(__u32) - 23];
         __u32                   llh_bitmap[LLOG_BITMAP_BYTES/sizeof(__u32)];
@@ -970,13 +982,8 @@ struct llogd_body {
         __u64 lgd_cur_offset;
 } __attribute__((packed));
 
-struct llog_ctxt_gen {
-        __u64 mnt_cnt;
-        __u64 conn_cnt;
-};
-
 struct llogd_conn_body {
-        struct llog_ctxt_gen    lgdc_gen;
+        struct llog_gen         lgdc_gen;
         struct llog_logid       lgdc_logid;
         __u32                   lgdc_ctxt_idx;
 } __attribute__((packed));
index c3ff249..2b62378 100644 (file)
         snprintf(logname, sizeof(logname), "LOGS/%s", name)
 
 struct plain_handle_data {
-        struct list_head   phd_entry;
-        struct llog_handle     *phd_cat_handle; 
-        struct llog_cookie phd_cookie; /* cookie of this log in its cat */
-        int                phd_last_idx;
+        struct list_head    phd_entry;
+        struct llog_handle *phd_cat_handle;
+        struct llog_cookie  phd_cookie; /* cookie of this log in its cat */
+        int                 phd_last_idx;
 };
 
 struct cat_handle_data {
@@ -71,9 +71,10 @@ struct llog_handle {
 
 /* llog.c  -  general API */
 typedef int (*llog_cb_t)(struct llog_handle *, struct llog_rec_hdr *, void *);
-int llog_init_handle(struct llog_handle *handle, int flags, 
+int llog_init_handle(struct llog_handle *handle, int flags,
                      struct obd_uuid *uuid);
-int llog_process(struct llog_handle *loghandle, llog_cb_t cb, void *data);
+int llog_process(struct llog_handle *loghandle, llog_cb_t cb,
+                 void *data, void *catdata);
 extern struct llog_handle *llog_alloc_handle(void);
 extern void llog_free_handle(struct llog_handle *handle);
 extern int llog_close(struct llog_handle *cathandle);
@@ -84,26 +85,34 @@ struct llog_process_data {
         void *lpd_data;
         llog_cb_t lpd_cb;
 };
+
+struct llog_process_cat_data {
+        int     first_idx;
+        int     last_idx;
+        /* to process catlog across zero record */
+};
+
 int llog_cat_put(struct llog_handle *cathandle);
 int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec,
                      struct llog_cookie *reccookie, void *buf);
 int llog_cat_cancel_records(struct llog_handle *cathandle, int count,
                             struct llog_cookie *cookies);
 int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data);
+int llog_cat_set_first_idx(struct llog_handle *cathandle, int index);
 
 /* llog_obd.c */
 int llog_setup(struct obd_device *obd, int index, struct obd_device *disk_obd,
-               int count,  struct llog_logid *logid, struct llog_operations *op);
+               int count,  struct llog_logid *logid,struct llog_operations *op);
 int llog_cleanup(struct llog_ctxt *);
 int llog_sync(struct llog_ctxt *ctxt, struct obd_export *exp);
-int llog_add(struct llog_ctxt *ctxt,
-                        struct llog_rec_hdr *rec, struct lov_stripe_md *lsm,
-                        struct llog_cookie *logcookies, int numcookies);
+int llog_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec,
+             struct lov_stripe_md *lsm, struct llog_cookie *logcookies,
+             int numcookies);
 int llog_cancel(struct llog_ctxt *, struct lov_stripe_md *lsm,
-                    int count, struct llog_cookie *cookies, int flags);
+                int count, struct llog_cookie *cookies, int flags);
 
-int llog_obd_origin_setup(struct obd_device *obd, int index, 
-                          struct obd_device *disk_obd, int count, 
+int llog_obd_origin_setup(struct obd_device *obd, int index,
+                          struct obd_device *disk_obd, int count,
                           struct llog_logid *logid);
 int llog_obd_origin_cleanup(struct llog_ctxt *ctxt);
 int llog_obd_origin_add(struct llog_ctxt *ctxt,
@@ -118,14 +127,14 @@ int obd_llog_finish(struct obd_device *obd, int count);
 
 /* llog_ioctl.c */
 int llog_ioctl(struct llog_ctxt *ctxt, int cmd, struct obd_ioctl_data *data);
-int llog_catlog_list(struct obd_device *obd, int count, 
+int llog_catlog_list(struct obd_device *obd, int count,
                      struct obd_ioctl_data *data);
 
 /* llog_net.c */
 int llog_initiator_connect(struct llog_ctxt *ctxt);
 int llog_receptor_accept(struct llog_ctxt *ctxt, struct obd_import *imp);
 int llog_origin_connect(struct llog_ctxt *ctxt, int count,
-                        struct llog_logid *logid, struct llog_ctxt_gen *gen);
+                        struct llog_logid *logid, struct llog_gen *gen);
 int llog_handle_connect(struct ptlrpc_request *req);
 
 /* recov_thread.c */
@@ -133,40 +142,34 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
                          struct lov_stripe_md *lsm, int count,
                          struct llog_cookie *cookies, int flags);
 int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp);
-int llog_repl_connect(struct llog_ctxt *ctxt, int count, 
-                      struct llog_logid *logid, struct llog_ctxt_gen *gen);
+int llog_repl_connect(struct llog_ctxt *ctxt, int count,
+                      struct llog_logid *logid, struct llog_gen *gen);
 
 struct llog_operations {
         int (*lop_write_rec)(struct llog_handle *loghandle,
-                             struct llog_rec_hdr *rec, 
-                             struct llog_cookie *logcookies, 
-                             int numcookies, 
-                             void *,
-                             int idx);
+                             struct llog_rec_hdr *rec,
+                             struct llog_cookie *logcookies, int numcookies,
+                             void *, int idx);
         int (*lop_destroy)(struct llog_handle *handle);
-        int (*lop_next_block)(struct llog_handle *h, 
-                              int *curr_idx,  
-                              int next_idx, 
-                              __u64 *offset, 
-                              void *buf, 
-                              int len);
+        int (*lop_next_block)(struct llog_handle *h, int *curr_idx,
+                              int next_idx, __u64 *offset, void *buf, int len);
         int (*lop_create)(struct llog_ctxt *ctxt, struct llog_handle **,
                           struct llog_logid *logid, char *name);
         int (*lop_close)(struct llog_handle *handle);
         int (*lop_read_header)(struct llog_handle *handle);
 
-        int (*lop_setup)(struct obd_device *obd, int ctxt_idx, 
-                         struct obd_device *disk_obd, int count, 
+        int (*lop_setup)(struct obd_device *obd, int ctxt_idx,
+                         struct obd_device *disk_obd, int count,
                          struct llog_logid *logid);
         int (*lop_sync)(struct llog_ctxt *ctxt, struct obd_export *exp);
         int (*lop_cleanup)(struct llog_ctxt *ctxt);
-        int (*lop_add)(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec, 
-                       struct lov_stripe_md *lsm, 
+        int (*lop_add)(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec,
+                       struct lov_stripe_md *lsm,
                        struct llog_cookie *logcookies, int numcookies);
         int (*lop_cancel)(struct llog_ctxt *ctxt, struct lov_stripe_md *lsm,
                           int count, struct llog_cookie *cookies, int flags);
         int (*lop_connect)(struct llog_ctxt *ctxt, int count,
-                           struct llog_logid *logid, struct llog_ctxt_gen *gen);
+                           struct llog_logid *logid, struct llog_gen *gen);
         /* XXX add 2 more: commit callbacks and llog recovery functions */
 };
 
@@ -178,10 +181,10 @@ extern struct llog_operations llog_lvfs_ops;
 
 struct llog_ctxt {
         int                      loc_idx; /* my index the obd array of ctxt's */
-        struct llog_ctxt_gen     loc_gen; 
+        struct llog_gen          loc_gen;
         struct obd_device       *loc_obd; /* points back to the containing obd*/
         struct obd_export       *loc_exp;
-        struct obd_import       *loc_imp; /* to use in RPC's: can be backward 
+        struct obd_import       *loc_imp; /* to use in RPC's: can be backward
                                              pointing import */
         struct llog_operations  *loc_logops;
         struct llog_handle      *loc_handle;
@@ -190,20 +193,19 @@ struct llog_ctxt {
         void                    *llog_proc_cb;
 };
 
-static inline void log_gen_init(struct llog_ctxt *ctxt)
+static inline void llog_gen_init(struct llog_ctxt *ctxt)
 {
         struct obd_device *obd = ctxt->loc_exp->exp_obd;
 
         if (!strcmp(obd->obd_type->typ_name, "mds"))
                 ctxt->loc_gen.mnt_cnt = obd->u.mds.mds_mount_count;
-        else if (!strstr(obd->obd_type->typ_name, "filter")) {
-                ctxt->loc_gen.mnt_cnt = obd->u.filter.fo_mount_count; 
-        }
+        else if (!strstr(obd->obd_type->typ_name, "filter"))
+                ctxt->loc_gen.mnt_cnt = obd->u.filter.fo_mount_count;
         else
-                ctxt->loc_gen.mnt_cnt = 0; 
+                ctxt->loc_gen.mnt_cnt = 0;
 }
 
-static inline int log_gen_lt(struct llog_ctxt_gen a, struct llog_ctxt_gen b)
+static inline int llog_gen_lt(struct llog_gen a, struct llog_gen b)
 {
         if (a.mnt_cnt < b.mnt_cnt)
                 return 1;
@@ -212,15 +214,19 @@ static inline int log_gen_lt(struct llog_ctxt_gen a, struct llog_ctxt_gen b)
         return(a.conn_cnt < b.conn_cnt ? 1 : 0);
 }
 
+#define LLOG_GEN_INC(gen)  ((gen).conn_cnt) ++
+#define LLOG_PROC_BREAK 0x0001
 
 static inline int llog_obd2ops(struct llog_ctxt *ctxt,
                                struct llog_operations **lop)
 {
         if (ctxt == NULL)
                 return -ENOTCONN;
+
         *lop = ctxt->loc_logops;
         if (*lop == NULL)
                 return -EOPNOTSUPP;
+
         return 0;
 }
 
@@ -229,6 +235,7 @@ static inline int llog_handle2ops(struct llog_handle *loghandle,
 {
         if (loghandle == NULL)
                 return -EINVAL;
+
         return llog_obd2ops(loghandle->lgh_ctxt, lop);
 }
 
@@ -242,8 +249,8 @@ static inline struct llog_ctxt *llog_get_context(struct obd_device *obd,
 {
         if (index < 0 || index >= LLOG_MAX_CTXTS)
                 return NULL;
-        else
-                return obd->obd_llog_ctxt[index];
+
+        return obd->obd_llog_ctxt[index];
 }
 
 static inline int llog_write_rec(struct llog_handle *handle,
@@ -254,7 +261,7 @@ static inline int llog_write_rec(struct llog_handle *handle,
         struct llog_operations *lop;
         int rc, buflen;
         ENTRY;
-        
+
         rc = llog_handle2ops(handle, &lop);
         if (rc)
                 RETURN(rc);
@@ -277,7 +284,7 @@ static inline int llog_read_header(struct llog_handle *handle)
         struct llog_operations *lop;
         int rc;
         ENTRY;
-        
+
         rc = llog_handle2ops(handle, &lop);
         if (rc)
                 RETURN(rc);
@@ -293,7 +300,7 @@ static inline int llog_destroy(struct llog_handle *handle)
         struct llog_operations *lop;
         int rc;
         ENTRY;
-        
+
         rc = llog_handle2ops(handle, &lop);
         if (rc)
                 RETURN(rc);
@@ -322,7 +329,7 @@ static inline int llog_cancel(struct obd_export *exp,
         rc = lop->lop_cancel(exp, lsm, count, cookies, flags);
         RETURN(rc);
 }
-#endif 
+#endif
 
 static inline int llog_next_block(struct llog_handle *loghandle, int *cur_idx,
                                   int next_idx, __u64 *cur_offset, void *buf,
@@ -343,8 +350,7 @@ static inline int llog_next_block(struct llog_handle *loghandle, int *cur_idx,
         RETURN(rc);
 }
 
-static inline int llog_create(struct llog_ctxt *ctxt, 
-                              struct llog_handle **res,
+static inline int llog_create(struct llog_ctxt *ctxt, struct llog_handle **res,
                               struct llog_logid *logid, char *name)
 {
         struct llog_operations *lop;
@@ -362,8 +368,7 @@ static inline int llog_create(struct llog_ctxt *ctxt,
 }
 
 static inline int llog_connect(struct llog_ctxt *ctxt, int count,
-                               struct llog_logid *logid,
-                               struct llog_ctxt_gen *gen)
+                               struct llog_logid *logid, struct llog_gen *gen)
 {
         struct llog_operations *lop;
         int rc;
index d303272..a1f333a 100644 (file)
@@ -56,6 +56,11 @@ struct lustre_md {
         struct lov_stripe_md *lsm;
 };
 
+struct ll_uctxt {
+        __u32 gid1;
+        __u32 gid2;
+};
+
 struct mdc_op_data {
         struct ll_fid fid1;
         struct ll_fid fid2;
index 1ca54d3..f4c0fc5 100644 (file)
@@ -642,7 +642,7 @@ void target_abort_recovery(void *data)
 
         class_disconnect_exports(obd, 0);
 
-        /* when recovery was abort, cleanup orphans for mds */
+        /* when recovery was aborted, cleanup orphans on mds and ost */
         if (OBT(obd) && OBP(obd, postrecov)) {
                 rc = OBP(obd, postrecov)(obd);
                 if (rc >= 0)
@@ -930,12 +930,12 @@ int target_queue_final_reply(struct ptlrpc_request *req, int rc)
                        obd->obd_name);
                 obd->obd_recovering = 0;
 
-                /* when recovering finished, cleanup orphans for mds */
+                /* when recovery finished, cleanup orphans on mds and ost */
                 if (OBT(obd) && OBP(obd, postrecov)) {
                         rc2 = OBP(obd, postrecov)(obd);
                         if (rc2 >= 0)
-                                CWARN("%s: all clients recovered, %d MDS orphans "
-                                       "deleted\n", obd->obd_name, rc2);
+                                CWARN("%s: all clients recovered, %d MDS "
+                                      "orphans deleted\n", obd->obd_name, rc2);
                         else
                                 CERROR("postrecov failed %d\n", rc2);
                 }
@@ -1133,4 +1133,3 @@ void *ldlm_put_lock_into_req(struct ptlrpc_request *req,
         LBUG();
         return NULL;
 }
-
index 95cb950..042a383 100644 (file)
@@ -855,41 +855,36 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
         LASSERT(req->rq_export != NULL);
         LASSERT(req->rq_export->exp_obd != NULL);
 
-#ifdef ENABLE_ORPHANS
         /* FIXME - how to send reply */
         if (req->rq_reqmsg->opc == OBD_LOG_CANCEL) {
                 int rc = llog_origin_handle_cancel(req);
                 ldlm_callback_reply(req, rc);
                 RETURN(0);
         }
-
         if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_CREATE) {
                 int rc = llog_origin_handle_create(req);
                 req->rq_status = rc;
                 ptlrpc_reply(req);
                 RETURN(0);
         }
-
         if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_NEXT_BLOCK) {
                 int rc = llog_origin_handle_next_block(req);
                 req->rq_status = rc;
                 ptlrpc_reply(req);
                 RETURN(0);
         }
-
         if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_READ_HEADER) {
                 int rc = llog_origin_handle_read_header(req);
                 req->rq_status = rc;
                 ptlrpc_reply(req);
                 RETURN(0);
         }
-
         if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_CLOSE) {
                 int rc = llog_origin_handle_close(req);
                 ldlm_callback_reply(req, rc);
                 RETURN(0);
         }
-#endif
+
         ns = req->rq_export->exp_obd->obd_namespace;
         LASSERT(ns != NULL);
 
index 0b9f6f3..59dc29e 100644 (file)
 
 #include "lov_internal.h"
 
-#if 0
-static int lov_logop_cleanup(struct llog_ctxt *ctxt)
-{
-        struct lov_obd *lov = &ctxt->loc_obd->u.lov;
-        int i, rc = 0;
-
-        ENTRY;
-        for (i = 0; i < lov->desc.ld_tgt_count; i++) {
-                struct obd_device *child = lov->tgts[i].ltd_exp->exp_obd;
-                struct llog_ctxt *cctxt = llog_get_context(child, ctxt->loc_idx);
-                rc = llog_cleanup(cctxt);
-                if (rc) {
-                        CERROR("error lov_llog_open %d\n", i);
-                        break;
-                }
-        }
-        RETURN(rc);
-}
-#endif
-
 /* Add log records for each OSC that this object is striped over, and return
  * cookies for each one.  We _would_ have nice abstraction here, except that
  * we need to keep cookies in stripe order, even if some are NULL, so that
@@ -101,7 +81,8 @@ static int lov_llog_origin_add(struct llog_ctxt *ctxt,
 
                 lur->lur_oid = loi->loi_id;
                 lur->lur_ogen = loi->loi_gr;
-                rc += llog_add(cctxt, &lur->lur_hdr, NULL, logcookies + rc, numcookies - rc);
+                rc += llog_add(cctxt, &lur->lur_hdr, NULL, logcookies + rc,
+                                numcookies - rc);
 
         }
         OBD_FREE(lur, sizeof(*lur));
@@ -110,8 +91,8 @@ static int lov_llog_origin_add(struct llog_ctxt *ctxt,
 }
 
 static int lov_llog_origin_connect(struct llog_ctxt *ctxt, int count,
-                                   struct llog_logid *logid,
-                                   struct llog_ctxt_gen *gen)
+                                   struct llog_logid *logid, 
+                                   struct llog_gen *gen)
 {
         struct obd_device *obd = ctxt->loc_obd;
         struct lov_obd *lov = &obd->u.lov;
index 5de8a9b..a45560a 100644 (file)
@@ -74,6 +74,7 @@ static void *fsfilt_ext3_start(struct inode *inode, int op, void *desc_private)
 {
         /* For updates to the last recieved file */
         int nblocks = EXT3_DATA_TRANS_BLOCKS;
+        int blocksize, block_count = 0;
         void *handle;
 
         if (current->journal_info) {
@@ -119,6 +120,13 @@ static void *fsfilt_ext3_start(struct inode *inode, int op, void *desc_private)
                 /* Setattr on inode */
                 nblocks += 1;
                 break;
+        case FSFILT_OP_CANCEL_UNLINK_LOG:
+                blocksize = 1 << inode->i_blkbits;
+                block_count = (blocksize - 1) + LLOG_CHUNK_SIZE;
+                block_count = (block_count + blocksize - 1) >> inode->i_blkbits;
+                block_count = block_count * EXT3_DATA_TRANS_BLOCKS + 2;
+                nblocks = 2 * 2 * block_count;
+                break;
         default: CERROR("unknown transaction start op %d\n", op);
                  LBUG();
         }
index 8434ce0..b8ade94 100644 (file)
@@ -993,7 +993,7 @@ static char *reint_names[] = {
 
 int mds_handle(struct ptlrpc_request *req)
 {
-        int should_process;
+        int should_process, fail = OBD_FAIL_MDS_ALL_REPLY_NET;
         int rc = 0;
         struct mds_obd *mds = NULL; /* quell gcc overwarning */
         struct obd_device *obd = NULL;
@@ -1130,7 +1130,7 @@ int mds_handle(struct ptlrpc_request *req)
                         break;
 
                 rc = mds_reint(req, 0, NULL);
-                OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET_REP, 0);
+                fail = OBD_FAIL_MDS_REINT_NET_REP;
                 break;
         }
 
@@ -1253,7 +1253,7 @@ int mds_handle(struct ptlrpc_request *req)
                 rc = req->rq_status = -ENOTCONN;
         }
 
-        target_send_reply(req, rc, OBD_FAIL_MDS_ALL_REPLY_NET);
+        target_send_reply(req, rc, fail);
         return 0;
 }
 
@@ -1438,13 +1438,12 @@ static int mds_postrecov(struct obd_device *obd)
 
         LASSERT(!obd->obd_recovering);
 
-#ifdef ENABLE_ORPHANS
         rc = llog_connect(llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT),
                           obd->u.mds.mds_lov_desc.ld_tgt_count, NULL, NULL);
         if (rc != 0) {
                 CERROR("faild at llog_origin_connect: %d\n", rc);
         }
-#endif
+
         rc = mds_cleanup_orphans(obd);
 
         rc2 = mds_lov_set_nextid(obd);
index a9b02ee..549c760 100644 (file)
@@ -54,7 +54,7 @@ static int mds_llog_origin_add(struct llog_ctxt *ctxt,
 
 static int mds_llog_origin_connect(struct llog_ctxt *ctxt, int count,
                                    struct llog_logid *logid,
-                                   struct llog_ctxt_gen *gen) 
+                                   struct llog_gen *gen)
 {
         struct obd_device *obd = ctxt->loc_obd;
         struct obd_device *lov_obd = obd->u.mds.mds_osc_obd;
@@ -86,9 +86,7 @@ int mds_log_op_unlink(struct obd_device *obd, struct inode *inode,
 {
         struct mds_obd *mds = &obd->u.mds;
         struct lov_stripe_md *lsm = NULL;
-#ifdef ENABLE_ORPHANS
         struct llog_ctxt *ctxt;
-#endif
         int rc;
         ENTRY;
 
@@ -101,11 +99,9 @@ int mds_log_op_unlink(struct obd_device *obd, struct inode *inode,
         if (rc < 0)
                 RETURN(rc);
 
-#ifdef ENABLE_ORPHANS
         ctxt = llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT);
         rc = llog_add(ctxt, NULL, lsm, lustre_msg_buf(repmsg, offset + 1, 0),
                       repmsg->buflens[offset + 1] / sizeof(struct llog_cookie));
-#endif
 
         obd_free_memmd(mds->mds_osc_exp, &lsm);
 
index 5476684..d272b33 100644 (file)
@@ -227,9 +227,9 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name)
         }
 
         valsize = sizeof(mds->mds_lov_desc);
-        rc = obd_get_info(mds->mds_osc_exp, strlen("lovdesc") + 1, "lovdesc", 
+        rc = obd_get_info(mds->mds_osc_exp, strlen("lovdesc") + 1, "lovdesc",
                           &valsize, &mds->mds_lov_desc);
-        if (rc) 
+        if (rc)
                 GOTO(err_reg, rc);
 
         mds->mds_max_mdsize = lov_mds_md_size(mds->mds_lov_desc.ld_tgt_count);
@@ -240,21 +240,20 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name)
         if (rc) {
                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
                 GOTO(err_reg, rc);
-        } 
+        }
 
-#ifdef ENABLE_ORPHANS
         rc = llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count);
         if (rc) {
                 CERROR("failed to initialize catalog %d\n", rc);
                 GOTO(err_reg, rc);
         }
-#endif
-        /* FIXME before this set info call is made, we must initialize the logging */
+
+        /* FIXME before set info call is made, we must initialize logging */
         rc = obd_set_info(mds->mds_osc_exp, strlen("mds_conn"), "mds_conn",
                           0, NULL);
-        if (rc) 
+        if (rc)
                 GOTO(err_reg, rc);
-        
+
         /* If we're mounting this code for the first time on an existing FS,
          * we need to populate the objids array from the real OST values */
         if (!mds->mds_lov_objids_valid) {
@@ -279,13 +278,12 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name)
          * it can use the obd_recovering flag to determine when the
          * the OBD is full available. */
         if (!obd->obd_recovering) {
-#ifdef ENABLE_ORPHANS
                 rc = llog_connect(llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT),
-                                  obd->u.mds.mds_lov_desc.ld_tgt_count, NULL, NULL);
-                if (rc != 0) {
+                                  obd->u.mds.mds_lov_desc.ld_tgt_count, NULL,
+                                  NULL);
+                if (rc != 0)
                         CERROR("faild at llog_origin_connect: %d\n", rc);
-                }
-#endif
+
                 rc = mds_cleanup_orphans(obd);
                 if (rc > 0)
                         CERROR("Cleanup %d orphans while MDS isn't recovering\n", rc);
@@ -297,12 +295,10 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name)
         RETURN(rc);
 
 err_llog:
-#ifdef ENABLE_ORPHANS
         /* cleanup all llogging subsystems */
         rc = obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
-        if (rc) 
+        if (rc)
                 CERROR("failed to cleanup llogging subsystems\n");
-#endif
 err_reg:
         obd_register_observer(mds->mds_osc_obd, NULL);
 err_discon:
@@ -319,12 +315,10 @@ int mds_lov_disconnect(struct obd_device *obd, int flags)
         ENTRY;
 
         if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
-#ifdef ENABLE_ORPHANS
                 /* cleanup all llogging subsystems */
                 rc = obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
-                if (rc) 
+                if (rc)
                         CERROR("failed to cleanup llogging subsystems\n");
-#endif
 
                 obd_register_observer(mds->mds_osc_obd, NULL);
 
@@ -358,10 +352,10 @@ int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                         RETURN(-EBUSY);
 
                 push_ctxt(&saved, &obd->obd_ctxt, NULL);
-                rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT), 
+                rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
                                  &mds->mds_cfg_llh, NULL, name);
                 if (rc == 0)
-                        llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN, 
+                        llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
                                          &cfg_uuid);
                 else
                         mds->mds_cfg_llh = NULL;
@@ -418,7 +412,7 @@ int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
         }
 
         case OBD_IOC_PARSE: {
-                struct llog_ctxt *ctxt = 
+                struct llog_ctxt *ctxt =
                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
                 push_ctxt(&saved, &obd->obd_ctxt, NULL);
                 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
@@ -430,7 +424,7 @@ int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
         }
 
         case OBD_IOC_DUMP_LOG: {
-                struct llog_ctxt *ctxt = 
+                struct llog_ctxt *ctxt =
                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
                 push_ctxt(&saved, &obd->obd_ctxt, NULL);
                 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
@@ -462,33 +456,29 @@ int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                 RETURN(rc);
 
         }
-        case OBD_IOC_LLOG_CHECK: 
+        case OBD_IOC_LLOG_CHECK:
         case OBD_IOC_LLOG_CANCEL:
-        case OBD_IOC_LLOG_REMOVE: { 
-                struct llog_ctxt *ctxt = 
+        case OBD_IOC_LLOG_REMOVE: {
+                struct llog_ctxt *ctxt =
                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
 
-#ifdef ENABLE_ORPHANS
                 obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
-#endif
                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_ctxt, NULL);
                 rc = llog_ioctl(ctxt, cmd, data);
                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_ctxt, NULL);
-      
-#ifdef ENABLE_ORPHANS                
                 llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count);
-#endif
+
                 RETURN(rc);
-        }                
+        }
         case OBD_IOC_LLOG_INFO:
         case OBD_IOC_LLOG_PRINT: {
-                struct llog_ctxt *ctxt = 
+                struct llog_ctxt *ctxt =
                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
-                
+
                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_ctxt, NULL);
                 rc = llog_ioctl(ctxt, cmd, data);
                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_ctxt, NULL);
-                
+
                 RETURN(rc);
         }
 
index 457bbcf..f3298bf 100644 (file)
@@ -1035,14 +1035,13 @@ int mds_mfd_close(struct ptlrpc_request *req, struct obd_device *obd,
                         GOTO(cleanup, rc);
                 }
 
-#ifdef ENABLE_ORPHANS
                 if (req != NULL &&
                     (reply_body->valid & OBD_MD_FLEASIZE) &&
                     mds_log_op_unlink(obd, pending_child->d_inode,
                                       req->rq_repmsg, 1) > 0) {
                         reply_body->valid |= OBD_MD_FLCOOKIE;
                 }
-#endif
+
                 pending_child->d_fsdata = (void *) &dp;
                 dp.p_inum = 0;
                 dp.p_ptr = req;
index 626248d..20aa83d 100644 (file)
@@ -62,9 +62,7 @@ static void mds_cancel_cookies_cb(struct obd_device *obd, __u64 transno,
 {
         struct mds_logcancel_data *mlcd = cb_data;
         struct lov_stripe_md *lsm = NULL;
-#ifdef ENABLE_ORPHANS
         struct llog_ctxt *ctxt;
-#endif
         int rc;
 
         obd_transno_commit_cb(obd, transno, error);
@@ -79,7 +77,6 @@ static void mds_cancel_cookies_cb(struct obd_device *obd, __u64 transno,
                        (int)(mlcd->mlcd_cookielen/sizeof(*mlcd->mlcd_cookies)),
                        rc);
         } else {
-#ifdef ENABLE_ORPHANS
                 ///* XXX 0 normally, SENDNOW for debug */);
                 ctxt = llog_get_context(obd, mlcd->mlcd_cookies[0].lgc_subsys + 1);
                 rc = llog_cancel(ctxt, lsm,
@@ -90,7 +87,6 @@ static void mds_cancel_cookies_cb(struct obd_device *obd, __u64 transno,
                         CERROR("error cancelling %d log cookies: rc %d\n",
                                (int)(mlcd->mlcd_cookielen /
                                      sizeof(*mlcd->mlcd_cookies)), rc);
-#endif
         }
 
         OBD_FREE(mlcd, mlcd->mlcd_size);
@@ -1184,13 +1180,10 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                 cleanup_phase = 4; /* transaction */
                 rc = vfs_unlink(dparent->d_inode, dchild);
 
-#ifdef ENABLE_ORPHANS
-
                 if (!rc && log_unlink)
                         if (mds_log_op_unlink(obd, child_inode, req->rq_repmsg,
                                               offset + 1) > 0)
                                 body->valid |= OBD_MD_FLCOOKIE;
-#endif
                 break;
         }
         case S_IFLNK:
index 330ef32..415b133 100644 (file)
@@ -139,7 +139,6 @@ static int mds_osc_destroy_orphan(struct mds_obd *mds,
         oa->o_mode = body->mode & S_IFMT;
         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE;
 
-#ifdef ENABLE_ORPHANS
         if (body->valid & OBD_MD_FLCOOKIE) {
                 oa->o_valid |= OBD_MD_FLCOOKIE;
                 oti.oti_logcookies = 
@@ -150,7 +149,6 @@ static int mds_osc_destroy_orphan(struct mds_obd *mds,
                         oa->o_valid &= ~OBD_MD_FLCOOKIE;
                         body->valid &= ~OBD_MD_FLCOOKIE;
         }
-#endif
 
         rc = obd_destroy(mds->mds_osc_exp, oa, lsm, &oti);
         obdo_free(oa);
@@ -209,12 +207,11 @@ static int mds_unlink_orphan(struct obd_device *obd, struct dentry *dchild,
                 CERROR("error %d unlinking orphan %*s from PENDING directory\n",
                        rc, dchild->d_name.len, dchild->d_name.name);
 
-#ifdef ENABLE_ORPHANS
         if ((body->valid & OBD_MD_FLEASIZE)) {
                 if (mds_log_op_unlink(obd, inode, req->rq_repmsg, 1) > 0)
                         body->valid |= OBD_MD_FLCOOKIE;
         }
-#endif
+
         if (handle) {
                 int err = fsfilt_commit(obd, pending_dir, handle, 0);
                 if (err) {
index ad7ddcd..82ceab4 100644 (file)
@@ -77,7 +77,7 @@ void llog_free_handle(struct llog_handle *loghandle)
 }
 EXPORT_SYMBOL(llog_free_handle);
 
-/* returns negative on error; 0 if success; 1 if success & log destroyed */ 
+/* returns negative on error; 0 if success; 1 if success & log destroyed */
 int llog_cancel_rec(struct llog_handle *loghandle, int index)
 {
         struct llog_log_hdr *llh = loghandle->lgh_hdr;
@@ -101,7 +101,7 @@ int llog_cancel_rec(struct llog_handle *loghandle, int index)
 
         if ((le32_to_cpu(llh->llh_flags) & LLOG_F_ZAP_WHEN_EMPTY) &&
             (le32_to_cpu(llh->llh_count) == 1) &&
-            (loghandle->lgh_last_idx == (LLOG_BITMAP_BYTES * 8) - 1)) { 
+            (loghandle->lgh_last_idx == (LLOG_BITMAP_BYTES * 8) - 1)) {
                 rc = llog_destroy(loghandle);
                 if (rc)
                         CERROR("failure destroying log after last cancel: %d\n",
@@ -111,7 +111,7 @@ int llog_cancel_rec(struct llog_handle *loghandle, int index)
         }
 
         rc = llog_write_rec(loghandle, &llh->llh_hdr, NULL, 0, NULL, 0);
-        if (rc) 
+        if (rc)
                 CERROR("failure re-writing header %d\n", rc);
         LASSERT(rc == 0);
         RETURN(rc);
@@ -144,16 +144,17 @@ int llog_init_handle(struct llog_handle *handle, int flags,
                 GOTO(out, rc);
         }
         rc = 0;
-        
+
         handle->lgh_last_idx = 0; /* header is record with index 0 */
         llh->llh_count = cpu_to_le32(1);         /* for the header record */
         llh->llh_hdr.lrh_type = cpu_to_le32(LLOG_HDR_MAGIC);
-        llh->llh_hdr.lrh_len = llh->llh_tail.lrt_len = cpu_to_le32(LLOG_CHUNK_SIZE);
+        llh->llh_hdr.lrh_len = llh->llh_tail.lrt_len =
+                cpu_to_le32(LLOG_CHUNK_SIZE);
         llh->llh_hdr.lrh_index = llh->llh_tail.lrt_index = 0;
         llh->llh_timestamp = cpu_to_le64(LTIME_S(CURRENT_TIME));
         if (uuid)
                 memcpy(&llh->llh_tgtuuid, uuid, sizeof(llh->llh_tgtuuid));
-        llh->llh_bitmap_offset = cpu_to_le32(offsetof(typeof(*llh), llh_bitmap));
+        llh->llh_bitmap_offset = cpu_to_le32(offsetof(typeof(*llh),llh_bitmap));
         ext2_set_bit(0, llh->llh_bitmap);
 
 out:
@@ -165,7 +166,7 @@ out:
                 INIT_LIST_HEAD(&handle->u.phd.phd_entry);
         else
                 LBUG();
-        
+
         if (rc) {
                 OBD_FREE(llh, sizeof(*llh));
                 handle->lgh_hdr = NULL;
@@ -192,12 +193,14 @@ int llog_close(struct llog_handle *loghandle)
 }
 EXPORT_SYMBOL(llog_close);
 
-int llog_process(struct llog_handle *loghandle, llog_cb_t cb, void *data)
+int llog_process(struct llog_handle *loghandle, llog_cb_t cb,
+                 void *data, void *catdata)
 {
         struct llog_log_hdr *llh = loghandle->lgh_hdr;
+        struct llog_process_cat_data *cd = catdata;
         void *buf;
         __u64 cur_offset = LLOG_CHUNK_SIZE;
-        int rc = 0, index = 1;
+        int rc = 0, index = 1, last_index, idx;
         int saved_index = 0;
         ENTRY;
 
@@ -205,27 +208,41 @@ int llog_process(struct llog_handle *loghandle, llog_cb_t cb, void *data)
         if (!buf)
                 RETURN(-ENOMEM);
 
+        if (cd != NULL)
+                index = cd->first_idx + 1;
+        if (cd != NULL && cd->last_idx)
+                last_index = cd->last_idx;
+        else
+                last_index = LLOG_BITMAP_BYTES * 8 - 1;
+
+
         while (rc == 0) {
                 struct llog_rec_hdr *rec;
-                
+
                 /* skip records not set in bitmap */
-                while (index < (LLOG_BITMAP_BYTES * 8) &&
+                while (index <= last_index &&
                        !ext2_test_bit(index, llh->llh_bitmap))
                         ++index;
 
-                LASSERT(index <= LLOG_BITMAP_BYTES * 8);
-                if (index == LLOG_BITMAP_BYTES * 8)
+                LASSERT(index <= last_index + 1);
+                if (index == last_index + 1)
                         break;
 
                 /* get the buf with our target record; avoid old garbage */
                 memset(buf, 0, LLOG_CHUNK_SIZE);
-                rc = llog_next_block(loghandle, &saved_index, index, 
+                rc = llog_next_block(loghandle, &saved_index, index,
                                      &cur_offset, buf, LLOG_CHUNK_SIZE);
                 if (rc)
                         GOTO(out, rc);
 
                 rec = buf;
-                index = le32_to_cpu(rec->lrh_index);
+                idx = le32_to_cpu(rec->lrh_index);
+                if (idx < index)
+                        CDEBUG(D_HA, "index %u : idx %u\n", index, idx);
+                while (idx < index) {
+                        rec = ((void *)rec + le32_to_cpu(rec->lrh_len));
+                        idx ++;
+                }
 
                 /* process records in buffer, starting where we found one */
                 while ((void *)rec < buf + LLOG_CHUNK_SIZE) {
@@ -235,13 +252,20 @@ int llog_process(struct llog_handle *loghandle, llog_cb_t cb, void *data)
                         /* if set, process the callback on this record */
                         if (ext2_test_bit(index, llh->llh_bitmap)) {
                                 rc = cb(loghandle, rec, data);
-                                if (rc) 
+                                if (rc == LLOG_PROC_BREAK) {
+                                        CWARN("recovery from log: "LPX64":%x"
+                                              " stopped\n",
+                                              loghandle->lgh_id.lgl_oid,
+                                              loghandle->lgh_id.lgl_ogen);
+                                        GOTO(out, rc);
+                                }
+                                if (rc)
                                         GOTO(out, rc);
                         }
 
                         /* next record, still in buffer? */
                         ++index;
-                        if (index > LLOG_BITMAP_BYTES * 8 - 1)
+                        if (index > last_index)
                                 GOTO(out, rc = 0);
                         rec = ((void *)rec + le32_to_cpu(rec->lrh_len));
                 }
index 4c49a10..a962797 100644 (file)
@@ -52,38 +52,41 @@ static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle)
         struct llog_handle *loghandle;
         struct llog_log_hdr *llh;
         struct llog_logid_rec rec;
-        int rc, index, bitmap_size, i;
+        int rc, index, bitmap_size;
         ENTRY;
 
+        llh = cathandle->lgh_hdr;
+        bitmap_size = sizeof(llh->llh_bitmap) * 8;
+
+        index = (cathandle->lgh_last_idx + 1) % bitmap_size;
+
+        /* maximum number of available slots in catlog is bitmap_size - 2 */
+        if (llh->llh_cat_idx == cpu_to_le32(index)) {
+                CERROR("no free catalog slots for log...\n");
+                RETURN(ERR_PTR(-ENOSPC));
+        } else {
+                if (index == 0)
+                        index = 1;
+                if (ext2_set_bit(index, llh->llh_bitmap)) {
+                        CERROR("argh, index %u already set in log bitmap?\n",
+                               index);
+                        LBUG(); /* should never happen */
+                }
+                cathandle->lgh_last_idx = index;
+                llh->llh_count = cpu_to_le32(le32_to_cpu(llh->llh_count) + 1);
+                llh->llh_tail.lrt_index = cpu_to_le32(index);
+        }
+
         rc = llog_create(cathandle->lgh_ctxt, &loghandle, NULL, NULL);
         if (rc)
                 RETURN(ERR_PTR(rc));
 
-        rc = llog_init_handle(loghandle, 
-                              LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY, 
+        rc = llog_init_handle(loghandle,
+                              LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY,
                               &cathandle->lgh_hdr->llh_tgtuuid);
         if (rc)
                 GOTO(out_destroy, rc);
 
-        /* Find first free entry */
-        llh = cathandle->lgh_hdr;
-        bitmap_size = sizeof(llh->llh_bitmap) * 8;
-        for (i = 0, index = le32_to_cpu(llh->llh_count); i < bitmap_size; 
-             i++, index++) {
-                index %= bitmap_size;
-                if (ext2_set_bit(index, llh->llh_bitmap)) {
-                        /* XXX This should trigger log clean up or similar */
-                        CERROR("catalog index %d is still in use\n", index);
-                } else {
-                        cathandle->lgh_last_idx = index;
-                        llh->llh_count = cpu_to_le32(le32_to_cpu(llh->llh_count) + 1);
-                        break;
-                }
-        }
-        if (i == bitmap_size) {
-                CERROR("no free catalog slots for log...\n");
-                GOTO(out_destroy, rc = -ENOSPC);
-        }
         CWARN("new recovery log "LPX64":%x for index %u of catalog "LPX64"\n",
                loghandle->lgh_id.lgl_oid, loghandle->lgh_id.lgl_ogen, index,
                cathandle->lgh_id.lgl_oid);
@@ -96,7 +99,7 @@ static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle)
         rec.lid_tail.lrt_index = cpu_to_le32(index);
 
         /* update the catalog: header and record */
-        rc = llog_write_rec(cathandle, &rec.lid_hdr, 
+        rc = llog_write_rec(cathandle, &rec.lid_hdr,
                             &loghandle->u.phd.phd_cookie, 1, NULL, index);
         if (rc < 0) {
                 GOTO(out_destroy, rc);
@@ -115,7 +118,10 @@ static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle)
 }
 EXPORT_SYMBOL(llog_cat_new_log);
 
-/* Assumes caller has already pushed us into the kernel context and is locking.
+/* Open an existent log handle and add it to the open list.
+ * This log handle will be closed when all of the records in it are removed.
+ *
+ * Assumes caller has already pushed us into the kernel context and is locking.
  * We return a lock on the handle to ensure nobody yanks it from us.
  */
 int llog_cat_id2handle(struct llog_handle *cathandle, struct llog_handle **res,
@@ -128,7 +134,7 @@ int llog_cat_id2handle(struct llog_handle *cathandle, struct llog_handle **res,
         if (cathandle == NULL)
                 RETURN(-EBADF);
 
-        list_for_each_entry(loghandle, &cathandle->u.chd.chd_head, 
+        list_for_each_entry(loghandle, &cathandle->u.chd.chd_head,
                             u.phd.phd_entry) {
                 struct llog_logid *cgl = &loghandle->lgh_id;
                 if (cgl->lgl_oid == logid->lgl_oid) {
@@ -139,7 +145,6 @@ int llog_cat_id2handle(struct llog_handle *cathandle, struct llog_handle **res,
                                 continue;
                         }
                         loghandle->u.phd.phd_cat_handle = cathandle;
-                        cathandle->u.chd.chd_current_log = loghandle;
                         GOTO(out, rc = 0);
                 }
         }
@@ -151,15 +156,14 @@ int llog_cat_id2handle(struct llog_handle *cathandle, struct llog_handle **res,
         } else {
                 rc = llog_init_handle(loghandle, LLOG_F_IS_PLAIN, NULL);
                 if (!rc) {
-                        list_add(&loghandle->u.phd.phd_entry, 
+                        list_add(&loghandle->u.phd.phd_entry,
                                  &cathandle->u.chd.chd_head);
-                        cathandle->u.chd.chd_current_log = loghandle;
                 }
         }
         if (!rc) {
                 loghandle->u.phd.phd_cat_handle = cathandle;
                 loghandle->u.phd.phd_cookie.lgc_lgl = cathandle->lgh_id;
-                loghandle->u.phd.phd_cookie.lgc_index = 
+                loghandle->u.phd.phd_cookie.lgc_index =
                         le32_to_cpu(loghandle->lgh_hdr->llh_cat_idx);
         }
 
@@ -174,7 +178,7 @@ int llog_cat_put(struct llog_handle *cathandle)
         int rc;
         ENTRY;
 
-        list_for_each_entry_safe(loghandle, n, &cathandle->u.chd.chd_head, 
+        list_for_each_entry_safe(loghandle, n, &cathandle->u.chd.chd_head,
                                  u.phd.phd_entry) {
                 int err = llog_close(loghandle);
                 if (err)
@@ -195,7 +199,7 @@ EXPORT_SYMBOL(llog_cat_put);
  *
  * NOTE: loghandle is write-locked upon successful return
  */
-static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle, 
+static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
                                                 int create)
 {
         struct llog_handle *loghandle = NULL;
@@ -205,7 +209,7 @@ static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
         loghandle = cathandle->u.chd.chd_current_log;
         if (loghandle) {
                 struct llog_log_hdr *llh = loghandle->lgh_hdr;
-                if (loghandle->lgh_last_idx < (sizeof(llh->llh_bitmap) * 8) - 1) {
+                if (loghandle->lgh_last_idx < (sizeof(llh->llh_bitmap)*8) - 1) {
                         down_write(&loghandle->lgh_lock);
                         up_read(&cathandle->lgh_lock);
                         RETURN(loghandle);
@@ -226,7 +230,7 @@ static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
         loghandle = cathandle->u.chd.chd_current_log;
         if (loghandle) {
                 struct llog_log_hdr *llh = loghandle->lgh_hdr;
-                if (loghandle->lgh_last_idx < (sizeof(llh->llh_bitmap) * 8) - 1) {
+                if (loghandle->lgh_last_idx < (sizeof(llh->llh_bitmap)*8) - 1) {
                         down_write(&loghandle->lgh_lock);
                         up_write(&cathandle->lgh_lock);
                         RETURN(loghandle);
@@ -247,7 +251,7 @@ static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
  * Assumes caller has already pushed us into the kernel context.
  */
 int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec,
-                    struct llog_cookie *reccookie, void *buf)
+                     struct llog_cookie *reccookie, void *buf)
 {
         struct llog_handle *loghandle;
         int rc;
@@ -260,6 +264,7 @@ int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec,
         /* loghandle is already locked by llog_cat_current_log() for us */
         rc = llog_write_rec(loghandle, rec, reccookie, 1, buf, -1);
         up_write(&loghandle->lgh_lock);
+
         RETURN(rc);
 }
 EXPORT_SYMBOL(llog_cat_add_rec);
@@ -293,15 +298,20 @@ int llog_cat_cancel_records(struct llog_handle *cathandle, int count,
                 down_write(&loghandle->lgh_lock);
                 rc = llog_cancel_rec(loghandle, cookies->lgc_index);
                 up_write(&loghandle->lgh_lock);
-                
+
                 if (rc == 1) {          /* log has been destroyed */
                         index = loghandle->u.phd.phd_cookie.lgc_index;
                         if (cathandle->u.chd.chd_current_log == loghandle)
                                 cathandle->u.chd.chd_current_log = NULL;
                         llog_free_handle(loghandle);
-                        
+
                         LASSERT(index);
+                        llog_cat_set_first_idx(cathandle, index);
                         rc = llog_cancel_rec(cathandle, index);
+                        if (rc == 0)
+                                CDEBUG(D_HA, "cancel plain log at index %u "
+                                       "of catalog "LPX64"\n",
+                                       index, cathandle->lgh_id.lgl_oid);
                 }
         }
         up_write(&cathandle->lgh_lock);
@@ -310,7 +320,8 @@ int llog_cat_cancel_records(struct llog_handle *cathandle, int count,
 }
 EXPORT_SYMBOL(llog_cat_cancel_records);
 
-int llog_cat_process_cb(struct llog_handle *cat_llh, struct llog_rec_hdr *rec, void *data)
+int llog_cat_process_cb(struct llog_handle *cat_llh, struct llog_rec_hdr *rec,
+                        void *data)
 {
         struct llog_process_data *d = data;
         struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
@@ -321,33 +332,85 @@ int llog_cat_process_cb(struct llog_handle *cat_llh, struct llog_rec_hdr *rec, v
                 CERROR("invalid record in catalog\n");
                 RETURN(-EINVAL);
         }
-        CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n", 
+        CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n",
                lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
                le32_to_cpu(rec->lrh_index), cat_llh->lgh_id.lgl_oid);
 
         rc = llog_cat_id2handle(cat_llh, &llh, &lir->lid_id);
         if (rc) {
-                CERROR("Cannot find handle for log "LPX64"\n", lir->lid_id.lgl_oid);
+                CERROR("Cannot find handle for log "LPX64"\n",
+                       lir->lid_id.lgl_oid);
                 RETURN(rc);
-        }        
+        }
 
-        rc = llog_process(llh, d->lpd_cb, d->lpd_data);
+        rc = llog_process(llh, d->lpd_cb, d->lpd_data, NULL);
         RETURN(rc);
 }
 
 int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data)
 {
         struct llog_process_data d;
+        struct llog_process_cat_data cd;
+        struct llog_log_hdr *llh = cat_llh->lgh_hdr;
         int rc;
         ENTRY;
+
+        LASSERT(llh->llh_flags &cpu_to_le32(LLOG_F_IS_CAT));
         d.lpd_data = data;
         d.lpd_cb = cb;
 
-        rc = llog_process(cat_llh, llog_cat_process_cb, &d);
+        if (llh->llh_cat_idx > cat_llh->lgh_last_idx) {
+                CWARN("catlog "LPX64" crosses index zero\n",
+                      cat_llh->lgh_id.lgl_oid);
+
+                cd.first_idx = le32_to_cpu(llh->llh_cat_idx);
+                cd.last_idx = 0;
+                rc = llog_process(cat_llh, llog_cat_process_cb, &d, &cd);
+                if (rc != 0)
+                        RETURN(rc);
+
+                cd.first_idx = 0;
+                cd.last_idx = cat_llh->lgh_last_idx;
+                rc = llog_process(cat_llh, llog_cat_process_cb, &d, &cd);
+        } else {
+                rc = llog_process(cat_llh, llog_cat_process_cb, &d, NULL);
+        }
+
         RETURN(rc);
 }
 EXPORT_SYMBOL(llog_cat_process);
 
+int llog_cat_set_first_idx(struct llog_handle *cathandle, int index)
+{
+        struct llog_log_hdr *llh = cathandle->lgh_hdr;
+        int i, bitmap_size, idx;
+        ENTRY;
+
+        bitmap_size = sizeof(llh->llh_bitmap) * 8;
+        if (llh->llh_cat_idx == cpu_to_le32(index - 1)) {
+                idx = le32_to_cpu(llh->llh_cat_idx) + 1;
+                llh->llh_cat_idx = cpu_to_le32(idx);
+                if (idx == cathandle->lgh_last_idx)
+                        goto out;
+                for (i = (index + 1) % bitmap_size;
+                     i != cathandle->lgh_last_idx;
+                     i = (i + 1) % bitmap_size) {
+                        if (!ext2_test_bit(i, llh->llh_bitmap)) {
+                                idx = le32_to_cpu(llh->llh_cat_idx) + 1;
+                                llh->llh_cat_idx = cpu_to_le32(idx);
+                        } else if (i == 0) {
+                                llh->llh_cat_idx = 0;
+                        } else {
+                                break;
+                        }
+                }
+out:
+                CDEBUG(D_HA, "set catlog "LPX64" first idx %u\n",
+                       cathandle->lgh_id.lgl_oid,le32_to_cpu(llh->llh_cat_idx));
+        }
+
+        RETURN(0);
+}
 
 #if 0
 /* Assumes caller has already pushed us into the kernel context. */
@@ -366,7 +429,7 @@ int llog_cat_init(struct llog_handle *cathandle, struct obd_uuid *tgtuuid)
         if (cathandle->lgh_file->f_dentry->d_inode->i_size == 0) {
                 llog_write_rec(cathandle, &llh->llh_hdr, NULL, 0, NULL, 0);
 
-write_hdr:    
+write_hdr:
                 rc = lustre_fwrite(cathandle->lgh_file, llh, LLOG_CHUNK_SIZE,
                                    &offset);
                 if (rc != LLOG_CHUNK_SIZE) {
index 3a6fb7d..785739f 100644 (file)
 static int str2logid(struct llog_logid *logid, char *str, int len)
 {
         char *start, *end, *endp;
-        
+
         start = str;
         if (*start != '#')
                 RETURN(-EINVAL);
-        
+
         start++;
         if (start - str >= len - 1)
                 RETURN(-EINVAL);
@@ -34,7 +34,7 @@ static int str2logid(struct llog_logid *logid, char *str, int len)
                 RETURN(-EINVAL);
 
         *end = '\0';
-        logid->lgl_oid = simple_strtoull(start, &endp, 16); 
+        logid->lgl_oid = simple_strtoull(start, &endp, 16);
         if (endp != end)
                 RETURN(-EINVAL);
 
@@ -60,7 +60,7 @@ static int str2logid(struct llog_logid *logid, char *str, int len)
         RETURN(0);
 }
 
-static int llog_check_cb(struct llog_handle *handle, struct llog_rec_hdr *rec, 
+static int llog_check_cb(struct llog_handle *handle, struct llog_rec_hdr *rec,
                          void *data)
 {
         struct obd_ioctl_data *ioc_data = (struct obd_ioctl_data *)data;
@@ -68,12 +68,12 @@ static int llog_check_cb(struct llog_handle *handle, struct llog_rec_hdr *rec,
         static char *out;
         char *endp;
         int cur_index, rc = 0;
-        
+
         cur_index = le32_to_cpu(rec->lrh_index);
-        
+
         if (ioc_data && (ioc_data->ioc_inllen1)) {
                 l = 0;
-                remains = ioc_data->ioc_inllen4 + 
+                remains = ioc_data->ioc_inllen4 +
                         size_round(ioc_data->ioc_inllen1) +
                         size_round(ioc_data->ioc_inllen2) +
                         size_round(ioc_data->ioc_inllen3);
@@ -92,58 +92,58 @@ static int llog_check_cb(struct llog_handle *handle, struct llog_rec_hdr *rec,
         }
         if (handle->lgh_hdr->llh_flags & cpu_to_le32(LLOG_F_IS_CAT)) {
                 struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
-                struct llog_handle *log_handle; 
-                
-                if (rec->lrh_type != cpu_to_le32(LLOG_LOGID_MAGIC)) { 
-                        l = snprintf(out, remains,
-                                     "[index]: %05d  [type]: %02x  [len]: %04d failed\n", 
+                struct llog_handle *log_handle;
+
+                if (rec->lrh_type != cpu_to_le32(LLOG_LOGID_MAGIC)) {
+                        l = snprintf(out, remains, "[index]: %05d  [type]: "
+                                     "%02x  [len]: %04d failed\n",
                                      cur_index, le32_to_cpu(rec->lrh_type),
                                      le32_to_cpu(rec->lrh_len));
                 }
                 if (handle->lgh_ctxt == NULL)
                         RETURN(-EOPNOTSUPP);
                 llog_cat_id2handle(handle, &log_handle, &lir->lid_id);
-                rc = llog_process(log_handle, llog_check_cb, NULL); 
+                rc = llog_process(log_handle, llog_check_cb, NULL, NULL);
                 llog_close(log_handle);
         } else {
                 switch (le32_to_cpu(rec->lrh_type)) {
                 case OST_SZ_REC:
-                case OST_RAID1_REC:    
+                case OST_RAID1_REC:
                 case MDS_UNLINK_REC:
-                case OBD_CFG_REC:      
-                case PTL_CFG_REC:      
-                case LLOG_HDR_MAGIC: { 
-                         l = snprintf(out, remains,
-                                     "[index]: %05d  [type]: %02x  [len]: %04d ok\n", 
-                                     cur_index, le32_to_cpu(rec->lrh_type),
-                                     le32_to_cpu(rec->lrh_len));
+                case OBD_CFG_REC:
+                case PTL_CFG_REC:
+                case LLOG_HDR_MAGIC: {
+                         l = snprintf(out, remains, "[index]: %05d  [type]: "
+                                      "%02x  [len]: %04d ok\n",
+                                      cur_index, le32_to_cpu(rec->lrh_type),
+                                      le32_to_cpu(rec->lrh_len));
                          out += l;
                          remains -= l;
                          if (remains <= 0) {
-                                CERROR("not enough space for print log records\n");
+                                CERROR("no space to print log records\n");
                                 RETURN(-LLOG_EEMPTY);
                          }
                          RETURN(0);
                 }
                 default: {
-                         l = snprintf(out, remains,
-                                     "[index]: %05d  [type]: %02x  [len]: %04d failed\n", 
-                                     cur_index, le32_to_cpu(rec->lrh_type),
-                                     le32_to_cpu(rec->lrh_len));
+                         l = snprintf(out, remains, "[index]: %05d  [type]: "
+                                      "%02x  [len]: %04d failed\n",
+                                      cur_index, le32_to_cpu(rec->lrh_type),
+                                      le32_to_cpu(rec->lrh_len));
                          out += l;
                          remains -= l;
                          if (remains <= 0) {
-                                CERROR("not enough space for print log records\n");
+                                CERROR("no space to print log records\n");
                                 RETURN(-LLOG_EEMPTY);
                          }
                          RETURN(0);
-                } 
+                }
                 }
         }
         RETURN(rc);
 }
 
-static int llog_print_cb(struct llog_handle *handle, struct llog_rec_hdr *rec, 
+static int llog_print_cb(struct llog_handle *handle, struct llog_rec_hdr *rec,
                          void *data)
 {
         struct obd_ioctl_data *ioc_data = (struct obd_ioctl_data *)data;
@@ -151,10 +151,10 @@ static int llog_print_cb(struct llog_handle *handle, struct llog_rec_hdr *rec,
         static char *out;
         char *endp;
         int cur_index;
-        
+
         if (ioc_data->ioc_inllen1) {
                 l = 0;
-                remains = ioc_data->ioc_inllen4 + 
+                remains = ioc_data->ioc_inllen4 +
                         size_round(ioc_data->ioc_inllen1) +
                         size_round(ioc_data->ioc_inllen2) +
                         size_round(ioc_data->ioc_inllen3);
@@ -183,11 +183,11 @@ static int llog_print_cb(struct llog_handle *handle, struct llog_rec_hdr *rec,
 
                 l = snprintf(out, remains,
                              "[index]: %05d  [logid]: #%llx#%llx#%08x\n",
-                             cur_index, lir->lid_id.lgl_oid, 
+                             cur_index, lir->lid_id.lgl_oid,
                              lir->lid_id.lgl_ogr, lir->lid_id.lgl_ogen);
         } else {
                 l = snprintf(out, remains,
-                             "[index]: %05d  [type]: %02x  [len]: %04d\n", 
+                             "[index]: %05d  [type]: %02x  [len]: %04d\n",
                              cur_index, le32_to_cpu(rec->lrh_type),
                              le32_to_cpu(rec->lrh_len));
         }
@@ -204,7 +204,7 @@ static int llog_remove_log(struct llog_handle *cat, struct llog_logid *logid)
 {
         struct llog_handle *log;
         int rc, index = 0;
-        
+
         down_write(&cat->lgh_lock);
         rc = llog_cat_id2handle(cat, &log, logid);
         if (rc) {
@@ -212,7 +212,7 @@ static int llog_remove_log(struct llog_handle *cat, struct llog_logid *logid)
                        logid->lgl_oid, logid->lgl_ogr, logid->lgl_ogen);
                 GOTO(out, rc = -ENOENT);
         }
-        
+
         index = log->u.phd.phd_cookie.lgc_index;
         LASSERT(index);
         rc = llog_destroy(log);
@@ -220,6 +220,7 @@ static int llog_remove_log(struct llog_handle *cat, struct llog_logid *logid)
                 CDEBUG(D_IOCTL, "cannot destroy log\n");
                 GOTO(out, rc);
         }
+        llog_cat_set_first_idx(cat, index);
         rc = llog_cancel_rec(cat, index);
 out:
         llog_free_handle(log);
@@ -227,16 +228,17 @@ out:
         RETURN(rc);
 
 }
-static int llog_delete_cb(struct llog_handle *handle, struct llog_rec_hdr *rec, 
-                         void *data)
+
+static int llog_delete_cb(struct llog_handle *handle, struct llog_rec_hdr *rec,
+                          void *data)
 {
         struct  llog_logid_rec *lir = (struct llog_logid_rec*)rec;
         int     rc;
-        
+
         if (rec->lrh_type != cpu_to_le32(LLOG_LOGID_MAGIC))
-              return (-EINVAL); 
+              return (-EINVAL);
         rc = llog_remove_log(handle, &lir->lid_id);
-        
+
         RETURN(rc);
 }
 
@@ -246,14 +248,14 @@ int llog_ioctl(struct llog_ctxt *ctxt, int cmd, struct obd_ioctl_data *data)
         struct llog_logid logid;
         int err = 0;
         struct llog_handle *handle = NULL;
+
         if (*data->ioc_inlbuf1 == '#') {
                 err = str2logid(&logid, data->ioc_inlbuf1, data->ioc_inllen1);
                 if (err)
                         GOTO(out, err);
                 err = llog_create(ctxt, &handle, &logid, NULL);
                 if (err)
-                        GOTO(out, err);        
+                        GOTO(out, err);
         } else if (*data->ioc_inlbuf1 == '$') {
                 char *name = data->ioc_inlbuf1 + 1;
                 err = llog_create(ctxt, &handle, NULL, name);
@@ -264,17 +266,17 @@ int llog_ioctl(struct llog_ctxt *ctxt, int cmd, struct obd_ioctl_data *data)
         }
 
         err = llog_init_handle(handle, 0, NULL);
-        if (err) 
+        if (err)
                 GOTO(out_close, err = -ENOENT);
-       
+
         switch (cmd) {
         case OBD_IOC_LLOG_INFO: {
                 int l;
-                int remains = data->ioc_inllen2 + 
+                int remains = data->ioc_inllen2 +
                         size_round(data->ioc_inllen1);
                 char *out = data->ioc_bulk;
 
-                l = snprintf(out, remains, 
+                l = snprintf(out, remains,
                              "logid:            #%llx#%llx#%08x\n"
                              "flags:            %x (%s)\n"
                              "records count:    %d\n"
@@ -282,20 +284,20 @@ int llog_ioctl(struct llog_ctxt *ctxt, int cmd, struct obd_ioctl_data *data)
                              handle->lgh_id.lgl_oid, handle->lgh_id.lgl_ogr,
                              handle->lgh_id.lgl_ogen,
                              le32_to_cpu(handle->lgh_hdr->llh_flags),
-                             le32_to_cpu(handle->lgh_hdr->llh_flags) & 
+                             le32_to_cpu(handle->lgh_hdr->llh_flags) &
                              LLOG_F_IS_CAT ? "cat" : "plain",
                              le32_to_cpu(handle->lgh_hdr->llh_count),
                              handle->lgh_last_idx);
                 out += l;
                 remains -= l;
-                if (remains <= 0) 
+                if (remains <= 0)
                         CERROR("not enough space for log header info\n");
 
                 GOTO(out_close, err);
         }
         case OBD_IOC_LLOG_CHECK: {
                 LASSERT(data->ioc_inllen1);
-                err = llog_process(handle, llog_check_cb, data);
+                err = llog_process(handle, llog_check_cb, data, NULL);
                 if (err == -LLOG_EEMPTY)
                         err = 0;
                 GOTO(out_close, err);
@@ -303,7 +305,7 @@ int llog_ioctl(struct llog_ctxt *ctxt, int cmd, struct obd_ioctl_data *data)
 
         case OBD_IOC_LLOG_PRINT: {
                 LASSERT(data->ioc_inllen1);
-                err = llog_process(handle, llog_print_cb, data);
+                err = llog_process(handle, llog_print_cb, data, NULL);
                 if (err == -LLOG_EEMPTY)
                         err = 0;
 
@@ -313,16 +315,15 @@ int llog_ioctl(struct llog_ctxt *ctxt, int cmd, struct obd_ioctl_data *data)
                 struct llog_cookie cookie;
                 struct llog_logid plain;
                 char *endp;
-                
+
                 if (!(handle->lgh_hdr->llh_flags & cpu_to_le32(LLOG_F_IS_CAT)))
                         GOTO(out_close, err = -EINVAL);
-        
+
                 err = str2logid(&plain, data->ioc_inlbuf2, data->ioc_inllen2);
                 if (err)
                         GOTO(out_close, err);
                 cookie.lgc_lgl = plain;
-                cookie.lgc_index = simple_strtoul(data->ioc_inlbuf3, 
-                                                  &endp, 0);
+                cookie.lgc_index = simple_strtoul(data->ioc_inlbuf3, &endp, 0);
                 if (*endp != '\0')
                         GOTO(out_close, err = -EINVAL);
 
@@ -331,26 +332,27 @@ int llog_ioctl(struct llog_ctxt *ctxt, int cmd, struct obd_ioctl_data *data)
         }
         case OBD_IOC_LLOG_REMOVE: {
                 struct llog_logid plain;
-                
+
                 if (!(handle->lgh_hdr->llh_flags & cpu_to_le32(LLOG_F_IS_CAT)))
                         GOTO(out_close, err = -EINVAL);
-        
+
                 if (data->ioc_inlbuf2) {
                         /*remove indicate log from the catalog*/
-                        err = str2logid(&plain, data->ioc_inlbuf2, data->ioc_inllen2);
+                        err = str2logid(&plain, data->ioc_inlbuf2,
+                                        data->ioc_inllen2);
                         if (err)
                                 GOTO(out_close, err);
                         err = llog_remove_log(handle, &plain);
                 } else {
                         /*remove all the log of the catalog*/
-                        llog_process(handle, llog_delete_cb, NULL);
+                        llog_process(handle, llog_delete_cb, NULL, NULL);
                 }
                 GOTO(out_close, err);
         }
         }
-        
+
 out_close:
-        if (handle->lgh_hdr && 
+        if (handle->lgh_hdr &&
             handle->lgh_hdr->llh_flags & cpu_to_le32(LLOG_F_IS_CAT))
                 llog_cat_put(handle);
         else
@@ -360,7 +362,7 @@ out:
 }
 EXPORT_SYMBOL(llog_ioctl);
 
-int llog_catlog_list(struct obd_device *obd, int count, 
+int llog_catlog_list(struct obd_device *obd, int count,
                      struct obd_ioctl_data *data)
 {
         int size, i;
@@ -368,25 +370,25 @@ int llog_catlog_list(struct obd_device *obd, int count,
         char name[32] = "CATLIST";
         char *out;
         int l, remains, rc = 0;
-        
+
         size = sizeof(*idarray) * count;
-        
+
         OBD_ALLOC(idarray, size);
         if (!idarray)
                 RETURN(-ENOMEM);
         memset(idarray, 0, size);
-        
+
         rc = llog_get_cat_list(obd, obd, name, count, idarray);
         if (rc) {
                 OBD_FREE(idarray, size);
                 RETURN(rc);
         }
-        
+
         out = data->ioc_bulk;
         remains = data->ioc_inllen1;
         id = idarray;
         for (i = 0; i < count; i++) {
-                l = snprintf(out, remains, 
+                l = snprintf(out, remains,
                              "catalog log: #%llx#%llx#%08x\n",
                              id->lgl_oid, id->lgl_ogr, id->lgl_ogen);
                 id++;
index 5eb2922..18a1afe 100644 (file)
@@ -70,7 +70,7 @@ static int llog_lvfs_pad(struct obd_device *obd, struct l_file *file,
         }
 
         file->f_pos += len - sizeof(rec) - sizeof(tail);
-        rc = fsfilt_write_record(obd, file, &tail, sizeof(tail), &file->f_pos, 0);
+        rc = fsfilt_write_record(obd, file, &tail, sizeof(tail),&file->f_pos,0);
         if (rc) {
                 CERROR("error writing padding record: rc %d\n", rc);
                 goto out;
@@ -92,7 +92,7 @@ static int llog_lvfs_write_blob(struct obd_device *obd, struct l_file *file,
         file->f_pos = off;
 
         if (!buf) {
-                rc = fsfilt_write_record(obd, file, rec, buflen, &file->f_pos, 0);
+                rc = fsfilt_write_record(obd, file, rec, buflen,&file->f_pos,0);
                 if (rc) {
                         CERROR("error writing log record: rc %d\n", rc);
                         goto out;
@@ -147,13 +147,12 @@ static int llog_lvfs_read_blob(struct obd_device *obd, struct l_file *file,
 
 static int llog_lvfs_read_header(struct llog_handle *handle)
 {
-        struct llog_rec_tail tail;
         struct obd_device *obd;
         int rc;
         ENTRY;
 
         LASSERT(sizeof(*handle->lgh_hdr) == LLOG_CHUNK_SIZE);
-        
+
         obd = handle->lgh_ctxt->loc_exp->exp_obd;
 
         if (handle->lgh_file->f_dentry->d_inode->i_size == 0) {
@@ -166,13 +165,7 @@ static int llog_lvfs_read_header(struct llog_handle *handle)
         if (rc)
                 CERROR("error reading log header\n");
 
-        rc = llog_lvfs_read_blob(obd, handle->lgh_file, &tail, sizeof(tail),
-                                 handle->lgh_file->f_dentry->d_inode->i_size -
-                                 sizeof(tail));
-        if (rc)
-                CERROR("error reading log tail\n");
-
-        handle->lgh_last_idx = le32_to_cpu(tail.lrt_index);
+        handle->lgh_last_idx = le32_to_cpu(handle->lgh_hdr->llh_tail.lrt_index);
         handle->lgh_file->f_pos = handle->lgh_file->f_dentry->d_inode->i_size;
 
         RETURN(rc);
@@ -207,7 +200,7 @@ static int llog_lvfs_write_rec(struct llog_handle *loghandle,
         if (rc)
                 RETURN(rc);
 
-        if (idx != -1) { 
+        if (idx != -1) {
                 loff_t saved_offset;
 
                 /* no header: only allowed to insert record 1 */
@@ -224,13 +217,13 @@ static int llog_lvfs_write_rec(struct llog_handle *loghandle,
                 if (rc || idx == 0)
                         RETURN(rc);
 
-                saved_offset = sizeof(*llh) + (idx-1) * le32_to_cpu(rec->lrh_len);
+                saved_offset = sizeof(*llh) + (idx-1)*le32_to_cpu(rec->lrh_len);
                 rc = llog_lvfs_write_blob(obd, file, rec, buf, saved_offset);
                 if (rc == 0 && reccookie) {
                         reccookie->lgc_lgl = loghandle->lgh_id;
                         reccookie->lgc_index = idx;
                         rc = 1;
-                }       
+                }
                 RETURN(rc);
         }
 
@@ -242,12 +235,12 @@ static int llog_lvfs_write_rec(struct llog_handle *loghandle,
          * big enough to hold reclen, so all we care about is padding here.
          */
         left = LLOG_CHUNK_SIZE - (file->f_pos & (LLOG_CHUNK_SIZE - 1));
-        if (buf) 
-                reclen = sizeof(*rec) + le32_to_cpu(rec->lrh_len) + 
+        if (buf)
+                reclen = sizeof(*rec) + le32_to_cpu(rec->lrh_len) +
                         sizeof(struct llog_rec_tail);
 
         /* NOTE: padding is a record, but no bit is set */
-        if (left != 0 && left != reclen && 
+        if (left != 0 && left != reclen &&
             left < (reclen + LLOG_MIN_REC_SIZE)) {
                 loghandle->lgh_last_idx++;
                 rc = llog_lvfs_pad(obd, file, left, loghandle->lgh_last_idx);
@@ -268,6 +261,7 @@ static int llog_lvfs_write_rec(struct llog_handle *loghandle,
                 LBUG(); /* should never happen */
         }
         llh->llh_count = cpu_to_le32(le32_to_cpu(llh->llh_count) + 1);
+        llh->llh_tail.lrt_index = cpu_to_le32(index);
 
         offset = 0;
         rc = llog_lvfs_write_blob(obd, file, &llh->llh_hdr, NULL, 0);
@@ -289,10 +283,13 @@ static int llog_lvfs_write_rec(struct llog_handle *loghandle,
                         reccookie->lgc_subsys = LLOG_SIZE_ORIG_CTXT;
                 else if (le32_to_cpu(rec->lrh_type) == OST_RAID1_REC)
                         reccookie->lgc_subsys = LLOG_RD1_ORIG_CTXT;
-                else 
+                else
                         reccookie->lgc_subsys = -1;
                 rc = 1;
         }
+        if (rc == 0 && le32_to_cpu(rec->lrh_type) == LLOG_GEN_REC)
+                rc = 1;
+
         RETURN(rc);
 }
 
@@ -306,7 +303,7 @@ static void llog_skip_over(__u64 *off, int curr, int goal)
 {
         if (goal <= curr)
                 return;
-        *off = (*off + (goal-curr-1) * LLOG_MIN_REC_SIZE) & 
+        *off = (*off + (goal-curr-1) * LLOG_MIN_REC_SIZE) &
                 ~(LLOG_CHUNK_SIZE - 1);
 }
 
@@ -345,7 +342,7 @@ static int llog_lvfs_next_block(struct llog_handle *loghandle, int *cur_idx,
                         CERROR("Cant read llog block at log id "LPU64
                                "/%u offset "LPU64"\n",
                                loghandle->lgh_id.lgl_oid,
-                               loghandle->lgh_id.lgl_ogen, 
+                               loghandle->lgh_id.lgl_ogen,
                                *cur_offset);
                          RETURN(rc);
                 }
@@ -405,11 +402,11 @@ static struct file *llog_filp_open(char *name, int flags, int mode)
         } else {
                 filp = l_filp_open(logname, flags, mode);
                 if (IS_ERR(filp))
-                        CERROR("logfile creation %s: %ld\n", logname, 
+                        CERROR("logfile creation %s: %ld\n", logname,
                                PTR_ERR(filp));
         }
 
-        OBD_FREE(logname, PATH_MAX); 
+        OBD_FREE(logname, PATH_MAX);
         return filp;
 }
 
@@ -441,7 +438,7 @@ static int llog_lvfs_create(struct llog_ctxt *ctxt, struct llog_handle **res,
 
                 if (IS_ERR(dchild)) {
                         rc = PTR_ERR(dchild);
-                        CERROR("error looking up log file "LPX64":0x%x: rc %d\n",
+                        CERROR("error looking up logfile "LPX64":0x%x: rc %d\n",
                                logid->lgl_oid, logid->lgl_ogen, rc);
                         GOTO(cleanup, rc);
                 }
@@ -656,14 +653,16 @@ static int llog_lvfs_read_header(struct llog_handle *handle)
         LBUG();
         return 0;
 }
+
 static int llog_lvfs_write_rec(struct llog_handle *loghandle,
                                struct llog_rec_hdr *rec,
-                               struct llog_cookie *reccookie, int cookiecount, 
+                               struct llog_cookie *reccookie, int cookiecount,
                                void *buf, int idx)
 {
         LBUG();
         return 0;
 }
+
 static int llog_lvfs_next_block(struct llog_handle *loghandle, int *cur_idx,
                                 int next_idx, __u64 *cur_offset, void *buf,
                                 int len)
@@ -671,31 +670,34 @@ static int llog_lvfs_next_block(struct llog_handle *loghandle, int *cur_idx,
         LBUG();
         return 0;
 }
-static int llog_lvfs_create(struct llog_obd_ctxt *ctxt, struct llog_handle **res,
+
+static int llog_lvfs_create(struct llog_obd_ctxt *ctxt,struct llog_handle **res,
                             struct llog_logid *logid, char *name)
 {
         LBUG();
         return 0;
 }
+
 static int llog_lvfs_close(struct llog_handle *handle)
 {
         LBUG();
         return 0;
 }
+
 static int llog_lvfs_destroy(struct llog_handle *handle)
 {
         LBUG();
         return 0;
 }
 
-int llog_get_cat_list(struct obd_device *obd, struct obd_device *disk_obd, 
+int llog_get_cat_list(struct obd_device *obd, struct obd_device *disk_obd,
                       char *name, int count, struct llog_logid *idarray)
 {
         LBUG();
         return 0;
 }
 
-int llog_put_cat_list(struct obd_device *obd, struct obd_device *disk_obd, 
+int llog_put_cat_list(struct obd_device *obd, struct obd_device *disk_obd,
                       char *name, int count, struct llog_logid *idarray)
 {
         LBUG();
index 9c9abb7..ea68bb3 100644 (file)
@@ -26,7 +26,7 @@
 
 /* helper functions for calling the llog obd methods */
 
-int llog_setup(struct obd_device *obd, int index, struct obd_device *disk_obd, 
+int llog_setup(struct obd_device *obd, int index, struct obd_device *disk_obd,
                int count, struct llog_logid *logid, struct llog_operations *op)
 {
         int rc = 0;
@@ -49,7 +49,7 @@ int llog_setup(struct obd_device *obd, int index, struct obd_device *disk_obd,
 
         if (op->lop_setup)
                 rc = op->lop_setup(obd, index, disk_obd, count, logid);
-        if (ctxt && rc) 
+        if (ctxt && rc)
                 OBD_FREE(ctxt, sizeof(*ctxt));
 
         RETURN(rc);
@@ -61,7 +61,6 @@ int llog_cleanup(struct llog_ctxt *ctxt)
         int rc = 0;
         ENTRY;
 
-        down(&ctxt->loc_sem);
         LASSERT(ctxt);
 
         if (CTXTP(ctxt, cleanup))
@@ -70,7 +69,6 @@ int llog_cleanup(struct llog_ctxt *ctxt)
         ctxt->loc_obd->obd_llog_ctxt[ctxt->loc_idx] = NULL;
         class_export_put(ctxt->loc_exp);
         ctxt->loc_exp = NULL;
-        up(&ctxt->loc_sem);
         OBD_FREE(ctxt, sizeof(*ctxt));
 
         RETURN(rc);
@@ -84,29 +82,25 @@ int llog_sync(struct llog_ctxt *ctxt, struct obd_export *exp)
 
         if (!ctxt)
                 RETURN(0);
-        down(&ctxt->loc_sem);
-        if (ctxt->loc_llcd && CTXTP(ctxt, sync))
+
+        if (CTXTP(ctxt, sync))
                 rc = CTXTP(ctxt, sync)(ctxt, exp);
-        else
-                up(&ctxt->loc_sem);
 
         RETURN(rc);
 }
 EXPORT_SYMBOL(llog_sync);
 
-int llog_add(struct llog_ctxt *ctxt,
-                 struct llog_rec_hdr *rec, struct lov_stripe_md *lsm,
-                 struct llog_cookie *logcookies, int numcookies)
+int llog_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec,
+                struct lov_stripe_md *lsm, struct llog_cookie *logcookies,
+                int numcookies)
 {
         int rc;
         ENTRY;
 
         LASSERT(ctxt);
-        down(&ctxt->loc_sem);
         CTXT_CHECK_OP(ctxt, add, -EOPNOTSUPP);
 
         rc = CTXTP(ctxt, add)(ctxt, rec, lsm, logcookies, numcookies);
-        up(&ctxt->loc_sem);
         RETURN(rc);
 }
 EXPORT_SYMBOL(llog_add);
@@ -125,7 +119,7 @@ int llog_cancel(struct llog_ctxt *ctxt, struct lov_stripe_md *lsm,
 EXPORT_SYMBOL(llog_cancel);
 
 /* callback func for llog_process in llog_obd_origin_setup */
-static int cat_cancel_cb(struct llog_handle *cathandle, 
+static int cat_cancel_cb(struct llog_handle *cathandle,
                           struct llog_rec_hdr *rec, void *data)
 {
         struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
@@ -138,35 +132,36 @@ static int cat_cancel_cb(struct llog_handle *cathandle,
                 CERROR("invalid record in catalog\n");
                 RETURN(-EINVAL);
         }
-        CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n", 
+        CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n",
                lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
                le32_to_cpu(rec->lrh_index), cathandle->lgh_id.lgl_oid);
 
         rc = llog_cat_id2handle(cathandle, &loghandle, &lir->lid_id);
         if (rc) {
-                CERROR("Cannot find handle for log "LPX64"\n", lir->lid_id.lgl_oid);
+                CERROR("Cannot find handle for log "LPX64"\n",
+                       lir->lid_id.lgl_oid);
                 RETURN(rc);
-        }        
-        
+        }
+
         llh = loghandle->lgh_hdr;
         if ((le32_to_cpu(llh->llh_flags) & LLOG_F_ZAP_WHEN_EMPTY) &&
             (le32_to_cpu(llh->llh_count) == 1)) {
                 rc = llog_destroy(loghandle);
                 if (rc)
-                        CERROR("failure destroying log during postsetup: %d\n", rc);
+                        CERROR("failure destroying log in postsetup: %d\n", rc);
                 LASSERT(rc == 0);
 
                 index = loghandle->u.phd.phd_cookie.lgc_index;
-                if (cathandle->u.chd.chd_current_log == loghandle)
-                        cathandle->u.chd.chd_current_log = NULL;
                 llog_free_handle(loghandle);
-                                                                                                                
+
                 LASSERT(index);
+                llog_cat_set_first_idx(cathandle, index);
                 rc = llog_cancel_rec(cathandle, index);
                 if (rc == 0)
-                        CWARN("cancel log "LPX64":%x at index %u of catalog "LPX64"\n", 
-                               lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
-                               le32_to_cpu(rec->lrh_index), cathandle->lgh_id.lgl_oid);
+                        CWARN("cancel log "LPX64":%x at index %u of catalog "
+                              LPX64"\n", lir->lid_id.lgl_oid,
+                              lir->lid_id.lgl_ogen, le32_to_cpu(rec->lrh_index),
+                              cathandle->lgh_id.lgl_oid);
         }
 
         RETURN(rc);
@@ -174,8 +169,9 @@ static int cat_cancel_cb(struct llog_handle *cathandle,
 
 /* lop_setup method for filter/osc */
 // XXX how to set exports
-int llog_obd_origin_setup(struct obd_device *obd, int index, struct obd_device *disk_obd,
-                          int count, struct llog_logid *logid)
+int llog_obd_origin_setup(struct obd_device *obd, int index,
+                          struct obd_device *disk_obd, int count,
+                          struct llog_logid *logid)
 {
         struct llog_ctxt *ctxt;
         struct llog_handle *handle;
@@ -187,20 +183,19 @@ int llog_obd_origin_setup(struct obd_device *obd, int index, struct obd_device *
                 RETURN(0);
 
         LASSERT(count == 1);
-        
+
         ctxt = llog_get_context(obd, index);
         LASSERT(ctxt);
-        log_gen_init(ctxt);
+        llog_gen_init(ctxt);
 
-        down(&ctxt->loc_sem);
         if (logid->lgl_oid)
                 rc = llog_create(ctxt, &handle, logid, NULL);
         else {
                 rc = llog_create(ctxt, &handle, NULL, NULL);
-                if (!rc) 
+                if (!rc)
                         *logid = handle->lgh_id;
         }
-        if (rc) 
+        if (rc)
                 GOTO(out, rc);
 
         ctxt->loc_handle = handle;
@@ -210,11 +205,10 @@ int llog_obd_origin_setup(struct obd_device *obd, int index, struct obd_device *
         if (rc)
                 GOTO(out, rc);
 
-        rc = llog_process(handle, (llog_cb_t)cat_cancel_cb, NULL);
-        if (rc) 
+        rc = llog_process(handle, (llog_cb_t)cat_cancel_cb, NULL, NULL);
+        if (rc)
                 CERROR("llog_process with cat_cancel_cb failed: %d\n", rc);
  out:
-        up(&ctxt->loc_sem);
         if (ctxt && rc) {
                 obd->obd_llog_ctxt[index] = NULL;
                 OBD_FREE(ctxt, sizeof(*ctxt));
@@ -229,32 +223,34 @@ int llog_obd_origin_cleanup(struct llog_ctxt *ctxt)
         struct llog_log_hdr *llh;
         int rc, index;
         ENTRY;
-        
+
         if (!ctxt)
                 return 0;
 
         cathandle = ctxt->loc_handle;
         if (cathandle) {
-                list_for_each_entry_safe(loghandle, n, &cathandle->u.chd.chd_head, 
-                                 u.phd.phd_entry) {
+                list_for_each_entry_safe(loghandle, n,
+                                         &cathandle->u.chd.chd_head,
+                                         u.phd.phd_entry) {
                         llh = loghandle->lgh_hdr;
-                        if ((le32_to_cpu(llh->llh_flags) & LLOG_F_ZAP_WHEN_EMPTY) &&
+                        if ((le32_to_cpu(llh->llh_flags) &
+                                LLOG_F_ZAP_WHEN_EMPTY) &&
                             (le32_to_cpu(llh->llh_count) == 1)) {
                                 rc = llog_destroy(loghandle);
                                 if (rc)
-                                        CERROR("failure destroying log during cleanup: %d\n",
-                                               rc);
+                                        CERROR("failure destroying log during "
+                                               "cleanup: %d\n", rc);
                                 LASSERT(rc == 0);
 
                                 index = loghandle->u.phd.phd_cookie.lgc_index;
-                                if (cathandle->u.chd.chd_current_log == loghandle)
-                                        cathandle->u.chd.chd_current_log = NULL;
                                 llog_free_handle(loghandle);
-                                                                                                                             
+
                                 LASSERT(index);
+                                llog_cat_set_first_idx(cathandle, index);
                                 rc = llog_cancel_rec(cathandle, index);
                                 if (rc == 0)
-                                        CWARN("cancel plain log at index %u of catalog "LPX64"\n", 
+                                        CWARN("cancel plain log at index %u "
+                                              "of catalog "LPX64"\n",
                                               index, cathandle->lgh_id.lgl_oid);
                         }
                 }
@@ -314,7 +310,7 @@ int llog_cat_initialize(struct obd_device *obd, int count)
                 CERROR("rc: %d\n", rc);
                 GOTO(out, rc);
         }
-                        
+
  out:
         OBD_FREE(idarray, size);
         RETURN(rc);
index 1660657..0607d12 100644 (file)
@@ -86,7 +86,7 @@ static int llog_test_1(struct obd_device *obd, char *name)
         int rc2;
         ENTRY;
 
-        CERROR("1a: create a log with name: %s\n", name);
+        CWARN("1a: create a log with name: %s\n", name);
         LASSERT(ctxt);
 
         rc = llog_create(ctxt, &llh, NULL, name);
@@ -100,7 +100,7 @@ static int llog_test_1(struct obd_device *obd, char *name)
                 GOTO(out, rc);
 
  out:
-        CERROR("1b: close newly-created log\n");
+        CWARN("1b: close newly-created log\n");
         rc2 = llog_close(llh);
         if (rc2) {
                 CERROR("1b: close log %s failed: %d\n", name, rc2);
@@ -120,7 +120,7 @@ static int llog_test_2(struct obd_device *obd, char *name,
         struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_TEST_ORIG_CTXT);
         ENTRY;
 
-        CERROR("2a: re-open a log with name: %s\n", name);
+        CWARN("2a: re-open a log with name: %s\n", name);
         rc = llog_create(ctxt, llh, NULL, name);
         if (rc) {
                 CERROR("2a: re-open log with name %s failed: %d\n", name, rc);
@@ -131,7 +131,7 @@ static int llog_test_2(struct obd_device *obd, char *name,
         if ((rc = verify_handle("2", *llh, 1)))
                 RETURN(rc);
 
-        CERROR("2b: create a log without specified NAME & LOGID\n");
+        CWARN("2b: create a log without specified NAME & LOGID\n");
         rc = llog_create(ctxt, &loghandle, NULL, NULL);
         if (rc) {
                 CERROR("2b: create log failed\n");
@@ -141,7 +141,7 @@ static int llog_test_2(struct obd_device *obd, char *name,
         logid = loghandle->lgh_id;
         llog_close(loghandle);
 
-        CERROR("2b: re-open the log by LOGID\n");
+        CWARN("2b: re-open the log by LOGID\n");
         rc = llog_create(ctxt, &loghandle, &logid, NULL);
         if (rc) {
                 CERROR("2b: re-open log by LOGID failed\n");
@@ -149,7 +149,7 @@ static int llog_test_2(struct obd_device *obd, char *name,
         }
         llog_init_handle(loghandle, LLOG_F_IS_PLAIN, &uuid);
 
-        CERROR("2b: destroy this log\n");
+        CWARN("2b: destroy this log\n");
         rc = llog_destroy(loghandle);
         if (rc) {
                 CERROR("2b: destroy log failed\n");
@@ -171,7 +171,7 @@ static int llog_test_3(struct obd_device *obd, struct llog_handle *llh)
         lcr.lcr_hdr.lrh_len = lcr.lcr_tail.lrt_len = cpu_to_le32(sizeof(lcr));
         lcr.lcr_hdr.lrh_type = cpu_to_le32(OST_SZ_REC);
 
-        CERROR("3a: write one create_rec\n");
+        CWARN("3a: write one create_rec\n");
         rc = llog_write_rec(llh,  &lcr.lcr_hdr, NULL, 0, NULL, -1);
         num_recs++;
         if (rc) {
@@ -182,7 +182,7 @@ static int llog_test_3(struct obd_device *obd, struct llog_handle *llh)
         if ((rc = verify_handle("3a", llh, num_recs)))
                 RETURN(rc);
 
-        CERROR("3b: write 10 cfg log records with 8 bytes bufs\n");
+        CWARN("3b: write 10 cfg log records with 8 bytes bufs\n");
         for (i = 0; i < 10; i++) {
                 struct llog_rec_hdr hdr;
                 char buf[8];
@@ -203,7 +203,7 @@ static int llog_test_3(struct obd_device *obd, struct llog_handle *llh)
         if ((rc = verify_handle("3b", llh, num_recs)))
                 RETURN(rc);
 
-        CERROR("3c: write 1000 more log records\n");
+        CWARN("3c: write 1000 more log records\n");
         for (i = 0; i < 1000; i++) {
                 rc = llog_write_rec(llh, &lcr.lcr_hdr, NULL, 0, NULL, -1);
                 if (rc) {
@@ -242,7 +242,7 @@ static int llog_test_4(struct obd_device *obd)
         lmr.lmr_hdr.lrh_type = cpu_to_le32(0xf00f00);
 
         sprintf(name, "%x", llog_test_rand+1);
-        CERROR("4a: create a catalog log with name: %s\n", name);
+        CWARN("4a: create a catalog log with name: %s\n", name);
         rc = llog_create(ctxt, &cath, NULL, name);
         if (rc) {
                 CERROR("1a: llog_create with name %s failed: %d\n", name, rc);
@@ -252,7 +252,7 @@ static int llog_test_4(struct obd_device *obd)
         num_recs++;
         cat_logid = cath->lgh_id;
 
-        CERROR("4b: write 1 record into the catalog\n");
+        CWARN("4b: write 1 record into the catalog\n");
         rc = llog_cat_add_rec(cath, &lmr.lmr_hdr, &cookie, NULL);
         if (rc != 1) {
                 CERROR("4b: write 1 catalog record failed at: %d\n", rc);
@@ -265,7 +265,7 @@ static int llog_test_4(struct obd_device *obd)
         if ((rc = verify_handle("4b", cath->u.chd.chd_current_log, num_recs)))
                 RETURN(rc);
 
-        CERROR("4c: cancel 1 log record\n");
+        CWARN("4c: cancel 1 log record\n");
         rc = llog_cat_cancel_records(cath, 1, &cookie);
         if (rc) {
                 CERROR("4c: cancel 1 catalog based record failed: %d\n", rc);
@@ -276,7 +276,7 @@ static int llog_test_4(struct obd_device *obd)
         if ((rc = verify_handle("4c", cath->u.chd.chd_current_log, num_recs)))
                 RETURN(rc);
 
-        CERROR("4d: write 40,000 more log records\n");
+        CWARN("4d: write 40,000 more log records\n");
         for (i = 0; i < 40000; i++) {
                 rc = llog_cat_add_rec(cath, &lmr.lmr_hdr, NULL, NULL);
                 if (rc) {
@@ -287,7 +287,7 @@ static int llog_test_4(struct obd_device *obd)
                 num_recs++;
         }
 
-        CERROR("4e: add 5 large records, one record per block\n");
+        CWARN("4e: add 5 large records, one record per block\n");
         buflen = LLOG_CHUNK_SIZE - sizeof(struct llog_rec_hdr)
                         - sizeof(struct llog_rec_tail);
         OBD_ALLOC(buf, buflen);
@@ -308,7 +308,7 @@ static int llog_test_4(struct obd_device *obd)
         OBD_FREE(buf, buflen);
 
  out:
-        CERROR("4f: put newly-created catalog\n");
+        CWARN("4f: put newly-created catalog\n");
         rc = llog_cat_put(cath);
         if (rc)
                 CERROR("1b: close log %s failed: %d\n", name, rc);
@@ -325,7 +325,7 @@ static int cat_print_cb(struct llog_handle *llh, struct llog_rec_hdr *rec,
                 RETURN(-EINVAL);
         }
 
-        CERROR("seeing record at index %d - "LPX64":%x in log "LPX64"\n",
+        CWARN("seeing record at index %d - "LPX64":%x in log "LPX64"\n",
                le32_to_cpu(rec->lrh_index), lir->lid_id.lgl_oid,
                lir->lid_id.lgl_ogen, llh->lgh_id.lgl_oid);
         RETURN(0);
@@ -334,12 +334,12 @@ static int cat_print_cb(struct llog_handle *llh, struct llog_rec_hdr *rec,
 static int plain_print_cb(struct llog_handle *llh, struct llog_rec_hdr *rec,
                           void *data)
 {
-        if (!le32_to_cpu(llh->lgh_hdr->llh_flags) & LLOG_F_IS_PLAIN) {
+        if (!(le32_to_cpu(llh->lgh_hdr->llh_flags) & LLOG_F_IS_PLAIN)) {
                 CERROR("log is not plain\n");
                 RETURN(-EINVAL);
         }
 
-        CERROR("seeing record at index %d in log "LPX64"\n",
+        CWARN("seeing record at index %d in log "LPX64"\n",
                le32_to_cpu(rec->lrh_index), llh->lgh_id.lgl_oid);
         RETURN(0);
 }
@@ -350,7 +350,7 @@ static int llog_cancel_rec_cb(struct llog_handle *llh, struct llog_rec_hdr *rec,
         struct llog_cookie cookie;
         static int i = 0;
 
-        if (!le32_to_cpu(llh->lgh_hdr->llh_flags) & LLOG_F_IS_PLAIN) {
+        if (!(le32_to_cpu(llh->lgh_hdr->llh_flags) & LLOG_F_IS_PLAIN)) {
                 CERROR("log is not plain\n");
                 RETURN(-EINVAL);
         }
@@ -382,7 +382,7 @@ static int llog_test_5(struct obd_device *obd)
                 cpu_to_le32(LLOG_MIN_REC_SIZE);
         lmr.lmr_hdr.lrh_type = cpu_to_le32(0xf00f00);
 
-        CERROR("5a: re-open catalog by id\n");
+        CWARN("5a: re-open catalog by id\n");
         rc = llog_create(ctxt, &llh, &cat_logid, NULL);
         if (rc) {
                 CERROR("5a: llog_create with logid failed: %d\n", rc);
@@ -390,21 +390,21 @@ static int llog_test_5(struct obd_device *obd)
         }
         llog_init_handle(llh, LLOG_F_IS_CAT, &uuid);
 
-        CERROR("5b: print the catalog entries.. we expect 2\n");
-        rc = llog_process(llh, (llog_cb_t)cat_print_cb, "test 5");
+        CWARN("5b: print the catalog entries.. we expect 2\n");
+        rc = llog_process(llh, (llog_cb_t)cat_print_cb, "test 5", NULL);
         if (rc) {
                 CERROR("5b: process with cat_print_cb failed: %d\n", rc);
                 GOTO(out, rc);
         }
 
-        CERROR("5c: Cancel 40000 records, see one log zapped\n");
+        CWARN("5c: Cancel 40000 records, see one log zapped\n");
         rc = llog_cat_process(llh, llog_cancel_rec_cb, "foobar");
         if (rc != -4711) {
                 CERROR("5c: process with cat_cancel_cb failed: %d\n", rc);
                 GOTO(out, rc);
         }
 
-        CERROR("5d: add 1 record to the log with many canceled empty pages\n");
+        CWARN("5d: add 1 record to the log with many canceled empty pages\n");
         rc = llog_cat_add_rec(llh, &lmr.lmr_hdr, NULL, NULL);
         if (rc) {
                 CERROR("5d: add record to the log with many canceled empty\
@@ -412,14 +412,14 @@ static int llog_test_5(struct obd_device *obd)
                 GOTO(out, rc);
         }
 
-        CERROR("5b: print the catalog entries.. we expect 1\n");
-        rc = llog_process(llh, (llog_cb_t)cat_print_cb, "test 5");
+        CWARN("5b: print the catalog entries.. we expect 1\n");
+        rc = llog_process(llh, (llog_cb_t)cat_print_cb, "test 5", NULL);
         if (rc) {
                 CERROR("5b: process with cat_print_cb failed: %d\n", rc);
                 GOTO(out, rc);
         }
 
-        CERROR("5e: print plain log entries.. expect 6\n");
+        CWARN("5e: print plain log entries.. expect 6\n");
         rc = llog_cat_process(llh, plain_print_cb, "foobar");
         if (rc) {
                 CERROR("5e: process with plain_print_cb failed: %d\n", rc);
@@ -427,7 +427,7 @@ static int llog_test_5(struct obd_device *obd)
         }
 
  out:
-        CERROR("5: close re-opened catalog\n");
+        CWARN("5: close re-opened catalog\n");
         if (llh)
                 rc = llog_cat_put(llh);
         if (rc)
@@ -448,7 +448,7 @@ static int llog_test_6(struct obd_device *obd, char *name)
         struct llog_ctxt *nctxt;
         int rc;
 
-        CERROR("6a: re-open log %s using client API\n", name);
+        CWARN("6a: re-open log %s using client API\n", name);
         mdc_obd = class_find_client_obd(mds_uuid, LUSTRE_MDC_NAME, NULL);
         if (mdc_obd == NULL) {
                 CERROR("6: no MDC devices connected to %s found.\n",
@@ -476,7 +476,7 @@ static int llog_test_6(struct obd_device *obd, char *name)
                 GOTO(parse_out, rc);
         }
 
-        rc = llog_process(llh, (llog_cb_t)plain_print_cb, NULL);
+        rc = llog_process(llh, (llog_cb_t)plain_print_cb, NULL, NULL);
         if (rc)
                 CERROR("6: llog_process failed %d\n", rc);
 
index d34dffc..931d5d3 100644 (file)
@@ -623,7 +623,7 @@ int class_config_parse_llog(struct llog_ctxt *ctxt, char *name,
         if (rc) 
                 GOTO(parse_out, rc);
 
-        rc = llog_process(llh, class_config_llog_handler, cfg);
+        rc = llog_process(llh, class_config_llog_handler, cfg, NULL);
 parse_out:
         rc2 = llog_close(llh);
         if (rc == 0)
@@ -720,7 +720,7 @@ int class_config_dump_llog(struct llog_ctxt *ctxt, char *name,
         if (rc) 
                 GOTO(parse_out, rc);
 
-        rc = llog_process(llh, class_config_dump_handler, cfg);
+        rc = llog_process(llh, class_config_dump_handler, cfg, NULL);
 parse_out:
         rc2 = llog_close(llh);
         if (rc == 0)
index 93d363e..a9b09fd 100644 (file)
@@ -1181,11 +1181,9 @@ static int filter_postsetup(struct obd_device *obd)
         ENTRY;
 
         // XXX add a storage location for the logid for size changes
-#ifdef ENABLE_ORPHANS
         rc = llog_cat_initialize(obd, 1);
         if (rc)
                 CERROR("failed to setup llogging subsystems\n");
-#endif
         RETURN(rc);
 }
 
@@ -1317,11 +1315,9 @@ static int filter_precleanup(struct obd_device *obd, int flags)
         int rc = 0;
         ENTRY;
 
-#ifdef ENABLE_ORPHANS
         rc = obd_llog_finish(obd, 0);
         if (rc)
                 CERROR("failed to cleanup llogging subsystem\n");
-#endif
 
         RETURN(rc);
 }
@@ -1806,9 +1802,15 @@ static int filter_destroy(struct obd_export *exp, struct obdo *oa,
 cleanup:
         switch(cleanup_phase) {
         case 3:
-                if (fcc != NULL)
-                        fsfilt_add_journal_cb(obd, 0, oti->oti_handle,
-                                              filter_cancel_cookies_cb, fcc);
+                if (fcc != NULL) {
+                        if (oti != NULL) {
+                                fsfilt_add_journal_cb(obd, 0, oti->oti_handle,
+                                                      filter_cancel_cookies_cb, fcc);
+                        } else {
+                                fsfilt_add_journal_cb(obd, 0, handle,
+                                                      filter_cancel_cookies_cb, fcc);
+                        }
+                }
                 rc = filter_finish_transno(exp, oti, rc);
                 rc2 = fsfilt_commit(obd, dparent->d_inode, handle, 0);
                 if (rc2) {
@@ -1863,6 +1865,7 @@ static int filter_sync(struct obd_export *exp, struct obdo *oa,
         struct obd_run_ctxt saved;
         struct filter_obd *filter;
         struct dentry *dentry;
+        struct llog_ctxt *ctxt;
         int rc, rc2;
         ENTRY;
 
@@ -1871,6 +1874,9 @@ static int filter_sync(struct obd_export *exp, struct obdo *oa,
         /* an objid of zero is taken to mean "sync whole filesystem" */
         if (!oa || !(oa->o_valid & OBD_MD_FLID)) {
                 rc = fsfilt_sync(exp->exp_obd, filter->fo_sb);
+                /* flush any remaining cancel messages out to the target */
+                ctxt = llog_get_context(exp->exp_obd, LLOG_UNLINK_REPL_CTXT);
+                llog_sync(ctxt, exp);
                 RETURN(rc);
         }
 
@@ -1955,9 +1961,7 @@ static int filter_set_info(struct obd_export *exp, __u32 keylen,
 {
         struct obd_device *obd;
         struct lustre_handle conn;
-#ifdef ENABLE_ORPHANS
         struct llog_ctxt *ctxt;
-#endif
         int rc = 0;
         ENTRY;
 
@@ -1976,10 +1980,8 @@ static int filter_set_info(struct obd_export *exp, __u32 keylen,
 
         CWARN("Received MDS connection ("LPX64")\n", conn.cookie);
         memcpy(&obd->u.filter.fo_mdc_conn, &conn, sizeof(conn));
-#ifdef ENABLE_ORPHANS
         ctxt = llog_get_context(obd, LLOG_UNLINK_REPL_CTXT);
         rc = llog_receptor_accept(ctxt, exp->exp_imp_reverse);
-#endif
         RETURN(rc);
 }
 
index b63c4c2..686fd30 100644 (file)
@@ -113,18 +113,19 @@ int filter_recov_log_unlink_cb(struct llog_handle *llh,
         struct obd_device *obd = ctxt->loc_obd;
         struct obd_export *exp = obd->obd_self_export;
         struct llog_cookie cookie;
+        struct llog_gen_rec *lgr;
         struct llog_unlink_rec *lur;
         struct obdo *oa;
-        struct obd_trans_info oti = { 0 };
         obd_id oid;
         int rc = 0;
         ENTRY;
                                                                                                                              
-        if (!le32_to_cpu(llh->lgh_hdr->llh_flags) & LLOG_F_IS_PLAIN) {
+        if (!(le32_to_cpu(llh->lgh_hdr->llh_flags) & LLOG_F_IS_PLAIN)) {
                 CERROR("log is not plain\n");
                 RETURN(-EINVAL);
         }
-        if (rec->lrh_type != MDS_UNLINK_REC) {
+        if (rec->lrh_type != MDS_UNLINK_REC &&
+            rec->lrh_type != LLOG_GEN_REC) {
                 CERROR("log record type error\n");
                 RETURN(-EINVAL);
         }
@@ -132,9 +133,19 @@ int filter_recov_log_unlink_cb(struct llog_handle *llh,
         cookie.lgc_lgl = llh->lgh_id;
         cookie.lgc_subsys = LLOG_UNLINK_ORIG_CTXT;
         cookie.lgc_index = le32_to_cpu(rec->lrh_index);
-                                                                                                                             
+
+        if (rec->lrh_type == LLOG_GEN_REC) {
+                lgr = (struct llog_gen_rec *)rec;
+                if (llog_gen_lt(lgr->lgr_gen, ctxt->loc_gen))
+                        rc = 0;
+                else
+                        rc = LLOG_PROC_BREAK;
+                CWARN("fetch generation log, send cookie\n");
+                llog_cancel(ctxt, NULL, 1, &cookie, 0);
+                RETURN(rc);
+        }
+
         lur = (struct llog_unlink_rec *)rec;
-        
         oa = obdo_alloc();
         if (oa == NULL) 
                 RETURN(-ENOMEM);
@@ -144,10 +155,10 @@ int filter_recov_log_unlink_cb(struct llog_handle *llh,
         memcpy(obdo_logcookie(oa), &cookie, sizeof(cookie));
         oid = oa->o_id;
 
-        rc = obd_destroy(exp, oa, NULL, &oti);
+        rc = obd_destroy(exp, oa, NULL, NULL);
         obdo_free(oa);
         if (rc == -ENOENT) {
-                CWARN("object already removed: send cookie\n");
+                CWARN("object already removed, send cookie\n");
                 llog_cancel(ctxt, NULL, 1, &cookie, 0);
                 RETURN(0);
         }
index a83592f..67120d7 100644 (file)
@@ -1029,7 +1029,6 @@ static int ost_handle(struct ptlrpc_request *req)
                 DEBUG_REQ(D_INODE, req, "ping");
                 rc = target_handle_ping(req);
                 break;
-#ifdef ENABLE_ORPHANS
         /* FIXME - just reply status */
         case LLOG_ORIGIN_CONNECT:
                 DEBUG_REQ(D_INODE, req, "log connect\n");
@@ -1039,7 +1038,6 @@ static int ost_handle(struct ptlrpc_request *req)
                 if (rc)
                         RETURN(rc);
                 RETURN(ptlrpc_reply(req));
-                //break;
         case OBD_LOG_CANCEL:
                 CDEBUG(D_INODE, "log cancel\n");
                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
@@ -1049,8 +1047,6 @@ static int ost_handle(struct ptlrpc_request *req)
                 if (rc)
                         RETURN(rc);
                 RETURN(ptlrpc_reply(req));
-                //break;
-#endif
         case LDLM_ENQUEUE:
                 CDEBUG(D_INODE, "enqueue\n");
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
index f9605ab..f295154 100644 (file)
@@ -1,8 +1,5 @@
 /* portals/include/config.h.in.  Generated from configure.in by autoheader.  */
 
-/* Compile with orphan support */
-#undef ENABLE_ORPHANS
-
 /* Use the Pinger */
 #undef ENABLE_PINGER
 
index 5524843..8accba6 100644 (file)
@@ -194,6 +194,7 @@ static int llog_client_read_header(struct llog_handle *handle)
                 GOTO(out, rc =-EFAULT);
        }
         memcpy(handle->lgh_hdr, hdr, sizeof (*hdr));
+        handle->lgh_last_idx = le32_to_cpu(handle->lgh_hdr->llh_tail.lrt_index);
 
 out:
         if (req)
index 1dd2f9a..0694bd1 100644 (file)
@@ -45,9 +45,9 @@
 
 #ifdef __KERNEL__
 int llog_origin_connect(struct llog_ctxt *ctxt, int count,
-                        struct llog_logid *logid,
-                        struct llog_ctxt_gen *gen)
+                        struct llog_logid *logid, struct llog_gen *gen)
 {
+        struct llog_gen_rec *lgr;
         struct obd_import *imp;
         struct ptlrpc_request *request;
         struct llogd_conn_body *req_body;
@@ -55,11 +55,31 @@ int llog_origin_connect(struct llog_ctxt *ctxt, int count,
         int rc;
         ENTRY;
 
+        if (list_empty(&ctxt->loc_handle->u.chd.chd_head)) {
+                CDEBUG(D_HA, "there is no record related to ctxt %p", ctxt);
+                RETURN(0);
+        }
+
+        /* FIXME what value for gen->conn_cnt */
+        LLOG_GEN_INC(ctxt->loc_gen);
+
+        /* first add llog_gen_rec */
+        OBD_ALLOC(lgr, sizeof(*lgr));
+        if (!lgr)
+                RETURN(-ENOMEM);
+        lgr->lgr_hdr.lrh_len = lgr->lgr_tail.lrt_len = sizeof(*lgr);
+        lgr->lgr_hdr.lrh_type = LLOG_GEN_REC;
+        lgr->lgr_gen = ctxt->loc_gen;
+        rc = llog_add(ctxt, &lgr->lgr_hdr, NULL, NULL, 1);
+        OBD_FREE(lgr, sizeof(*lgr));
+        if (rc != 1)
+                RETURN(rc);
+
         LASSERT(ctxt->loc_imp);
         imp = ctxt->loc_imp;
 
         request = ptlrpc_prep_req(imp, LLOG_ORIGIN_CONNECT, 1, &size, NULL);
-        if (!request) 
+        if (!request)
                 RETURN(-ENOMEM);
 
         req_body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof(*req_body));
@@ -87,9 +107,9 @@ int llog_handle_connect(struct ptlrpc_request *req)
         req_body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*req_body));
 
         ctxt = llog_get_context(obd, req_body->lgdc_ctxt_idx);
-        rc = llog_connect(ctxt, 1, &req_body->lgdc_logid, 
+        rc = llog_connect(ctxt, 1, &req_body->lgdc_logid,
                           &req_body->lgdc_gen);
-        if (rc != 0) 
+        if (rc != 0)
                 CERROR("failed at llog_relp_connect\n");
 
         RETURN(rc);
index b0b739e..4d9e68c 100644 (file)
@@ -34,6 +34,7 @@
 #include <linux/lustre_log.h>
 #include <linux/lustre_net.h>
 #include <portals/list.h>
+#include <linux/lustre_fsfilt.h>
 
 int llog_origin_handle_create(struct ptlrpc_request *req)
 {
@@ -238,16 +239,17 @@ int llog_origin_handle_close(struct ptlrpc_request *req)
         RETURN(rc);
 }
 
-#ifdef ENABLE_ORPHANS
 int llog_origin_handle_cancel(struct ptlrpc_request *req)
 {
         struct obd_device *obd = req->rq_export->exp_obd;
         struct obd_device *disk_obd;
         struct llog_cookie *logcookies;
         struct llog_ctxt *ctxt;
-        int num_cookies, rc = 0;
+        int num_cookies, rc = 0, err, i;
         struct obd_run_ctxt saved;
         struct llog_handle *cathandle;
+        struct inode *inode;
+        void *handle;
         ENTRY;
 
         logcookies = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*logcookies));
@@ -262,27 +264,43 @@ int llog_origin_handle_cancel(struct ptlrpc_request *req)
                 CWARN("llog subsys not setup or already cleanup\n");
                 RETURN(-ENOENT);
         }
-        down(&ctxt->loc_sem);
-        disk_obd = ctxt->loc_exp->exp_obd;
-        cathandle = ctxt->loc_handle;
-        LASSERT(cathandle);
 
+        disk_obd = ctxt->loc_exp->exp_obd;
         push_ctxt(&saved, &disk_obd->obd_ctxt, NULL);
-        rc = llog_cat_cancel_records(cathandle, num_cookies, logcookies);
+        for (i = 0; i < num_cookies; i++, logcookies++) {
+                cathandle = ctxt->loc_handle;
+                LASSERT(cathandle != NULL);
+                inode = cathandle->lgh_file->f_dentry->d_inode;
+
+                handle = fsfilt_start(disk_obd, inode,
+                                      FSFILT_OP_CANCEL_UNLINK_LOG, NULL);
+                if (IS_ERR(handle)) {
+                        CERROR("fsfilt_start failed: %ld\n", PTR_ERR(handle));
+                        GOTO(pop_ctxt, rc = PTR_ERR(handle));
+                }
+
+                rc = llog_cat_cancel_records(cathandle, 1, logcookies);
+
+                err = fsfilt_commit(disk_obd, inode, handle, 0);
+                if (err) {
+                        CERROR("error committing transaction: %d\n", err);
+                        if (!rc)
+                                rc = err;
+                        GOTO(pop_ctxt, rc);
+                }
+        }
+pop_ctxt:
+        pop_ctxt(&saved, &disk_obd->obd_ctxt, NULL);
         if (rc)
                 CERROR("cancel %d llog-records failed: %d\n", num_cookies, rc);
         else
-                CWARN("cancel %d llog-records\n", num_cookies);
-
-        pop_ctxt(&saved, &disk_obd->obd_ctxt, NULL);
-        up(&ctxt->loc_sem);
+                CDEBUG(D_HA, "cancel %d llog-records\n", num_cookies);
 
         RETURN(rc);
 }
 EXPORT_SYMBOL(llog_origin_handle_cancel);
-#endif
 
-static int llog_catinfo_config(struct obd_device *obd, char *buf, int buf_len, 
+static int llog_catinfo_config(struct obd_device *obd, char *buf, int buf_len,
                                char *client)
 {
         struct mds_obd *mds = &obd->u.mds;
@@ -292,17 +310,17 @@ static int llog_catinfo_config(struct obd_device *obd, char *buf, int buf_len,
         char name[4][64];
         int rc, i, l, remains = buf_len;
         char *out = buf;
-        
+
         if (ctxt == NULL || mds == NULL)
                 RETURN(-EOPNOTSUPP);
 
         push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_ctxt, NULL);
-        
+
         sprintf(name[0], "%s", mds->mds_profile);
         sprintf(name[1], "%s-clean", mds->mds_profile);
         sprintf(name[2], "%s", client);
         sprintf(name[3], "%s-clean", client);
-        
+
         for (i = 0; i < 4; i++) {
                 int index, uncanceled = 0;
                 rc = llog_create(ctxt, &handle, NULL, name[i]);
@@ -313,18 +331,17 @@ static int llog_catinfo_config(struct obd_device *obd, char *buf, int buf_len,
                         llog_close(handle);
                         GOTO(out_pop, rc = -ENOENT);
                 }
-                
+
                 for (index = 1; index < (LLOG_BITMAP_BYTES * 8); index ++) {
                         if (ext2_test_bit(index, handle->lgh_hdr->llh_bitmap))
                                 uncanceled++;
                 }
-                
+
                 l = snprintf(out, remains, "[Log Name]: %s\nLog Size: %llu\n"
                              "Last Index: %d\nUncanceled Records: %d\n\n",
-                             name[i], 
+                             name[i],
                              handle->lgh_file->f_dentry->d_inode->i_size,
-                             handle->lgh_last_idx,
-                             uncanceled);
+                             handle->lgh_last_idx, uncanceled);
                 out += l;
                 remains -= l;
 
@@ -344,7 +361,7 @@ struct cb_data {
         int  init;
 };
 
-static int llog_catinfo_cb(struct llog_handle *cat, 
+static int llog_catinfo_cb(struct llog_handle *cat,
                            struct llog_rec_hdr *rec, void *data)
 {
         static char *out = NULL;
@@ -365,7 +382,7 @@ static int llog_catinfo_cb(struct llog_handle *cat,
 
         if (!(cat->lgh_hdr->llh_flags & cpu_to_le32(LLOG_F_IS_CAT)))
                 RETURN(-EINVAL);
-        
+
         lir = (struct llog_logid_rec *)rec;
         logid = &lir->lid_id;
         rc = llog_create(ctxt, &handle, logid, NULL);
@@ -374,7 +391,7 @@ static int llog_catinfo_cb(struct llog_handle *cat,
         rc = llog_init_handle(handle, 0, NULL);
         if (rc)
                 GOTO(out_close, rc);
-        
+
         for (index = 1; index < (LLOG_BITMAP_BYTES * 8); index++) {
                 if (ext2_test_bit(index, handle->lgh_hdr->llh_bitmap))
                         count++;
@@ -394,13 +411,13 @@ static int llog_catinfo_cb(struct llog_handle *cat,
                 CWARN("Not enough memory\n");
                 rc = -ENOMEM;
         }
-        
+
 out_close:
         llog_close(handle);
         RETURN(rc);
 }
-                
-static int llog_catinfo_deletions(struct obd_device *obd, char *buf, 
+
+static int llog_catinfo_deletions(struct obd_device *obd, char *buf,
                                   int buf_len)
 {
         struct mds_obd *mds = &obd->u.mds;
@@ -412,24 +429,24 @@ static int llog_catinfo_deletions(struct obd_device *obd, char *buf,
         int rc;
         struct cb_data data;
         struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
+
         if (ctxt == NULL || mds == NULL)
                 RETURN(-EOPNOTSUPP);
-       
+
         count = mds->mds_lov_desc.ld_tgt_count;
         size = sizeof(*idarray) * count;
-        
+
         OBD_ALLOC(idarray, size);
         if (!idarray)
                 RETURN(-ENOMEM);
         memset(idarray, 0, size);
-        
+
         rc = llog_get_cat_list(obd, obd, name, count, idarray);
-        if (rc) 
+        if (rc)
                 GOTO(out_free, rc);
 
         push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_ctxt, NULL);
-        
+
         id = idarray;
         data.ctxt = ctxt;
         data.out = buf;
@@ -448,19 +465,19 @@ static int llog_catinfo_deletions(struct obd_device *obd, char *buf,
                         if (ext2_test_bit(index, handle->lgh_hdr->llh_bitmap))
                                 uncanceled++;
                 }
-                l = snprintf(data.out, data.remains, 
+                l = snprintf(data.out, data.remains,
                              "\n[Catlog ID]: #"LPX64"#"LPX64"#%08x  "
                              "[Log Count]: %d\n",
                              id->lgl_oid, id->lgl_ogr, id->lgl_ogen,
                              uncanceled);
-                
+
                 data.out += l;
                 data.remains -= l;
                 data.init = 1;
-                
-                llog_process(handle, llog_catinfo_cb, &data);
+
+                llog_process(handle, llog_catinfo_cb, &data, NULL);
                 llog_close(handle);
-                
+
                 if (data.remains <= 0)
                         break;
         }
@@ -470,7 +487,7 @@ out_free:
         OBD_FREE(idarray, size);
         RETURN(rc);
 }
-        
+
 int llog_catinfo(struct ptlrpc_request *req)
 {
         struct obd_export *exp = req->rq_export;
@@ -503,7 +520,7 @@ int llog_catinfo(struct ptlrpc_request *req)
         if (strlen(buf) == 0)
                 sprintf(buf, "%s", "No log informations\n");
         memcpy(reply, buf, buf_len);
-        
+
 out_free:
         OBD_FREE(buf, buf_len);
         return rc;
index 74aa72b..d28b0b6 100644 (file)
@@ -50,6 +50,8 @@
 #include <linux/lustre_log.h>
 #include "ptlrpc_internal.h"
 
+#define LLCD_SIZE 4096
+
 #ifdef __KERNEL__
 
 static struct llog_commit_master lustre_lcm;
@@ -61,7 +63,7 @@ static int llcd_alloc(void)
         struct llog_canceld_ctxt *llcd;
         int offset = offsetof(struct llog_canceld_ctxt, llcd_cookies);
 
-        OBD_ALLOC(llcd, PAGE_SIZE + offset);
+        OBD_ALLOC(llcd, LLCD_SIZE + offset);
         if (llcd == NULL)
                 return -ENOMEM;
 
@@ -107,7 +109,7 @@ static void llcd_put(struct llog_canceld_ctxt *llcd)
         int offset = offsetof(struct llog_canceld_ctxt, llcd_cookies);
 
         if (atomic_read(&lcm->lcm_llcd_numfree) >= lcm->lcm_llcd_maxfree) {
-                OBD_FREE(llcd, PAGE_SIZE + offset);
+                OBD_FREE(llcd, LLCD_SIZE + offset);
         } else {
                 spin_lock(&lcm->lcm_llcd_lock);
                 list_add(&llcd->llcd_list, &lcm->lcm_llcd_free);
@@ -128,8 +130,8 @@ void llcd_send(struct llog_canceld_ctxt *llcd)
 EXPORT_SYMBOL(llcd_send);
 
 /* deleted objects have a commit callback that cancels the MDS
- * log record for the deletion.  The commit callback calls this 
- * function 
+ * log record for the deletion.  The commit callback calls this
+ * function
  */
 int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
                          struct lov_stripe_md *lsm, int count,
@@ -160,8 +162,7 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
                                cookies->lgc_lgl.lgl_ogen, cookies->lgc_index);
                         GOTO(out, rc = -ENOMEM);
                 }
-                llcd->llcd_import = ctxt->loc_imp;
-                llcd->llcd_gen = ctxt->loc_gen;
+                llcd->llcd_ctxt = ctxt;
                 ctxt->loc_llcd = llcd;
         }
 
@@ -170,9 +171,9 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
         llcd->llcd_cookiebytes += sizeof(*cookies);
 
 send_now:
-        if ((PAGE_SIZE - llcd->llcd_cookiebytes < sizeof(*cookies) ||
+        if ((LLCD_SIZE - llcd->llcd_cookiebytes < sizeof(*cookies) ||
              flags & OBD_LLOG_FL_SENDNOW)) {
-                CDEBUG(D_HA, "send llcd: %p\n", llcd);
+                CDEBUG(D_HA, "send llcd %p:%p\n", llcd, llcd->llcd_ctxt);
                 ctxt->loc_llcd = NULL;
                 llcd_send(llcd);
         }
@@ -187,16 +188,17 @@ int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp)
         int rc = 0;
         ENTRY;
 
-        LASSERT(ctxt->loc_llcd);
-
         if (exp && (ctxt->loc_imp == exp->exp_imp_reverse)) {
-                CWARN("import will be destroyed, put llcd %p\n", 
-                      ctxt->loc_llcd);
-                llcd_put(ctxt->loc_llcd);
-                ctxt->loc_llcd = NULL;
+                down(&ctxt->loc_sem);
+                if (ctxt->loc_llcd != NULL) {
+                        CWARN("import will be destroyed, put "
+                              "llcd %p:%p\n", ctxt->loc_llcd, ctxt);
+                        llcd_put(ctxt->loc_llcd);
+                        ctxt->loc_llcd = NULL;
+                        ctxt->loc_imp = NULL;
+                }
                 up(&ctxt->loc_sem);
         } else {
-                up(&ctxt->loc_sem);
                 rc = llog_cancel(ctxt, NULL, 0, NULL, OBD_LLOG_FL_SENDNOW);
         }
 
@@ -294,11 +296,11 @@ static int log_commit_thread(void *arg)
                         llcd = list_entry(lcd->lcd_llcd_list.next,
                                           typeof(*llcd), llcd_list);
                         LASSERT(llcd->llcd_lcm == lcm);
-                        import = llcd->llcd_import;
+                        import = llcd->llcd_ctxt->loc_imp;
                 }
                 list_for_each_entry_safe(llcd, n, sending_list, llcd_list) {
                         LASSERT(llcd->llcd_lcm == lcm);
-                        if (import == llcd->llcd_import)
+                        if (import == llcd->llcd_ctxt->loc_imp)
                                 list_move_tail(&llcd->llcd_list,
                                                &lcd->lcd_llcd_list);
                 }
@@ -306,7 +308,7 @@ static int log_commit_thread(void *arg)
                         list_for_each_entry_safe(llcd, n, &lcm->lcm_llcd_resend,
                                                  llcd_list) {
                                 LASSERT(llcd->llcd_lcm == lcm);
-                                if (import == llcd->llcd_import)
+                                if (import == llcd->llcd_ctxt->loc_imp)
                                         list_move_tail(&llcd->llcd_list,
                                                        &lcd->lcd_llcd_list);
                         }
@@ -316,31 +318,29 @@ static int log_commit_thread(void *arg)
                 /* We are the only one manipulating our local list - no lock */
                 list_for_each_entry_safe(llcd,n, &lcd->lcd_llcd_list,llcd_list){
                         char *bufs[1] = {(char *)llcd->llcd_cookies};
-                        struct obd_device *obd = import->imp_obd;
-                        struct llog_ctxt *ctxt;
 
                         list_del(&llcd->llcd_list);
                         if (llcd->llcd_cookiebytes == 0) {
-                                CDEBUG(D_HA, "just put empty llcd %p\n", llcd);
+                                CDEBUG(D_HA, "put empty llcd %p:%p\n",
+                                       llcd, llcd->llcd_ctxt);
                                 llcd_put(llcd);
                                 continue;
                         }
-                        /* check whether the cookies are new. if new then send, otherwise
-                         * just put llcd */
-                        ctxt = llog_get_context(obd, llcd->llcd_cookies[0].lgc_subsys + 1);
-                        LASSERT(ctxt != NULL);
-                        down(&ctxt->loc_sem);
-                        if (log_gen_lt(llcd->llcd_gen, ctxt->loc_gen)) {
-                                up(&ctxt->loc_sem); 
-                                CDEBUG(D_HA, "just put stale llcd %p\n", llcd);
+
+                        down(&llcd->llcd_ctxt->loc_sem);
+                        if (llcd->llcd_ctxt->loc_imp == NULL) {
+                                up(&llcd->llcd_ctxt->loc_sem);
+                                CWARN("import will be destroyed, put "
+                                      "llcd %p:%p\n", llcd, llcd->llcd_ctxt);
                                 llcd_put(llcd);
                                 continue;
                         }
-                        up(&ctxt->loc_sem); 
 
                         request = ptlrpc_prep_req(import, OBD_LOG_CANCEL, 1,
                                                   &llcd->llcd_cookiebytes,
                                                   bufs);
+                        up(&llcd->llcd_ctxt->loc_sem);
+
                         if (request == NULL) {
                                 rc = -ENOMEM;
                                 CERROR("error preparing commit: rc %d\n", rc);
@@ -354,8 +354,18 @@ static int log_commit_thread(void *arg)
                         }
 
                         request->rq_replen = lustre_msg_size(0, NULL);
+                        down(&llcd->llcd_ctxt->loc_sem);
+                        if (llcd->llcd_ctxt->loc_imp == NULL) {
+                                up(&llcd->llcd_ctxt->loc_sem);
+                                CWARN("import will be destroyed, put "
+                                      "llcd %p:%p\n", llcd, llcd->llcd_ctxt);
+                                llcd_put(llcd);
+                                ptlrpc_req_finished(request);
+                                continue;
+                        }
                         rc = ptlrpc_queue_wait(request);
                         ptlrpc_req_finished(request);
+                        up(&llcd->llcd_ctxt->loc_sem);
 
                         /* If the RPC failed, we put this and the remaining
                          * messages onto the resend list for another time. */
@@ -364,7 +374,7 @@ static int log_commit_thread(void *arg)
                                 continue;
                         }
 
-#if 0                   /* FIXME just put llcd, not send it again */
+#if 0                   /* FIXME just put llcd, not put it on resend list */
                         spin_lock(&lcm->lcm_llcd_lock);
                         list_splice(&lcd->lcd_llcd_list, &lcm->lcm_llcd_resend);
                         if (++llcd->llcd_tries < 5) {
@@ -377,13 +387,15 @@ static int log_commit_thread(void *arg)
                         } else {
                                 spin_unlock(&lcm->lcm_llcd_lock);
 #endif
-                                CERROR("commit %p dropped %d cookies: rc %d\n",
-                                       llcd, (int)(llcd->llcd_cookiebytes /
-                                                   sizeof(*llcd->llcd_cookies)),
-                                       rc);
+                                CERROR("commit %p:%p drop %d cookies: rc %d\n",
+                                       llcd, llcd->llcd_ctxt,
+                                       (int)(llcd->llcd_cookiebytes /
+                                             sizeof(*llcd->llcd_cookies)), rc);
                                 llcd_put(llcd);
-//                        }
+#if 0
+                        }
                         break;
+#endif
                 }
 
                 if (rc == 0) {
@@ -439,11 +451,12 @@ int llog_start_commit_thread(void)
 EXPORT_SYMBOL(llog_start_commit_thread);
 
 static struct llog_process_args {
-        struct semaphore         llpa_sem; 
+        struct semaphore         llpa_sem;
         struct llog_ctxt        *llpa_ctxt;
         void                    *llpa_cb;
         void                    *llpa_arg;
 } llpa;
+
 int llog_init_commit_master(void)
 {
         INIT_LIST_HEAD(&lcm->lcm_thread_busy);
@@ -475,7 +488,6 @@ int llog_cleanup_commit_master(int force)
         return 0;
 }
 
-
 static int log_process_thread(void *args)
 {
         struct llog_process_args *data = args;
@@ -486,17 +498,17 @@ static int log_process_thread(void *args)
         unsigned long flags;
         int rc;
         ENTRY;
-                                                                                                                             
+
         up(&data->llpa_sem);
         lock_kernel();
         ptlrpc_daemonize(); /* thread never needs to do IO */
-                                                                                                                             
+
         SIGNAL_MASK_LOCK(current, flags);
         sigfillset(&current->blocked);
         RECALC_SIGPENDING;
         SIGNAL_MASK_UNLOCK(current, flags);
         unlock_kernel();
-                                                                                                                             
+
         rc = llog_create(ctxt, &llh, &logid, NULL);
         if (rc) {
                 CERROR("llog_create failed %d\n", rc);
@@ -507,26 +519,27 @@ static int log_process_thread(void *args)
                 CERROR("llog_init_handle failed %d\n", rc);
                 GOTO(out, rc);
         }
-                                                                                                                             
+
         if (cb) {
                 rc = llog_cat_process(llh, (llog_cb_t)cb, NULL);
-                if (rc)
+                if (rc != LLOG_PROC_BREAK)
                         CERROR("llog_cat_process failed %d\n", rc);
-        } else
+        } else {
                 CWARN("no callback function for recovery\n");
+        }
 
-        CDEBUG(D_HA, "send to llcd :%p forcibly\n", ctxt->loc_llcd);
+        CDEBUG(D_HA, "send llcd %p:%p forcibly after recovery\n",
+               ctxt->loc_llcd, ctxt);
         llog_sync(ctxt, NULL);
 out:
         rc = llog_cat_put(llh);
         if (rc)
                 CERROR("llog_cat_put failed %d\n", rc);
-                                                                                                                             
+
         RETURN(rc);
 }
-static int llog_recovery_generic(struct llog_ctxt *ctxt,
-                                 void *handle,
-                                 void *arg)
+
+static int llog_recovery_generic(struct llog_ctxt *ctxt, void *handle,void *arg)
 {
         int rc;
         ENTRY;
@@ -546,31 +559,33 @@ static int llog_recovery_generic(struct llog_ctxt *ctxt,
 
         RETURN(rc);
 }
+
 int llog_repl_connect(struct llog_ctxt *ctxt, int count,
-                      struct llog_logid *logid, struct llog_ctxt_gen *gen)
+                      struct llog_logid *logid, struct llog_gen *gen)
 {
         struct llog_canceld_ctxt *llcd;
         int rc;
         ENTRY;
-                                                                                                                             
+
+        /* send back llcd before recovery from llog */
+        if (ctxt->loc_llcd != NULL) {
+                CWARN("llcd %p:%p not empty\n", ctxt->loc_llcd, ctxt);
+                llog_sync(ctxt, NULL);
+        }
+
         down(&ctxt->loc_sem);
         ctxt->loc_gen = *gen;
-        llcd = ctxt->loc_llcd;
-        if (llcd) {
-                CDEBUG(D_HA, "put current llcd when new connection arrives\n");
-                llcd_put(llcd);
-        }
         llcd = llcd_grab();
         if (llcd == NULL) {
                 CERROR("couldn't get an llcd\n");
+                up(&ctxt->loc_sem);
                 RETURN(-ENOMEM);
         }
-        llcd->llcd_import = ctxt->loc_imp;
-        llcd->llcd_gen = ctxt->loc_gen;
+        llcd->llcd_ctxt = ctxt;
         ctxt->loc_llcd = llcd;
         up(&ctxt->loc_sem);
 
-        rc = llog_recovery_generic(ctxt, ctxt->llog_proc_cb, logid); 
+        rc = llog_recovery_generic(ctxt, ctxt->llog_proc_cb, logid);
         if (rc != 0)
                 CERROR("error recovery process: %d\n", rc);
 
index 034ee8a..bed8b61 100755 (executable)
@@ -14,7 +14,7 @@ init_test_env $@
 . ${CONFIG:=$LUSTRE/tests/cfg/local.sh}
 
 # Skip these tests
-ALWAYS_EXCEPT="35"
+ALWAYS_EXCEPT=""
 
 
 gen_config() {
@@ -707,5 +707,41 @@ test_37() {
 }
 run_test 37 "abort recovery before client does replay (test mds_cleanup_orphans for directories)"
 
+test_38() {
+    for i in `seq 1 800`; do
+       touch $DIR/$tfile-$i
+    done
+    for i in `seq 1 400`; do
+       rm $DIR/$tfile-$i
+    done
+
+    replay_barrier mds
+    fail mds
+    for i in `seq 401 800`; do
+       rm $DIR/$tfile-$i
+    done
+    sleep 2
+    $CHECKSTAT -t file $DIR/$tfile-* && return 1 || true
+}
+run_test 38 "test recovery from unlink llog (test llog_gen_rec) "
+
+test_39() {
+    for i in `seq 1 800`; do
+       touch $DIR/$tfile-$i
+    done
+
+    replay_barrier mds
+    for i in `seq 1 400`; do
+       rm $DIR/$tfile-$i
+    done
+    fail mds
+    for i in `seq 401 800`; do
+       rm $DIR/$tfile-$i
+    done
+    sleep 2
+    $CHECKSTAT -t file $DIR/$tfile-* && return 1 || true
+}
+run_test 39 "test recovery from unlink llog (test llog_gen_rec) "
+
 equals_msg test complete, cleaning up
 $CLEANUP
index 7397fcb..18810ae 100644 (file)
@@ -137,6 +137,12 @@ check_obdo(void)
         CHECK_VALUE(OBD_MD_FLOSCOPQ);
         CHECK_VALUE(OBD_MD_FLCOOKIE);
         CHECK_VALUE(OBD_MD_FLGROUP);
+
+        CHECK_VALUE(OBD_FL_INLINEDATA);
+        CHECK_VALUE(OBD_FL_OBDMDEXISTS);
+        CHECK_VALUE(OBD_FL_DELORPHAN);
+        CHECK_VALUE(OBD_FL_NORPC);
+        CHECK_VALUE(OBD_FL_IDONLY);
 }
 
 void
@@ -206,6 +212,7 @@ check_niobuf_remote(void)
         CHECK_VALUE(OBD_BRW_WRITE);
         CHECK_VALUE(OBD_BRW_CREATE);
         CHECK_VALUE(OBD_BRW_SYNC);
+        CHECK_VALUE(OBD_BRW_FROM_GRANT);
 }
 
 void
@@ -269,6 +276,7 @@ check_mds_body(void)
         CHECK_VALUE(FMODE_EXEC);
         CHECK_VALUE(MDS_OPEN_CREAT);
         CHECK_VALUE(MDS_OPEN_EXCL);
+        CHECK_VALUE(MDS_OPEN_TRUNC);
         CHECK_VALUE(MDS_OPEN_APPEND);
         CHECK_VALUE(MDS_OPEN_SYNC);
         CHECK_VALUE(MDS_OPEN_DIRECTORY);
@@ -508,6 +516,7 @@ check_llog_logid(void)
         CHECK_VALUE(MDS_UNLINK_REC);
         CHECK_VALUE(OBD_CFG_REC);
         CHECK_VALUE(PTL_CFG_REC);
+        CHECK_VALUE(LLOG_GEN_REC);
         CHECK_VALUE(LLOG_HDR_MAGIC);
         CHECK_VALUE(LLOG_LOGID_MAGIC);
 }
@@ -532,6 +541,79 @@ check_llog_rec_tail(void)
 }
 
 void
+check_llog_logid_rec(void)
+{
+        BLANK_LINE();
+        CHECK_STRUCT(llog_logid_rec);
+        CHECK_MEMBER(llog_logid_rec, lid_hdr);
+        CHECK_MEMBER(llog_logid_rec, lid_id);
+        CHECK_MEMBER(llog_logid_rec, lid_tail);
+}
+
+void
+check_llog_create_rec(void)
+{
+        BLANK_LINE();
+        CHECK_STRUCT(llog_create_rec);
+        CHECK_MEMBER(llog_create_rec, lcr_hdr);
+        CHECK_MEMBER(llog_create_rec, lcr_fid);
+        CHECK_MEMBER(llog_create_rec, lcr_oid);
+        CHECK_MEMBER(llog_create_rec, lcr_ogen);
+}
+
+void
+check_llog_orphan_rec(void)
+{
+        BLANK_LINE();
+        CHECK_STRUCT(llog_orphan_rec);
+        CHECK_MEMBER(llog_orphan_rec, lor_hdr);
+        CHECK_MEMBER(llog_orphan_rec, lor_oid);
+        CHECK_MEMBER(llog_orphan_rec, lor_ogen);
+        CHECK_MEMBER(llog_orphan_rec, lor_tail);
+}
+
+void
+check_llog_unlink_rec(void)
+{
+        BLANK_LINE();
+        CHECK_STRUCT(llog_unlink_rec);
+        CHECK_MEMBER(llog_unlink_rec, lur_hdr);
+        CHECK_MEMBER(llog_unlink_rec, lur_oid);
+        CHECK_MEMBER(llog_unlink_rec, lur_ogen);
+        CHECK_MEMBER(llog_unlink_rec, lur_tail);
+}
+
+void
+check_llog_size_change_rec(void)
+{
+        BLANK_LINE();
+        CHECK_STRUCT(llog_size_change_rec);
+        CHECK_MEMBER(llog_size_change_rec, lsc_hdr);
+        CHECK_MEMBER(llog_size_change_rec, lsc_fid);
+        CHECK_MEMBER(llog_size_change_rec, lsc_io_epoch);
+        CHECK_MEMBER(llog_size_change_rec, lsc_tail);
+}
+
+void
+check_llog_gen(void)
+{
+        BLANK_LINE();
+        CHECK_STRUCT(llog_gen);
+        CHECK_MEMBER(llog_gen, mnt_cnt);
+        CHECK_MEMBER(llog_gen, conn_cnt);
+}
+
+void
+check_llog_gen_rec(void)
+{
+        BLANK_LINE();
+        CHECK_STRUCT(llog_gen_rec);
+        CHECK_MEMBER(llog_gen_rec, lgr_hdr);
+        CHECK_MEMBER(llog_gen_rec, lgr_gen);
+        CHECK_MEMBER(llog_gen_rec, lgr_tail);
+}
+
+void
 check_llog_log_hdr(void)
 {
         BLANK_LINE();
@@ -577,15 +659,8 @@ check_llogd_body(void)
         CHECK_VALUE(LLOG_ORIGIN_HANDLE_READ_HEADER);
         CHECK_VALUE(LLOG_ORIGIN_HANDLE_WRITE_REC);
         CHECK_VALUE(LLOG_ORIGIN_HANDLE_CLOSE);
-}
-
-void
-check_llog_ctxt_gen(void)
-{
-        BLANK_LINE();
-        CHECK_STRUCT(llog_ctxt_gen);
-        CHECK_MEMBER(llog_ctxt_gen, mnt_cnt);
-        CHECK_MEMBER(llog_ctxt_gen, conn_cnt);
+        CHECK_VALUE(LLOG_ORIGIN_CONNECT);
+        CHECK_VALUE(LLOG_CATINFO);
 }
 
 void
@@ -640,10 +715,6 @@ main(int argc, char **argv)
         CHECK_VALUE(OST_SAN_WRITE);
         CHECK_VALUE(OST_SYNC);
         CHECK_VALUE(OST_LAST_OPC);
-        CHECK_VALUE(OST_FIRST_OPC);
-
-        CHECK_VALUE(OBD_FL_INLINEDATA);
-        CHECK_VALUE(OBD_FL_OBDMDEXISTS);
 
         CHECK_DEFINE(OBD_OBJECT_EOF);
 
@@ -658,8 +729,11 @@ main(int argc, char **argv)
         CHECK_VALUE(MDS_DISCONNECT);
         CHECK_VALUE(MDS_GETSTATUS);
         CHECK_VALUE(MDS_STATFS);
+        CHECK_VALUE(MDS_PIN);
+        CHECK_VALUE(MDS_UNPIN);
+        CHECK_VALUE(MDS_SYNC);
+        CHECK_VALUE(MDS_DONE_WRITING);
         CHECK_VALUE(MDS_LAST_OPC);
-        CHECK_VALUE(MDS_FIRST_OPC);
 
         CHECK_VALUE(REINT_SETATTR);
         CHECK_VALUE(REINT_CREATE);
@@ -687,7 +761,13 @@ main(int argc, char **argv)
         CHECK_VALUE(LDLM_BL_CALLBACK);
         CHECK_VALUE(LDLM_CP_CALLBACK);
         CHECK_VALUE(LDLM_LAST_OPC);
-        CHECK_VALUE(LDLM_FIRST_OPC);
+
+        CHECK_VALUE(LCK_EX);
+        CHECK_VALUE(LCK_PW);
+        CHECK_VALUE(LCK_PR);
+        CHECK_VALUE(LCK_CW);
+        CHECK_VALUE(LCK_CR);
+        CHECK_VALUE(LCK_NL);
 
         CHECK_VALUE(PTLBD_QUERY);
         CHECK_VALUE(PTLBD_READ);
@@ -696,9 +776,14 @@ main(int argc, char **argv)
         CHECK_VALUE(PTLBD_CONNECT);
         CHECK_VALUE(PTLBD_DISCONNECT);
         CHECK_VALUE(PTLBD_LAST_OPC);
-        CHECK_VALUE(PTLBD_FIRST_OPC);
+
+        CHECK_VALUE(MGMT_CONNECT);
+        CHECK_VALUE(MGMT_DISCONNECT);
+        CHECK_VALUE(MGMT_EXCEPTION);
 
         CHECK_VALUE(OBD_PING);
+        CHECK_VALUE(OBD_LOG_CANCEL);
+        CHECK_VALUE(OBD_LAST_OPC);
 
         COMMENT("Sizes and Offsets");
         BLANK_LINE();
index 9a37aa4..3535aaa 100644 (file)
@@ -49,9 +49,6 @@ void lustre_assert_wire_constants(void)
         LASSERT(OST_SAN_WRITE == 15);
         LASSERT(OST_SYNC == 16);
         LASSERT(OST_LAST_OPC == 18);
-        LASSERT(OST_FIRST_OPC == 0);
-        LASSERT(OBD_FL_INLINEDATA == 1);
-        LASSERT(OBD_FL_OBDMDEXISTS == 2);
         LASSERT(OBD_OBJECT_EOF == 0xffffffffffffffffULL);
         LASSERT(OST_REQ_HAS_OA1 == 1);
         LASSERT(MDS_GETATTR == 33);
@@ -63,8 +60,11 @@ void lustre_assert_wire_constants(void)
         LASSERT(MDS_DISCONNECT == 39);
         LASSERT(MDS_GETSTATUS == 40);
         LASSERT(MDS_STATFS == 41);
-        LASSERT(MDS_LAST_OPC == 45);
-        LASSERT(MDS_FIRST_OPC == 33);
+        LASSERT(MDS_PIN == 42);
+        LASSERT(MDS_UNPIN == 43);
+        LASSERT(MDS_SYNC == 44);
+        LASSERT(MDS_DONE_WRITING == 45);
+        LASSERT(MDS_LAST_OPC == 46);
         LASSERT(REINT_SETATTR == 1);
         LASSERT(REINT_CREATE == 2);
         LASSERT(REINT_LINK == 3);
@@ -87,7 +87,12 @@ void lustre_assert_wire_constants(void)
         LASSERT(LDLM_BL_CALLBACK == 104);
         LASSERT(LDLM_CP_CALLBACK == 105);
         LASSERT(LDLM_LAST_OPC == 106);
-        LASSERT(LDLM_FIRST_OPC == 101);
+        LASSERT(LCK_EX == 1);
+        LASSERT(LCK_PW == 2);
+        LASSERT(LCK_PR == 3);
+        LASSERT(LCK_CW == 4);
+        LASSERT(LCK_CR == 5);
+        LASSERT(LCK_NL == 6);
         LASSERT(PTLBD_QUERY == 200);
         LASSERT(PTLBD_READ == 201);
         LASSERT(PTLBD_WRITE == 202);
@@ -95,8 +100,12 @@ void lustre_assert_wire_constants(void)
         LASSERT(PTLBD_CONNECT == 204);
         LASSERT(PTLBD_DISCONNECT == 205);
         LASSERT(PTLBD_LAST_OPC == 206);
-        LASSERT(PTLBD_FIRST_OPC == 200);
+        LASSERT(MGMT_CONNECT == 250);
+        LASSERT(MGMT_DISCONNECT == 251);
+        LASSERT(MGMT_EXCEPTION == 252);
         LASSERT(OBD_PING == 400);
+        LASSERT(OBD_LOG_CANCEL == 401);
+        LASSERT(OBD_LAST_OPC == 402);
         /* Sizes and Offsets */
 
 
@@ -197,6 +206,11 @@ void lustre_assert_wire_constants(void)
         LASSERT(OBD_MD_FLOSCOPQ == 4194304);
         LASSERT(OBD_MD_FLCOOKIE == 8388608);
         LASSERT(OBD_MD_FLGROUP == 16777216);
+        LASSERT(OBD_FL_INLINEDATA == 1);
+        LASSERT(OBD_FL_OBDMDEXISTS == 2);
+        LASSERT(OBD_FL_DELORPHAN == 4);
+        LASSERT(OBD_FL_NORPC == 8);
+        LASSERT(OBD_FL_IDONLY == 16);
 
         /* Checks for struct lov_mds_md_v1 */
         LASSERT((int)sizeof(struct lov_mds_md_v1) == 32);
@@ -274,6 +288,7 @@ void lustre_assert_wire_constants(void)
         LASSERT(OBD_BRW_WRITE == 2);
         LASSERT(OBD_BRW_CREATE == 4);
         LASSERT(OBD_BRW_SYNC == 8);
+        LASSERT(OBD_BRW_FROM_GRANT == 32);
 
         /* Checks for struct ost_body */
         LASSERT((int)sizeof(struct ost_body) == 168);
@@ -349,6 +364,7 @@ void lustre_assert_wire_constants(void)
         LASSERT(FMODE_EXEC == 4);
         LASSERT(MDS_OPEN_CREAT == 64);
         LASSERT(MDS_OPEN_EXCL == 128);
+        LASSERT(MDS_OPEN_TRUNC == 512);
         LASSERT(MDS_OPEN_APPEND == 1024);
         LASSERT(MDS_OPEN_SYNC == 4096);
         LASSERT(MDS_OPEN_DIRECTORY == 65536);
@@ -617,6 +633,7 @@ void lustre_assert_wire_constants(void)
         LASSERT(MDS_UNLINK_REC == 274801668);
         LASSERT(OBD_CFG_REC == 274857984);
         LASSERT(PTL_CFG_REC == 274923520);
+        LASSERT(LLOG_GEN_REC == 274989056);
         LASSERT(LLOG_HDR_MAGIC == 275010873);
         LASSERT(LLOG_LOGID_MAGIC == 275010874);
 
@@ -691,6 +708,8 @@ void lustre_assert_wire_constants(void)
         LASSERT(LLOG_ORIGIN_HANDLE_READ_HEADER == 503);
         LASSERT(LLOG_ORIGIN_HANDLE_WRITE_REC == 504);
         LASSERT(LLOG_ORIGIN_HANDLE_CLOSE == 505);
+        LASSERT(LLOG_ORIGIN_CONNECT == 506);
+        LASSERT(LLOG_CATINFO == 507);
 
         /* Checks for struct llogd_conn_body */
         LASSERT((int)sizeof(struct llogd_conn_body) == 40);